From b25d454be370cbe7bb3ddfa01be6e03fe1ac3a33 Mon Sep 17 00:00:00 2001 From: jar3b Date: Mon, 19 Aug 2019 17:03:18 +0300 Subject: [PATCH] Initial commit --- README.md | 111 ++++++++++++++++++ connection.go | 83 ++++++++++++++ go.mod | 10 ++ migrations.go | 266 ++++++++++++++++++++++++++++++++++++++++++++ migrations_table.go | 51 +++++++++ 5 files changed, 521 insertions(+) create mode 100644 README.md create mode 100644 connection.go create mode 100644 go.mod create mode 100644 migrations.go create mode 100644 migrations_table.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..95a121a --- /dev/null +++ b/README.md @@ -0,0 +1,111 @@ +# clickhouse-migrator + +## Install + +```bash +go get https://github.com/jar3b/clickhouse-migrator +``` + +## Migrations + +You need subfolders with migration files in format `-..sql`, where: + +1. `id` is migration identifier 4-digit including trailing `0`, example: `0001` +2. `name` is migration name. Text field, must pass a regex `[A-z_0-9]+`, example `initial` +3. `role` is role of node which need this migrations. Choices are: `master` or `node`. Be care! +`node` migrations also applied in `master`; is common case, `master` migrations contains `Distributed` +table descriptions. + +Example of `0001-initial.master.sql`: + +```sql +DROP TABLE IF EXISTS %Table:test%; +CREATE TABLE %Table:test%( + event_date Date, + event_time DateTime, + event_type UInt8, + event_data FixedString(17), +) engine=Distributed(%CLUSTER_NAME%, %DB_NAME%, %ChildTable:test%, event_time); +``` + +Example of `0001-initial.node.sql` + +```sql +DROP TABLE IF EXISTS %Table:test%; +CREATE TABLE %Table:test%( + event_date Date, + event_time DateTime, + event_type UInt8, + event_data FixedString(17), +) engine=MergeTree(event_date, (event_time, event_type, event_data), 8192); +``` + +As you can see, you can use these "helpers": + +- `%Table:%` table name, good for per-node tables if you use `Distributed` tables. These tables will be postfixed +with value, described below, BUT only if table is referenced by `Distributed` table. If you dont use them, you must always use + `%Table:%` definition or specify table name as-is (`DROP TABLE IF EXISTS test;`) +- `%ChildTable:test%` - reference for a regular table name in `Distributed` table definition. This is a kind of link to a regular table. +- `%CLUSTER_NAME%` - name of ClickHouse cluster (if it was used). +- `%DB_NAME%` - database name. + +## Code usage + +```go +package main + +import ( + clickhouseMigrator "github.com/jar3b/clickhouse-migrator" + log "github.com/sirupsen/logrus" +) + +func main() { + mig, err := clickhouseMigrator.NewMigrator( + conf.Clickhouse.NodeList, // list of node strings (host:port) + conf.Clickhouse.MasterNode, // master node address (host:port) + conf.Clickhouse.User, + conf.Clickhouse.Pass, + conf.Clickhouse.ClusterName, + conf.Clickhouse.DbName, + "migrations", // directory with migrations + "_impl", // it is a regular table postfix, if "Distributed" was used + ) + if err != nil { + log.Fatalf("cannot create migrator: %v", err) + } + if err = mig.Apply(); err != nil { + log.Fatalf("cannot apply migrations: %v", err) + } + log.Info("Migrations applied") +} +``` + +where conf is + +``` +Clickhouse struct { + NodeList string `envconfig:"NODE_LIST" required:"true"` + MasterNode string `envconfig:"MASTER_NODE" required:"true"` + User string `envconfig:"USER" required:"false"` + Pass string `envconfig:"PASS" required:"false"` + ClusterName string `envconfig:"CLUSTER_NAME" required:"true"` + DbName string `envconfig:"DB" required:"true"` + ReplicaCount int `envconfig:"REPLICA_COUNT" required:"true"` +} +``` + +and env + +``` +CLICKHOUSE_NODE_LIST=clickhouse:8090,clickhouse:8091,clickhouse:8092 +CLICKHOUSE_MASTER_NODE=clickhouse:8090 +CLICKHOUSE_USER=default +CLICKHOUSE_PASS= +CLICKHOUSE_DB=my_database +CLICKHOUSE_REPLICA_COUNT=3 +CLICKHOUSE_CLUSTER_NAME=default_cluster +``` + +## Limitations + +1. Only one master node is supported \ No newline at end of file diff --git a/connection.go b/connection.go new file mode 100644 index 0000000..c414b70 --- /dev/null +++ b/connection.go @@ -0,0 +1,83 @@ +package click_mig + +import ( + "fmt" + "github.com/jmoiron/sqlx" + _ "github.com/kshvakov/clickhouse" + "strconv" + "strings" +) + +type NodeConnection struct { + NodeName string + IsMaster bool + Url string +} + +func splitHostName(node string) (string, int, error) { + if node == "" { + return "", 0, nil + } + sList := strings.Split(node, ":") + if len(sList) != 2 { + return "", 0, fmt.Errorf("invalid node string '%s'", node) + } + + port, err := strconv.Atoi(sList[1]) + if err != nil { + return "", 0, err + } + return sList[0], port, nil +} + +func makeConnectionString(host string, port int, user string, pass string, dbName string) string { + connectStr := fmt.Sprintf("tcp://%s:%d?database=%s", host, port, dbName) + if user != "" { + connectStr += "&username=" + user + } + if pass != "" { + connectStr += "&password=" + pass + } + + return connectStr +} + +func getNodeConnections(nodeList string, masterNode string, user string, pass string, clusterName string, dbName string) (*[]NodeConnection, error) { + hosts := make([]NodeConnection, 0) + + // get master cnn + masterHost, masterPort, err := splitHostName(masterNode) + if err != nil { + return nil, err + } + + nodeStrings := strings.Split(nodeList, ",") + for _, nodeStr := range nodeStrings { + nodeHost, nodePort, err := splitHostName(nodeStr) + if err != nil { + return nil, err + } + + nodeIsMaster := (nodeHost == masterHost) && (nodePort == masterPort) + hosts = append(hosts, NodeConnection{ + fmt.Sprintf("%s, master:%t", nodeStr, nodeIsMaster), + nodeIsMaster, + makeConnectionString(nodeHost, nodePort, user, pass, dbName), + }) + } + + return &hosts, nil +} + +func pingNode(client client) error { + return client.Ping() +} + +func connectToClickHouse(connectionString string) (client, error) { + chClient, err := sqlx.Open("clickhouse", connectionString) + if err != nil { + return nil, fmt.Errorf("cannot connect to Clickhouse: %v", err) + } + + return chClient, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d51eb43 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/jar3b/clickhouse-migrator + +go 1.12 + +require ( + github.com/jmoiron/sqlx v1.2.0 + github.com/kshvakov/clickhouse v1.3.9 + github.com/sirupsen/logrus v1.4.2 + google.golang.org/appengine v1.6.1 // indirect +) diff --git a/migrations.go b/migrations.go new file mode 100644 index 0000000..1e6c5f9 --- /dev/null +++ b/migrations.go @@ -0,0 +1,266 @@ +package click_mig + +import ( + "fmt" + "github.com/jmoiron/sqlx" + log "github.com/sirupsen/logrus" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" +) + +type client = *sqlx.DB + +type Migrator struct { + nodeConnections *[]NodeConnection + clusterName string + dbName string + basePath string + tablePostfix string +} + +func NewMigrator(nodeList string, masterNode string, user string, password string, clusterName string, dbName string, migrationsDir string, tablePostfix string) (*Migrator, error) { + hosts, err := getNodeConnections(nodeList, masterNode, user, password, clusterName, dbName) + if err != nil { + return nil, err + } + return &Migrator{ + nodeConnections: hosts, + clusterName: clusterName, + dbName: dbName, + basePath: migrationsDir, + tablePostfix: tablePostfix, + }, nil +} + +type migrationDescriptor struct { + Number int32 + Name string + Role string + NodeData []byte + MasterData []byte +} + +type connectionBundle struct { + client client + nodeName string + isMaster bool + existentMigrations map[int32]bool +} + +var ( + tableRegex = regexp.MustCompile(`%Table:([A-z_]+)%`) + childTableRegex = regexp.MustCompile(`%ChildTable:([A-z_]+)%`) +) + +func (migrator *Migrator) getParsedQuery(query string, clusterName string, dbName string, isRunOnMaster bool, masterCount int) string { + query = strings.Replace(query, "%DB_NAME%", dbName, -1) + query = strings.Replace(query, "%CLUSTER_NAME%", clusterName, -1) + + postfix := "" + childPostfix := "" + if masterCount > 0 { + if isRunOnMaster { + childPostfix = migrator.tablePostfix + } else { + postfix = migrator.tablePostfix + } + } + + query = tableRegex.ReplaceAllString(query, `${1}`+postfix) + query = childTableRegex.ReplaceAllString(query, `${1}`+childPostfix) + + return query +} + +func (migrator *Migrator) Apply() error { + migrationList, err := parseMigrations(migrator.basePath) + if err != nil { + return fmt.Errorf("cannot apply migrations: %v", err) + } + + cb := make([]connectionBundle, 0) + + // function to fill ConnectionBundle + nodeCount := 0 + masterCount := 0 + for _, host := range *migrator.nodeConnections { + c, err := connectToClickHouse(host.Url) + if err != nil { + return fmt.Errorf("cannot connect to node: %v", err) + } + if err = pingNode(c); err != nil { + return fmt.Errorf("cannot get access to node: %v", err) + } + + cb = append(cb, connectionBundle{ + c, + host.NodeName, + host.IsMaster, + map[int32]bool{}, + }) + + if host.IsMaster { + masterCount++ + } else { + nodeCount++ + } + } + + // close all connections + finalizeConnections := func(cbs *[]connectionBundle) { + for _, cbE := range *cbs { + if cbE.client != nil { + if err := cbE.client.Close(); err != nil { + log.Debugf("Cannot close connection to '%s': %v", cbE.nodeName, err) + } else { + log.Debugf("Connection to '%s' was closed", cbE.nodeName) + } + } + } + } + defer finalizeConnections(&cb) + + // check if migrations exists and create tables for migrations if needed + for _, connectionBundle := range cb { + // check and create tables + if !isMigrationTableExists(connectionBundle.client) { + if err = createMigrationTable(connectionBundle.client); err != nil { + return err + } else { + log.Infof("Created 'migrations' table for node '%s'", connectionBundle.nodeName) + } + } + // list all migrations + migrations, err := listMigrationEntries(connectionBundle.client) + if err != nil { + return fmt.Errorf("cannot get migrations list for node '%s': %v", connectionBundle.nodeName, err) + } + // enum migrations + for _, migE := range *migrations { + connectionBundle.existentMigrations[migE.Id] = true + } + } + + // apply migrations + for _, migRation := range *migrationList { + for _, connectionBundle := range cb { + if !connectionBundle.existentMigrations[migRation.Number] { + log.Debugf("Migration '%d_%s' isn't exists for node '%s'", migRation.Number, migRation.Name, connectionBundle.nodeName) + // start transaction + log.Infof("Applying migration '%d_%s' for node '%s'...", migRation.Number, migRation.Name, connectionBundle.nodeName) + + applyContent := func(content []byte, migTypeIsMaster bool) error { + if len(content) > 0 { + tx, _ := connectionBundle.client.Beginx() + migrationBody := string(content) + for _, subQuery := range strings.Split(migrationBody, ";") { + if subQuery != "" { + subQuery = migrator.getParsedQuery(subQuery, migrator.clusterName, migrator.dbName, migTypeIsMaster, masterCount) + _, err := tx.Exec(subQuery) + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("error applying '%d_%s' for '%s'(master:%t): %v", migRation.Number, migRation.Name, connectionBundle.nodeName, migTypeIsMaster, err) + } + } + } + + // Add migration to migrations table if this is not a master + if !migTypeIsMaster { + if err := setMigrationApplied(migRation.Number, migRation.Name, tx); err != nil { + _ = tx.Rollback() + return fmt.Errorf("error set-apply '%d_%s' for '%s'(master:%t): %v", migRation.Number, migRation.Name, connectionBundle.nodeName, migTypeIsMaster, err) + } + } + + // commit transaction + if err := tx.Commit(); err != nil { + _ = tx.Rollback() + log.Errorf("Cannot apply migration '%d_%s' for '%s'(master:%t): %v", migRation.Number, migRation.Name, connectionBundle.nodeName, migTypeIsMaster, err) + return err + } + + log.Infof("Migration '%d_%s' was APPLIED for '%s'(master:%t)!", migRation.Number, migRation.Name, connectionBundle.nodeName, migTypeIsMaster) + } else { + log.Warnf("Migration '%d_%s' isn't correct for node '%s'(master:%t)", migRation.Number, migRation.Name, connectionBundle.nodeName, migTypeIsMaster) + } + + return nil + } + + err = applyContent(migRation.NodeData, false) + if err != nil { + return err + } + + if connectionBundle.isMaster { + err = applyContent(migRation.MasterData, true) + if err != nil { + return err + } + } + } + } + } + + return nil +} + +func parseMigrations(basePath string) (*[]*migrationDescriptor, error) { + files, err := filepath.Glob(basePath + "/*.node.sql") + if err != nil { + return nil, err + } + + // regex + migrationFileRegex := regexp.MustCompile("(?P[0-9]{4})_(?P[a-z]+).(?Pnode|master).(?Psql)") + + // create output array + migrationList := make([]*migrationDescriptor, 0) + + for _, migrationFilePath := range files { + nodeContent, err := ioutil.ReadFile(migrationFilePath) + if err != nil { + return nil, fmt.Errorf("cannot read n-mig content of '%s': %v", migrationFilePath, err) + } + + migrationFileName := filepath.Base(migrationFilePath) + + // check migration name validity + fNameParts := migrationFileRegex.FindStringSubmatch(migrationFileName) + if fNameParts == nil || len(fNameParts) != 5 { + return nil, fmt.Errorf("cannot parse migration filename: '%s'", migrationFileName) + } + migrationNumber, err := strconv.Atoi(fNameParts[1]) + if err != nil { + return nil, err + } + + //create basic migration entry + migrationDescriptor := migrationDescriptor{ + Number: int32(migrationNumber), + Name: fNameParts[2], + Role: fNameParts[3], + NodeData: nodeContent, + } + + // check if master + migrationMasterFileName := fmt.Sprintf("%s_%s.master.%s", fNameParts[1], fNameParts[2], fNameParts[4]) + if _, err := os.Stat(basePath + "/" + migrationMasterFileName); err == nil { + masterContent, err := ioutil.ReadFile(basePath + "/" + migrationMasterFileName) + if err != nil { + return nil, fmt.Errorf("cannot read m-mig content of '%s': %v", migrationFilePath, err) + } + + migrationDescriptor.MasterData = masterContent + } + + // add to list + migrationList = append(migrationList, &migrationDescriptor) + } + + return &migrationList, nil +} diff --git a/migrations_table.go b/migrations_table.go new file mode 100644 index 0000000..e2e9865 --- /dev/null +++ b/migrations_table.go @@ -0,0 +1,51 @@ +package click_mig + +import ( + "github.com/jmoiron/sqlx" + "log" + "time" +) + +type migrationEntry struct { + Id int32 `db:"id"` + Name string `db:"name"` + DateApplied time.Time `db:"applied"` +} + +func createMigrationTable(client client) error { + _, err := client.Exec(` + CREATE TABLE migrations( + id Int32, + name String, + applied DateTime + ) engine=TinyLog() + `) + + return err +} + +func setMigrationApplied(id int32, name string, tx *sqlx.Tx) error { + mig := migrationEntry{Id: id, Name: name, DateApplied: time.Now()} + + _, err := tx.NamedExec("INSERT INTO migrations (id, name, applied) VALUES (:id, :name, :applied)", &mig) + return err +} + +func isMigrationTableExists(client client) bool { + r, err := client.Queryx("SHOW TABLES LIKE 'migrations'") + if err != nil { + log.Fatalf("Cannot connect to node: %v", err) + return false + } + defer r.Close() + return r.Next() +} + +func listMigrationEntries(client client) (*[]migrationEntry, error) { + migrations := []migrationEntry{} + if err := client.Select(&migrations, "SELECT * FROM migrations ORDER BY id ASC"); err != nil { + return nil, err + } + + return &migrations, nil +}