2 Commits

Author SHA1 Message Date
jar3b
5cf7a062b9 fix: add SetupJetStream() 2021-06-07 14:02:59 +03:00
jar3b
e8cba551c7 fix: remove own types 2021-06-07 13:45:26 +03:00

30
nacl.go
View File

@@ -12,17 +12,11 @@ import (
var ( var (
NatsClient *nats.Conn NatsClient *nats.Conn
StanClient stan.Conn StanClient stan.Conn
JsClient nats.JetStreamContext
natsLock = sync.Mutex{} natsLock = sync.Mutex{}
stanLock = sync.Mutex{} stanLock = sync.Mutex{}
) )
type (
Msg = stan.Msg
NatsMsg = nats.Msg
Subscription = stan.Subscription
NatsSubscription = nats.Subscription
)
func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error { func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error {
natsLock.Lock() natsLock.Lock()
defer natsLock.Unlock() defer natsLock.Unlock()
@@ -47,6 +41,20 @@ func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *g
return nil return nil
} }
func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error {
var err error
if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil {
return err
}
JsClient, err = NatsClient.JetStream(opts...)
if err != nil {
return fmt.Errorf("Cannot init JS: %v", err)
}
return nil
}
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error { func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
natsLock.Lock() natsLock.Lock()
defer natsLock.Unlock() defer natsLock.Unlock()
@@ -98,7 +106,7 @@ func SetupStan(clusterName string, clientId string, host string, port int, user
return nil return nil
} }
func FinalizeStan(subscriptions *[]Subscription) error { func FinalizeStan(subscriptions *[]stan.Subscription) error {
stanLock.Lock() stanLock.Lock()
defer stanLock.Unlock() defer stanLock.Unlock()
if StanClient == nil { if StanClient == nil {
@@ -118,7 +126,7 @@ func FinalizeStan(subscriptions *[]Subscription) error {
return nil return nil
} }
func FinalizeNats(subscriptions *[]*NatsSubscription) error { func FinalizeNats(subscriptions *[]*nats.Subscription) error {
natsLock.Lock() natsLock.Lock()
defer natsLock.Unlock() defer natsLock.Unlock()
if NatsClient == nil { if NatsClient == nil {
@@ -135,3 +143,7 @@ func FinalizeNats(subscriptions *[]*NatsSubscription) error {
return nil return nil
} }
func FinalizeJetStream(subscriptions *[]*nats.Subscription) error {
return FinalizeNats(subscriptions)
}