From eb4942729f8bab3d787aba4d06fe38678d7d6123 Mon Sep 17 00:00:00 2001 From: jar3b Date: Thu, 28 Mar 2019 18:02:02 +0300 Subject: [PATCH] First working version --- README.md | 35 +++++++++ go.mod | 9 +++ src/helpers/helpers.go | 28 +++++++ src/main.go | 74 ++++++++++++++++++ src/tasks/models.go | 167 +++++++++++++++++++++++++++++++++++++++++ src/tasks/scheduler.go | 54 +++++++++++++ src/tasks/tasks.go | 32 ++++++++ tasks.yaml | 19 +++++ 8 files changed, 418 insertions(+) create mode 100644 README.md create mode 100644 go.mod create mode 100644 src/helpers/helpers.go create mode 100644 src/main.go create mode 100644 src/tasks/models.go create mode 100644 src/tasks/scheduler.go create mode 100644 src/tasks/tasks.go create mode 100644 tasks.yaml diff --git a/README.md b/README.md new file mode 100644 index 0000000..3ad0108 --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# concron + +**Con**tainerized **cron**. Golang native scheduler to run repeated command inside containers (Docker, k8s) + +## command-line + +examples: +``` +# run with config file 'tasks.yaml' located in current directory +concron -c tasks.yaml +# show options +concron -h +``` + +arguments: +``` +-c : config file, YAML format +-h : show help +-p : http port for http server (default: 8080) +-debug : show debug logs +``` + +## http server endpoint + +- `/healthz` - health endpoint, returns code `200` with text `OK`. Useful for kubernetes pods ready/live probes. + +## config format + +example: + +see [tasks.yaml](tasks.yaml) + +description: + +_in work..._ \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fd9aee4 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/jar3b/concron + +require ( + github.com/jar3b/logrus-levelpad-formatter v1.0.0 + github.com/julienschmidt/httprouter v1.2.0 + github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 + github.com/sirupsen/logrus v1.4.0 + gopkg.in/yaml.v2 v2.2.2 +) diff --git a/src/helpers/helpers.go b/src/helpers/helpers.go new file mode 100644 index 0000000..0ed4b73 --- /dev/null +++ b/src/helpers/helpers.go @@ -0,0 +1,28 @@ +package helpers + +import ( + "bufio" + "os" +) + +func ReadBinFile(filename string) ([]byte, error) { + file, err := os.Open(filename) + + if err != nil { + return nil, err + } + defer file.Close() + + stats, statsErr := file.Stat() + if statsErr != nil { + return nil, statsErr + } + + var size int64 = stats.Size() + bytes := make([]byte, size) + + bufr := bufio.NewReader(file) + _, err = bufr.Read(bytes) + + return bytes, err +} diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..717779c --- /dev/null +++ b/src/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "flag" + "fmt" + "github.com/jar3b/concron/src/tasks" + "github.com/jar3b/logrus-levelpad-formatter" + "github.com/julienschmidt/httprouter" + log "github.com/sirupsen/logrus" + "net/http" +) + +func healthHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { + w.WriteHeader(200) + w.Write([]byte("OK")) +} + +func initLog(debug bool) { + // Log as JSON instead of the default ASCII formatter. + log.SetFormatter(&levelpad.Formatter{ + TimestampFormat: "2006-01-02 15:04:05.000", + LogFormat: "[%lvl%][%time%] %msg%\n", + LevelPad: 8, + }) + + if debug { + log.SetLevel(log.DebugLevel) + } else { + log.SetLevel(log.InfoLevel) + } +} + +func main() { + // command arguments + port := flag.Int("p", 8080, "HTTP server port") + debug := flag.Bool("debug", false, "debug mode") + configFile := flag.String("c", "tasks.yaml", "config file location") + flag.Parse() + + initLog(*debug) + + // manage tasks + taskList, err := tasks.LoadTasks(*configFile) + if err != nil { + log.Fatalf("cannot load %s: %v", *configFile, err) + return + } + + // start scheduler + sched, err := tasks.NewScheduler() + if err != nil { + log.Fatalf("cannot initialize scheduler: %v", err) + return + } + if err = sched.AddTasks(taskList.Tasks); err != nil { + log.Fatalf("cannot add task list to scheduler: %v", err) + return + } + if err = sched.Start(); err != nil { + log.Fatalf("cannot start scheduler: %v", err) + return + } + + // setup http router + router := httprouter.New() + router.GET("/healthz", healthHandler) + + // start HTTP server + log.Infof("concron was started on :%d", *port) + err = http.ListenAndServe(fmt.Sprintf(":%d", *port), router) + if err != nil { + log.Errorf("cannot start concron http server", err) + } +} diff --git a/src/tasks/models.go b/src/tasks/models.go new file mode 100644 index 0000000..e919c4a --- /dev/null +++ b/src/tasks/models.go @@ -0,0 +1,167 @@ +package tasks + +import ( + "bufio" + "bytes" + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "io" + "os" + "os/exec" + "strings" + "time" +) + +type taskExecution struct { + id int + time time.Time + cmd *exec.Cmd + stop chan error +} + +func (te *taskExecution) Wait() { + te.stop <- te.cmd.Wait() +} + +func (te *taskExecution) Stop() { + _ = te.cmd.Process.Kill() + te.stop <- errors.New("interrupted") +} + +func newExecution(id int, cmd *exec.Cmd) *taskExecution { + return &taskExecution{ + id: id, + time: time.Now(), + cmd: cmd, + stop: make(chan error, 1), + } +} + +type Task struct { + Name string `yaml:"name"` + Crontab string `yaml:"crontab"` + Dir string `yaml:"dir"` + UseShell bool `yaml:"useShell"` + Command string `yaml:"cmd"` + Args []string `yaml:"args"` + Deadline uint32 `yaml:"deadline"` + ConcurrencyPolicy string `yaml:"concurrencyPolicy"` + + // internal values + path string + cmdExe string + cmdArg []string + buf bytes.Buffer + writer io.Writer + + // task executions + execCount int + execMap map[int]*taskExecution +} + +func (t *Task) init(shell string, systemPath string) error { + t.execCount = 0 + t.path = fmt.Sprintf("PATH=%s", systemPath) + if shell != "" && t.UseShell { + t.cmdExe = shell + t.cmdArg = []string{"-c", t.Command + " " + strings.Join(t.Args, " ")} + } else { + t.cmdExe = t.Command + t.cmdArg = t.Args + } + + if t.ConcurrencyPolicy == "" { + t.ConcurrencyPolicy = "Allow" + } else if t.ConcurrencyPolicy != "Allow" && t.ConcurrencyPolicy != "Forbid" && t.ConcurrencyPolicy != "Replace" { + return errors.New(fmt.Sprintf("invalid value '%s' for concurrencyPolicy, allowed: Allow, Forbid, Replace", t.ConcurrencyPolicy)) + } + + t.execMap = make(map[int]*taskExecution, 0) + + return nil +} + +func (t *Task) getCmd() *exec.Cmd { + cmd := exec.Command(t.cmdExe, t.cmdArg...) + if t.Dir != "" { + cmd.Dir = t.Dir + } + cmd.Env = append(cmd.Env, t.path) + + // setup out pipe + t.buf.Reset() + t.writer = bufio.NewWriter(&t.buf) + cmd.Stdout = t.writer + cmd.Stderr = t.writer + + return cmd +} + +func (t *Task) Run() { + // work with running executions + runningTaskCount := len(t.execMap) + if runningTaskCount > 0 { + if t.ConcurrencyPolicy == "Forbid" { + log.Errorf("[%-20s][%d] CANNOT RUN, another %d running executions", t.Name, t.execCount+1, runningTaskCount) + return + } else if t.ConcurrencyPolicy == "Replace" { + log.Infof("[%-20s][%d] found %d running executions, cleaning...", t.Name, t.execCount+1, runningTaskCount) + for _, ex := range t.execMap { + ex.Stop() + } + log.Infof("[%-20s][%d] %d running executions was cleaned", t.Name, t.execCount+1, runningTaskCount) + } + } + t.execCount++ + + // create execution + execution := newExecution(t.execCount, t.getCmd()) + // start command + if err := execution.cmd.Start(); err != nil { + log.Errorf("[%-20s][%d] cannot start: %v", t.Name, execution.id, err) + return + } + // wait for execution ends + go execution.Wait() + // add execution to list + t.execMap[execution.id] = execution + + log.Infof("[%-20s][%d] started", t.Name, execution.id) + select { + case err := <-execution.stop: + if err == nil { + log.Infof("[%-20s][%d] SUCCESS", t.Name, execution.id) + log.Debugf("[%-20s][%d] SUCCESS, output: %s", t.Name, execution.id, t.buf.String()) + } else { + log.Infof("[%-20s][%d] ERROR, err: %v, output: %s", t.Name, execution.id, err, t.buf.String()) + } + delete(t.execMap, execution.id) + } +} + +// ConfigDescriptiveInfo +type ConfigDescriptiveInfo struct { + Shell string `yaml:"shell"` + Tasks []*Task `yaml:"tasks"` +} + +func (di *ConfigDescriptiveInfo) InitTasks() []error { + var errList = make([]error, 0) + var err error + + // get os PATH env + osPath := os.Getenv("PATH") + + var taskNames []string + for _, t := range di.Tasks { + if err = t.init(di.Shell, osPath); err != nil { + errList = append(errList, err) + } + taskNames = append(taskNames, t.Name) + } + + log.Infof("%d tasks loaded - %s", len(di.Tasks), strings.Join(taskNames, ", ")) + + return errList +} diff --git a/src/tasks/scheduler.go b/src/tasks/scheduler.go new file mode 100644 index 0000000..f72207e --- /dev/null +++ b/src/tasks/scheduler.go @@ -0,0 +1,54 @@ +package tasks + +import ( + "errors" + "github.com/robfig/cron" + log "github.com/sirupsen/logrus" +) + +type Scheduler struct { + tasks []*Task + cron *cron.Cron + started bool +} + +func (s *Scheduler) AddTasks(taskList []*Task) error { + s.tasks = taskList + for _, task := range s.tasks { + if err := s.cron.AddFunc(task.Crontab, task.Run); err != nil { + return err + } + } + + return nil +} + +func (s *Scheduler) Start() error { + if s.started { + return errors.New("cannot start scheduler, already started") + } + s.cron.Start() + log.Info("scheduler was started") + s.started = true + + return nil +} + +func (s *Scheduler) Stop() error { + if s.started { + return errors.New("cannot stop scheduler, already stopped") + } + s.cron.Stop() + log.Info("scheduler was stopped") + s.started = false + + return nil +} + +func NewScheduler() (*Scheduler, error) { + sched := Scheduler{} + sched.cron = cron.New() + sched.started = false + + return &sched, nil +} diff --git a/src/tasks/tasks.go b/src/tasks/tasks.go new file mode 100644 index 0000000..350acf7 --- /dev/null +++ b/src/tasks/tasks.go @@ -0,0 +1,32 @@ +package tasks + +import ( + "errors" + "fmt" + "github.com/jar3b/concron/src/helpers" + log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" +) + +func LoadTasks(configFileName string) (*ConfigDescriptiveInfo, error) { + config := ConfigDescriptiveInfo{} + data, err := helpers.ReadBinFile(configFileName) + if err != nil { + return nil, err + } + + if err := yaml.Unmarshal([]byte(data), &config); err != nil { + return nil, err + } + + // init tasks + errList := config.InitTasks() + for _, err := range errList { + log.Errorf(fmt.Sprintf("parse task error: %v", err)) + } + if len(errList) > 0 { + return nil, errors.New("cannot parse tasks, errors was shown above") + } + + return &config, nil +} diff --git a/tasks.yaml b/tasks.yaml new file mode 100644 index 0000000..79b4058 --- /dev/null +++ b/tasks.yaml @@ -0,0 +1,19 @@ +shell: "/bin/sh" +tasks: + - name: reservation + crontab: "*/5 * * * * *" + dir: "/tmp" + useShell: true + cmd: "sleep" + args: ["60"] + concurrencyPolicy: Replace + - name: test + crontab: "*/10 * * * * *" + dir: "/tmp" + useShell: false + cmd: "/bin/echo" + args: ["HELLO WORLD"] + - name: show_env + crontab: "*/15 * * * * *" + useShell: true + cmd: "env" \ No newline at end of file