First working version

This commit is contained in:
jar3b 2019-03-28 18:02:02 +03:00
parent ce01279e08
commit eb4942729f
8 changed files with 418 additions and 0 deletions

35
README.md Normal file
View File

@ -0,0 +1,35 @@
# concron
**Con**tainerized **cron**. Golang native scheduler to run repeated command inside containers (Docker, k8s)
## command-line
examples:
```
# run with config file 'tasks.yaml' located in current directory
concron -c tasks.yaml
# show options
concron -h
```
arguments:
```
-c <config file> : config file, YAML format
-h : show help
-p <http port> : http port for http server (default: 8080)
-debug : show debug logs
```
## http server endpoint
- `/healthz` - health endpoint, returns code `200` with text `OK`. Useful for kubernetes pods ready/live probes.
## config format
example:
see [tasks.yaml](tasks.yaml)
description:
_in work..._

9
go.mod Normal file
View File

@ -0,0 +1,9 @@
module github.com/jar3b/concron
require (
github.com/jar3b/logrus-levelpad-formatter v1.0.0
github.com/julienschmidt/httprouter v1.2.0
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/sirupsen/logrus v1.4.0
gopkg.in/yaml.v2 v2.2.2
)

28
src/helpers/helpers.go Normal file
View File

@ -0,0 +1,28 @@
package helpers
import (
"bufio"
"os"
)
func ReadBinFile(filename string) ([]byte, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
stats, statsErr := file.Stat()
if statsErr != nil {
return nil, statsErr
}
var size int64 = stats.Size()
bytes := make([]byte, size)
bufr := bufio.NewReader(file)
_, err = bufr.Read(bytes)
return bytes, err
}

74
src/main.go Normal file
View File

@ -0,0 +1,74 @@
package main
import (
"flag"
"fmt"
"github.com/jar3b/concron/src/tasks"
"github.com/jar3b/logrus-levelpad-formatter"
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
"net/http"
)
func healthHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
w.WriteHeader(200)
w.Write([]byte("OK"))
}
func initLog(debug bool) {
// Log as JSON instead of the default ASCII formatter.
log.SetFormatter(&levelpad.Formatter{
TimestampFormat: "2006-01-02 15:04:05.000",
LogFormat: "[%lvl%][%time%] %msg%\n",
LevelPad: 8,
})
if debug {
log.SetLevel(log.DebugLevel)
} else {
log.SetLevel(log.InfoLevel)
}
}
func main() {
// command arguments
port := flag.Int("p", 8080, "HTTP server port")
debug := flag.Bool("debug", false, "debug mode")
configFile := flag.String("c", "tasks.yaml", "config file location")
flag.Parse()
initLog(*debug)
// manage tasks
taskList, err := tasks.LoadTasks(*configFile)
if err != nil {
log.Fatalf("cannot load %s: %v", *configFile, err)
return
}
// start scheduler
sched, err := tasks.NewScheduler()
if err != nil {
log.Fatalf("cannot initialize scheduler: %v", err)
return
}
if err = sched.AddTasks(taskList.Tasks); err != nil {
log.Fatalf("cannot add task list to scheduler: %v", err)
return
}
if err = sched.Start(); err != nil {
log.Fatalf("cannot start scheduler: %v", err)
return
}
// setup http router
router := httprouter.New()
router.GET("/healthz", healthHandler)
// start HTTP server
log.Infof("concron was started on :%d", *port)
err = http.ListenAndServe(fmt.Sprintf(":%d", *port), router)
if err != nil {
log.Errorf("cannot start concron http server", err)
}
}

167
src/tasks/models.go Normal file
View File

@ -0,0 +1,167 @@
package tasks
import (
"bufio"
"bytes"
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"io"
"os"
"os/exec"
"strings"
"time"
)
type taskExecution struct {
id int
time time.Time
cmd *exec.Cmd
stop chan error
}
func (te *taskExecution) Wait() {
te.stop <- te.cmd.Wait()
}
func (te *taskExecution) Stop() {
_ = te.cmd.Process.Kill()
te.stop <- errors.New("interrupted")
}
func newExecution(id int, cmd *exec.Cmd) *taskExecution {
return &taskExecution{
id: id,
time: time.Now(),
cmd: cmd,
stop: make(chan error, 1),
}
}
type Task struct {
Name string `yaml:"name"`
Crontab string `yaml:"crontab"`
Dir string `yaml:"dir"`
UseShell bool `yaml:"useShell"`
Command string `yaml:"cmd"`
Args []string `yaml:"args"`
Deadline uint32 `yaml:"deadline"`
ConcurrencyPolicy string `yaml:"concurrencyPolicy"`
// internal values
path string
cmdExe string
cmdArg []string
buf bytes.Buffer
writer io.Writer
// task executions
execCount int
execMap map[int]*taskExecution
}
func (t *Task) init(shell string, systemPath string) error {
t.execCount = 0
t.path = fmt.Sprintf("PATH=%s", systemPath)
if shell != "" && t.UseShell {
t.cmdExe = shell
t.cmdArg = []string{"-c", t.Command + " " + strings.Join(t.Args, " ")}
} else {
t.cmdExe = t.Command
t.cmdArg = t.Args
}
if t.ConcurrencyPolicy == "" {
t.ConcurrencyPolicy = "Allow"
} else if t.ConcurrencyPolicy != "Allow" && t.ConcurrencyPolicy != "Forbid" && t.ConcurrencyPolicy != "Replace" {
return errors.New(fmt.Sprintf("invalid value '%s' for concurrencyPolicy, allowed: Allow, Forbid, Replace", t.ConcurrencyPolicy))
}
t.execMap = make(map[int]*taskExecution, 0)
return nil
}
func (t *Task) getCmd() *exec.Cmd {
cmd := exec.Command(t.cmdExe, t.cmdArg...)
if t.Dir != "" {
cmd.Dir = t.Dir
}
cmd.Env = append(cmd.Env, t.path)
// setup out pipe
t.buf.Reset()
t.writer = bufio.NewWriter(&t.buf)
cmd.Stdout = t.writer
cmd.Stderr = t.writer
return cmd
}
func (t *Task) Run() {
// work with running executions
runningTaskCount := len(t.execMap)
if runningTaskCount > 0 {
if t.ConcurrencyPolicy == "Forbid" {
log.Errorf("[%-20s][%d] CANNOT RUN, another %d running executions", t.Name, t.execCount+1, runningTaskCount)
return
} else if t.ConcurrencyPolicy == "Replace" {
log.Infof("[%-20s][%d] found %d running executions, cleaning...", t.Name, t.execCount+1, runningTaskCount)
for _, ex := range t.execMap {
ex.Stop()
}
log.Infof("[%-20s][%d] %d running executions was cleaned", t.Name, t.execCount+1, runningTaskCount)
}
}
t.execCount++
// create execution
execution := newExecution(t.execCount, t.getCmd())
// start command
if err := execution.cmd.Start(); err != nil {
log.Errorf("[%-20s][%d] cannot start: %v", t.Name, execution.id, err)
return
}
// wait for execution ends
go execution.Wait()
// add execution to list
t.execMap[execution.id] = execution
log.Infof("[%-20s][%d] started", t.Name, execution.id)
select {
case err := <-execution.stop:
if err == nil {
log.Infof("[%-20s][%d] SUCCESS", t.Name, execution.id)
log.Debugf("[%-20s][%d] SUCCESS, output: %s", t.Name, execution.id, t.buf.String())
} else {
log.Infof("[%-20s][%d] ERROR, err: %v, output: %s", t.Name, execution.id, err, t.buf.String())
}
delete(t.execMap, execution.id)
}
}
// ConfigDescriptiveInfo
type ConfigDescriptiveInfo struct {
Shell string `yaml:"shell"`
Tasks []*Task `yaml:"tasks"`
}
func (di *ConfigDescriptiveInfo) InitTasks() []error {
var errList = make([]error, 0)
var err error
// get os PATH env
osPath := os.Getenv("PATH")
var taskNames []string
for _, t := range di.Tasks {
if err = t.init(di.Shell, osPath); err != nil {
errList = append(errList, err)
}
taskNames = append(taskNames, t.Name)
}
log.Infof("%d tasks loaded - %s", len(di.Tasks), strings.Join(taskNames, ", "))
return errList
}

54
src/tasks/scheduler.go Normal file
View File

@ -0,0 +1,54 @@
package tasks
import (
"errors"
"github.com/robfig/cron"
log "github.com/sirupsen/logrus"
)
type Scheduler struct {
tasks []*Task
cron *cron.Cron
started bool
}
func (s *Scheduler) AddTasks(taskList []*Task) error {
s.tasks = taskList
for _, task := range s.tasks {
if err := s.cron.AddFunc(task.Crontab, task.Run); err != nil {
return err
}
}
return nil
}
func (s *Scheduler) Start() error {
if s.started {
return errors.New("cannot start scheduler, already started")
}
s.cron.Start()
log.Info("scheduler was started")
s.started = true
return nil
}
func (s *Scheduler) Stop() error {
if s.started {
return errors.New("cannot stop scheduler, already stopped")
}
s.cron.Stop()
log.Info("scheduler was stopped")
s.started = false
return nil
}
func NewScheduler() (*Scheduler, error) {
sched := Scheduler{}
sched.cron = cron.New()
sched.started = false
return &sched, nil
}

32
src/tasks/tasks.go Normal file
View File

@ -0,0 +1,32 @@
package tasks
import (
"errors"
"fmt"
"github.com/jar3b/concron/src/helpers"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
func LoadTasks(configFileName string) (*ConfigDescriptiveInfo, error) {
config := ConfigDescriptiveInfo{}
data, err := helpers.ReadBinFile(configFileName)
if err != nil {
return nil, err
}
if err := yaml.Unmarshal([]byte(data), &config); err != nil {
return nil, err
}
// init tasks
errList := config.InitTasks()
for _, err := range errList {
log.Errorf(fmt.Sprintf("parse task error: %v", err))
}
if len(errList) > 0 {
return nil, errors.New("cannot parse tasks, errors was shown above")
}
return &config, nil
}

19
tasks.yaml Normal file
View File

@ -0,0 +1,19 @@
shell: "/bin/sh"
tasks:
- name: reservation
crontab: "*/5 * * * * *"
dir: "/tmp"
useShell: true
cmd: "sleep"
args: ["60"]
concurrencyPolicy: Replace
- name: test
crontab: "*/10 * * * * *"
dir: "/tmp"
useShell: false
cmd: "/bin/echo"
args: ["HELLO WORLD"]
- name: show_env
crontab: "*/15 * * * * *"
useShell: true
cmd: "env"