nats API

nats

package

API reference for the nats package.

S
struct

natsSubscription

v1/syncbus/nats/nats.go:15-18
type natsSubscription struct

Fields

Name Type Description
sub *nats.Subscription
chans []chan syncbus.Event
S
struct

NATSBus

NATSBus implements Bus using a NATS backend.

v1/syncbus/nats/nats.go:21-29
type NATSBus struct

Methods

Publish
Method

Publish implements Bus.Publish.

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*NATSBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	b.mu.Lock()
	if _, ok := b.pending[key]; ok {
		b.mu.Unlock()
		return nil // deduplicate
	}
	b.pending[key] = struct{}{}
	b.mu.Unlock()

	if j := rand.Int63n(int64(10 * time.Millisecond)); j > 0 {
		select {
		case <-ctx.Done():
			b.mu.Lock()
			delete(b.pending, key)
			b.mu.Unlock()
			return ctx.Err()
		case <-time.After(time.Duration(j)):
		}
	}

	id := uuid.NewString()
	backoff := 100 * time.Millisecond
	var err error
	for {
		err = b.conn.Publish(key, []byte(id))
		if err == nil {
			b.published.Add(1)
			break
		}
		_ = b.reconnect()
		select {
		case <-ctx.Done():
			b.mu.Lock()
			delete(b.pending, key)
			b.mu.Unlock()
			return ctx.Err()
		default:
		}
		jitter := time.Duration(rand.Int63n(int64(backoff)))
		time.Sleep(backoff + jitter)
		if backoff < time.Second {
			backoff *= 2
			if backoff > time.Second {
				backoff = time.Second
			}
		}
	}

	time.AfterFunc(time.Millisecond, func() {
		b.mu.Lock()
		delete(b.pending, key)
		b.mu.Unlock()
	})
	return err
}

PublishAndAwait implements Bus.PublishAndAwait. NATS core subjects do not expose subscriber counts, so only a quorum of 1 is supported.

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*NATSBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	if replicas <= 0 {
		replicas = 1
	}
	if replicas > 1 {
		return syncbus.ErrQuorumUnsupported
	}
	if err := b.Publish(ctx, key, opts...); err != nil {
		return err
	}
	if deadline, ok := ctx.Deadline(); ok {
		timeout := time.Until(deadline)
		if timeout <= 0 {
			return ctx.Err()
		}
		return b.conn.FlushTimeout(timeout)
	}
	return b.conn.Flush()
}

PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*NATSBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	// NATS does not support topology awareness in Core NATS.
	return syncbus.ErrQuorumUnsupported
}
Subscribe
Method

Subscribe implements Bus.Subscribe.

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*NATSBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	ch := make(chan syncbus.Event, 1)
	backoff := 100 * time.Millisecond

	for {
		b.mu.Lock()
		sub := b.subs[key]
		b.mu.Unlock()
		if sub != nil {
			b.mu.Lock()
			sub.chans = append(sub.chans, ch)
			b.mu.Unlock()
			break
		}
		ns, err := b.conn.Subscribe(key, b.natsHandler(key))
		if err == nil {
			b.mu.Lock()
			b.subs[key] = &natsSubscription{sub: ns, chans: []chan syncbus.Event{ch}}
			b.mu.Unlock()
			break
		}
		_ = b.reconnect()
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}
		jitter := time.Duration(rand.Int63n(int64(backoff)))
		time.Sleep(backoff + jitter)
		if backoff < time.Second {
			backoff *= 2
			if backoff > time.Second {
				backoff = time.Second
			}
		}
	}

	go func() {
		<-ctx.Done()
		_ = b.Unsubscribe(context.Background(), key, ch)
	}()
	return ch, nil
}
Unsubscribe
Method

Unsubscribe implements Bus.Unsubscribe.

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*NATSBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	b.mu.Lock()
	sub := b.subs[key]
	if sub == nil {
		b.mu.Unlock()
		return nil
	}
	for i, c := range sub.chans {
		if c == ch {
			sub.chans[i] = sub.chans[len(sub.chans)-1]
			sub.chans = sub.chans[:len(sub.chans)-1]
			close(c)
			break
		}
	}
	if len(sub.chans) == 0 {
		delete(b.subs, key)
		b.mu.Unlock()
		return sub.sub.Unsubscribe()
	}
	b.mu.Unlock()
	return nil
}
RevokeLease
Method

RevokeLease publishes a lease revocation event.

Parameters

id string

Returns

error
func (*NATSBus) RevokeLease(ctx context.Context, id string) error
{
	return b.Publish(ctx, "lease:"+id)
}
IsHealthy
Method

IsHealthy implements Bus.IsHealthy.

Returns

bool
func (*NATSBus) IsHealthy() bool
{
	return true
}
Peers
Method

Peers implements Bus.Peers.

Returns

[]string
func (*NATSBus) Peers() []string
{
	return nil
}

SubscribeLease subscribes to lease revocation events.

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*NATSBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return b.Subscribe(ctx, "lease:"+id)
}

UnsubscribeLease cancels a lease revocation subscription.

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*NATSBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Metrics
Method

Metrics returns the published and delivered counts.

Returns

func (*NATSBus) Metrics() syncbus.Metrics
{
	return syncbus.Metrics{
		Published: b.published.Load(),
		Delivered: b.delivered.Load(),
	}
}
natsHandler
Method

Parameters

key string

Returns

func (*NATSBus) natsHandler(key string) nats.MsgHandler
{
	return func(m *nats.Msg) {
		id := string(m.Data)
		b.mu.Lock()
		if _, ok := b.processed[id]; ok {
			b.mu.Unlock()
			return
		}
		b.processed[id] = struct{}{}
		chans := append([]chan syncbus.Event(nil), b.subs[key].chans...)
		b.mu.Unlock()

		evt := syncbus.Event{Key: key}
		for _, c := range chans {
			select {
			case c <- evt:
				b.delivered.Add(1)
			default:
			}
		}
	}
}
reconnect
Method

Returns

error
func (*NATSBus) reconnect() error
{
	if b.conn != nil && b.conn.IsConnected() {
		return nil
	}
	newConn, err := b.conn.Opts.Connect()
	if err != nil {
		return err
	}
	b.mu.Lock()
	b.conn = newConn
	for key, sub := range b.subs {
		ns, err := b.conn.Subscribe(key, b.natsHandler(key))
		if err != nil {
			continue
		}
		sub.sub = ns
	}
	b.mu.Unlock()
	return nil
}

Fields

Name Type Description
conn *nats.Conn
mu sync.Mutex
subs map[string]*natsSubscription
pending map[string]struct{}
processed map[string]struct{}
published atomic.Uint64
delivered atomic.Uint64
F
function

NewNATSBus

NewNATSBus returns a new NATSBus using the provided connection.

Parameters

conn

Returns

v1/syncbus/nats/nats.go:32-39
func NewNATSBus(conn *nats.Conn) *NATSBus

{
	return &NATSBus{
		conn:      conn,
		subs:      make(map[string]*natsSubscription),
		pending:   make(map[string]struct{}),
		processed: make(map[string]struct{}),
	}
}
F
function

newNATSBus

Parameters

v1/syncbus/nats/nats_test.go:15-52
func newNATSBus(t *testing.T) (*NATSBus, context.Context)

{
	t.Helper()
	addr := os.Getenv("WARP_TEST_NATS_ADDR")
	forceReal := os.Getenv("WARP_TEST_FORCE_REAL") == "true"

	if forceReal && addr == "" {
		t.Fatal("WARP_TEST_FORCE_REAL is true but WARP_TEST_NATS_ADDR is empty")
	}

	var conn *nats.Conn
	var s *server.Server
	var err error

	if addr != "" {
		t.Logf("TestNATSBus: using real NATS at %s", addr)
		conn, err = nats.Connect(addr)
		if err != nil {
			t.Fatalf("connect: %v", err)
		}
	} else {
		t.Log("TestNATSBus: using embedded NATS server")
		s = natsserver.RunRandClientPortServer()
		conn, err = nats.Connect(s.ClientURL())
		if err != nil {
			t.Fatalf("connect: %v", err)
		}
	}

	bus := NewNATSBus(conn)
	ctx := context.Background()
	t.Cleanup(func() {
		conn.Close()
		if s != nil {
			s.Shutdown()
		}
	})
	return bus, ctx
}
F
function

TestNATSBusPublishSubscribeFlowAndMetrics

Parameters

v1/syncbus/nats/nats_test.go:54-75
func TestNATSBusPublishSubscribeFlowAndMetrics(t *testing.T)

{
	bus, ctx := newNATSBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	if err := bus.Publish(ctx, "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestNATSBusContextBasedUnsubscribe

Parameters

v1/syncbus/nats/nats_test.go:77-98
func TestNATSBusContextBasedUnsubscribe(t *testing.T)

{
	bus, _ := newNATSBus(t)
	subCtx, cancel := context.WithCancel(context.Background())
	ch, err := bus.Subscribe(subCtx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	cancel()
	select {
	case _, ok := <-ch:
		if ok {
			t.Fatal("expected channel closed")
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for unsubscribe")
	}
	bus.mu.Lock()
	defer bus.mu.Unlock()
	if _, ok := bus.subs["key"]; ok {
		t.Fatal("subscription still present after context cancel")
	}
}
F
function

TestNATSBusDeduplicatePendingKeys

Parameters

v1/syncbus/nats/nats_test.go:100-124
func TestNATSBusDeduplicatePendingKeys(t *testing.T)

{
	bus, ctx := newNATSBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	bus.mu.Lock()
	bus.pending["key"] = struct{}{}
	bus.mu.Unlock()
	if err := bus.Publish(ctx, "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case <-ch:
		t.Fatal("unexpected publish when key pending")
	default:
	}
	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestNATSBusReconnectAfterClose

Parameters

v1/syncbus/nats/nats_test.go:126-148
func TestNATSBusReconnectAfterClose(t *testing.T)

{
	bus, ctx := newNATSBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	bus.conn.Close()
	if err := bus.Publish(ctx, "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestNATSBusReconnectLoop

Parameters

v1/syncbus/nats/nats_test.go:150-172
func TestNATSBusReconnectLoop(t *testing.T)

{
	bus, ctx := newNATSBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	bus.conn.Close()
	done := make(chan error, 1)
	go func() { done <- bus.Publish(ctx, "key") }()
	select {
	case err := <-done:
		if err != nil {
			t.Fatalf("publish: %v", err)
		}
	case <-time.After(2 * time.Second):
		t.Fatal("publish timeout")
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
}
F
function

TestNATSBusIdempotentAfterReconnect

Parameters

v1/syncbus/nats/nats_test.go:174-201
func TestNATSBusIdempotentAfterReconnect(t *testing.T)

{
	bus, ctx := newNATSBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	id := uuid.NewString()
	if err := bus.conn.Publish("key", []byte(id)); err != nil {
		t.Fatalf("direct publish: %v", err)
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
	bus.conn.Close()
	if err := bus.reconnect(); err != nil {
		t.Fatalf("reconnect: %v", err)
	}
	if err := bus.conn.Publish("key", []byte(id)); err != nil {
		t.Fatalf("dup publish: %v", err)
	}
	select {
	case <-ch:
		t.Fatal("duplicate delivered")
	case <-time.After(200 * time.Millisecond):
	}
}