Compare commits
	
		
			13 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 25689b1e5b | ||
|  | b60e33a718 | ||
|  | 76b13f3833 | ||
|  | d66cc04f04 | ||
|  | 1e18f34ee2 | ||
|  | 5cf7a062b9 | ||
|  | e8cba551c7 | ||
|  | 15e6c19222 | ||
|  | 4a7f5a637a | ||
|  | f6ffcbb4d7 | ||
|  | 15e01680a2 | ||
|  | 282b5f5ff0 | ||
|  | fabfbfe174 | 
							
								
								
									
										2
									
								
								LICENSE
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								LICENSE
									
									
									
									
									
								
							| @@ -1,6 +1,6 @@ | |||||||
| MIT License | MIT License | ||||||
|  |  | ||||||
| Copyright (c) 2020 jar3b | Copyright (c) 2021 jar3b | ||||||
|  |  | ||||||
| Permission is hereby granted, free of charge, to any person obtaining a copy | Permission is hereby granted, free of charge, to any person obtaining a copy | ||||||
| of this software and associated documentation files (the "Software"), to deal | of this software and associated documentation files (the "Software"), to deal | ||||||
|   | |||||||
							
								
								
									
										40
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								README.md
									
									
									
									
									
								
							| @@ -1,2 +1,42 @@ | |||||||
| # nacl | # nacl | ||||||
|  |  | ||||||
| Client helper for NATS/STAN | Client helper for NATS/STAN | ||||||
|  |  | ||||||
|  | ## examples | ||||||
|  |  | ||||||
|  | connect to NATS | ||||||
|  |  | ||||||
|  | ```go | ||||||
|  | package main | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  |     "fmt" | ||||||
|  |     "github.com/jar3b/grawt" | ||||||
|  |     "github.com/jar3b/nacl" | ||||||
|  |     "github.com/kelseyhightower/envconfig" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func main() { | ||||||
|  |     // get params | ||||||
|  |     conf := config.NewConfig() | ||||||
|  |  | ||||||
|  | 	// init waiter (github.com/jar3b/grawt) | ||||||
|  | 	var waiter = grawt.NewWaiter() | ||||||
|  |  | ||||||
|  | 	// init nats | ||||||
|  | 	var subscriptions []*nacl.NatsSubscription | ||||||
|  | 	if err := nacl.SetupNats(conf.Nats.Host, conf.Nats.Port, conf.Nats.User, conf.Nats.Pass, | ||||||
|  | 		// handler called before app closed | ||||||
|  |         // we need to terminate sub's properly and sometimes doing another actions (finalizers) | ||||||
|  |         waiter.AddCloseHandler(func() { | ||||||
|  | 			nacl.FinalizeNats(&subscriptions) | ||||||
|  | 		}, false), | ||||||
|  | 	); err != nil { | ||||||
|  | 		waiter.Halt(fmt.Errorf("cannot connect to nats: %v", err)) | ||||||
|  | 	} | ||||||
|  | 	defer nacl.NatsClient.Close() | ||||||
|  | 	 | ||||||
|  |     // here we add some subscriptions and wait (using blocking call) | ||||||
|  |     // ... | ||||||
|  | } | ||||||
|  | ``` | ||||||
							
								
								
									
										19
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										19
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,13 +1,16 @@ | |||||||
| module github.com/jar3b/nacl | module github.com/jar3b/nacl | ||||||
|  |  | ||||||
| go 1.15 | go 1.17 | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	github.com/golang/protobuf v1.4.2 // indirect | 	github.com/jar3b/grawt v0.1.10 | ||||||
| 	github.com/jar3b/grawt v0.1.6 | 	github.com/nats-io/nats.go v1.13.0 | ||||||
| 	github.com/nats-io/nats-server/v2 v2.1.8 // indirect | ) | ||||||
| 	github.com/nats-io/nats-streaming-server v0.18.0 // indirect |  | ||||||
| 	github.com/nats-io/nats.go v1.10.0 | require ( | ||||||
| 	github.com/nats-io/stan.go v0.7.0 | 	github.com/nats-io/nkeys v0.3.0 // indirect | ||||||
| 	google.golang.org/protobuf v1.25.0 // indirect | 	github.com/nats-io/nuid v1.0.1 // indirect | ||||||
|  | 	github.com/sirupsen/logrus v1.8.1 // indirect | ||||||
|  | 	golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect | ||||||
|  | 	golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect | ||||||
| ) | ) | ||||||
|   | |||||||
							
								
								
									
										22
									
								
								go.sum
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								go.sum
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,22 @@ | |||||||
|  | github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||||||
|  | github.com/jar3b/grawt v0.1.10 h1:PwdPFJx6PFNdbQOk04VMWMGuqAwCuk0INvjdMA53MCA= | ||||||
|  | github.com/jar3b/grawt v0.1.10/go.mod h1:C9fy+5804sm2Z/kO78ENgHntWTWtqLcrRtN9Q7tPScE= | ||||||
|  | github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= | ||||||
|  | github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= | ||||||
|  | github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= | ||||||
|  | github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= | ||||||
|  | github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= | ||||||
|  | github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= | ||||||
|  | github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||||||
|  | github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= | ||||||
|  | github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= | ||||||
|  | github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= | ||||||
|  | golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= | ||||||
|  | golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= | ||||||
|  | golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= | ||||||
|  | golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||||||
|  | golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= | ||||||
|  | golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||||||
|  | golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= | ||||||
|  | golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | ||||||
|  | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | ||||||
							
								
								
									
										75
									
								
								jetstream.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								jetstream.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,75 @@ | |||||||
|  | package nacl | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"github.com/jar3b/grawt" | ||||||
|  | 	"github.com/nats-io/nats.go" | ||||||
|  | 	"log" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	JsClient nats.JetStreamContext | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error { | ||||||
|  | 	var err error | ||||||
|  | 	if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	JsClient, err = NatsClient.JetStream(opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("cannot init JS: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func FinalizeJetStream(subscriptions *[]*nats.Subscription) error { | ||||||
|  | 	return FinalizeNats(subscriptions) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func AddOrUpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { | ||||||
|  | 	_, err := JsClient.StreamInfo(cfg.Name) | ||||||
|  | 	if err != nil && err.Error() != "nats: stream not found" { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err == nil { | ||||||
|  | 		sc, err := JsClient.UpdateStream(cfg, opts...) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		return sc, nil | ||||||
|  | 	} else { | ||||||
|  | 		sc, err := JsClient.AddStream(cfg, opts...) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		return sc, nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func AddOrUpdateConsumer(stream string, consumer string, cfg *nats.ConsumerConfig, allowDelete bool, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { | ||||||
|  | 	ci0, err := JsClient.ConsumerInfo(stream, consumer, opts...) | ||||||
|  | 	if err != nil && err.Error() != "nats: consumer not found" { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err == nil && allowDelete { | ||||||
|  | 		if err := JsClient.DeleteConsumer(stream, consumer, opts...); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ci, err := JsClient.AddConsumer(stream, cfg, opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if !allowDelete { | ||||||
|  | 			log.Printf("Consumer '%s' was exists", consumer) | ||||||
|  | 			return ci0, nil | ||||||
|  | 		} else { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return ci, nil | ||||||
|  | } | ||||||
							
								
								
									
										75
									
								
								nacl.go
									
									
									
									
									
								
							
							
						
						
									
										75
									
								
								nacl.go
									
									
									
									
									
								
							| @@ -4,23 +4,42 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"github.com/jar3b/grawt" | 	"github.com/jar3b/grawt" | ||||||
| 	"github.com/nats-io/nats.go" | 	"github.com/nats-io/nats.go" | ||||||
| 	"github.com/nats-io/stan.go" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	NatsClient *nats.Conn | 	NatsClient *nats.Conn | ||||||
| 	StanClient stan.Conn | 	natsLock   = sync.Mutex{} | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type ( | func SetupNatsWithCreds(host string, port int, credsFile string, closeHandler *grawt.CloseHandler) error { | ||||||
| 	Msg              = stan.Msg | 	natsLock.Lock() | ||||||
| 	NatsMsg          = nats.Msg | 	defer natsLock.Unlock() | ||||||
| 	Subscription     = stan.Subscription | 	var err error | ||||||
| 	NatsSubscription = nats.Subscription |  | ||||||
|  | 	// connect | ||||||
|  | 	NatsClient, err = nats.Connect( | ||||||
|  | 		fmt.Sprintf("nats://%s:%d", host, port), | ||||||
|  | 		nats.UserCredentials(credsFile), | ||||||
|  | 		nats.ClosedHandler(func(conn *nats.Conn) { | ||||||
|  | 			if closeHandler != nil { | ||||||
|  | 				closeHandler.Halt(nil) | ||||||
|  | 			} | ||||||
|  | 		}), | ||||||
|  | 		nats.MaxReconnects(5), | ||||||
|  | 		nats.ReconnectWait(time.Second*2), | ||||||
| 	) | 	) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("cannot connect to NATS: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error { | func SetupNats(host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) error { | ||||||
|  | 	natsLock.Lock() | ||||||
|  | 	defer natsLock.Unlock() | ||||||
| 	var err error | 	var err error | ||||||
|  |  | ||||||
| 	// init connection string | 	// init connection string | ||||||
| @@ -34,7 +53,9 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr | |||||||
| 	NatsClient, err = nats.Connect( | 	NatsClient, err = nats.Connect( | ||||||
| 		connectionString, | 		connectionString, | ||||||
| 		nats.ClosedHandler(func(conn *nats.Conn) { | 		nats.ClosedHandler(func(conn *nats.Conn) { | ||||||
|  | 			if closeHandler != nil { | ||||||
| 				closeHandler.Halt(nil) | 				closeHandler.Halt(nil) | ||||||
|  | 			} | ||||||
| 		}), | 		}), | ||||||
| 		nats.MaxReconnects(5), | 		nats.MaxReconnects(5), | ||||||
| 		nats.ReconnectWait(time.Second*2), | 		nats.ReconnectWait(time.Second*2), | ||||||
| @@ -46,46 +67,18 @@ func SetupNats(host string, port int, user string, pass string, closeHandler *gr | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func SetupStan(clusterName string, clientId string, host string, port int, user string, pass string, closeHandler *grawt.CloseHandler) (err error) { | func FinalizeNats(subscriptions *[]*nats.Subscription) error { | ||||||
| 	if err = SetupNats(host, port, user, pass, closeHandler); err != nil { | 	natsLock.Lock() | ||||||
| 		return err | 	defer natsLock.Unlock() | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// stan connection |  | ||||||
| 	StanClient, err = stan.Connect( |  | ||||||
| 		clusterName, |  | ||||||
| 		clientId, |  | ||||||
| 		stan.NatsConn(NatsClient), |  | ||||||
| 	) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("cannot connect to STAN: %v", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func FinalizeStan(subscriptions *[]Subscription) error { |  | ||||||
| 	if StanClient == nil { |  | ||||||
| 		return fmt.Errorf("stan client is not initialized") |  | ||||||
| 	} |  | ||||||
| 	for _, subscription := range *subscriptions { |  | ||||||
| 		_ = subscription.Unsubscribe() |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if err := StanClient.Close(); err != nil { |  | ||||||
| 		return fmt.Errorf("cannot disconnect from STAN") |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func FinalizeNats(subscriptions *[]*NatsSubscription) error { |  | ||||||
| 	if NatsClient == nil { | 	if NatsClient == nil { | ||||||
| 		return fmt.Errorf("stan client is not initialized") | 		return fmt.Errorf("stan client is not initialized") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if subscriptions != nil { | ||||||
| 		for _, subscription := range *subscriptions { | 		for _, subscription := range *subscriptions { | ||||||
| 			_ = subscription.Unsubscribe() | 			_ = subscription.Unsubscribe() | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	NatsClient.Close() | 	NatsClient.Close() | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user