diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3563390 --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module github.com/jar3b/nacl + +go 1.13 + +require ( + github.com/jar3b/grawt v0.1.5 + github.com/nats-io/nats-server/v2 v2.1.2 // indirect + github.com/nats-io/nats-streaming-server v0.16.2 // indirect + github.com/nats-io/nats.go v1.9.1 + github.com/nats-io/stan.go v0.6.0 + github.com/sirupsen/logrus v1.4.2 +) diff --git a/nacl.go b/nacl.go new file mode 100644 index 0000000..fd30ff4 --- /dev/null +++ b/nacl.go @@ -0,0 +1,76 @@ +package nacl + +import ( + "fmt" + "github.com/jar3b/grawt" + "github.com/nats-io/nats.go" + "github.com/nats-io/stan.go" + "time" +) + +var ( + NatsClient *nats.Conn + StanClient stan.Conn +) + +type ( + Msg = stan.Msg + NatsMsg = nats.Msg + Subscription = stan.Subscription +) + +func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error { + var err error + + // init connection string + connectionString := fmt.Sprintf("%s:%d", host, port) + if user != "" && pass != "" { + connectionString = fmt.Sprintf("%s:%s@%s", user, pass, connectionString) + } + connectionString = fmt.Sprintf("nats://%s", connectionString) + + // connect + NatsClient, err = nats.Connect( + connectionString, + nats.ClosedHandler(func(conn *nats.Conn) { + closeHandler.Halt(nil) + }), + nats.MaxReconnects(5), + nats.ReconnectWait(time.Second*2), + ) + if err != nil { + return fmt.Errorf("cannot connect to NATS: %v", err) + } + + return nil +} + +func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) { + if err = SetupNats(host, port, user, pass, closeHandler); err != nil { + return err + } + + // stan connection + StanClient, err = stan.Connect( + clusterName, + clientId, + stan.NatsConn(NatsClient), + ) + if err != nil { + return fmt.Errorf("cannot connect to STAN: %v", err) + } + + return nil +} + +func FinalizeStan(subscriptions *[]Subscription) error { + for _, subscription := range *subscriptions { + _ = subscription.Unsubscribe() + } + + if err := StanClient.Close(); err != nil { + return fmt.Errorf("cannot disconnect from STAN") + } + + return nil +}