syncbus API

syncbus

package

API reference for the syncbus package.

S
struct

mockBus

v1/syncbus/circuit_breaker_test.go:10-14
type mockBus struct

Methods

Publish
Method

Parameters

key string
opts ...PublishOption

Returns

error
func (*mockBus) Publish(ctx context.Context, key string, opts ...PublishOption) error
{
	if m.publishFunc != nil {
		return m.publishFunc(ctx, key, opts...)
	}
	return m.InMemoryBus.Publish(ctx, key, opts...)
}
IsHealthy
Method

Returns

bool
func (*mockBus) IsHealthy() bool
{ return m.isHealthy }

Fields

Name Type Description
publishFunc func(ctx context.Context, key string, opts ...PublishOption) error
isHealthy bool
F
function

TestCircuitBreaker_StateTransitions

Parameters

v1/syncbus/circuit_breaker_test.go:25-83
func TestCircuitBreaker_StateTransitions(t *testing.T)

{
	mb := &mockBus{InMemoryBus: NewInMemoryBus(), isHealthy: true}
	threshold := 2
	timeout := 50 * time.Millisecond
	cb := NewCircuitBreaker(mb, threshold, timeout)

	ctx := context.Background()
	failErr := errors.New("fail")

	if !cb.IsHealthy() {
		t.Fatal("expected healthy initially")
	}

	mb.publishFunc = func(ctx context.Context, key string, opts ...PublishOption) error { return failErr }
	if err := cb.Publish(ctx, "key"); err != failErr {
		t.Fatalf("expected failErr, got %v", err)
	}
	if !cb.IsHealthy() {
		t.Fatal("expected healthy after 1 failure (threshold 2)")
	}

	if err := cb.Publish(ctx, "key"); err != failErr {
		t.Fatalf("expected failErr, got %v", err)
	}
	if cb.IsHealthy() {
		t.Fatal("expected unhealthy/open after threshold reached")
	}
	if err := cb.Publish(ctx, "key"); err != ErrCircuitOpen {
		t.Fatalf("expected ErrCircuitOpen, got %v", err)
	}

	time.Sleep(timeout + 10*time.Millisecond)

	mb.publishFunc = func(ctx context.Context, key string, opts ...PublishOption) error { return nil }
	if err := cb.Publish(ctx, "key"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if !cb.IsHealthy() {
		t.Fatal("expected healthy after success")
	}

	mb.publishFunc = func(ctx context.Context, key string, opts ...PublishOption) error { return failErr }
	cb.Publish(ctx, "key")
	cb.Publish(ctx, "key")
	if cb.IsHealthy() {
		t.Fatal("expected open")
	}

	time.Sleep(timeout + 10*time.Millisecond)
	if err := cb.Publish(ctx, "key"); err != failErr {
		t.Fatalf("expected failErr, got %v", err)
	}
	if cb.IsHealthy() {
		t.Fatal("expected open after half-open failure")
	}
	if err := cb.Publish(ctx, "key"); err != ErrCircuitOpen {
		t.Fatalf("expected ErrCircuitOpen, got %v", err)
	}
}
F
function

TestCircuitBreaker_Passthrough

Parameters

v1/syncbus/circuit_breaker_test.go:85-106
func TestCircuitBreaker_Passthrough(t *testing.T)

{
	// Re-using InMemoryBus directly embedded in mock
	mb := &mockBus{InMemoryBus: NewInMemoryBus(), isHealthy: true}
	cb := NewCircuitBreaker(mb, 5, time.Minute)

	ctx := context.Background()
	// Test Publish
	if err := cb.Publish(ctx, "foo"); err != nil {
		t.Fatal(err)
	}

	// Ensure it went through to underlying bus
	sub, _ := mb.InMemoryBus.Subscribe(ctx, "foo")
	go func() {
		cb.Publish(ctx, "foo")
	}()
	select {
	case <-sub:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for message on underlying bus")
	}
}
T
type

Scope

Scope defines the propagation scope of an event.

v1/syncbus/syncbus.go:17-17
type Scope uint8
S
struct

PublishOptions

v1/syncbus/syncbus.go:24-28
type PublishOptions struct

Fields

Name Type Description
Region string
VectorClock map[string]uint64
Scope Scope
T
type

PublishOption

v1/syncbus/syncbus.go:30-30
type PublishOption func(*PublishOptions)
F
function

WithRegion

Parameters

region
string

Returns

v1/syncbus/syncbus.go:32-36
func WithRegion(region string) PublishOption

{
	return func(o *PublishOptions) {
		o.Region = region
	}
}
F
function

WithVectorClock

Parameters

vc
map[string]uint64

Returns

v1/syncbus/syncbus.go:38-42
func WithVectorClock(vc map[string]uint64) PublishOption

{
	return func(o *PublishOptions) {
		o.VectorClock = vc
	}
}
F
function

WithScope

Parameters

scope

Returns

v1/syncbus/syncbus.go:44-48
func WithScope(scope Scope) PublishOption

{
	return func(o *PublishOptions) {
		o.Scope = scope
	}
}
S
struct

Event

Event represents a bus event carrying metadata.

v1/syncbus/syncbus.go:51-56
type Event struct

Fields

Name Type Description
Key string
Region string
VectorClock map[string]uint64
Scope Scope
I
interface

Bus

Bus provides a simple pub/sub mechanism used by warp to propagate
invalidation events across nodes.

v1/syncbus/syncbus.go:60-71
type Bus interface

Methods

Publish
Method

Parameters

key string
opts ...PublishOption

Returns

error
func Publish(...)

Parameters

key string
replicas int
opts ...PublishOption

Returns

error
func PublishAndAwait(...)

Parameters

key string
minZones int
opts ...PublishOption

Returns

error
func PublishAndAwaitTopology(...)
Subscribe
Method

Parameters

key string

Returns

<-chan Event
error
func Subscribe(...)
Unsubscribe
Method

Parameters

key string
ch <-chan Event

Returns

error
func Unsubscribe(...)
RevokeLease
Method

Parameters

id string

Returns

error
func RevokeLease(...)

Parameters

id string

Returns

<-chan Event
error
func SubscribeLease(...)

Parameters

id string
ch <-chan Event

Returns

error
func UnsubscribeLease(...)
IsHealthy
Method

Returns

bool
func IsHealthy(...)
Peers
Method

Returns

[]string
func Peers(...)
S
struct
Implements: Bus

InMemoryBus

InMemoryBus is a local implementation of Bus mainly for testing.

v1/syncbus/syncbus.go:74-79
type InMemoryBus struct

Methods

IsHealthy
Method

IsHealthy implements Bus.IsHealthy.

Returns

bool
func (*InMemoryBus) IsHealthy() bool
{
	return true
}
Peers
Method

Peers implements Bus.Peers.

Returns

[]string
func (*InMemoryBus) Peers() []string
{
	return nil
}
Publish
Method

Publish implements Bus.Publish.

Parameters

key string
opts ...PublishOption

Returns

error
func (*InMemoryBus) Publish(ctx context.Context, key string, opts ...PublishOption) error
{
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	options := PublishOptions{}
	for _, opt := range opts {
		opt(&options)
	}

	// Check and Set Pending (Deduplication)
	var alreadyPending bool
	b.pending.Compute(key, func(v bool, exists bool) bool {
		alreadyPending = exists
		return true
	})

	if alreadyPending {
		return nil
	}
	defer b.pending.Delete(key)

	chans, _ := b.subs.Get(key)
	if len(chans) == 0 {
		return nil
	}

	evt := Event{
		Key:         key,
		Region:      options.Region,
		VectorClock: options.VectorClock,
	}

	b.published.Add(1)
	for _, ch := range chans {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case ch <- evt:
			b.delivered.Add(1)
		default:
		}
	}

	return nil
}

PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.

Parameters

key string
minZones int
opts ...PublishOption

Returns

error
func (*InMemoryBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...PublishOption) error
{
	// In-memory simulation: treat zones as simple replica count for now
	return b.PublishAndAwait(ctx, key, minZones, opts...)
}

PublishAndAwait implements Bus.PublishAndAwait.

Parameters

key string
replicas int
opts ...PublishOption

Returns

error
func (*InMemoryBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...PublishOption) error
{
	if replicas <= 0 {
		replicas = 1
	}
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	options := PublishOptions{}
	for _, opt := range opts {
		opt(&options)
	}

	var alreadyPending bool
	b.pending.Compute(key, func(v bool, exists bool) bool {
		alreadyPending = exists
		return true
	})

	if alreadyPending {
		return nil
	}
	defer b.pending.Delete(key)

	chans, _ := b.subs.Get(key)
	if len(chans) < replicas {
		return ErrQuorumNotSatisfied
	}

	evt := Event{
		Key:         key,
		Region:      options.Region,
		VectorClock: options.VectorClock,
	}

	delivered := 0
	for _, ch := range chans {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case ch <- evt:
			b.delivered.Add(1)
			delivered++
		default:
		}
	}

	if delivered < replicas {
		return ErrQuorumNotSatisfied
	}
	b.published.Add(1)
	return nil
}
Subscribe
Method

Subscribe implements Bus.Subscribe.

Parameters

key string

Returns

<-chan Event
error
func (*InMemoryBus) Subscribe(ctx context.Context, key string) (<-chan Event, error)
{
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

	ch := make(chan Event, 1) // Buffer 1
	b.subs.Compute(key, func(v []chan Event, exists bool) []chan Event {
		return append(v, ch)
	})

	go func() {
		<-ctx.Done()
		_ = b.Unsubscribe(context.Background(), key, ch)
	}()
	return ch, nil
}
Unsubscribe
Method

Unsubscribe implements Bus.Unsubscribe.

Parameters

key string
ch <-chan Event

Returns

error
func (*InMemoryBus) Unsubscribe(ctx context.Context, key string, ch <-chan Event) error
{
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	b.subs.Compute(key, func(subs []chan Event, exists bool) []chan Event {
		if !exists {
			return nil
		}
		for i, c := range subs {
			if c == ch {
				// Remove (swap with last)
				subs[i] = subs[len(subs)-1]
				subs = subs[:len(subs)-1]
				close(c)
				break
			}
		}
		if len(subs) == 0 {
			return subs
		}
		return subs
	})

	return nil
}
RevokeLease
Method

RevokeLease publishes a lease revocation event.

Parameters

id string

Returns

error
func (*InMemoryBus) RevokeLease(ctx context.Context, id string) error
{
	return b.Publish(ctx, "lease:"+id)
}

SubscribeLease subscribes to lease revocation events.

Parameters

id string

Returns

<-chan Event
error
func (*InMemoryBus) SubscribeLease(ctx context.Context, id string) (<-chan Event, error)
{
	return b.Subscribe(ctx, "lease:"+id)
}

UnsubscribeLease cancels a lease revocation subscription.

Parameters

id string
ch <-chan Event

Returns

error
func (*InMemoryBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan Event) error
{
	return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Metrics
Method

Returns

func (*InMemoryBus) Metrics() Metrics
{
	return Metrics{
		Published: b.published.Load(),
		Delivered: b.delivered.Load(),
	}
}

Fields

F
function

NewInMemoryBus

NewInMemoryBus returns a new InMemoryBus.

Returns

v1/syncbus/syncbus.go:82-87
func NewInMemoryBus() *InMemoryBus

{
	return &InMemoryBus{
		subs:    safemap.NewSharded[string, []chan Event](safemap.StringHasher, 32),
		pending: safemap.NewSharded[string, bool](safemap.StringHasher, 32),
	}
}
S
struct

Metrics

v1/syncbus/syncbus.go:277-280
type Metrics struct

Fields

Name Type Description
Published uint64
Delivered uint64
F
function

BenchmarkPublish_InMemory

Parameters

v1/syncbus/syncbus_bench_test.go:9-24
func BenchmarkPublish_InMemory(b *testing.B)

{
	bus := NewInMemoryBus()
	ctx := context.Background()
	key := "bench-key"

	// Setup subscribers
	numSubs := 10
	for i := 0; i < numSubs; i++ {
		_, _ = bus.Subscribe(ctx, key)
	}

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_ = bus.Publish(ctx, key)
	}
}
F
function

BenchmarkPublish_Parallel_InMemory

Parameters

v1/syncbus/syncbus_bench_test.go:26-43
func BenchmarkPublish_Parallel_InMemory(b *testing.B)

{
	bus := NewInMemoryBus()
	ctx := context.Background()
	key := "bench-key"

	// Setup subscribers
	numSubs := 10
	for i := 0; i < numSubs; i++ {
		_, _ = bus.Subscribe(ctx, key)
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			_ = bus.Publish(ctx, key)
		}
	})
}
F
function

BenchmarkSubscribe_InMemory

Parameters

v1/syncbus/syncbus_bench_test.go:45-54
func BenchmarkSubscribe_InMemory(b *testing.B)

{
	bus := NewInMemoryBus()
	ctx := context.Background()

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		key := fmt.Sprintf("key-%d", i)
		_, _ = bus.Subscribe(ctx, key)
	}
}
F
function

TestPublishSubscribeFlowAndMetrics

Parameters

v1/syncbus/syncbus_test.go:9-36
func TestPublishSubscribeFlowAndMetrics(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	if err := bus.Publish(context.Background(), "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}

	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for publish")
	}

	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestPublishAndAwaitQuorumSatisfied

Parameters

v1/syncbus/syncbus_test.go:38-73
func TestPublishAndAwaitQuorumSatisfied(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx := context.Background()
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	done := make(chan struct{})
	go func() {
		if err := bus.PublishAndAwait(ctx, "key", 1); err != nil {
			t.Errorf("publish and await: %v", err)
		}
		close(done)
	}()

	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for quorum publish")
	}

	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("publish and await did not return")
	}

	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestPublishAndAwaitQuorumNotSatisfied

Parameters

v1/syncbus/syncbus_test.go:75-88
func TestPublishAndAwaitQuorumNotSatisfied(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx := context.Background()
	if err := bus.PublishAndAwait(ctx, "key", 2); err != ErrQuorumNotSatisfied {
		t.Fatalf("expected quorum error, got %v", err)
	}
	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestContextBasedUnsubscribe

Parameters

v1/syncbus/syncbus_test.go:90-124
func TestContextBasedUnsubscribe(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx, cancel := context.WithCancel(context.Background())
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	cancel()

	select {
	case _, ok := <-ch:
		if ok {
			t.Fatal("expected channel closed")
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for unsubscribe")
	}

	// Poll for cleanup
	deadline := time.Now().Add(time.Second)
	cleaned := false
	for time.Now().Before(deadline) {
		subs, _ := bus.subs.Get("key")
		if len(subs) == 0 {
			cleaned = true
			break
		}
		time.Sleep(10 * time.Millisecond)
	}

	if !cleaned {
		t.Fatal("subscription still present after context cancel")
	}
}
F
function

TestDeduplicatePendingKeys

Parameters

v1/syncbus/syncbus_test.go:126-154
func TestDeduplicatePendingKeys(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx := context.Background()
	ch, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	// Manually set pending
	bus.pending.Set("key", true)

	if err := bus.Publish(context.Background(), "key"); err != nil {
		t.Fatalf("publish: %v", err)
	}

	select {
	case <-ch:
		t.Fatal("unexpected publish when key pending")
	default:
	}

	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestPublishContextCanceled

Parameters

v1/syncbus/syncbus_test.go:156-170
func TestPublishContextCanceled(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if err := bus.Publish(ctx, "key"); err == nil {
		t.Fatal("expected publish error due to canceled context")
	}
	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestSubscribeContextCanceled

Parameters

v1/syncbus/syncbus_test.go:172-186
func TestSubscribeContextCanceled(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if _, err := bus.Subscribe(ctx, "key"); err == nil {
		t.Fatal("expected subscribe error due to canceled context")
	}

	if bus.subs.Has("key") {
		subs, _ := bus.subs.Get("key")
		if len(subs) > 0 {
			t.Fatal("subscription should not be added when context is canceled")
		}
	}
}
F
function

TestUnsubscribeContextCanceled

Parameters

v1/syncbus/syncbus_test.go:188-218
func TestUnsubscribeContextCanceled(t *testing.T)

{
	bus := NewInMemoryBus()
	ch, err := bus.Subscribe(context.Background(), "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if err := bus.Unsubscribe(ctx, "key", ch); err == nil {
		t.Fatal("expected unsubscribe error due to canceled context")
	}

	if !bus.subs.Has("key") {
		t.Fatal("subscription subscription should remain when unsubscribe context is canceled")
	}
	subs, _ := bus.subs.Get("key")
	found := false
	for _, c := range subs {
		if c == ch {
			found = true
			break
		}
	}
	if !found {
		t.Fatal("subscription channel missing")
	}

	if err := bus.Unsubscribe(context.Background(), "key", ch); err != nil {
		t.Fatalf("cleanup unsubscribe: %v", err)
	}
}
F
function

TestLeaseRevokeFlowAndMetrics

Parameters

v1/syncbus/syncbus_test.go:220-245
func TestLeaseRevokeFlowAndMetrics(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx := context.Background()
	ch, err := bus.SubscribeLease(ctx, "id")
	if err != nil {
		t.Fatalf("subscribe lease: %v", err)
	}

	if err := bus.RevokeLease(context.Background(), "id"); err != nil {
		t.Fatalf("revoke lease: %v", err)
	}

	select {
	case <-ch:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for revoke lease")
	}

	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestSubscribeLeaseContextCanceled

Parameters

v1/syncbus/syncbus_test.go:247-259
func TestSubscribeLeaseContextCanceled(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if _, err := bus.SubscribeLease(ctx, "id"); err == nil {
		t.Fatal("expected subscribe lease error due to canceled context")
	}

	subs, _ := bus.subs.Get("lease:id")
	if len(subs) > 0 {
		t.Fatal("subscription should not be added when context is canceled")
	}
}
F
function

TestUnsubscribeLeaseClosesChannel

Parameters

v1/syncbus/syncbus_test.go:261-292
func TestUnsubscribeLeaseClosesChannel(t *testing.T)

{
	bus := NewInMemoryBus()
	ch, err := bus.SubscribeLease(context.Background(), "id")
	if err != nil {
		t.Fatalf("subscribe lease: %v", err)
	}
	if err := bus.UnsubscribeLease(context.Background(), "id", ch); err != nil {
		t.Fatalf("unsubscribe lease: %v", err)
	}

	select {
	case _, ok := <-ch:
		if ok {
			t.Fatal("expected channel closed")
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for unsubscribe lease")
	}

	subs, _ := bus.subs.Get("lease:id")
	if len(subs) > 0 {
		t.Fatal("subscription still present after unsubscribe lease")
	}

	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestRevokeLeaseContextCanceled

Parameters

v1/syncbus/syncbus_test.go:294-308
func TestRevokeLeaseContextCanceled(t *testing.T)

{
	bus := NewInMemoryBus()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if err := bus.RevokeLease(ctx, "id"); err == nil {
		t.Fatal("expected revoke lease error due to canceled context")
	}
	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
	if metrics.Delivered != 0 {
		t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
	}
}
F
function

TestUnsubscribeLeaseContextCanceled

Parameters

v1/syncbus/syncbus_test.go:310-330
func TestUnsubscribeLeaseContextCanceled(t *testing.T)

{
	bus := NewInMemoryBus()
	ch, err := bus.SubscribeLease(context.Background(), "id")
	if err != nil {
		t.Fatalf("subscribe lease: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if err := bus.UnsubscribeLease(ctx, "id", ch); err == nil {
		t.Fatal("expected unsubscribe lease error due to canceled context")
	}

	subs, _ := bus.subs.Get("lease:id")
	if len(subs) == 0 {
		t.Fatal("subscription should remain when unsubscribe lease context is canceled")
	}

	if err := bus.UnsubscribeLease(context.Background(), "id", ch); err != nil {
		t.Fatalf("cleanup unsubscribe lease: %v", err)
	}
}
S
struct

CircuitBreakerBus

CircuitBreakerBus decorates a Bus with circuit breaker logic.

v1/syncbus/circuit_breaker.go:14-17
type CircuitBreakerBus struct

Methods

IsHealthy
Method

IsHealthy returns true if the circuit is closed.

Returns

bool
func (*CircuitBreakerBus) IsHealthy() bool
{
	return cb.breaker.State() == resiliency.StateClosed
}
Publish
Method

Publish implements Bus.Publish with circuit breaker logic.

Parameters

key string
opts ...PublishOption

Returns

error
func (*CircuitBreakerBus) Publish(ctx context.Context, key string, opts ...PublishOption) error
{
	return cb.breaker.Execute(func() error {
		return cb.bus.Publish(ctx, key, opts...)
	})
}

PublishAndAwait implements Bus.PublishAndAwait with circuit breaker logic.

Parameters

key string
replicas int
opts ...PublishOption

Returns

error
func (*CircuitBreakerBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...PublishOption) error
{
	return cb.breaker.Execute(func() error {
		return cb.bus.PublishAndAwait(ctx, key, replicas, opts...)
	})
}

PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology with circuit breaker logic.

Parameters

key string
minZones int
opts ...PublishOption

Returns

error
func (*CircuitBreakerBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...PublishOption) error
{
	return cb.breaker.Execute(func() error {
		return cb.bus.PublishAndAwaitTopology(ctx, key, minZones, opts...)
	})
}
Subscribe
Method

Parameters

key string

Returns

<-chan Event
error
func (*CircuitBreakerBus) Subscribe(ctx context.Context, key string) (<-chan Event, error)
{
	return cb.bus.Subscribe(ctx, key)
}
Unsubscribe
Method

Parameters

key string
ch <-chan Event

Returns

error
func (*CircuitBreakerBus) Unsubscribe(ctx context.Context, key string, ch <-chan Event) error
{
	return cb.bus.Unsubscribe(ctx, key, ch)
}
RevokeLease
Method

Parameters

id string

Returns

error
func (*CircuitBreakerBus) RevokeLease(ctx context.Context, id string) error
{
	return cb.breaker.Execute(func() error {
		return cb.bus.RevokeLease(ctx, id)
	})
}

Parameters

id string

Returns

<-chan Event
error
func (*CircuitBreakerBus) SubscribeLease(ctx context.Context, id string) (<-chan Event, error)
{
	return cb.bus.SubscribeLease(ctx, id)
}

Parameters

id string
ch <-chan Event

Returns

error
func (*CircuitBreakerBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan Event) error
{
	return cb.bus.UnsubscribeLease(ctx, id, ch)
}

Fields

Name Type Description
bus Bus
breaker *resiliency.CircuitBreaker
F
function

NewCircuitBreaker

NewCircuitBreaker returns a new CircuitBreakerBus.

Parameters

bus
threshold
int
timeout
v1/syncbus/circuit_breaker.go:20-25
func NewCircuitBreaker(bus Bus, threshold int, timeout time.Duration) *CircuitBreakerBus

{
	return &CircuitBreakerBus{
		bus:     bus,
		breaker: resiliency.NewCircuitBreaker(threshold, timeout),
	}
}