nats
packageAPI reference for the nats
package.
Imports
(12)natsSubscription
type natsSubscription struct
Fields
| Name | Type | Description |
|---|---|---|
| sub | *nats.Subscription | |
| chans | []chan syncbus.Event |
NATSBus
NATSBus implements Bus using a NATS backend.
type NATSBus struct
Methods
Publish implements Bus.Publish.
Parameters
Returns
func (*NATSBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
b.mu.Lock()
if _, ok := b.pending[key]; ok {
b.mu.Unlock()
return nil // deduplicate
}
b.pending[key] = struct{}{}
b.mu.Unlock()
if j := rand.Int63n(int64(10 * time.Millisecond)); j > 0 {
select {
case <-ctx.Done():
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return ctx.Err()
case <-time.After(time.Duration(j)):
}
}
id := uuid.NewString()
backoff := 100 * time.Millisecond
var err error
for {
err = b.conn.Publish(key, []byte(id))
if err == nil {
b.published.Add(1)
break
}
_ = b.reconnect()
select {
case <-ctx.Done():
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return ctx.Err()
default:
}
jitter := time.Duration(rand.Int63n(int64(backoff)))
time.Sleep(backoff + jitter)
if backoff < time.Second {
backoff *= 2
if backoff > time.Second {
backoff = time.Second
}
}
}
time.AfterFunc(time.Millisecond, func() {
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
})
return err
}
PublishAndAwait implements Bus.PublishAndAwait. NATS core subjects do not expose subscriber counts, so only a quorum of 1 is supported.
Parameters
Returns
func (*NATSBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
if replicas <= 0 {
replicas = 1
}
if replicas > 1 {
return syncbus.ErrQuorumUnsupported
}
if err := b.Publish(ctx, key, opts...); err != nil {
return err
}
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return ctx.Err()
}
return b.conn.FlushTimeout(timeout)
}
return b.conn.Flush()
}
PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.
Parameters
Returns
func (*NATSBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
// NATS does not support topology awareness in Core NATS.
return syncbus.ErrQuorumUnsupported
}
Subscribe implements Bus.Subscribe.
Parameters
Returns
func (*NATSBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
ch := make(chan syncbus.Event, 1)
backoff := 100 * time.Millisecond
for {
b.mu.Lock()
sub := b.subs[key]
b.mu.Unlock()
if sub != nil {
b.mu.Lock()
sub.chans = append(sub.chans, ch)
b.mu.Unlock()
break
}
ns, err := b.conn.Subscribe(key, b.natsHandler(key))
if err == nil {
b.mu.Lock()
b.subs[key] = &natsSubscription{sub: ns, chans: []chan syncbus.Event{ch}}
b.mu.Unlock()
break
}
_ = b.reconnect()
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
jitter := time.Duration(rand.Int63n(int64(backoff)))
time.Sleep(backoff + jitter)
if backoff < time.Second {
backoff *= 2
if backoff > time.Second {
backoff = time.Second
}
}
}
go func() {
<-ctx.Done()
_ = b.Unsubscribe(context.Background(), key, ch)
}()
return ch, nil
}
Unsubscribe implements Bus.Unsubscribe.
Parameters
Returns
func (*NATSBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
b.mu.Lock()
sub := b.subs[key]
if sub == nil {
b.mu.Unlock()
return nil
}
for i, c := range sub.chans {
if c == ch {
sub.chans[i] = sub.chans[len(sub.chans)-1]
sub.chans = sub.chans[:len(sub.chans)-1]
close(c)
break
}
}
if len(sub.chans) == 0 {
delete(b.subs, key)
b.mu.Unlock()
return sub.sub.Unsubscribe()
}
b.mu.Unlock()
return nil
}
RevokeLease publishes a lease revocation event.
Parameters
Returns
func (*NATSBus) RevokeLease(ctx context.Context, id string) error
{
return b.Publish(ctx, "lease:"+id)
}
IsHealthy implements Bus.IsHealthy.
Returns
func (*NATSBus) IsHealthy() bool
{
return true
}
Peers implements Bus.Peers.
Returns
func (*NATSBus) Peers() []string
{
return nil
}
SubscribeLease subscribes to lease revocation events.
Parameters
Returns
func (*NATSBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return b.Subscribe(ctx, "lease:"+id)
}
UnsubscribeLease cancels a lease revocation subscription.
Parameters
Returns
func (*NATSBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Metrics returns the published and delivered counts.
Returns
func (*NATSBus) Metrics() syncbus.Metrics
{
return syncbus.Metrics{
Published: b.published.Load(),
Delivered: b.delivered.Load(),
}
}
Parameters
Returns
func (*NATSBus) natsHandler(key string) nats.MsgHandler
{
return func(m *nats.Msg) {
id := string(m.Data)
b.mu.Lock()
if _, ok := b.processed[id]; ok {
b.mu.Unlock()
return
}
b.processed[id] = struct{}{}
chans := append([]chan syncbus.Event(nil), b.subs[key].chans...)
b.mu.Unlock()
evt := syncbus.Event{Key: key}
for _, c := range chans {
select {
case c <- evt:
b.delivered.Add(1)
default:
}
}
}
}
Returns
func (*NATSBus) reconnect() error
{
if b.conn != nil && b.conn.IsConnected() {
return nil
}
newConn, err := b.conn.Opts.Connect()
if err != nil {
return err
}
b.mu.Lock()
b.conn = newConn
for key, sub := range b.subs {
ns, err := b.conn.Subscribe(key, b.natsHandler(key))
if err != nil {
continue
}
sub.sub = ns
}
b.mu.Unlock()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| conn | *nats.Conn | |
| mu | sync.Mutex | |
| subs | map[string]*natsSubscription | |
| pending | map[string]struct{} | |
| processed | map[string]struct{} | |
| published | atomic.Uint64 | |
| delivered | atomic.Uint64 |
NewNATSBus
NewNATSBus returns a new NATSBus using the provided connection.
func NewNATSBus(conn *nats.Conn) *NATSBus
{
return &NATSBus{
conn: conn,
subs: make(map[string]*natsSubscription),
pending: make(map[string]struct{}),
processed: make(map[string]struct{}),
}
}
newNATSBus
Parameters
Returns
func newNATSBus(t *testing.T) (*NATSBus, context.Context)
{
t.Helper()
addr := os.Getenv("WARP_TEST_NATS_ADDR")
forceReal := os.Getenv("WARP_TEST_FORCE_REAL") == "true"
if forceReal && addr == "" {
t.Fatal("WARP_TEST_FORCE_REAL is true but WARP_TEST_NATS_ADDR is empty")
}
var conn *nats.Conn
var s *server.Server
var err error
if addr != "" {
t.Logf("TestNATSBus: using real NATS at %s", addr)
conn, err = nats.Connect(addr)
if err != nil {
t.Fatalf("connect: %v", err)
}
} else {
t.Log("TestNATSBus: using embedded NATS server")
s = natsserver.RunRandClientPortServer()
conn, err = nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
}
bus := NewNATSBus(conn)
ctx := context.Background()
t.Cleanup(func() {
conn.Close()
if s != nil {
s.Shutdown()
}
})
return bus, ctx
}
TestNATSBusPublishSubscribeFlowAndMetrics
Parameters
func TestNATSBusPublishSubscribeFlowAndMetrics(t *testing.T)
{
bus, ctx := newNATSBus(t)
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
if err := bus.Publish(ctx, "key"); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for publish")
}
metrics := bus.Metrics()
if metrics.Published != 1 {
t.Fatalf("expected published 1 got %d", metrics.Published)
}
if metrics.Delivered != 1 {
t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
}
}
TestNATSBusContextBasedUnsubscribe
Parameters
func TestNATSBusContextBasedUnsubscribe(t *testing.T)
{
bus, _ := newNATSBus(t)
subCtx, cancel := context.WithCancel(context.Background())
ch, err := bus.Subscribe(subCtx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
cancel()
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel closed")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for unsubscribe")
}
bus.mu.Lock()
defer bus.mu.Unlock()
if _, ok := bus.subs["key"]; ok {
t.Fatal("subscription still present after context cancel")
}
}
TestNATSBusDeduplicatePendingKeys
Parameters
func TestNATSBusDeduplicatePendingKeys(t *testing.T)
{
bus, ctx := newNATSBus(t)
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
bus.mu.Lock()
bus.pending["key"] = struct{}{}
bus.mu.Unlock()
if err := bus.Publish(ctx, "key"); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
t.Fatal("unexpected publish when key pending")
default:
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
if metrics.Delivered != 0 {
t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
}
}
TestNATSBusReconnectAfterClose
Parameters
func TestNATSBusReconnectAfterClose(t *testing.T)
{
bus, ctx := newNATSBus(t)
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
bus.conn.Close()
if err := bus.Publish(ctx, "key"); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for publish")
}
metrics := bus.Metrics()
if metrics.Published != 1 {
t.Fatalf("expected published 1 got %d", metrics.Published)
}
if metrics.Delivered != 1 {
t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
}
}
TestNATSBusReconnectLoop
Parameters
func TestNATSBusReconnectLoop(t *testing.T)
{
bus, ctx := newNATSBus(t)
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
bus.conn.Close()
done := make(chan error, 1)
go func() { done <- bus.Publish(ctx, "key") }()
select {
case err := <-done:
if err != nil {
t.Fatalf("publish: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("publish timeout")
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for publish")
}
}
TestNATSBusIdempotentAfterReconnect
Parameters
func TestNATSBusIdempotentAfterReconnect(t *testing.T)
{
bus, ctx := newNATSBus(t)
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
id := uuid.NewString()
if err := bus.conn.Publish("key", []byte(id)); err != nil {
t.Fatalf("direct publish: %v", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for publish")
}
bus.conn.Close()
if err := bus.reconnect(); err != nil {
t.Fatalf("reconnect: %v", err)
}
if err := bus.conn.Publish("key", []byte(id)); err != nil {
t.Fatalf("dup publish: %v", err)
}
select {
case <-ch:
t.Fatal("duplicate delivered")
case <-time.After(200 * time.Millisecond):
}
}