syncbus_test API

syncbus_test

package

API reference for the syncbus_test package.

F
function

TestRedisCluster_Gossip

Parameters

v1/syncbus/redis_cluster_test.go:17-124
func TestRedisCluster_Gossip(t *testing.T)

{
	redisAddr := "localhost:6379"
	testKey := "test-key-" + uuid.NewString()

	nodeCount := 10
	nodes := make([]*core.Warp[string], nodeCount)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Track resources for cleanup
	var buses []*busredis.RedisBus
	var clients []*redis.Client

	defer func() {
		for _, b := range buses {
			_ = b.Close()
		}
		for _, c := range clients {
			_ = c.Close()
		}
	}()

	// All nodes connect to the same Redis (shared Store and Bus)
	for i := 0; i < nodeCount; i++ {
		client := redis.NewClient(&redis.Options{Addr: redisAddr})
		if err := client.Ping(ctx).Err(); err != nil {
			t.Skipf("Skipping Redis test, connection failed: %v", err)
			return
		}
		clients = append(clients, client)

		bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client})
		buses = append(buses, bus)

		c := cache.NewInMemory[merge.Value[string]]()
		s := adapter.NewRedisStore[string](client) // Shared Redis Store
		e := merge.NewEngine[string]()

		w := core.New(c, s, bus, e)
		w.Register(testKey, core.ModeEventualDistributed, time.Hour)
		nodes[i] = w
	}

	// Clean up previous test data using the first client
	_ = clients[0].Del(ctx, testKey).Err()

	// Wait for all nodes to fully subscribe before testing
	time.Sleep(2 * time.Second)

	// Step 1: Node 0 sets initial value
	if err := nodes[0].Set(ctx, testKey, "valueA"); err != nil {
		t.Fatalf("node 0 initial set failed: %v", err)
	}

	// Verify the value is in Redis L2 before proceeding
	val, err := clients[0].Get(ctx, testKey).Result()
	if err != nil {
		t.Fatalf("failed to verify initial value in Redis: %v", err)
	}
	t.Logf("Verified: %s is set in Redis L2 (raw value exists, len=%d)", testKey, len(val))

	time.Sleep(300 * time.Millisecond)

	// Step 2: All other nodes Get -> They should get "valueA"
	for i := 1; i < nodeCount; i++ {
		val, err := nodes[i].Get(ctx, testKey)
		if err != nil || val != "valueA" {
			t.Errorf("node %d initial Get: expected valueA, got %v, err %v", i, val, err)
		}
	}

	t.Log("All Redis nodes have valueA in L1. Node 0 updating to valueB...")

	// Step 3: Node 0 updates to "valueB"
	if err := nodes[0].Set(ctx, testKey, "valueB"); err != nil {
		t.Fatalf("node 0 update set failed: %v", err)
	}

	// Verify the updated value is in Redis L2
	val, err = clients[0].Get(ctx, testKey).Result()
	if err != nil {
		t.Fatalf("failed to verify updated value in Redis: %v", err)
	}
	t.Logf("Verified: %s updated in Redis L2 (raw value exists, len=%d)", testKey, len(val))

	// Step 4: Wait for invalidation propagation
	time.Sleep(2 * time.Second)

	// Step 5: Verify
	successCount := 0
	for i := 1; i < nodeCount; i++ {
		val, err := nodes[i].Get(ctx, testKey)
		if val == "valueB" {
			successCount++
		} else if val == "valueA" {
			t.Errorf("Node %d STALE! Still has valueA", i)
		} else {
			t.Errorf("Node %d unexpected: val=%q err=%v", i, val, err)
		}
	}

	if successCount == nodeCount-1 {
		t.Logf("SUCCESS: All %d Redis peer nodes received invalidation and have valueB.", successCount)
	} else {
		t.Fatalf("FAILED: Only %d/%d Redis nodes received invalidation.", successCount, nodeCount-1)
	}
}
S
struct

failingBus

v1/syncbus/warp_errors_test.go:15-18
type failingBus struct

Methods

Publish
Method

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*failingBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	return f.publishErr
}

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*failingBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	return b.publishErr
}

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*failingBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	return b.publishErr
}
Subscribe
Method

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*failingBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	return nil, f.subscribeErr
}
Unsubscribe
Method

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*failingBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	return nil
}
RevokeLease
Method

Parameters

id string

Returns

error
func (*failingBus) RevokeLease(ctx context.Context, id string) error
{ return f.publishErr }

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*failingBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return nil, f.subscribeErr
}

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*failingBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return nil
}
IsHealthy
Method

Returns

bool
func (*failingBus) IsHealthy() bool
{
	return true
}
Peers
Method

Returns

[]string
func (*failingBus) Peers() []string
{
	return nil
}

Fields

Name Type Description
publishErr error
subscribeErr error
F
function

TestWarpPublishError

Parameters

v1/syncbus/warp_errors_test.go:53-68
func TestWarpPublishError(t *testing.T)

{
	bus := &failingBus{publishErr: errors.New("publish failed")}
	w := core.New[string](cache.NewInMemory[merge.Value[string]](), nil, bus, merge.NewEngine[string]())
	w.Register("key", core.ModeEventualDistributed, time.Minute)
	if err := w.Invalidate(context.Background(), "key"); err != nil {
		t.Fatalf("unexpected invalidate error: %v", err)
	}
	select {
	case err := <-w.PublishErrors():
		if !errors.Is(err, bus.publishErr) {
			t.Fatalf("expected %v got %v", bus.publishErr, err)
		}
	case <-time.After(100 * time.Millisecond):
		t.Fatal("expected publish error")
	}
}
F
function

TestWarpSubscribeError

Parameters

v1/syncbus/warp_errors_test.go:70-76
func TestWarpSubscribeError(t *testing.T)

{
	bus := &failingBus{subscribeErr: errors.New("subscribe failed")}
	w := core.New[string](cache.NewInMemory[merge.Value[string]](), nil, bus, merge.NewEngine[string]())
	if _, err := w.GrantLease(context.Background(), time.Minute); err != nil {
		t.Fatalf("grant lease returned error: %v", err)
	}
}
F
function

newRedisBus

v1/syncbus/warp_integration_test.go:20-57
func newRedisBus(t *testing.T) (*busredis.RedisBus, context.Context)

{
	t.Helper()
	addr := os.Getenv("WARP_TEST_REDIS_ADDR")
	forceReal := os.Getenv("WARP_TEST_FORCE_REAL") == "true"
	var client *redis.Client
	var mr *miniredis.Miniredis

	if forceReal && addr == "" {
		t.Fatal("WARP_TEST_FORCE_REAL is true but WARP_TEST_REDIS_ADDR is empty")
	}

	if addr != "" {
		t.Logf("TestRedisBus: using real Redis at %s", addr)
		client = redis.NewClient(&redis.Options{Addr: addr})
	} else {
		t.Log("TestRedisBus: using miniredis")
		var err error
		mr, err = miniredis.Run()
		if err != nil {
			t.Fatalf("miniredis run: %v", err)
		}
		client = redis.NewClient(&redis.Options{Addr: mr.Addr()})
	}

	bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client})
	ctx := context.Background()
	t.Cleanup(func() {
		_ = bus.Close()
		if addr != "" {
			_ = client.FlushAll(context.Background()).Err()
			_ = client.Close()
		} else {
			_ = client.Close()
			mr.Close()
		}
	})
	return bus, ctx
}
F
function

TestRedisBusConcurrentInvalidations

Parameters

v1/syncbus/warp_integration_test.go:59-99
func TestRedisBusConcurrentInvalidations(t *testing.T)

{
	bus, ctx := newRedisBus(t)
	store := adapter.NewInMemoryStore[int]()
	w1 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w2 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w1.Register("key", core.ModeStrongDistributed, time.Minute)
	w2.Register("key", core.ModeStrongDistributed, time.Minute)

	sub, err := bus.Subscribe(ctx, "key")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	t.Cleanup(func() { _ = bus.Unsubscribe(context.Background(), "key", sub) })
	go func() {
		for range sub {
		}
	}()

	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := w1.Invalidate(ctx, "key"); err != nil {
			t.Errorf("w1 invalidate: %v", err)
		}
	}()
	go func() {
		defer wg.Done()
		if err := w2.Invalidate(ctx, "key"); err != nil {
			t.Errorf("w2 invalidate: %v", err)
		}
	}()
	wg.Wait()
	time.Sleep(50 * time.Millisecond)
	metrics := bus.Metrics()
	// Deduplication is best-effort. If concurrent invalidations overlap, we expect 1 publish.
	// If they happen sequentially (due to scheduler/networking), we expect 2. Both are valid.
	if metrics.Published < 1 || metrics.Published > 2 {
		t.Fatalf("expected 1 or 2 published got %d", metrics.Published)
	}
}
F
function

TestMeshCluster_Gossip

Parameters

v1/syncbus/mesh_cluster_test.go:17-114
func TestMeshCluster_Gossip(t *testing.T)

{
	nodeCount := 10
	basePort := 8100 // Use different port range to avoid conflicts
	nodes := make([]*core.Warp[string], nodeCount)

	// Prepare peer list
	var allPeers []string
	for i := 0; i < nodeCount; i++ {
		allPeers = append(allPeers, fmt.Sprintf("127.0.0.1:%d", basePort+i))
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// IMPORTANT: All nodes share the SAME InMemoryStore to simulate L2 consistency
	sharedStore := adapter.NewInMemoryStore[string]()

	var readyWg sync.WaitGroup
	readyWg.Add(nodeCount)

	for i := 0; i < nodeCount; i++ {
		port := basePort + i
		addr := fmt.Sprintf("127.0.0.1:%d", port)

		var peers []string
		for _, p := range allPeers {
			if p != addr {
				peers = append(peers, p)
			}
		}

		opts := mesh.MeshOptions{
			Port:          port,
			Peers:         peers,
			AdvertiseAddr: addr,
			Heartbeat:     200 * time.Millisecond,
		}

		bus, err := mesh.NewMeshBus(opts)
		if err != nil {
			t.Fatalf("node %d bus setup failed: %v", i, err)
		}

		c := cache.NewInMemory[merge.Value[string]]()
		e := merge.NewEngine[string]()
		w := core.New(c, sharedStore, bus, e)
		w.Register("test-key", core.ModeEventualDistributed, time.Hour)

		nodes[i] = w
		readyWg.Done()
	}

	readyWg.Wait()
	time.Sleep(500 * time.Millisecond) // Let mesh settle

	// Step 1: Node 0 sets initial value
	if err := nodes[0].Set(ctx, "test-key", "valueA"); err != nil {
		t.Fatalf("node 0 initial set failed: %v", err)
	}
	time.Sleep(300 * time.Millisecond)

	// Step 2: All other nodes Get -> They should get "valueA" from shared Store (and cache it in L1)
	for i := 1; i < nodeCount; i++ {
		val, err := nodes[i].Get(ctx, "test-key")
		if err != nil || val != "valueA" {
			t.Errorf("node %d initial Get: expected valueA, got %v, err %v", i, val, err)
		}
	}

	t.Log("All nodes have valueA in L1. Node 0 updating to valueB...")

	// Step 3: Node 0 updates to "valueB" -> This publishes invalidation
	if err := nodes[0].Set(ctx, "test-key", "valueB"); err != nil {
		t.Fatalf("node 0 update set failed: %v", err)
	}

	// Step 4: Wait for invalidation propagation
	time.Sleep(1 * time.Second)

	// Step 5: All other nodes Get -> They should get "valueB" (L1 invalidated, refetch from Store)
	successCount := 0
	for i := 1; i < nodeCount; i++ {
		val, err := nodes[i].Get(ctx, "test-key")
		if err == nil && val == "valueB" {
			successCount++
		} else if val == "valueA" {
			t.Errorf("Node %d STALE! Still has valueA (invalidation not received)", i)
		} else {
			t.Errorf("Node %d unexpected: val=%v, err=%v", i, val, err)
		}
	}

	if successCount == nodeCount-1 {
		t.Logf("SUCCESS: All %d peer nodes received invalidation and have valueB.", successCount)
	} else {
		t.Fatalf("FAILED: Only %d/%d nodes received invalidation.", successCount, nodeCount-1)
	}
}