syncbus_test
API
syncbus_test
packageAPI reference for the syncbus_test
package.
Imports
(17)
STD
context
STD
testing
STD
time
PKG
github.com/google/uuid
INT
github.com/mirkobrombin/go-warp/v1/adapter
INT
github.com/mirkobrombin/go-warp/v1/cache
INT
github.com/mirkobrombin/go-warp/v1/core
INT
github.com/mirkobrombin/go-warp/v1/merge
INT
github.com/mirkobrombin/go-warp/v1/syncbus/redis
PKG
github.com/redis/go-redis/v9
STD
errors
INT
github.com/mirkobrombin/go-warp/v1/syncbus
STD
os
STD
sync
PKG
github.com/alicebob/miniredis/v2
STD
fmt
INT
github.com/mirkobrombin/go-warp/v1/syncbus/mesh
F
function
TestRedisCluster_Gossip
Parameters
t
v1/syncbus/redis_cluster_test.go:17-124
func TestRedisCluster_Gossip(t *testing.T)
{
redisAddr := "localhost:6379"
testKey := "test-key-" + uuid.NewString()
nodeCount := 10
nodes := make([]*core.Warp[string], nodeCount)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Track resources for cleanup
var buses []*busredis.RedisBus
var clients []*redis.Client
defer func() {
for _, b := range buses {
_ = b.Close()
}
for _, c := range clients {
_ = c.Close()
}
}()
// All nodes connect to the same Redis (shared Store and Bus)
for i := 0; i < nodeCount; i++ {
client := redis.NewClient(&redis.Options{Addr: redisAddr})
if err := client.Ping(ctx).Err(); err != nil {
t.Skipf("Skipping Redis test, connection failed: %v", err)
return
}
clients = append(clients, client)
bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client})
buses = append(buses, bus)
c := cache.NewInMemory[merge.Value[string]]()
s := adapter.NewRedisStore[string](client) // Shared Redis Store
e := merge.NewEngine[string]()
w := core.New(c, s, bus, e)
w.Register(testKey, core.ModeEventualDistributed, time.Hour)
nodes[i] = w
}
// Clean up previous test data using the first client
_ = clients[0].Del(ctx, testKey).Err()
// Wait for all nodes to fully subscribe before testing
time.Sleep(2 * time.Second)
// Step 1: Node 0 sets initial value
if err := nodes[0].Set(ctx, testKey, "valueA"); err != nil {
t.Fatalf("node 0 initial set failed: %v", err)
}
// Verify the value is in Redis L2 before proceeding
val, err := clients[0].Get(ctx, testKey).Result()
if err != nil {
t.Fatalf("failed to verify initial value in Redis: %v", err)
}
t.Logf("Verified: %s is set in Redis L2 (raw value exists, len=%d)", testKey, len(val))
time.Sleep(300 * time.Millisecond)
// Step 2: All other nodes Get -> They should get "valueA"
for i := 1; i < nodeCount; i++ {
val, err := nodes[i].Get(ctx, testKey)
if err != nil || val != "valueA" {
t.Errorf("node %d initial Get: expected valueA, got %v, err %v", i, val, err)
}
}
t.Log("All Redis nodes have valueA in L1. Node 0 updating to valueB...")
// Step 3: Node 0 updates to "valueB"
if err := nodes[0].Set(ctx, testKey, "valueB"); err != nil {
t.Fatalf("node 0 update set failed: %v", err)
}
// Verify the updated value is in Redis L2
val, err = clients[0].Get(ctx, testKey).Result()
if err != nil {
t.Fatalf("failed to verify updated value in Redis: %v", err)
}
t.Logf("Verified: %s updated in Redis L2 (raw value exists, len=%d)", testKey, len(val))
// Step 4: Wait for invalidation propagation
time.Sleep(2 * time.Second)
// Step 5: Verify
successCount := 0
for i := 1; i < nodeCount; i++ {
val, err := nodes[i].Get(ctx, testKey)
if val == "valueB" {
successCount++
} else if val == "valueA" {
t.Errorf("Node %d STALE! Still has valueA", i)
} else {
t.Errorf("Node %d unexpected: val=%q err=%v", i, val, err)
}
}
if successCount == nodeCount-1 {
t.Logf("SUCCESS: All %d Redis peer nodes received invalidation and have valueB.", successCount)
} else {
t.Fatalf("FAILED: Only %d/%d Redis nodes received invalidation.", successCount, nodeCount-1)
}
}
S
struct
failingBus
v1/syncbus/warp_errors_test.go:15-18
type failingBus struct
Methods
Publish
Method
Parameters
Returns
error
func (*failingBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
return f.publishErr
}
PublishAndAwait
Method
Parameters
Returns
error
func (*failingBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
return b.publishErr
}
PublishAndAwaitTopology
Method
Parameters
Returns
error
func (*failingBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
return b.publishErr
}
Subscribe
Method
Parameters
ctx
context.Context
key
string
Returns
<-chan
syncbus.Event
error
func (*failingBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
return nil, f.subscribeErr
}
Unsubscribe
Method
Parameters
Returns
error
func (*failingBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
return nil
}
RevokeLease
Method
Parameters
ctx
context.Context
id
string
Returns
error
func (*failingBus) RevokeLease(ctx context.Context, id string) error
{ return f.publishErr }
SubscribeLease
Method
Parameters
ctx
context.Context
id
string
Returns
<-chan
syncbus.Event
error
func (*failingBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return nil, f.subscribeErr
}
UnsubscribeLease
Method
Parameters
Returns
error
func (*failingBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| publishErr | error | |
| subscribeErr | error |
F
function
TestWarpPublishError
Parameters
t
v1/syncbus/warp_errors_test.go:53-68
func TestWarpPublishError(t *testing.T)
{
bus := &failingBus{publishErr: errors.New("publish failed")}
w := core.New[string](cache.NewInMemory[merge.Value[string]](), nil, bus, merge.NewEngine[string]())
w.Register("key", core.ModeEventualDistributed, time.Minute)
if err := w.Invalidate(context.Background(), "key"); err != nil {
t.Fatalf("unexpected invalidate error: %v", err)
}
select {
case err := <-w.PublishErrors():
if !errors.Is(err, bus.publishErr) {
t.Fatalf("expected %v got %v", bus.publishErr, err)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("expected publish error")
}
}
F
function
TestWarpSubscribeError
Parameters
t
v1/syncbus/warp_errors_test.go:70-76
func TestWarpSubscribeError(t *testing.T)
{
bus := &failingBus{subscribeErr: errors.New("subscribe failed")}
w := core.New[string](cache.NewInMemory[merge.Value[string]](), nil, bus, merge.NewEngine[string]())
if _, err := w.GrantLease(context.Background(), time.Minute); err != nil {
t.Fatalf("grant lease returned error: %v", err)
}
}
F
function
newRedisBus
Parameters
t
Returns
v1/syncbus/warp_integration_test.go:20-57
func newRedisBus(t *testing.T) (*busredis.RedisBus, context.Context)
{
t.Helper()
addr := os.Getenv("WARP_TEST_REDIS_ADDR")
forceReal := os.Getenv("WARP_TEST_FORCE_REAL") == "true"
var client *redis.Client
var mr *miniredis.Miniredis
if forceReal && addr == "" {
t.Fatal("WARP_TEST_FORCE_REAL is true but WARP_TEST_REDIS_ADDR is empty")
}
if addr != "" {
t.Logf("TestRedisBus: using real Redis at %s", addr)
client = redis.NewClient(&redis.Options{Addr: addr})
} else {
t.Log("TestRedisBus: using miniredis")
var err error
mr, err = miniredis.Run()
if err != nil {
t.Fatalf("miniredis run: %v", err)
}
client = redis.NewClient(&redis.Options{Addr: mr.Addr()})
}
bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client})
ctx := context.Background()
t.Cleanup(func() {
_ = bus.Close()
if addr != "" {
_ = client.FlushAll(context.Background()).Err()
_ = client.Close()
} else {
_ = client.Close()
mr.Close()
}
})
return bus, ctx
}
F
function
TestRedisBusConcurrentInvalidations
Parameters
t
v1/syncbus/warp_integration_test.go:59-99
func TestRedisBusConcurrentInvalidations(t *testing.T)
{
bus, ctx := newRedisBus(t)
store := adapter.NewInMemoryStore[int]()
w1 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w2 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w1.Register("key", core.ModeStrongDistributed, time.Minute)
w2.Register("key", core.ModeStrongDistributed, time.Minute)
sub, err := bus.Subscribe(ctx, "key")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
t.Cleanup(func() { _ = bus.Unsubscribe(context.Background(), "key", sub) })
go func() {
for range sub {
}
}()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if err := w1.Invalidate(ctx, "key"); err != nil {
t.Errorf("w1 invalidate: %v", err)
}
}()
go func() {
defer wg.Done()
if err := w2.Invalidate(ctx, "key"); err != nil {
t.Errorf("w2 invalidate: %v", err)
}
}()
wg.Wait()
time.Sleep(50 * time.Millisecond)
metrics := bus.Metrics()
// Deduplication is best-effort. If concurrent invalidations overlap, we expect 1 publish.
// If they happen sequentially (due to scheduler/networking), we expect 2. Both are valid.
if metrics.Published < 1 || metrics.Published > 2 {
t.Fatalf("expected 1 or 2 published got %d", metrics.Published)
}
}
F
function
TestMeshCluster_Gossip
Parameters
t
v1/syncbus/mesh_cluster_test.go:17-114
func TestMeshCluster_Gossip(t *testing.T)
{
nodeCount := 10
basePort := 8100 // Use different port range to avoid conflicts
nodes := make([]*core.Warp[string], nodeCount)
// Prepare peer list
var allPeers []string
for i := 0; i < nodeCount; i++ {
allPeers = append(allPeers, fmt.Sprintf("127.0.0.1:%d", basePort+i))
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// IMPORTANT: All nodes share the SAME InMemoryStore to simulate L2 consistency
sharedStore := adapter.NewInMemoryStore[string]()
var readyWg sync.WaitGroup
readyWg.Add(nodeCount)
for i := 0; i < nodeCount; i++ {
port := basePort + i
addr := fmt.Sprintf("127.0.0.1:%d", port)
var peers []string
for _, p := range allPeers {
if p != addr {
peers = append(peers, p)
}
}
opts := mesh.MeshOptions{
Port: port,
Peers: peers,
AdvertiseAddr: addr,
Heartbeat: 200 * time.Millisecond,
}
bus, err := mesh.NewMeshBus(opts)
if err != nil {
t.Fatalf("node %d bus setup failed: %v", i, err)
}
c := cache.NewInMemory[merge.Value[string]]()
e := merge.NewEngine[string]()
w := core.New(c, sharedStore, bus, e)
w.Register("test-key", core.ModeEventualDistributed, time.Hour)
nodes[i] = w
readyWg.Done()
}
readyWg.Wait()
time.Sleep(500 * time.Millisecond) // Let mesh settle
// Step 1: Node 0 sets initial value
if err := nodes[0].Set(ctx, "test-key", "valueA"); err != nil {
t.Fatalf("node 0 initial set failed: %v", err)
}
time.Sleep(300 * time.Millisecond)
// Step 2: All other nodes Get -> They should get "valueA" from shared Store (and cache it in L1)
for i := 1; i < nodeCount; i++ {
val, err := nodes[i].Get(ctx, "test-key")
if err != nil || val != "valueA" {
t.Errorf("node %d initial Get: expected valueA, got %v, err %v", i, val, err)
}
}
t.Log("All nodes have valueA in L1. Node 0 updating to valueB...")
// Step 3: Node 0 updates to "valueB" -> This publishes invalidation
if err := nodes[0].Set(ctx, "test-key", "valueB"); err != nil {
t.Fatalf("node 0 update set failed: %v", err)
}
// Step 4: Wait for invalidation propagation
time.Sleep(1 * time.Second)
// Step 5: All other nodes Get -> They should get "valueB" (L1 invalidated, refetch from Store)
successCount := 0
for i := 1; i < nodeCount; i++ {
val, err := nodes[i].Get(ctx, "test-key")
if err == nil && val == "valueB" {
successCount++
} else if val == "valueA" {
t.Errorf("Node %d STALE! Still has valueA (invalidation not received)", i)
} else {
t.Errorf("Node %d unexpected: val=%v, err=%v", i, val, err)
}
}
if successCount == nodeCount-1 {
t.Logf("SUCCESS: All %d peer nodes received invalidation and have valueB.", successCount)
} else {
t.Fatalf("FAILED: Only %d/%d nodes received invalidation.", successCount, nodeCount-1)
}
}