syncbus
packageAPI reference for the syncbus
package.
Imports
(8)mockBus
type mockBus struct
Methods
Parameters
Returns
func (*mockBus) Publish(ctx context.Context, key string, opts ...PublishOption) error
{
if m.publishFunc != nil {
return m.publishFunc(ctx, key, opts...)
}
return m.InMemoryBus.Publish(ctx, key, opts...)
}
Fields
| Name | Type | Description |
|---|---|---|
| publishFunc | func(ctx context.Context, key string, opts ...PublishOption) error | |
| isHealthy | bool |
TestCircuitBreaker_StateTransitions
Parameters
func TestCircuitBreaker_StateTransitions(t *testing.T)
{
mb := &mockBus{InMemoryBus: NewInMemoryBus(), isHealthy: true}
threshold := 2
timeout := 50 * time.Millisecond
cb := NewCircuitBreaker(mb, threshold, timeout)
ctx := context.Background()
failErr := errors.New("fail")
if !cb.IsHealthy() {
t.Fatal("expected healthy initially")
}
mb.publishFunc = func(ctx context.Context, key string, opts ...PublishOption) error { return failErr }
if err := cb.Publish(ctx, "key"); err != failErr {
t.Fatalf("expected failErr, got %v", err)
}
if !cb.IsHealthy() {
t.Fatal("expected healthy after 1 failure (threshold 2)")
}
if err := cb.Publish(ctx, "key"); err != failErr {
t.Fatalf("expected failErr, got %v", err)
}
if cb.IsHealthy() {
t.Fatal("expected unhealthy/open after threshold reached")
}
if err := cb.Publish(ctx, "key"); err != ErrCircuitOpen {
t.Fatalf("expected ErrCircuitOpen, got %v", err)
}
time.Sleep(timeout + 10*time.Millisecond)
mb.publishFunc = func(ctx context.Context, key string, opts ...PublishOption) error { return nil }
if err := cb.Publish(ctx, "key"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !cb.IsHealthy() {
t.Fatal("expected healthy after success")
}
mb.publishFunc = func(ctx context.Context, key string, opts ...PublishOption) error { return failErr }
cb.Publish(ctx, "key")
cb.Publish(ctx, "key")
if cb.IsHealthy() {
t.Fatal("expected open")
}
time.Sleep(timeout + 10*time.Millisecond)
if err := cb.Publish(ctx, "key"); err != failErr {
t.Fatalf("expected failErr, got %v", err)
}
if cb.IsHealthy() {
t.Fatal("expected open after half-open failure")
}
if err := cb.Publish(ctx, "key"); err != ErrCircuitOpen {
t.Fatalf("expected ErrCircuitOpen, got %v", err)
}
}
TestCircuitBreaker_Passthrough
Parameters
func TestCircuitBreaker_Passthrough(t *testing.T)
{
// Re-using InMemoryBus directly embedded in mock
mb := &mockBus{InMemoryBus: NewInMemoryBus(), isHealthy: true}
cb := NewCircuitBreaker(mb, 5, time.Minute)
ctx := context.Background()
// Test Publish
if err := cb.Publish(ctx, "foo"); err != nil {
t.Fatal(err)
}
// Ensure it went through to underlying bus
sub, _ := mb.InMemoryBus.Subscribe(ctx, "foo")
go func() {
cb.Publish(ctx, "foo")
}()
select {
case <-sub:
case <-time.After(time.Second):
t.Fatal("timeout waiting for message on underlying bus")
}
}
Scope
Scope defines the propagation scope of an event.
type Scope uint8
PublishOptions
type PublishOptions struct
Fields
| Name | Type | Description |
|---|---|---|
| Region | string | |
| VectorClock | map[string]uint64 | |
| Scope | Scope |
Uses
PublishOption
type PublishOption func(*PublishOptions)
WithRegion
Parameters
Returns
func WithRegion(region string) PublishOption
{
return func(o *PublishOptions) {
o.Region = region
}
}
WithVectorClock
Parameters
Returns
func WithVectorClock(vc map[string]uint64) PublishOption
{
return func(o *PublishOptions) {
o.VectorClock = vc
}
}
WithScope
Parameters
Returns
func WithScope(scope Scope) PublishOption
{
return func(o *PublishOptions) {
o.Scope = scope
}
}
Event
Event represents a bus event carrying metadata.
type Event struct
Fields
| Name | Type | Description |
|---|---|---|
| Key | string | |
| Region | string | |
| VectorClock | map[string]uint64 | |
| Scope | Scope |
Uses
Bus
Bus provides a simple pub/sub mechanism used by warp to propagate
invalidation events across nodes.
type Bus interface
Methods
Parameters
Returns
func Subscribe(...)
Parameters
Returns
func SubscribeLease(...)
InMemoryBus
InMemoryBus is a local implementation of Bus mainly for testing.
type InMemoryBus struct
Methods
IsHealthy implements Bus.IsHealthy.
Returns
func (*InMemoryBus) IsHealthy() bool
{
return true
}
Peers implements Bus.Peers.
Returns
func (*InMemoryBus) Peers() []string
{
return nil
}
Publish implements Bus.Publish.
Parameters
Returns
func (*InMemoryBus) Publish(ctx context.Context, key string, opts ...PublishOption) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
options := PublishOptions{}
for _, opt := range opts {
opt(&options)
}
// Check and Set Pending (Deduplication)
var alreadyPending bool
b.pending.Compute(key, func(v bool, exists bool) bool {
alreadyPending = exists
return true
})
if alreadyPending {
return nil
}
defer b.pending.Delete(key)
chans, _ := b.subs.Get(key)
if len(chans) == 0 {
return nil
}
evt := Event{
Key: key,
Region: options.Region,
VectorClock: options.VectorClock,
}
b.published.Add(1)
for _, ch := range chans {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- evt:
b.delivered.Add(1)
default:
}
}
return nil
}
PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.
Parameters
Returns
func (*InMemoryBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...PublishOption) error
{
// In-memory simulation: treat zones as simple replica count for now
return b.PublishAndAwait(ctx, key, minZones, opts...)
}
PublishAndAwait implements Bus.PublishAndAwait.
Parameters
Returns
func (*InMemoryBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...PublishOption) error
{
if replicas <= 0 {
replicas = 1
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
options := PublishOptions{}
for _, opt := range opts {
opt(&options)
}
var alreadyPending bool
b.pending.Compute(key, func(v bool, exists bool) bool {
alreadyPending = exists
return true
})
if alreadyPending {
return nil
}
defer b.pending.Delete(key)
chans, _ := b.subs.Get(key)
if len(chans) < replicas {
return ErrQuorumNotSatisfied
}
evt := Event{
Key: key,
Region: options.Region,
VectorClock: options.VectorClock,
}
delivered := 0
for _, ch := range chans {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- evt:
b.delivered.Add(1)
delivered++
default:
}
}
if delivered < replicas {
return ErrQuorumNotSatisfied
}
b.published.Add(1)
return nil
}
Subscribe implements Bus.Subscribe.
Parameters
Returns
func (*InMemoryBus) Subscribe(ctx context.Context, key string) (<-chan Event, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan Event, 1) // Buffer 1
b.subs.Compute(key, func(v []chan Event, exists bool) []chan Event {
return append(v, ch)
})
go func() {
<-ctx.Done()
_ = b.Unsubscribe(context.Background(), key, ch)
}()
return ch, nil
}
Unsubscribe implements Bus.Unsubscribe.
Parameters
Returns
func (*InMemoryBus) Unsubscribe(ctx context.Context, key string, ch <-chan Event) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
b.subs.Compute(key, func(subs []chan Event, exists bool) []chan Event {
if !exists {
return nil
}
for i, c := range subs {
if c == ch {
// Remove (swap with last)
subs[i] = subs[len(subs)-1]
subs = subs[:len(subs)-1]
close(c)
break
}
}
if len(subs) == 0 {
return subs
}
return subs
})
return nil
}
RevokeLease publishes a lease revocation event.
Parameters
Returns
func (*InMemoryBus) RevokeLease(ctx context.Context, id string) error
{
return b.Publish(ctx, "lease:"+id)
}
SubscribeLease subscribes to lease revocation events.
Parameters
Returns
func (*InMemoryBus) SubscribeLease(ctx context.Context, id string) (<-chan Event, error)
{
return b.Subscribe(ctx, "lease:"+id)
}
UnsubscribeLease cancels a lease revocation subscription.
Parameters
Returns
func (*InMemoryBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan Event) error
{
return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Fields
| Name | Type | Description |
|---|---|---|
| subs | *safemap.ShardedMap[string, []chan Event] | |
| pending | *safemap.ShardedMap[string, bool] | |
| published | atomic.Uint64 | |
| delivered | atomic.Uint64 |
NewInMemoryBus
NewInMemoryBus returns a new InMemoryBus.
Returns
func NewInMemoryBus() *InMemoryBus
{
return &InMemoryBus{
subs: safemap.NewSharded[string, []chan Event](safemap.StringHasher, 32),
pending: safemap.NewSharded[string, bool](safemap.StringHasher, 32),
}
}
Metrics
type Metrics struct
Fields
| Name | Type | Description |
|---|---|---|
| Published | uint64 | |
| Delivered | uint64 |
BenchmarkPublish_InMemory
Parameters
func BenchmarkPublish_InMemory(b *testing.B)
{
bus := NewInMemoryBus()
ctx := context.Background()
key := "bench-key"
// Setup subscribers
numSubs := 10
for i := 0; i < numSubs; i++ {
_, _ = bus.Subscribe(ctx, key)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = bus.Publish(ctx, key)
}
}
BenchmarkPublish_Parallel_InMemory
Parameters
func BenchmarkPublish_Parallel_InMemory(b *testing.B)
{
bus := NewInMemoryBus()
ctx := context.Background()
key := "bench-key"
// Setup subscribers
numSubs := 10
for i := 0; i < numSubs; i++ {
_, _ = bus.Subscribe(ctx, key)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = bus.Publish(ctx, key)
}
})
}
BenchmarkSubscribe_InMemory
Parameters
func BenchmarkSubscribe_InMemory(b *testing.B)
{
bus := NewInMemoryBus()
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key-%d", i)
_, _ = bus.Subscribe(ctx, key)
}
}
TestPublishSubscribeFlowAndMetrics
Parameters
func TestPublishSubscribeFlowAndMetrics(t *testing.T)
{
bus := NewInMemoryBus()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
if err := bus.Publish(context.Background(), "key"); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for publish")
}
metrics := bus.Metrics()
if metrics.Published != 1 {
t.Fatalf("expected published 1 got %d", metrics.Published)
}
if metrics.Delivered != 1 {
t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
}
}
TestPublishAndAwaitQuorumSatisfied
Parameters
func TestPublishAndAwaitQuorumSatisfied(t *testing.T)
{
bus := NewInMemoryBus()
ctx := context.Background()
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
done := make(chan struct{})
go func() {
if err := bus.PublishAndAwait(ctx, "key", 1); err != nil {
t.Errorf("publish and await: %v", err)
}
close(done)
}()
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for quorum publish")
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("publish and await did not return")
}
metrics := bus.Metrics()
if metrics.Published != 1 {
t.Fatalf("expected published 1 got %d", metrics.Published)
}
if metrics.Delivered != 1 {
t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
}
}
TestPublishAndAwaitQuorumNotSatisfied
Parameters
func TestPublishAndAwaitQuorumNotSatisfied(t *testing.T)
{
bus := NewInMemoryBus()
ctx := context.Background()
if err := bus.PublishAndAwait(ctx, "key", 2); err != ErrQuorumNotSatisfied {
t.Fatalf("expected quorum error, got %v", err)
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
if metrics.Delivered != 0 {
t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
}
}
TestContextBasedUnsubscribe
Parameters
func TestContextBasedUnsubscribe(t *testing.T)
{
bus := NewInMemoryBus()
ctx, cancel := context.WithCancel(context.Background())
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
cancel()
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel closed")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for unsubscribe")
}
// Poll for cleanup
deadline := time.Now().Add(time.Second)
cleaned := false
for time.Now().Before(deadline) {
subs, _ := bus.subs.Get("key")
if len(subs) == 0 {
cleaned = true
break
}
time.Sleep(10 * time.Millisecond)
}
if !cleaned {
t.Fatal("subscription still present after context cancel")
}
}
TestDeduplicatePendingKeys
Parameters
func TestDeduplicatePendingKeys(t *testing.T)
{
bus := NewInMemoryBus()
ctx := context.Background()
ch, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
// Manually set pending
bus.pending.Set("key", true)
if err := bus.Publish(context.Background(), "key"); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
t.Fatal("unexpected publish when key pending")
default:
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
if metrics.Delivered != 0 {
t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
}
}
TestPublishContextCanceled
Parameters
func TestPublishContextCanceled(t *testing.T)
{
bus := NewInMemoryBus()
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := bus.Publish(ctx, "key"); err == nil {
t.Fatal("expected publish error due to canceled context")
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
if metrics.Delivered != 0 {
t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
}
}
TestSubscribeContextCanceled
Parameters
func TestSubscribeContextCanceled(t *testing.T)
{
bus := NewInMemoryBus()
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := bus.Subscribe(ctx, "key"); err == nil {
t.Fatal("expected subscribe error due to canceled context")
}
if bus.subs.Has("key") {
subs, _ := bus.subs.Get("key")
if len(subs) > 0 {
t.Fatal("subscription should not be added when context is canceled")
}
}
}
TestUnsubscribeContextCanceled
Parameters
func TestUnsubscribeContextCanceled(t *testing.T)
{
bus := NewInMemoryBus()
ch, err := bus.Subscribe(context.Background(), "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := bus.Unsubscribe(ctx, "key", ch); err == nil {
t.Fatal("expected unsubscribe error due to canceled context")
}
if !bus.subs.Has("key") {
t.Fatal("subscription subscription should remain when unsubscribe context is canceled")
}
subs, _ := bus.subs.Get("key")
found := false
for _, c := range subs {
if c == ch {
found = true
break
}
}
if !found {
t.Fatal("subscription channel missing")
}
if err := bus.Unsubscribe(context.Background(), "key", ch); err != nil {
t.Fatalf("cleanup unsubscribe: %v", err)
}
}
TestLeaseRevokeFlowAndMetrics
Parameters
func TestLeaseRevokeFlowAndMetrics(t *testing.T)
{
bus := NewInMemoryBus()
ctx := context.Background()
ch, err := bus.SubscribeLease(ctx, "id")
if err != nil {
t.Fatalf("subscribe lease: %v", err)
}
if err := bus.RevokeLease(context.Background(), "id"); err != nil {
t.Fatalf("revoke lease: %v", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout waiting for revoke lease")
}
metrics := bus.Metrics()
if metrics.Published != 1 {
t.Fatalf("expected published 1 got %d", metrics.Published)
}
if metrics.Delivered != 1 {
t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
}
}
TestSubscribeLeaseContextCanceled
Parameters
func TestSubscribeLeaseContextCanceled(t *testing.T)
{
bus := NewInMemoryBus()
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := bus.SubscribeLease(ctx, "id"); err == nil {
t.Fatal("expected subscribe lease error due to canceled context")
}
subs, _ := bus.subs.Get("lease:id")
if len(subs) > 0 {
t.Fatal("subscription should not be added when context is canceled")
}
}
TestUnsubscribeLeaseClosesChannel
Parameters
func TestUnsubscribeLeaseClosesChannel(t *testing.T)
{
bus := NewInMemoryBus()
ch, err := bus.SubscribeLease(context.Background(), "id")
if err != nil {
t.Fatalf("subscribe lease: %v", err)
}
if err := bus.UnsubscribeLease(context.Background(), "id", ch); err != nil {
t.Fatalf("unsubscribe lease: %v", err)
}
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel closed")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for unsubscribe lease")
}
subs, _ := bus.subs.Get("lease:id")
if len(subs) > 0 {
t.Fatal("subscription still present after unsubscribe lease")
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
if metrics.Delivered != 0 {
t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
}
}
TestRevokeLeaseContextCanceled
Parameters
func TestRevokeLeaseContextCanceled(t *testing.T)
{
bus := NewInMemoryBus()
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := bus.RevokeLease(ctx, "id"); err == nil {
t.Fatal("expected revoke lease error due to canceled context")
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
if metrics.Delivered != 0 {
t.Fatalf("expected delivered 0 got %d", metrics.Delivered)
}
}
TestUnsubscribeLeaseContextCanceled
Parameters
func TestUnsubscribeLeaseContextCanceled(t *testing.T)
{
bus := NewInMemoryBus()
ch, err := bus.SubscribeLease(context.Background(), "id")
if err != nil {
t.Fatalf("subscribe lease: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := bus.UnsubscribeLease(ctx, "id", ch); err == nil {
t.Fatal("expected unsubscribe lease error due to canceled context")
}
subs, _ := bus.subs.Get("lease:id")
if len(subs) == 0 {
t.Fatal("subscription should remain when unsubscribe lease context is canceled")
}
if err := bus.UnsubscribeLease(context.Background(), "id", ch); err != nil {
t.Fatalf("cleanup unsubscribe lease: %v", err)
}
}
CircuitBreakerBus
CircuitBreakerBus decorates a Bus with circuit breaker logic.
type CircuitBreakerBus struct
Methods
IsHealthy returns true if the circuit is closed.
Returns
func (*CircuitBreakerBus) IsHealthy() bool
{
return cb.breaker.State() == resiliency.StateClosed
}
Publish implements Bus.Publish with circuit breaker logic.
Parameters
Returns
func (*CircuitBreakerBus) Publish(ctx context.Context, key string, opts ...PublishOption) error
{
return cb.breaker.Execute(func() error {
return cb.bus.Publish(ctx, key, opts...)
})
}
PublishAndAwait implements Bus.PublishAndAwait with circuit breaker logic.
Parameters
Returns
func (*CircuitBreakerBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...PublishOption) error
{
return cb.breaker.Execute(func() error {
return cb.bus.PublishAndAwait(ctx, key, replicas, opts...)
})
}
PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology with circuit breaker logic.
Parameters
Returns
func (*CircuitBreakerBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...PublishOption) error
{
return cb.breaker.Execute(func() error {
return cb.bus.PublishAndAwaitTopology(ctx, key, minZones, opts...)
})
}
Parameters
Returns
func (*CircuitBreakerBus) Subscribe(ctx context.Context, key string) (<-chan Event, error)
{
return cb.bus.Subscribe(ctx, key)
}
Parameters
Returns
func (*CircuitBreakerBus) Unsubscribe(ctx context.Context, key string, ch <-chan Event) error
{
return cb.bus.Unsubscribe(ctx, key, ch)
}
Parameters
Returns
func (*CircuitBreakerBus) RevokeLease(ctx context.Context, id string) error
{
return cb.breaker.Execute(func() error {
return cb.bus.RevokeLease(ctx, id)
})
}
Parameters
Returns
func (*CircuitBreakerBus) SubscribeLease(ctx context.Context, id string) (<-chan Event, error)
{
return cb.bus.SubscribeLease(ctx, id)
}
Parameters
Returns
func (*CircuitBreakerBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan Event) error
{
return cb.bus.UnsubscribeLease(ctx, id, ch)
}
Fields
| Name | Type | Description |
|---|---|---|
| bus | Bus | |
| breaker | *resiliency.CircuitBreaker |
Uses
NewCircuitBreaker
NewCircuitBreaker returns a new CircuitBreakerBus.
Parameters
Returns
func NewCircuitBreaker(bus Bus, threshold int, timeout time.Duration) *CircuitBreakerBus
{
return &CircuitBreakerBus{
bus: bus,
breaker: resiliency.NewCircuitBreaker(threshold, timeout),
}
}