15 Commits

Author SHA1 Message Date
jar3b
25689b1e5b fix: downgrade nats.go@v1.13.0 2022-02-06 19:09:06 +03:00
jar3b
b60e33a718 bump: packages; feat: remove STAN (nats-streaming) 2022-02-06 18:53:02 +03:00
jar3b
76b13f3833 bump: nats.go v1.13.0 2021-10-13 14:06:39 +03:00
jar3b
d66cc04f04 fix: stream name in AddOrUpdateStream() 2021-10-13 14:05:47 +03:00
jar3b
1e18f34ee2 fix: add basic JetStream management; bump: nats.go v1.12.3 (BREAKING to use with nack) 2021-09-29 14:37:13 +03:00
jar3b
5cf7a062b9 fix: add SetupJetStream() 2021-06-07 14:02:59 +03:00
jar3b
e8cba551c7 fix: remove own types 2021-06-07 13:45:26 +03:00
jar3b
15e6c19222 feat: add SetupNatsWithCreds 2021-06-03 15:43:57 +03:00
jar3b
4a7f5a637a feat: update deps 2021-06-03 15:43:43 +03:00
jar3b
f6ffcbb4d7 frat: update deps; feat: use locks for clients init;bump: up to v0.1.1 2021-03-12 20:05:00 +03:00
jar3b
15e01680a2 bump: deps, up to v0.1.0 2020-11-19 16:22:01 +03:00
jar3b
282b5f5ff0 fix: add subscriptions and closeHandler nil check 2020-11-03 19:47:15 +03:00
jar3b
fabfbfe174 chore: update readme, add NATS example 2020-09-14 16:07:48 +03:00
jar3b
d24c89dc41 bump: update libs in go.mod 2020-09-14 15:52:26 +03:00
jar3b
d159a8e392 fix: NatsClient in FinalizeNats 2020-02-21 16:08:15 +03:00
6 changed files with 187 additions and 52 deletions

View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2020 jar3b
Copyright (c) 2021 jar3b
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -1,2 +1,42 @@
# nacl
Client helper for NATS/STAN
## examples
connect to NATS
```go
package main
import (
"fmt"
"github.com/jar3b/grawt"
"github.com/jar3b/nacl"
"github.com/kelseyhightower/envconfig"
)
func main() {
// get params
conf := config.NewConfig()
// init waiter (github.com/jar3b/grawt)
var waiter = grawt.NewWaiter()
// init nats
var subscriptions []*nacl.NatsSubscription
if err := nacl.SetupNats(conf.Nats.Host, conf.Nats.Port, conf.Nats.User, conf.Nats.Pass,
// handler called before app closed
// we need to terminate sub's properly and sometimes doing another actions (finalizers)
waiter.AddCloseHandler(func() {
nacl.FinalizeNats(&subscriptions)
}, false),
); err != nil {
waiter.Halt(fmt.Errorf("cannot connect to nats: %v", err))
}
defer nacl.NatsClient.Close()
// here we add some subscriptions and wait (using blocking call)
// ...
}
```

17
go.mod
View File

@@ -1,11 +1,16 @@
module github.com/jar3b/nacl
go 1.13
go 1.17
require (
github.com/golang/protobuf v1.3.3 // indirect
github.com/jar3b/grawt v0.1.5
github.com/nats-io/nats-streaming-server v0.17.0 // indirect
github.com/nats-io/nats.go v1.9.1
github.com/nats-io/stan.go v0.6.0
github.com/jar3b/grawt v0.1.10
github.com/nats-io/nats.go v1.13.0
)
require (
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
)

22
go.sum Normal file
View File

@@ -0,0 +1,22 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jar3b/grawt v0.1.10 h1:PwdPFJx6PFNdbQOk04VMWMGuqAwCuk0INvjdMA53MCA=
github.com/jar3b/grawt v0.1.10/go.mod h1:C9fy+5804sm2Z/kO78ENgHntWTWtqLcrRtN9Q7tPScE=
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

75
jetstream.go Normal file
View File

@@ -0,0 +1,75 @@
package nacl
import (
"fmt"
"github.com/jar3b/grawt"
"github.com/nats-io/nats.go"
"log"
)
var (
JsClient nats.JetStreamContext
)
func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error {
var err error
if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil {
return err
}
JsClient, err = NatsClient.JetStream(opts...)
if err != nil {
return fmt.Errorf("cannot init JS: %v", err)
}
return nil
}
func FinalizeJetStream(subscriptions *[]*nats.Subscription) error {
return FinalizeNats(subscriptions)
}
func AddOrUpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
_, err := JsClient.StreamInfo(cfg.Name)
if err != nil && err.Error() != "nats: stream not found" {
return nil, err
}
if err == nil {
sc, err := JsClient.UpdateStream(cfg, opts...)
if err != nil {
return nil, err
}
return sc, nil
} else {
sc, err := JsClient.AddStream(cfg, opts...)
if err != nil {
return nil, err
}
return sc, nil
}
}
func AddOrUpdateConsumer(stream string, consumer string, cfg *nats.ConsumerConfig, allowDelete bool, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
ci0, err := JsClient.ConsumerInfo(stream, consumer, opts...)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
if err == nil && allowDelete {
if err := JsClient.DeleteConsumer(stream, consumer, opts...); err != nil {
return nil, err
}
}
ci, err := JsClient.AddConsumer(stream, cfg, opts...)
if err != nil {
if !allowDelete {
log.Printf("Consumer '%s' was exists", consumer)
return ci0, nil
} else {
return nil, err
}
}
return ci, nil
}

83
nacl.go
View File

@@ -4,23 +4,42 @@ import (
"fmt"
"github.com/jar3b/grawt"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"sync"
"time"
)
var (
NatsClient *nats.Conn
StanClient stan.Conn
natsLock = sync.Mutex{}
)
type (
Msg = stan.Msg
NatsMsg = nats.Msg
Subscription = stan.Subscription
NatsSubscription = nats.Subscription
)
func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error {
natsLock.Lock()
defer natsLock.Unlock()
var err error
// connect
NatsClient, err = nats.Connect(
fmt.Sprintf("nats://%s:%d", host, port),
nats.UserCredentials(credsFile),
nats.ClosedHandler(func(conn *nats.Conn) {
if closeHandler != nil {
closeHandler.Halt(nil)
}
}),
nats.MaxReconnects(5),
nats.ReconnectWait(time.Second*2),
)
if err != nil {
return fmt.Errorf("cannot connect to NATS: %v", err)
}
return nil
}
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
natsLock.Lock()
defer natsLock.Unlock()
var err error
// init connection string
@@ -34,7 +53,9 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
NatsClient, err = nats.Connect(
connectionString,
nats.ClosedHandler(func(conn *nats.Conn) {
closeHandler.Halt(nil)
if closeHandler != nil {
closeHandler.Halt(nil)
}
}),
nats.MaxReconnects(5),
nats.ReconnectWait(time.Second*2),
@@ -46,45 +67,17 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
return nil
}
func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) {
if err = SetupNats(host, port, user, pass, closeHandler); err != nil {
return err
}
// stan connection
StanClient, err = stan.Connect(
clusterName,
clientId,
stan.NatsConn(NatsClient),
)
if err != nil {
return fmt.Errorf("cannot connect to STAN: %v", err)
}
return nil
}
func FinalizeStan(subscriptions *[]Subscription) error {
if StanClient == nil {
func FinalizeNats(subscriptions *[]*nats.Subscription) error {
natsLock.Lock()
defer natsLock.Unlock()
if NatsClient == nil {
return fmt.Errorf("stan client is not initialized")
}
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
}
if err := StanClient.Close(); err != nil {
return fmt.Errorf("cannot disconnect from STAN")
}
return nil
}
func FinalizeNats(subscriptions *[]*NatsSubscription) error {
if StanClient == nil {
return fmt.Errorf("stan client is not initialized")
}
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
if subscriptions != nil {
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
}
}
NatsClient.Close()