feat: add logic
This commit is contained in:
parent
e2c24fe695
commit
0263f499fd
12
go.mod
Normal file
12
go.mod
Normal file
@ -0,0 +1,12 @@
|
||||
module github.com/jar3b/nacl
|
||||
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/jar3b/grawt v0.1.5
|
||||
github.com/nats-io/nats-server/v2 v2.1.2 // indirect
|
||||
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
|
||||
github.com/nats-io/nats.go v1.9.1
|
||||
github.com/nats-io/stan.go v0.6.0
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
)
|
76
nacl.go
Normal file
76
nacl.go
Normal file
@ -0,0 +1,76 @@
|
||||
package nacl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/jar3b/grawt"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/stan.go"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
NatsClient *nats.Conn
|
||||
StanClient stan.Conn
|
||||
)
|
||||
|
||||
type (
|
||||
Msg = stan.Msg
|
||||
NatsMsg = nats.Msg
|
||||
Subscription = stan.Subscription
|
||||
)
|
||||
|
||||
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
|
||||
var err error
|
||||
|
||||
// init connection string
|
||||
connectionString := fmt.Sprintf("%s:%d", host, port)
|
||||
if user != "" && pass != "" {
|
||||
connectionString = fmt.Sprintf("%s:%s@%s", user, pass, connectionString)
|
||||
}
|
||||
connectionString = fmt.Sprintf("nats://%s", connectionString)
|
||||
|
||||
// connect
|
||||
NatsClient, err = nats.Connect(
|
||||
connectionString,
|
||||
nats.ClosedHandler(func(conn *nats.Conn) {
|
||||
closeHandler.Halt(nil)
|
||||
}),
|
||||
nats.MaxReconnects(5),
|
||||
nats.ReconnectWait(time.Second*2),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to NATS: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) {
|
||||
if err = SetupNats(host, port, user, pass, closeHandler); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// stan connection
|
||||
StanClient, err = stan.Connect(
|
||||
clusterName,
|
||||
clientId,
|
||||
stan.NatsConn(NatsClient),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to STAN: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func FinalizeStan(subscriptions *[]Subscription) error {
|
||||
for _, subscription := range *subscriptions {
|
||||
_ = subscription.Unsubscribe()
|
||||
}
|
||||
|
||||
if err := StanClient.Close(); err != nil {
|
||||
return fmt.Errorf("cannot disconnect from STAN")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user