Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
a9ecf835c5 | ||
|
25689b1e5b | ||
|
b60e33a718 | ||
|
76b13f3833 | ||
|
d66cc04f04 | ||
|
1e18f34ee2 | ||
|
5cf7a062b9 | ||
|
e8cba551c7 | ||
|
15e6c19222 | ||
|
4a7f5a637a | ||
|
f6ffcbb4d7 | ||
|
15e01680a2 | ||
|
282b5f5ff0 | ||
|
fabfbfe174 | ||
|
d24c89dc41 | ||
|
d159a8e392 | ||
|
b4635386bb | ||
|
c1708a5308 |
2
LICENSE
2
LICENSE
@ -1,6 +1,6 @@
|
|||||||
MIT License
|
MIT License
|
||||||
|
|
||||||
Copyright (c) 2020 jar3b
|
Copyright (c) 2022 jar3b
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
40
README.md
40
README.md
@ -1,2 +1,42 @@
|
|||||||
# nacl
|
# nacl
|
||||||
|
|
||||||
Client helper for NATS/STAN
|
Client helper for NATS/STAN
|
||||||
|
|
||||||
|
## examples
|
||||||
|
|
||||||
|
connect to NATS
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/jar3b/grawt"
|
||||||
|
"github.com/jar3b/nacl"
|
||||||
|
"github.com/kelseyhightower/envconfig"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// get params
|
||||||
|
conf := config.NewConfig()
|
||||||
|
|
||||||
|
// init waiter (github.com/jar3b/grawt)
|
||||||
|
var waiter = grawt.NewWaiter()
|
||||||
|
|
||||||
|
// init nats
|
||||||
|
var subscriptions []*nacl.NatsSubscription
|
||||||
|
if err := nacl.SetupNats(conf.Nats.Host, conf.Nats.Port, conf.Nats.User, conf.Nats.Pass,
|
||||||
|
// handler called before app closed
|
||||||
|
// we need to terminate sub's properly and sometimes doing another actions (finalizers)
|
||||||
|
waiter.AddCloseHandler(func() {
|
||||||
|
nacl.FinalizeNats(&subscriptions)
|
||||||
|
}, false),
|
||||||
|
); err != nil {
|
||||||
|
waiter.Halt(fmt.Errorf("cannot connect to nats: %v", err))
|
||||||
|
}
|
||||||
|
defer nacl.NatsClient.Close()
|
||||||
|
|
||||||
|
// here we add some subscriptions and wait (using blocking call)
|
||||||
|
// ...
|
||||||
|
}
|
||||||
|
```
|
21
go.mod
21
go.mod
@ -1,12 +1,19 @@
|
|||||||
module github.com/jar3b/nacl
|
module github.com/jar3b/nacl
|
||||||
|
|
||||||
go 1.13
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/jar3b/grawt v0.1.5
|
github.com/jar3b/grawt v0.1.10
|
||||||
github.com/nats-io/nats-server/v2 v2.1.2 // indirect
|
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
|
||||||
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
|
)
|
||||||
github.com/nats-io/nats.go v1.9.1
|
|
||||||
github.com/nats-io/stan.go v0.6.0
|
require (
|
||||||
github.com/sirupsen/logrus v1.4.2
|
github.com/golang/protobuf v1.5.2 // indirect
|
||||||
|
github.com/nats-io/nats-server/v2 v2.7.2 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
|
||||||
|
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
|
||||||
|
google.golang.org/protobuf v1.27.1 // indirect
|
||||||
)
|
)
|
||||||
|
67
go.sum
Normal file
67
go.sum
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||||
|
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||||
|
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||||
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
|
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||||
|
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
|
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
|
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
|
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
|
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/jar3b/grawt v0.1.10 h1:PwdPFJx6PFNdbQOk04VMWMGuqAwCuk0INvjdMA53MCA=
|
||||||
|
github.com/jar3b/grawt v0.1.10/go.mod h1:C9fy+5804sm2Z/kO78ENgHntWTWtqLcrRtN9Q7tPScE=
|
||||||
|
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||||
|
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||||
|
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||||
|
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||||
|
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
|
||||||
|
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||||
|
github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI=
|
||||||
|
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
|
||||||
|
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
|
||||||
|
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||||
|
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||||
|
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||||
|
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||||
|
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||||
|
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||||
|
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||||
|
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce h1:Roh6XWxHFKrPgC/EQhVubSAGQ6Ozk6IdxHSzt1mR0EI=
|
||||||
|
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
|
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||||
|
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
|
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
|
||||||
|
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
|
||||||
|
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||||
|
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||||
|
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||||
|
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||||
|
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||||
|
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||||
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
|
||||||
|
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
75
jetstream.go
Normal file
75
jetstream.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
package nacl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/jar3b/grawt"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
JsClient nats.JetStreamContext
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error {
|
||||||
|
var err error
|
||||||
|
if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
JsClient, err = NatsClient.JetStream(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot init JS: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func FinalizeJetStream(subscriptions *[]*nats.Subscription) error {
|
||||||
|
return FinalizeNats(subscriptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddOrUpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
|
||||||
|
_, err := JsClient.StreamInfo(cfg.Name)
|
||||||
|
if err != nil && err.Error() != "nats: stream not found" {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
sc, err := JsClient.UpdateStream(cfg, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sc, nil
|
||||||
|
} else {
|
||||||
|
sc, err := JsClient.AddStream(cfg, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddOrUpdateConsumer(stream string, consumer string, cfg *nats.ConsumerConfig, allowDelete bool, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
|
||||||
|
ci0, err := JsClient.ConsumerInfo(stream, consumer, opts...)
|
||||||
|
if err != nil && err.Error() != "nats: consumer not found" {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil && allowDelete {
|
||||||
|
if err := JsClient.DeleteConsumer(stream, consumer, opts...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ci, err := JsClient.AddConsumer(stream, cfg, opts...)
|
||||||
|
if err != nil {
|
||||||
|
if !allowDelete {
|
||||||
|
log.Printf("Consumer '%s' was exists", consumer)
|
||||||
|
return ci0, nil
|
||||||
|
} else {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ci, nil
|
||||||
|
}
|
81
nacl.go
81
nacl.go
@ -4,22 +4,49 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/jar3b/grawt"
|
"github.com/jar3b/grawt"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/nats-io/stan.go"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
NatsClient *nats.Conn
|
NatsClient *nats.Conn
|
||||||
StanClient stan.Conn
|
natsLock = sync.Mutex{}
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error {
|
||||||
Msg = stan.Msg
|
natsLock.Lock()
|
||||||
NatsMsg = nats.Msg
|
defer natsLock.Unlock()
|
||||||
Subscription = stan.Subscription
|
var err error
|
||||||
)
|
|
||||||
|
|
||||||
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
|
// connect
|
||||||
|
options := []nats.Option{
|
||||||
|
nats.ClosedHandler(func(conn *nats.Conn) {
|
||||||
|
if closeHandler != nil {
|
||||||
|
closeHandler.Halt(nil)
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
nats.MaxReconnects(5),
|
||||||
|
nats.ReconnectWait(time.Second * 2),
|
||||||
|
}
|
||||||
|
|
||||||
|
if credsFile != "" {
|
||||||
|
options = append(options, nats.UserCredentials(credsFile))
|
||||||
|
}
|
||||||
|
|
||||||
|
NatsClient, err = nats.Connect(
|
||||||
|
fmt.Sprintf("nats://%s:%d", host, port),
|
||||||
|
options...,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot connect to NATS: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetupNatsWithPass(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
|
||||||
|
natsLock.Lock()
|
||||||
|
defer natsLock.Unlock()
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// init connection string
|
// init connection string
|
||||||
@ -33,7 +60,9 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
|
|||||||
NatsClient, err = nats.Connect(
|
NatsClient, err = nats.Connect(
|
||||||
connectionString,
|
connectionString,
|
||||||
nats.ClosedHandler(func(conn *nats.Conn) {
|
nats.ClosedHandler(func(conn *nats.Conn) {
|
||||||
closeHandler.Halt(nil)
|
if closeHandler != nil {
|
||||||
|
closeHandler.Halt(nil)
|
||||||
|
}
|
||||||
}),
|
}),
|
||||||
nats.MaxReconnects(5),
|
nats.MaxReconnects(5),
|
||||||
nats.ReconnectWait(time.Second*2),
|
nats.ReconnectWait(time.Second*2),
|
||||||
@ -45,32 +74,20 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) {
|
func FinalizeNats(subscriptions *[]*nats.Subscription) error {
|
||||||
if err = SetupNats(host, port, user, pass, closeHandler); err != nil {
|
natsLock.Lock()
|
||||||
return err
|
defer natsLock.Unlock()
|
||||||
|
if NatsClient == nil {
|
||||||
|
return fmt.Errorf("stan client is not initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
// stan connection
|
if subscriptions != nil {
|
||||||
StanClient, err = stan.Connect(
|
for _, subscription := range *subscriptions {
|
||||||
clusterName,
|
_ = subscription.Unsubscribe()
|
||||||
clientId,
|
}
|
||||||
stan.NatsConn(NatsClient),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot connect to STAN: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func FinalizeStan(subscriptions *[]Subscription) error {
|
|
||||||
for _, subscription := range *subscriptions {
|
|
||||||
_ = subscription.Unsubscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := StanClient.Close(); err != nil {
|
|
||||||
return fmt.Errorf("cannot disconnect from STAN")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NatsClient.Close()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user