11 Commits

Author SHA1 Message Date
jar3b
25689b1e5b fix: downgrade nats.go@v1.13.0 2022-02-06 19:09:06 +03:00
jar3b
b60e33a718 bump: packages; feat: remove STAN (nats-streaming) 2022-02-06 18:53:02 +03:00
jar3b
76b13f3833 bump: nats.go v1.13.0 2021-10-13 14:06:39 +03:00
jar3b
d66cc04f04 fix: stream name in AddOrUpdateStream() 2021-10-13 14:05:47 +03:00
jar3b
1e18f34ee2 fix: add basic JetStream management; bump: nats.go v1.12.3 (BREAKING to use with nack) 2021-09-29 14:37:13 +03:00
jar3b
5cf7a062b9 fix: add SetupJetStream() 2021-06-07 14:02:59 +03:00
jar3b
e8cba551c7 fix: remove own types 2021-06-07 13:45:26 +03:00
jar3b
15e6c19222 feat: add SetupNatsWithCreds 2021-06-03 15:43:57 +03:00
jar3b
4a7f5a637a feat: update deps 2021-06-03 15:43:43 +03:00
jar3b
f6ffcbb4d7 frat: update deps; feat: use locks for clients init;bump: up to v0.1.1 2021-03-12 20:05:00 +03:00
jar3b
15e01680a2 bump: deps, up to v0.1.0 2020-11-19 16:22:01 +03:00
5 changed files with 139 additions and 54 deletions

View File

@@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) 2020 jar3b Copyright (c) 2021 jar3b
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

19
go.mod
View File

@@ -1,13 +1,16 @@
module github.com/jar3b/nacl module github.com/jar3b/nacl
go 1.15 go 1.17
require ( require (
github.com/golang/protobuf v1.4.2 // indirect github.com/jar3b/grawt v0.1.10
github.com/jar3b/grawt v0.1.6 github.com/nats-io/nats.go v1.13.0
github.com/nats-io/nats-server/v2 v2.1.8 // indirect )
github.com/nats-io/nats-streaming-server v0.18.0 // indirect
github.com/nats-io/nats.go v1.10.0 require (
github.com/nats-io/stan.go v0.7.0 github.com/nats-io/nkeys v0.3.0 // indirect
google.golang.org/protobuf v1.25.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
) )

22
go.sum Normal file
View File

@@ -0,0 +1,22 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jar3b/grawt v0.1.10 h1:PwdPFJx6PFNdbQOk04VMWMGuqAwCuk0INvjdMA53MCA=
github.com/jar3b/grawt v0.1.10/go.mod h1:C9fy+5804sm2Z/kO78ENgHntWTWtqLcrRtN9Q7tPScE=
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

75
jetstream.go Normal file
View File

@@ -0,0 +1,75 @@
package nacl
import (
"fmt"
"github.com/jar3b/grawt"
"github.com/nats-io/nats.go"
"log"
)
var (
JsClient nats.JetStreamContext
)
func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error {
var err error
if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil {
return err
}
JsClient, err = NatsClient.JetStream(opts...)
if err != nil {
return fmt.Errorf("cannot init JS: %v", err)
}
return nil
}
func FinalizeJetStream(subscriptions *[]*nats.Subscription) error {
return FinalizeNats(subscriptions)
}
func AddOrUpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
_, err := JsClient.StreamInfo(cfg.Name)
if err != nil && err.Error() != "nats: stream not found" {
return nil, err
}
if err == nil {
sc, err := JsClient.UpdateStream(cfg, opts...)
if err != nil {
return nil, err
}
return sc, nil
} else {
sc, err := JsClient.AddStream(cfg, opts...)
if err != nil {
return nil, err
}
return sc, nil
}
}
func AddOrUpdateConsumer(stream string, consumer string, cfg *nats.ConsumerConfig, allowDelete bool, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
ci0, err := JsClient.ConsumerInfo(stream, consumer, opts...)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
if err == nil && allowDelete {
if err := JsClient.DeleteConsumer(stream, consumer, opts...); err != nil {
return nil, err
}
}
ci, err := JsClient.AddConsumer(stream, cfg, opts...)
if err != nil {
if !allowDelete {
log.Printf("Consumer '%s' was exists", consumer)
return ci0, nil
} else {
return nil, err
}
}
return ci, nil
}

75
nacl.go
View File

@@ -4,23 +4,42 @@ import (
"fmt" "fmt"
"github.com/jar3b/grawt" "github.com/jar3b/grawt"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nats-io/stan.go" "sync"
"time" "time"
) )
var ( var (
NatsClient *nats.Conn NatsClient *nats.Conn
StanClient stan.Conn natsLock = sync.Mutex{}
) )
type ( func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error {
Msg = stan.Msg natsLock.Lock()
NatsMsg = nats.Msg defer natsLock.Unlock()
Subscription = stan.Subscription var err error
NatsSubscription = nats.Subscription
) // connect
NatsClient, err = nats.Connect(
fmt.Sprintf("nats://%s:%d", host, port),
nats.UserCredentials(credsFile),
nats.ClosedHandler(func(conn *nats.Conn) {
if closeHandler != nil {
closeHandler.Halt(nil)
}
}),
nats.MaxReconnects(5),
nats.ReconnectWait(time.Second*2),
)
if err != nil {
return fmt.Errorf("cannot connect to NATS: %v", err)
}
return nil
}
func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error { func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error {
natsLock.Lock()
defer natsLock.Unlock()
var err error var err error
// init connection string // init connection string
@@ -48,43 +67,9 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr
return nil return nil
} }
func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) { func FinalizeNats(subscriptions *[]*nats.Subscription) error {
if err = SetupNats(host, port, user, pass, closeHandler); err != nil { natsLock.Lock()
return err defer natsLock.Unlock()
}
// stan connection
StanClient, err = stan.Connect(
clusterName,
clientId,
stan.NatsConn(NatsClient),
)
if err != nil {
return fmt.Errorf("cannot connect to STAN: %v", err)
}
return nil
}
func FinalizeStan(subscriptions *[]Subscription) error {
if StanClient == nil {
return fmt.Errorf("stan client is not initialized")
}
if subscriptions != nil {
for _, subscription := range *subscriptions {
_ = subscription.Unsubscribe()
}
}
if err := StanClient.Close(); err != nil {
return fmt.Errorf("cannot disconnect from STAN")
}
return nil
}
func FinalizeNats(subscriptions *[]*NatsSubscription) error {
if NatsClient == nil { if NatsClient == nil {
return fmt.Errorf("stan client is not initialized") return fmt.Errorf("stan client is not initialized")
} }