6 Commits

Author SHA1 Message Date
jar3b
282b5f5ff0 fix: add subscriptions and closeHandler nil check 2020-11-03 19:47:15 +03:00
jar3b
fabfbfe174 chore: update readme, add NATS example 2020-09-14 16:07:48 +03:00
jar3b
d24c89dc41 bump: update libs in go.mod 2020-09-14 15:52:26 +03:00
jar3b
d159a8e392 fix: NatsClient in FinalizeNats 2020-02-21 16:08:15 +03:00
jar3b
b4635386bb feat: add FinalizeNats function; chore: add checks for non-nil connections before close 2020-02-21 15:47:48 +03:00
jar3b
c1708a5308 bump: nats-streaming-server v0.16.2 -> v0.17.0 2020-02-21 14:50:02 +03:00
3 changed files with 79 additions and 13 deletions

View File

@@ -1,2 +1,42 @@
# nacl
Client helper for NATS/STAN
## examples
connect to NATS
```go
package main
import (
"fmt"
"github.com/jar3b/grawt"
"github.com/jar3b/nacl"
"github.com/kelseyhightower/envconfig"
)
func main() {
// get params
conf := config.NewConfig()
// init waiter (github.com/jar3b/grawt)
var waiter = grawt.NewWaiter()
// init nats
var subscriptions []*nacl.NatsSubscription
if err := nacl.SetupNats(conf.Nats.Host, conf.Nats.Port, conf.Nats.User, conf.Nats.Pass,
// handler called before app closed
// we need to terminate sub's properly and sometimes doing another actions (finalizers)
waiter.AddCloseHandler(func() {
nacl.FinalizeNats(&subscriptions)
}, false),
); err != nil {
waiter.Halt(fmt.Errorf("cannot connect to nats: %v", err))
}
defer nacl.NatsClient.Close()
// here we add some subscriptions and wait (using blocking call)
// ...
}
```

15
go.mod
View File

@@ -1,12 +1,13 @@
module github.com/jar3b/nacl
go 1.13
go 1.15
require (
github.com/jar3b/grawt v0.1.5
github.com/nats-io/nats-server/v2 v2.1.2 // indirect
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
github.com/nats-io/nats.go v1.9.1
github.com/nats-io/stan.go v0.6.0
github.com/sirupsen/logrus v1.4.2
github.com/golang/protobuf v1.4.2 // indirect
github.com/jar3b/grawt v0.1.6
github.com/nats-io/nats-server/v2 v2.1.8 // indirect
github.com/nats-io/nats-streaming-server v0.18.0 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/stan.go v0.7.0
google.golang.org/protobuf v1.25.0 // indirect
)

37
nacl.go
View File

@@ -14,9 +14,10 @@ var (
)
type (
Msg = stan.Msg
NatsMsg = nats.Msg
Subscription = stan.Subscription
Msg = stan.Msg
NatsMsg = nats.Msg
Subscription = stan.Subscription
NatsSubscription = nats.Subscription
)
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
@@ -33,7 +34,9 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
NatsClient, err = nats.Connect(
connectionString,
nats.ClosedHandler(func(conn *nats.Conn) {
closeHandler.Halt(nil)
if closeHandler != nil {
closeHandler.Halt(nil)
}
}),
nats.MaxReconnects(5),
nats.ReconnectWait(time.Second*2),
@@ -64,8 +67,14 @@ func SetupStan(clusterName string, clientId string, host string, port int, user
}
func FinalizeStan(subscriptions *[]Subscription) error {
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
if StanClient == nil {
return fmt.Errorf("stan client is not initialized")
}
if subscriptions != nil {
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
}
}
if err := StanClient.Close(); err != nil {
@@ -74,3 +83,19 @@ func FinalizeStan(subscriptions *[]Subscription) error {
return nil
}
func FinalizeNats(subscriptions *[]*NatsSubscription) error {
if NatsClient == nil {
return fmt.Errorf("stan client is not initialized")
}
if subscriptions != nil {
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
}
}
NatsClient.Close()
return nil
}