Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
25689b1e5b | ||
|
b60e33a718 | ||
|
76b13f3833 | ||
|
d66cc04f04 | ||
|
1e18f34ee2 | ||
|
5cf7a062b9 | ||
|
e8cba551c7 | ||
|
15e6c19222 | ||
|
4a7f5a637a | ||
|
f6ffcbb4d7 | ||
|
15e01680a2 | ||
|
282b5f5ff0 | ||
|
fabfbfe174 | ||
|
d24c89dc41 |
2
LICENSE
2
LICENSE
@@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2020 jar3b
|
||||
Copyright (c) 2021 jar3b
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
40
README.md
40
README.md
@@ -1,2 +1,42 @@
|
||||
# nacl
|
||||
|
||||
Client helper for NATS/STAN
|
||||
|
||||
## examples
|
||||
|
||||
connect to NATS
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/jar3b/grawt"
|
||||
"github.com/jar3b/nacl"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// get params
|
||||
conf := config.NewConfig()
|
||||
|
||||
// init waiter (github.com/jar3b/grawt)
|
||||
var waiter = grawt.NewWaiter()
|
||||
|
||||
// init nats
|
||||
var subscriptions []*nacl.NatsSubscription
|
||||
if err := nacl.SetupNats(conf.Nats.Host, conf.Nats.Port, conf.Nats.User, conf.Nats.Pass,
|
||||
// handler called before app closed
|
||||
// we need to terminate sub's properly and sometimes doing another actions (finalizers)
|
||||
waiter.AddCloseHandler(func() {
|
||||
nacl.FinalizeNats(&subscriptions)
|
||||
}, false),
|
||||
); err != nil {
|
||||
waiter.Halt(fmt.Errorf("cannot connect to nats: %v", err))
|
||||
}
|
||||
defer nacl.NatsClient.Close()
|
||||
|
||||
// here we add some subscriptions and wait (using blocking call)
|
||||
// ...
|
||||
}
|
||||
```
|
17
go.mod
17
go.mod
@@ -1,11 +1,16 @@
|
||||
module github.com/jar3b/nacl
|
||||
|
||||
go 1.13
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.3.3 // indirect
|
||||
github.com/jar3b/grawt v0.1.5
|
||||
github.com/nats-io/nats-streaming-server v0.17.0 // indirect
|
||||
github.com/nats-io/nats.go v1.9.1
|
||||
github.com/nats-io/stan.go v0.6.0
|
||||
github.com/jar3b/grawt v0.1.10
|
||||
github.com/nats-io/nats.go v1.13.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
|
||||
)
|
||||
|
22
go.sum
Normal file
22
go.sum
Normal file
@@ -0,0 +1,22 @@
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/jar3b/grawt v0.1.10 h1:PwdPFJx6PFNdbQOk04VMWMGuqAwCuk0INvjdMA53MCA=
|
||||
github.com/jar3b/grawt v0.1.10/go.mod h1:C9fy+5804sm2Z/kO78ENgHntWTWtqLcrRtN9Q7tPScE=
|
||||
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
|
||||
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
75
jetstream.go
Normal file
75
jetstream.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package nacl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/jar3b/grawt"
|
||||
"github.com/nats-io/nats.go"
|
||||
"log"
|
||||
)
|
||||
|
||||
var (
|
||||
JsClient nats.JetStreamContext
|
||||
)
|
||||
|
||||
func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error {
|
||||
var err error
|
||||
if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
JsClient, err = NatsClient.JetStream(opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot init JS: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func FinalizeJetStream(subscriptions *[]*nats.Subscription) error {
|
||||
return FinalizeNats(subscriptions)
|
||||
}
|
||||
|
||||
func AddOrUpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
|
||||
_, err := JsClient.StreamInfo(cfg.Name)
|
||||
if err != nil && err.Error() != "nats: stream not found" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
sc, err := JsClient.UpdateStream(cfg, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sc, nil
|
||||
} else {
|
||||
sc, err := JsClient.AddStream(cfg, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
}
|
||||
|
||||
func AddOrUpdateConsumer(stream string, consumer string, cfg *nats.ConsumerConfig, allowDelete bool, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
|
||||
ci0, err := JsClient.ConsumerInfo(stream, consumer, opts...)
|
||||
if err != nil && err.Error() != "nats: consumer not found" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err == nil && allowDelete {
|
||||
if err := JsClient.DeleteConsumer(stream, consumer, opts...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ci, err := JsClient.AddConsumer(stream, cfg, opts...)
|
||||
if err != nil {
|
||||
if !allowDelete {
|
||||
log.Printf("Consumer '%s' was exists", consumer)
|
||||
return ci0, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return ci, nil
|
||||
}
|
83
nacl.go
83
nacl.go
@@ -4,23 +4,42 @@ import (
|
||||
"fmt"
|
||||
"github.com/jar3b/grawt"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/stan.go"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
NatsClient *nats.Conn
|
||||
StanClient stan.Conn
|
||||
natsLock = sync.Mutex{}
|
||||
)
|
||||
|
||||
type (
|
||||
Msg = stan.Msg
|
||||
NatsMsg = nats.Msg
|
||||
Subscription = stan.Subscription
|
||||
NatsSubscription = nats.Subscription
|
||||
)
|
||||
func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error {
|
||||
natsLock.Lock()
|
||||
defer natsLock.Unlock()
|
||||
var err error
|
||||
|
||||
// connect
|
||||
NatsClient, err = nats.Connect(
|
||||
fmt.Sprintf("nats://%s:%d", host, port),
|
||||
nats.UserCredentials(credsFile),
|
||||
nats.ClosedHandler(func(conn *nats.Conn) {
|
||||
if closeHandler != nil {
|
||||
closeHandler.Halt(nil)
|
||||
}
|
||||
}),
|
||||
nats.MaxReconnects(5),
|
||||
nats.ReconnectWait(time.Second*2),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to NATS: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
|
||||
natsLock.Lock()
|
||||
defer natsLock.Unlock()
|
||||
var err error
|
||||
|
||||
// init connection string
|
||||
@@ -34,7 +53,9 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
|
||||
NatsClient, err = nats.Connect(
|
||||
connectionString,
|
||||
nats.ClosedHandler(func(conn *nats.Conn) {
|
||||
closeHandler.Halt(nil)
|
||||
if closeHandler != nil {
|
||||
closeHandler.Halt(nil)
|
||||
}
|
||||
}),
|
||||
nats.MaxReconnects(5),
|
||||
nats.ReconnectWait(time.Second*2),
|
||||
@@ -46,45 +67,17 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) {
|
||||
if err = SetupNats(host, port, user, pass, closeHandler); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// stan connection
|
||||
StanClient, err = stan.Connect(
|
||||
clusterName,
|
||||
clientId,
|
||||
stan.NatsConn(NatsClient),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to STAN: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func FinalizeStan(subscriptions *[]Subscription) error {
|
||||
if StanClient == nil {
|
||||
return fmt.Errorf("stan client is not initialized")
|
||||
}
|
||||
for _, subscription := range *subscriptions {
|
||||
_ = subscription.Unsubscribe()
|
||||
}
|
||||
|
||||
if err := StanClient.Close(); err != nil {
|
||||
return fmt.Errorf("cannot disconnect from STAN")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func FinalizeNats(subscriptions *[]*NatsSubscription) error {
|
||||
func FinalizeNats(subscriptions *[]*nats.Subscription) error {
|
||||
natsLock.Lock()
|
||||
defer natsLock.Unlock()
|
||||
if NatsClient == nil {
|
||||
return fmt.Errorf("stan client is not initialized")
|
||||
}
|
||||
for _, subscription := range *subscriptions {
|
||||
_ = subscription.Unsubscribe()
|
||||
|
||||
if subscriptions != nil {
|
||||
for _, subscription := range *subscriptions {
|
||||
_ = subscription.Unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
NatsClient.Close()
|
||||
|
Reference in New Issue
Block a user