redis API

redis

package

API reference for the redis package.

F
function

init

v1/syncbus/redis/redis.go:19-21
func init()

{
	rand.Seed(time.Now().UnixNano())
}
S
struct

BatchPayload

v1/syncbus/redis/redis.go:30-36
type BatchPayload struct

Fields

Name Type Description
Nonce string json:"n"
Keys []string json:"k"
Regions []string json:"r,omitempty"
Vectors []map[string]uint64 json:"v,omitempty"
Scopes []syncbus.Scope json:"s,omitempty"
S
struct

redisSubscription

v1/syncbus/redis/redis.go:38-41
type redisSubscription struct

Fields

Name Type Description
pubsub *redis.PubSub
chans []chan syncbus.Event
S
struct

publishReq

v1/syncbus/redis/redis.go:43-48
type publishReq struct

Fields

Name Type Description
ctx context.Context
key string
opts syncbus.PublishOptions
resp chan error
S
struct

RedisBus

RedisBus implements Bus using a Redis backend with batching support.

v1/syncbus/redis/redis.go:51-64
type RedisBus struct

Methods

Publish
Method

Publish implements Bus.Publish.

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*RedisBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	options := syncbus.PublishOptions{}
	for _, opt := range opts {
		opt(&options)
	}

	b.mu.Lock()
	if _, ok := b.pending[key]; ok {
		b.mu.Unlock()
		return nil
	}
	b.pending[key] = struct{}{}
	b.mu.Unlock()

	cleanup := func() {
		b.mu.Lock()
		delete(b.pending, key)
		b.mu.Unlock()
	}

	resp := make(chan error, 1)
	select {
	case b.publishCh <- publishReq{ctx: ctx, key: key, opts: options, resp: resp}:
		select {
		case err := <-resp:
			cleanup()
			if err == nil {
				b.published.Add(1)
			}
			return err
		case <-ctx.Done():
			cleanup()
			if ctx.Err() == context.DeadlineExceeded {
				return warperrors.ErrTimeout
			}
			return ctx.Err()
		}
	case <-ctx.Done():
		cleanup()
		if ctx.Err() == context.DeadlineExceeded {
			return warperrors.ErrTimeout
		}
		return ctx.Err()
	}
}

PublishAndAwait implements Bus.PublishAndAwait using Redis publish counts.

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*RedisBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	if replicas <= 0 {
		replicas = 1
	}

	val, err := b.client.Publish(ctx, key, uuid.NewString()).Result()
	if err != nil {
		return err
	}
	if int(val) < replicas {
		return syncbus.ErrQuorumNotSatisfied
	}
	b.published.Add(1)
	return nil
}

PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*RedisBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	return b.PublishAndAwait(ctx, key, minZones, opts...)
}
Subscribe
Method

Subscribe implements Bus.Subscribe.

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*RedisBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	ch := make(chan syncbus.Event, 1)
	b.mu.Lock()
	defer b.mu.Unlock()

	sub := b.subs[key]
	if sub == nil {
		ps := b.client.Subscribe(ctx, key)
		if _, err := ps.Receive(ctx); err != nil {
			return nil, err
		}
		sub = &redisSubscription{pubsub: ps, chans: []chan syncbus.Event{ch}}
		b.subs[key] = sub
		go b.dispatch(key, sub)
	} else {
		sub.chans = append(sub.chans, ch)
	}

	go func() {
		<-ctx.Done()
		_ = b.Unsubscribe(context.Background(), key, ch)
	}()
	return ch, nil
}
checkSeen
Method

Parameters

payload string

Returns

bool
func (*RedisBus) checkSeen(payload string) bool
{
	b.mu.Lock()
	defer b.mu.Unlock()
	if _, ok := b.seen[payload]; ok {
		return true
	}
	b.seen[payload] = time.Now()
	return false
}
cleanupSeen
Method
func (*RedisBus) cleanupSeen()
{
	defer b.wg.Done()
	ticker := time.NewTicker(1 * time.Minute)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			b.mu.Lock()
			now := time.Now()
			for k, t := range b.seen {
				if now.Sub(t) > 1*time.Minute {
					delete(b.seen, k)
				}
			}
			b.mu.Unlock()
		case <-b.closeCh:
			return
		}
	}
}
dispatch
Method

Parameters

key string
func (*RedisBus) dispatch(key string, sub *redisSubscription)
{
	ch := sub.pubsub.Channel()
	for msg := range ch { // Loop terminates when channel is closed
		if b.checkSeen(msg.Payload) {
			continue
		}

		b.mu.Lock()
		chans := append([]chan syncbus.Event(nil), sub.chans...)
		b.mu.Unlock()

		evt := syncbus.Event{Key: key}
		for _, c := range chans {
			select {
			case c <- evt:
				b.delivered.Add(1)
			default:
			}
		}
	}
}
func (*RedisBus) dispatchGlobal()
{
	defer b.wg.Done()
	b.mu.Lock()
	client := b.client
	b.mu.Unlock()

	// Robust subscription loop
	for {
		// Refresh client in case of reconnection
		b.mu.Lock()
		client = b.client
		b.mu.Unlock()

		ps := client.Subscribe(context.Background(), globalUpdateChannel)

		ch := ps.Channel()

	loop:
		for {
			select {
			case msg, ok := <-ch:
				if !ok {
					// Channel closed, connection lost
					break loop
				}
				if msg == nil {
					continue
				}
				if b.checkSeen(msg.Payload) {
					continue
				}

				// Decode payload
				reader := bytes.NewReader([]byte(msg.Payload))
				gz, err := gzip.NewReader(reader)
				if err != nil {
					continue
				}
				var batch BatchPayload
				if err := json.NewDecoder(gz).Decode(&batch); err != nil {
					gz.Close()
					continue
				}
				gz.Close()

				for i, key := range batch.Keys {
					b.mu.Lock()
					sub, ok := b.subs[key]
					if !ok {
						b.mu.Unlock()
						continue
					}
					chans := append([]chan syncbus.Event(nil), sub.chans...)
					b.mu.Unlock()

					evt := syncbus.Event{
						Key: key,
					}
					if i < len(batch.Regions) {
						evt.Region = batch.Regions[i]
					}
					if i < len(batch.Vectors) {
						evt.VectorClock = batch.Vectors[i]
					}
					if i < len(batch.Scopes) {
						evt.Scope = batch.Scopes[i]
					}

					for _, c := range chans {
						select {
						case c <- evt:
							b.delivered.Add(1)
						default:
						}
					}
				}

			case <-b.closeCh:
				ps.Close()
				return
			}
		}

		ps.Close()

		// If we are here, connection was lost. Wait a bit and try to get new client.
		select {
		case <-b.closeCh:
			return
		case <-time.After(10 * time.Millisecond): // Reduced delay
			// Refresh client pointer
			b.mu.Lock()
			client = b.client
			b.mu.Unlock()
		}
	}
}
runBatcher
Method
func (*RedisBus) runBatcher()
{
	defer b.wg.Done()
	ticker := time.NewTicker(batchTicker)
	defer ticker.Stop()

	var batch []publishReq

	flush := func() {
		if len(batch) == 0 {
			return
		}

		// Deduplicate keys in the batch
		seen := make(map[string]struct{})
		uniqueBatch := make([]publishReq, 0, len(batch))

		for _, req := range batch {
			if _, ok := seen[req.key]; !ok {
				seen[req.key] = struct{}{}
				uniqueBatch = append(uniqueBatch, req)
			}
		}

		payload := BatchPayload{
			Nonce:   uuid.NewString(), // Ensure payload is unique even for same keys
			Keys:    make([]string, len(uniqueBatch)),
			Regions: make([]string, len(uniqueBatch)),
			Vectors: make([]map[string]uint64, len(uniqueBatch)),
			Scopes:  make([]syncbus.Scope, len(uniqueBatch)),
		}

		for i, req := range uniqueBatch {
			payload.Keys[i] = req.key
			payload.Regions[i] = req.opts.Region
			payload.Vectors[i] = req.opts.VectorClock
			payload.Scopes[i] = req.opts.Scope
		}

		// Compress and send
		var buf bytes.Buffer
		gz := gzip.NewWriter(&buf)
		if err := json.NewEncoder(gz).Encode(payload); err != nil {
			// Fail all requests
			for _, req := range batch {
				req.resp <- err
			}
			batch = nil
			gz.Close() // Ensure gzip writer is closed even on encode error
			return
		}
		gz.Close()

		b.mu.Lock()
		client := b.client
		b.mu.Unlock()

		// Publish to global channel
		err := client.Publish(context.Background(), globalUpdateChannel, buf.String()).Err()

		// If error (e.g. closed), try to reconnect once
		if err != nil {
			_ = b.reconnect()
			// Refresh client
			b.mu.Lock()
			client = b.client
			b.mu.Unlock()

			// Add jitter to increase deduplication window and reduce stampedes
			if j := rand.Int63n(int64(100 * time.Millisecond)); j > 0 {
				select {
				case <-b.closeCh:
					// If closeCh is signaled during jitter, return early
					return
				case <-time.After(time.Duration(j)):
					// Continue after jitter
				}
			}
			err = client.Publish(context.Background(), globalUpdateChannel, buf.String()).Err()
		}

		for _, req := range batch {
			req.resp <- err
		}
		batch = nil
	}

	for {
		select {
		case req := <-b.publishCh:
			batch = append(batch, req)
			if len(batch) >= batchThreshold {
				flush()
			}
		case <-ticker.C:
			flush()
		case <-b.closeCh:
			flush() // Flush remaining
			return
		}
	}
}
Unsubscribe
Method

Unsubscribe implements Bus.Unsubscribe.

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*RedisBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	b.mu.Lock()
	defer b.mu.Unlock()

	sub := b.subs[key]
	if sub == nil {
		return nil
	}
	for i, c := range sub.chans {
		if c == ch {
			sub.chans[i] = sub.chans[len(sub.chans)-1]
			sub.chans = sub.chans[:len(sub.chans)-1]
			close(c)
			break
		}
	}
	if len(sub.chans) == 0 {
		delete(b.subs, key)
		if sub.pubsub != nil {
			return sub.pubsub.Close()
		}
	}
	return nil
}
RevokeLease
Method

RevokeLease publishes a lease revocation event.

Parameters

id string

Returns

error
func (*RedisBus) RevokeLease(ctx context.Context, id string) error
{
	return b.Publish(ctx, "lease:"+id)
}

SubscribeLease subscribes to lease revocation events.

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*RedisBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return b.Subscribe(ctx, "lease:"+id)
}

UnsubscribeLease cancels a lease revocation subscription.

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*RedisBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Metrics
Method

Metrics returns the published and delivered counts.

Returns

func (*RedisBus) Metrics() syncbus.Metrics
{
	return syncbus.Metrics{
		Published: b.published.Load(),
		Delivered: b.delivered.Load(),
	}
}
reconnect
Method

reconnect establishes a new connection/subscription (stub for tests).

Returns

error
func (*RedisBus) reconnect() error
{
	// In this implementation using go-redis, the client handles reconnection mostly.
	// But we might need to resubscribe.
	// For tests that close the client, we need to try to recreate it if we have options.
	// Note: b.client might be closed, so Options() might behave weirdly if internal state is gone,
	// but usually Options() just returns the config struct.

	b.mu.Lock()
	defer b.mu.Unlock()

	if b.client != nil {
		opts := b.client.Options()
		b.client = redis.NewClient(opts)
	}

	for key, sub := range b.subs {
		if sub.pubsub != nil {
			_ = sub.pubsub.Close()
		}
		ps := b.client.Subscribe(context.Background(), key)
		// Wait for subscription to be established to avoid race with Publish
		if _, err := ps.Receive(context.Background()); err != nil {
			_ = ps.Close()
			continue
		}
		sub.pubsub = ps
		go b.dispatch(key, sub)
	}
	return nil
}
IsHealthy
Method

IsHealthy implements Bus.IsHealthy.

Returns

bool
func (*RedisBus) IsHealthy() bool
{
	return true
}
Peers
Method

Peers implements Bus.Peers.

Returns

[]string
func (*RedisBus) Peers() []string
{
	return nil
}
Close
Method

Close releases resources used by the RedisBus.

Returns

error
func (*RedisBus) Close() error
{
	close(b.closeCh)

	b.mu.Lock()
	var subs []*redis.PubSub
	for _, sub := range b.subs {
		if sub.pubsub != nil {
			subs = append(subs, sub.pubsub)
		}
	}
	// Clear the map to ensure no further operations can use the subscriptions
	b.subs = make(map[string]*redisSubscription)
	b.mu.Unlock()

	for _, ps := range subs {
		_ = ps.Close()
	}

	b.wg.Wait()
	return nil
}

Fields

Name Type Description
client *redis.Client
region string
mu sync.Mutex
subs map[string]*redisSubscription
pending map[string]struct{}
seen map[string]time.Time
published atomic.Uint64
delivered atomic.Uint64
publishCh chan publishReq
closeCh chan struct{}
wg sync.WaitGroup
S
struct

RedisBusOptions

RedisBusOptions configures the RedisBus.

v1/syncbus/redis/redis.go:67-70
type RedisBusOptions struct

Fields

Name Type Description
Client *redis.Client
Region string
F
function

NewRedisBus

NewRedisBus returns a new RedisBus using the provided client.

Parameters

Returns

v1/syncbus/redis/redis.go:73-88
func NewRedisBus(opts RedisBusOptions) *RedisBus

{
	b := &RedisBus{
		client:    opts.Client,
		region:    opts.Region,
		subs:      make(map[string]*redisSubscription),
		pending:   make(map[string]struct{}),
		seen:      make(map[string]time.Time),
		publishCh: make(chan publishReq, 1000),
		closeCh:   make(chan struct{}),
	}
	b.wg.Add(3)
	go b.runBatcher()
	go b.dispatchGlobal()
	go b.cleanupSeen()
	return b
}
F
function

newRedisBus

Parameters

v1/syncbus/redis/redis_test.go:17-54
func newRedisBus(t *testing.T) (*RedisBus, context.Context)

{
	t.Helper()
	addr := os.Getenv("WARP_TEST_REDIS_ADDR")
	forceReal := os.Getenv("WARP_TEST_FORCE_REAL") == "true"
	var client *redis.Client
	var mr *miniredis.Miniredis

	if forceReal && addr == "" {
		t.Fatal("WARP_TEST_FORCE_REAL is true but WARP_TEST_REDIS_ADDR is empty")
	}

	if addr != "" {
		t.Logf("TestRedisBus: using real Redis at %s", addr)
		client = redis.NewClient(&redis.Options{Addr: addr})
	} else {
		t.Log("TestRedisBus: using miniredis")
		var err error
		mr, err = miniredis.Run()
		if err != nil {
			t.Fatalf("miniredis run: %v", err)
		}
		client = redis.NewClient(&redis.Options{Addr: mr.Addr()})
	}

	bus := NewRedisBus(RedisBusOptions{Client: client})
	ctx := context.Background()
	t.Cleanup(func() {
		_ = bus.Close()
		if addr != "" {
			_ = client.FlushAll(context.Background()).Err()
			_ = client.Close()
		} else {
			_ = client.Close()
			mr.Close()
		}
	})
	return bus, ctx
}
F
function

TestRedisBusPublishSubscribeFlowAndMetrics

Parameters

v1/syncbus/redis/redis_test.go:56-77
func TestRedisBusPublishSubscribeFlowAndMetrics(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	if err := bus.Publish(ctx, "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestRedisBusContextBasedUnsubscribe

Parameters

v1/syncbus/redis/redis_test.go:79-100
func TestRedisBusContextBasedUnsubscribe(t *testing.T)

{
	bus, _ := newRedisBus(t)
	subCtx, cancel := context.WithCancel(context.Background())
	ch, err := bus.Subscribe(subCtx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	cancel()
	select {
	case _, ok := <-ch:
		if ok {
			t.Fatal("expected channel closed")
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for unsubscribe")
	}
	bus.mu.Lock()
	defer bus.mu.Unlock()
	if _, ok := bus.subs["key"]; ok {
		t.Fatal("subscription still present after context cancel")
	}
}
F
function

TestRedisBusDeduplicatePendingKeys

Parameters

v1/syncbus/redis/redis_test.go:102-126
func TestRedisBusDeduplicatePendingKeys(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	bus.mu.Lock()
	bus.pending["key"] = struct{}{}
	bus.mu.Unlock()
	if err := bus.Publish(ctx, "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case <-ch:
		t.Fatal("unexpected publish when key pending")
	default:
	}
	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestRedisBusReconnectAfterClose

Parameters

v1/syncbus/redis/redis_test.go:128-150
func TestRedisBusReconnectAfterClose(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	_ = bus.client.Close()
	if err := bus.Publish(ctx, "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestRedisBusReconnectLoop

Parameters

v1/syncbus/redis/redis_test.go:152-174
func TestRedisBusReconnectLoop(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	_ = bus.client.Close()
	done := make(chan error, 1)
	go func() { done <- bus.Publish(ctx, "key") }()
	select {
	case err := <-done:
		if err != nil {
			t.Fatalf("publish: %v", err)
		}
	case <-time.After(2 * time.Second):
		t.Fatal("publish timeout")
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
}
F
function

TestRedisBusIdempotentAfterReconnect

Parameters

v1/syncbus/redis/redis_test.go:176-203
func TestRedisBusIdempotentAfterReconnect(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	id := uuid.NewString()
	if err := bus.client.Publish(ctx, "key", id).Err(); err != nil {
		t.Fatalf("direct publish: %v", err)
	}
	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}
	_ = bus.client.Close()
	if err := bus.reconnect(); err != nil {
		t.Fatalf("reconnect: %v", err)
	}
	if err := bus.client.Publish(ctx, "key", id).Err(); err != nil {
		t.Fatalf("dup publish: %v", err)
	}
	select {
	case <-ch:
		t.Fatal("duplicate delivered")
	case <-time.After(200 * time.Millisecond):
	}
}
F
function

TestRedisBusTimeout

Parameters

v1/syncbus/redis/redis_test.go:205-213
func TestRedisBusTimeout(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	tCtx, cancel := context.WithTimeout(ctx, time.Nanosecond)
	defer cancel()
	time.Sleep(time.Millisecond)
	if err := bus.Publish(tCtx, "key"); !errors.Is(err, warperrors.ErrTimeout) {
		t.Fatalf("expected timeout error, got %v", err)
	}
}