fix: add basic JetStream management; bump: nats.go v1.12.3 (BREAKING to use with nack)
This commit is contained in:
75
jetstream.go
Normal file
75
jetstream.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package nacl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/jar3b/grawt"
|
||||
"github.com/nats-io/nats.go"
|
||||
"log"
|
||||
)
|
||||
|
||||
var (
|
||||
JsClient nats.JetStreamContext
|
||||
)
|
||||
|
||||
func SetupJetStream(host string, port int, credsFile string, closeHandler *grawt.CloseHandler, opts ...nats.JSOpt) error {
|
||||
var err error
|
||||
if err := SetupNatsWithCreds(host, port, credsFile, closeHandler); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
JsClient, err = NatsClient.JetStream(opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot init JS: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func FinalizeJetStream(subscriptions *[]*nats.Subscription) error {
|
||||
return FinalizeNats(subscriptions)
|
||||
}
|
||||
|
||||
func AddOrUpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
|
||||
_, err := JsClient.StreamInfo("metric-stream")
|
||||
if err != nil && err.Error() != "nats: stream not found" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
sc, err := JsClient.UpdateStream(cfg, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sc, nil
|
||||
} else {
|
||||
sc, err := JsClient.AddStream(cfg, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
}
|
||||
|
||||
func AddOrUpdateConsumer(stream string, consumer string, cfg *nats.ConsumerConfig, allowDelete bool, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
|
||||
ci0, err := JsClient.ConsumerInfo(stream, consumer, opts...)
|
||||
if err != nil && err.Error() != "nats: consumer not found" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err == nil && allowDelete {
|
||||
if err := JsClient.DeleteConsumer(stream, consumer, opts...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ci, err := JsClient.AddConsumer(stream, cfg, opts...)
|
||||
if err != nil {
|
||||
if !allowDelete {
|
||||
log.Printf("Consumer '%s' was exists", consumer)
|
||||
return ci0, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return ci, nil
|
||||
}
|
||||
Reference in New Issue
Block a user