core
packageAPI reference for the core
package.
Imports
(32)context
STD
errors
STD
testing
STD
time
INT
github.com/mirkobrombin/go-warp/v1/cache
INT
github.com/mirkobrombin/go-warp/v1/merge
INT
github.com/mirkobrombin/go-warp/v1/syncbus
INT
github.com/mirkobrombin/go-warp/v1/metrics
INT
github.com/mirkobrombin/go-warp/v1/watchbus
PKG
github.com/prometheus/client_golang/prometheus/testutil
STD
fmt
STD
sync
INT
github.com/mirkobrombin/go-warp/v1/adapter
INT
github.com/mirkobrombin/go-warp/v1/cache/versioned
INT
github.com/mirkobrombin/go-warp/v1/validator
STD
sync/atomic
PKG
github.com/hashicorp/go-uuid
STD
log/slog
STD
reflect
STD
runtime
PKG
github.com/prometheus/client_golang/prometheus
PKG
go.opentelemetry.io/otel
PKG
go.opentelemetry.io/otel/attribute
PKG
go.opentelemetry.io/otel/trace
PKG
golang.org/x/sync/errgroup
PKG
golang.org/x/sync/singleflight
PKG
github.com/alicebob/miniredis/v2
PKG
github.com/nats-io/nats-server/v2/test
PKG
github.com/nats-io/nats.go
PKG
github.com/redis/go-redis/v9
INT
github.com/mirkobrombin/go-warp/v1/syncbus/nats
INT
github.com/mirkobrombin/go-warp/v1/syncbus/redis
TestLeaseGrantAutoRenewAttachRevoke
Parameters
func TestLeaseGrantAutoRenewAttachRevoke(t *testing.T)
{
ctx := context.Background()
c := cache.NewInMemory[merge.Value[string]]()
bus := syncbus.NewInMemoryBus()
w := New[string](c, nil, bus, merge.NewEngine[string]())
lm := w.leases
ttl := 20 * time.Millisecond
id, err := lm.Grant(ctx, ttl)
if err != nil {
t.Fatalf("grant: %v", err)
}
key := "foo"
lm.Attach(id, key)
_ = w.cache.Set(ctx, key, merge.Value[string]{Data: "bar", Timestamp: time.Now()}, time.Minute)
time.Sleep(3 * ttl)
lm.mu.Lock()
_, ok := lm.leases[id]
lm.mu.Unlock()
if !ok {
t.Fatalf("lease expired before renewal")
}
lm.Revoke(ctx, id)
lm.mu.Lock()
if _, ok := lm.leases[id]; ok {
t.Fatalf("lease not removed after revoke")
}
lm.mu.Unlock()
if _, ok, _ := w.cache.Get(ctx, key); ok {
t.Fatalf("cache key not invalidated on revoke")
}
}
TestLeaseGrantRejectsNonPositiveTTL
Parameters
func TestLeaseGrantRejectsNonPositiveTTL(t *testing.T)
{
ctx := context.Background()
c := cache.NewInMemory[merge.Value[string]]()
w := New[string](c, nil, nil, merge.NewEngine[string]())
if _, err := w.leases.Grant(ctx, 0); !errors.Is(err, ErrInvalidLeaseTTL) {
t.Fatalf("expected ErrInvalidLeaseTTL, got %v", err)
}
w.leases.mu.Lock()
if len(w.leases.leases) != 0 {
t.Fatalf("expected no leases to be created for invalid ttl")
}
w.leases.mu.Unlock()
}
TestLeaseBusRevocation
Parameters
func TestLeaseBusRevocation(t *testing.T)
{
ctx := context.Background()
c := cache.NewInMemory[merge.Value[string]]()
bus := syncbus.NewInMemoryBus()
w := New[string](c, nil, bus, merge.NewEngine[string]())
lm := w.leases
id, err := lm.Grant(ctx, time.Minute)
if err != nil {
t.Fatalf("grant: %v", err)
}
lm.Attach(id, "a")
_ = w.cache.Set(ctx, "a", merge.Value[string]{Data: "1", Timestamp: time.Now()}, time.Minute)
ghostID := "ghost"
lm.Attach(ghostID, "b")
_ = w.cache.Set(ctx, "b", merge.Value[string]{Data: "2", Timestamp: time.Now()}, time.Minute)
_ = bus.RevokeLease(ctx, id)
_ = bus.RevokeLease(ctx, ghostID)
time.Sleep(20 * time.Millisecond)
lm.mu.Lock()
if _, ok := lm.leases[id]; ok {
lm.mu.Unlock()
t.Fatalf("lease not revoked via bus")
}
if _, ok := lm.leases[ghostID]; ok {
lm.mu.Unlock()
t.Fatalf("placeholder lease not revoked via bus")
}
lm.mu.Unlock()
if _, ok, _ := w.cache.Get(ctx, "a"); ok {
t.Fatalf("key a not invalidated via bus revocation")
}
if _, ok, _ := w.cache.Get(ctx, "b"); ok {
t.Fatalf("key b not invalidated via placeholder revocation")
}
}
TestWatchPrefixMetricsAndEvents
Parameters
func TestWatchPrefixMetricsAndEvents(t *testing.T)
{
bus := watchbus.NewInMemory()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
metrics.WatcherGauge.Set(0)
ch, err := WatchPrefix(ctx, bus, "foo")
if err != nil {
t.Fatalf("watch prefix: %v", err)
}
if v := testutil.ToFloat64(metrics.WatcherGauge); v != 1 {
t.Fatalf("expected gauge 1 got %v", v)
}
if err := bus.Publish(ctx, "foobar", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-ch:
if string(msg) != "hello" {
t.Fatalf("unexpected message %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
cancel()
time.Sleep(20 * time.Millisecond)
if v := testutil.ToFloat64(metrics.WatcherGauge); v != 0 {
t.Fatalf("expected gauge 0 got %v", v)
}
}
errStore
type errStore struct
Fields
| Name | Type | Description |
|---|---|---|
| err | error |
errBus
type errBus struct
Methods
Parameters
Returns
func (errBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
return b.err
}
Parameters
Returns
func (errBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
return b.err
}
Parameters
Returns
func (errBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
return b.err
}
Parameters
Returns
func (errBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
return nil, b.err
}
Parameters
Returns
func (errBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
return b.err
}
Parameters
Returns
func (errBus) RevokeLease(ctx context.Context, id string) error
{ return b.err }
Parameters
Returns
func (errBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return nil, b.err
}
Parameters
Returns
func (errBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| err | error |
slowBus
type slowBus struct
Methods
Parameters
Returns
func (*slowBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
time.Sleep(b.delay)
if b.done != nil {
b.done <- struct{}{}
}
return b.InMemoryBus.Publish(ctx, key, opts...)
}
Parameters
Returns
func (*slowBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
time.Sleep(b.delay)
return b.InMemoryBus.PublishAndAwait(ctx, key, replicas, opts...)
}
Parameters
Returns
func (*slowBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
time.Sleep(b.delay)
return b.InMemoryBus.PublishAndAwaitTopology(ctx, key, minZones, opts...)
}
Parameters
Returns
func (*slowBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
return nil, nil
}
Parameters
Returns
func (*slowBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
return nil
}
Parameters
Returns
func (*slowBus) RevokeLease(ctx context.Context, id string) error
{ return nil }
Parameters
Returns
func (*slowBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return nil, nil
}
Parameters
Returns
func (*slowBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| delay | time.Duration | |
| done | chan struct{} |
ttlCache
type ttlCache struct
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.Mutex | |
| items | map[string]T | |
| ttls | map[string]time.Duration |
newTTLCache
Returns
func newTTLCache[T any]() *ttlCache[T]
{
return &ttlCache[T]{
items: make(map[string]T),
ttls: make(map[string]time.Duration),
}
}
mockStrategy
type mockStrategy struct
Methods
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.Mutex | |
| ttl | time.Duration | |
| records | []string |
slowStore
type slowStore struct
Fields
| Name | Type | Description |
|---|---|---|
| data | map[string]T | |
| delay | time.Duration | |
| mu | sync.Mutex | |
| calls | int | |
| started | chan struct{} | |
| once | sync.Once |
TestWarpSetGet
Parameters
func TestWarpSetGet(t *testing.T)
{
ctx := context.Background()
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, syncbus.NewInMemoryBus(), merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
if err := w.Set(ctx, "foo", "bar"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
v, err := w.Get(ctx, "foo")
if err != nil || v != "bar" {
t.Fatalf("unexpected value: %v, err: %v", v, err)
}
if err := w.Invalidate(ctx, "foo"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if _, err := w.Get(ctx, "foo"); err == nil {
t.Fatalf("expected error after invalidate")
}
}
TestWarpConcurrentAccess
Parameters
func TestWarpConcurrentAccess(t *testing.T)
{
ctx := context.Background()
c := cache.NewInMemory[merge.Value[int]]()
s := adapter.NewInMemoryStore[int]()
w := New(c, s, syncbus.NewInMemoryBus(), merge.NewEngine[int]())
if !w.Register(
"counter",
ModeStrongLocal,
time.Second,
cache.WithSliding(),
cache.WithDynamicTTL(10*time.Millisecond, 5*time.Millisecond, 1*time.Millisecond, 5*time.Millisecond, time.Second),
) {
t.Fatalf("expected registration to succeed")
}
if err := w.Set(ctx, "counter", 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
const goroutines = 32
const iterations = 64
start := make(chan struct{})
errCh := make(chan error, 1)
var wg sync.WaitGroup
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(base int) {
defer wg.Done()
<-start
for j := 0; j < iterations; j++ {
if err := w.Set(ctx, "counter", base*iterations+j); err != nil {
select {
case errCh <- err:
default:
}
return
}
}
}(i)
wg.Add(1)
go func() {
defer wg.Done()
<-start
for j := 0; j < iterations; j++ {
if _, err := w.Get(ctx, "counter"); err != nil {
select {
case errCh <- err:
default:
}
return
}
time.Sleep(time.Millisecond)
}
}()
}
close(start)
wg.Wait()
select {
case err := <-errCh:
t.Fatalf("concurrent access failed: %v", err)
default:
}
}
TestWarpGetAtRollback
Parameters
func TestWarpGetAtRollback(t *testing.T)
{
ctx := context.Background()
base := cache.NewInMemory[merge.VersionedValue[string]]()
c := versioned.New[string](base, 5)
w := New[string](c, nil, nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
if err := w.Set(ctx, "foo", "v1"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
t1 := time.Now()
time.Sleep(time.Millisecond)
if err := w.Set(ctx, "foo", "v2"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
v, err := w.GetAt(ctx, "foo", t1)
if err != nil || v != "v1" {
t.Fatalf("expected v1, got %v err %v", v, err)
}
}
TestWarpGetAtExpire
Parameters
func TestWarpGetAtExpire(t *testing.T)
{
ctx := context.Background()
base := cache.NewInMemory[merge.VersionedValue[string]]()
c := versioned.New[string](base, 5)
w := New[string](c, nil, nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, 5*time.Millisecond)
if err := w.Set(ctx, "foo", "v1"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
time.Sleep(10 * time.Millisecond)
if _, err := w.GetAt(ctx, "foo", time.Now()); !errors.Is(err, ErrNotFound) {
t.Fatalf("expected not found after ttl, got %v", err)
}
}
TestWarpMerge
Parameters
func TestWarpMerge(t *testing.T)
{
ctx := context.Background()
w := New(cache.NewInMemory[merge.Value[int]](), adapter.NewInMemoryStore[int](), nil, merge.NewEngine[int]())
w.Register("cnt", ModeStrongLocal, time.Minute)
w.Merge("cnt", func(old, new int) (int, error) {
return old + new, nil
})
if err := w.Set(ctx, "cnt", 1); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := w.Set(ctx, "cnt", 2); err != nil {
t.Fatalf("unexpected error: %v", err)
}
v, err := w.Get(ctx, "cnt")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if v != 3 {
t.Fatalf("expected 3, got %v", v)
}
}
TestWarpFallbackAndWarmup
Parameters
func TestWarpFallbackAndWarmup(t *testing.T)
{
ctx := context.Background()
store := adapter.NewInMemoryStore[string]()
_ = store.Set(ctx, "foo", "bar")
w := New[string](cache.NewInMemory[merge.Value[string]](), store, nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
// fallback
v, err := w.Get(ctx, "foo")
if err != nil || v != "bar" {
t.Fatalf("unexpected fallback value: %v err: %v", v, err)
}
// warmup
if err := w.Invalidate(ctx, "foo"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
w.Warmup(ctx)
v, err = w.Get(ctx, "foo")
if err != nil || v != "bar" {
t.Fatalf("expected warmup to load value, got %v err %v", v, err)
}
}
TestWarpWarmupContextCancel
Parameters
func TestWarpWarmupContextCancel(t *testing.T)
{
ctx, cancel := context.WithCancel(context.Background())
store := &slowStore[string]{
data: map[string]string{"a": "1", "b": "2"},
delay: 50 * time.Millisecond,
started: make(chan struct{}),
}
w := New[string](cache.NewInMemory[merge.Value[string]](), store, nil, merge.NewEngine[string]())
w.Register("a", ModeStrongLocal, time.Minute)
w.Register("b", ModeStrongLocal, time.Minute)
done := make(chan struct{})
go func() {
w.Warmup(ctx)
close(done)
}()
<-store.started
cancel()
<-done
store.mu.Lock()
calls := store.calls
store.mu.Unlock()
if calls == 0 {
t.Fatalf("expected store to be called at least once")
}
if _, ok, _ := w.cache.Get(context.Background(), "b"); ok {
t.Fatalf("expected key b not to be warmed up")
}
}
TestWarpUnregister
Parameters
func TestWarpUnregister(t *testing.T)
{
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
if _, ok := w.getReg("foo"); !ok {
t.Fatalf("expected foo to be registered")
}
w.Unregister("foo")
if _, ok := w.getReg("foo"); ok {
t.Fatalf("expected foo to be unregistered")
}
}
TestWarpGetStoreError
Parameters
func TestWarpGetStoreError(t *testing.T)
{
ctx := context.Background()
expected := errors.New("boom")
w := New[string](cache.NewInMemory[merge.Value[string]](), errStore[string]{err: expected}, nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
if _, err := w.Get(ctx, "foo"); !errors.Is(err, expected) {
t.Fatalf("expected %v, got %v", expected, err)
}
}
TestWarpRegisterDuplicate
Parameters
func TestWarpRegisterDuplicate(t *testing.T)
{
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())
if !w.Register("foo", ModeStrongLocal, time.Minute) {
t.Fatalf("expected first registration to succeed")
}
if w.Register("foo", ModeEventualDistributed, 2*time.Minute) {
t.Fatalf("expected duplicate registration to fail")
}
reg, _ := w.getReg("foo")
if reg.mode != ModeStrongLocal || reg.ttl != time.Minute {
t.Fatalf("registration should not be overwritten")
}
}
TestWarpSetPublishError
Parameters
func TestWarpSetPublishError(t *testing.T)
{
ctx := context.Background()
expected := errors.New("boom")
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, errBus{err: expected}, merge.NewEngine[string]())
w.Register("foo", ModeEventualDistributed, time.Minute)
if err := w.Set(ctx, "foo", "bar"); err != nil {
t.Fatalf("unexpected set error: %v", err)
}
select {
case err := <-w.PublishErrors():
if !errors.Is(err, expected) {
t.Fatalf("expected %v, got %v", expected, err)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected publish error")
}
}
TestWarpInvalidatePublishError
Parameters
func TestWarpInvalidatePublishError(t *testing.T)
{
ctx := context.Background()
expected := errors.New("boom")
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, errBus{err: expected}, merge.NewEngine[string]())
w.Register("foo", ModeEventualDistributed, time.Minute)
if err := w.Invalidate(ctx, "foo"); err != nil {
t.Fatalf("unexpected invalidate error: %v", err)
}
select {
case err := <-w.PublishErrors():
if !errors.Is(err, expected) {
t.Fatalf("expected %v, got %v", expected, err)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected publish error")
}
}
TestWarpUnregisteredKey
Parameters
func TestWarpUnregisteredKey(t *testing.T)
{
ctx := context.Background()
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())
if _, err := w.Get(ctx, "foo"); !errors.Is(err, ErrUnregistered) {
t.Fatalf("expected ErrUnregistered, got %v", err)
}
if err := w.Set(ctx, "foo", "bar"); !errors.Is(err, ErrUnregistered) {
t.Fatalf("expected ErrUnregistered, got %v", err)
}
if err := w.Invalidate(ctx, "foo"); !errors.Is(err, ErrUnregistered) {
t.Fatalf("expected ErrUnregistered, got %v", err)
}
}
TestWarpValidatorAutoHealTTL
Parameters
func TestWarpValidatorAutoHealTTL(t *testing.T)
{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newTTLCache[merge.Value[string]]()
store := adapter.NewInMemoryStore[string]()
_ = store.Set(ctx, "k", "v1")
w := New[string](c, store, nil, merge.NewEngine[string]())
ttl := time.Minute
w.Register("k", ModeStrongLocal, ttl)
mv := merge.Value[string]{Data: "v0", Timestamp: time.Now()}
_ = c.Set(ctx, "k", mv, ttl)
orig := c.TTL("k")
v := w.Validator(validator.ModeAutoHeal, time.Millisecond)
go v.Run(ctx)
time.Sleep(5 * time.Millisecond)
healed, ok, err := c.Get(ctx, "k")
if err != nil || !ok || healed.Data != "v1" {
t.Fatalf("expected value healed to v1, got %v err %v", healed.Data, err)
}
if newTTL := c.TTL("k"); newTTL != orig {
t.Fatalf("expected TTL %v, got %v", orig, newTTL)
}
}
TestWarpRegisterDynamicTTL
Parameters
func TestWarpRegisterDynamicTTL(t *testing.T)
{
ctx := context.Background()
c := newTTLCache[merge.Value[string]]()
strat := &mockStrategy{ttl: 2 * time.Second}
w := New[string](c, nil, nil, merge.NewEngine[string]())
if !w.RegisterDynamicTTL("foo", ModeStrongLocal, strat) {
t.Fatalf("expected registration success")
}
if err := w.Set(ctx, "foo", "bar"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if ttl := c.TTL("foo"); ttl != 2*time.Second {
t.Fatalf("expected ttl 2s, got %v", ttl)
}
if _, err := w.Get(ctx, "foo"); err != nil {
t.Fatalf("unexpected get error: %v", err)
}
strat.mu.Lock()
defer strat.mu.Unlock()
if len(strat.records) != 2 {
t.Fatalf("expected 2 record calls, got %d", len(strat.records))
}
}
TestWarpTxnCASMismatch
Parameters
func TestWarpTxnCASMismatch(t *testing.T)
{
ctx := context.Background()
w := New[string](cache.NewInMemory[merge.Value[string]](), adapter.NewInMemoryStore[string](), nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
if err := w.Set(ctx, "foo", "a"); err != nil {
t.Fatalf("set: %v", err)
}
txn := w.Txn(ctx)
txn.CompareAndSwap("foo", "b", "c")
if err := txn.Commit(); !errors.Is(err, ErrCASMismatch) {
t.Fatalf("expected ErrCASMismatch got %v", err)
}
}
TestConcurrentRegisterGetSet
Parameters
func TestConcurrentRegisterGetSet(t *testing.T)
{
ctx := context.Background()
w := New[string](cache.NewInMemory[merge.Value[string]](), nil, syncbus.NewInMemoryBus(), merge.NewEngine[string]())
const n = 50
var wg sync.WaitGroup
for i := 0; i < n; i++ {
i := i
wg.Add(1)
go func() {
defer wg.Done()
key := fmt.Sprintf("k%d", i)
if !w.Register(key, ModeStrongLocal, time.Minute) {
t.Errorf("register failed for %s", key)
return
}
val := fmt.Sprintf("v%d", i)
if err := w.Set(ctx, key, val); err != nil {
t.Errorf("set failed for %s: %v", key, err)
return
}
if got, err := w.Get(ctx, key); err != nil || got != val {
t.Errorf("get failed for %s: %v %v", key, got, err)
}
}()
}
wg.Wait()
}
TestWarpSetAsyncPublish
Parameters
func TestWarpSetAsyncPublish(t *testing.T)
{
bus := &slowBus{delay: 100 * time.Millisecond, done: make(chan struct{}, 1), InMemoryBus: syncbus.NewInMemoryBus()}
c := cache.NewInMemory[merge.Value[string]]()
w := New[string](c, nil, bus, merge.NewEngine[string]())
w.Register("foo", ModeEventualDistributed, time.Minute)
start := time.Now()
if err := w.Set(context.Background(), "foo", "bar"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if d := time.Since(start); d >= 100*time.Millisecond {
t.Fatalf("set blocked on publish: %v", d)
}
select {
case <-bus.done:
case <-time.After(200 * time.Millisecond):
t.Fatalf("publish did not complete")
}
}
TestWarpInvalidateAsyncPublish
Parameters
func TestWarpInvalidateAsyncPublish(t *testing.T)
{
bus := &slowBus{delay: 100 * time.Millisecond, done: make(chan struct{}, 1), InMemoryBus: syncbus.NewInMemoryBus()}
c := cache.NewInMemory[merge.Value[string]]()
w := New[string](c, nil, bus, merge.NewEngine[string]())
w.Register("foo", ModeEventualDistributed, time.Minute)
start := time.Now()
if err := w.Invalidate(context.Background(), "foo"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if d := time.Since(start); d >= 100*time.Millisecond {
t.Fatalf("invalidate blocked on publish: %v", d)
}
select {
case <-bus.done:
case <-time.After(200 * time.Millisecond):
t.Fatalf("publish did not complete")
}
}
TestWarpGetSingleflight
Parameters
func TestWarpGetSingleflight(t *testing.T)
{
ctx := context.Background()
store := &slowStore[string]{
data: map[string]string{"foo": "bar"},
delay: 50 * time.Millisecond,
started: make(chan struct{}),
}
w := New[string](cache.NewInMemory[merge.Value[string]](), store, nil, merge.NewEngine[string]())
w.Register("foo", ModeStrongLocal, time.Minute)
const numCallers = 10
var wg sync.WaitGroup
wg.Add(numCallers)
for i := 0; i < numCallers; i++ {
go func() {
defer wg.Done()
v, err := w.Get(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if v != "bar" {
t.Errorf("expected value 'bar', got '%s'", v)
}
}()
}
wg.Wait()
if store.calls != 1 {
t.Fatalf("expected store.Get to be called once, got %d", store.calls)
}
}
TestGetOrSet_Hit
Parameters
func TestGetOrSet_Hit(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
w := New[string](c, nil, nil, nil)
w.Register("gos-key-1", ModeStrongLocal, 10*time.Minute)
// Pre-populate
w.Set(context.Background(), "gos-key-1", "cached-val")
loaderCalled := false
loader := func(ctx context.Context) (string, error) {
loaderCalled = true
return "loaded-val", nil
}
val, err := w.GetOrSet(context.Background(), "gos-key-1", loader)
if err != nil {
t.Fatalf("Expected success, got: %v", err)
}
if val != "cached-val" {
t.Errorf("Expected 'cached-val', got '%v'", val)
}
if loaderCalled {
t.Error("Loader should not be called on hit")
}
}
TestGetOrSet_Miss
Parameters
func TestGetOrSet_Miss(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
w := New[string](c, nil, nil, nil)
w.Register("gos-key-2", ModeStrongLocal, 10*time.Minute)
loader := func(ctx context.Context) (string, error) {
return "loaded-val", nil
}
val, err := w.GetOrSet(context.Background(), "gos-key-2", loader)
if err != nil {
t.Fatalf("Expected success, got: %v", err)
}
if val != "loaded-val" {
t.Errorf("Expected 'loaded-val', got '%v'", val)
}
// Verify it's now in cache
cached, err := w.Get(context.Background(), "gos-key-2")
if err != nil || cached != "loaded-val" {
t.Errorf("Value was not cached correctly. Got: %v, Err: %v", cached, err)
}
}
TestGetOrSet_Singleflight
Parameters
func TestGetOrSet_Singleflight(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
w := New[string](c, nil, nil, nil)
w.Register("gos-key-3", ModeStrongLocal, 10*time.Minute)
var calls atomic.Int32
ready := make(chan struct{})
block := make(chan struct{})
var once sync.Once
loader := func(ctx context.Context) (string, error) {
calls.Add(1)
once.Do(func() {
close(ready) // Signal we started
})
<-block // Wait to finish
return "concurrent-val", nil
}
var wg sync.WaitGroup
wg.Add(2)
// Routine 1
go func() {
defer wg.Done()
w.GetOrSet(context.Background(), "gos-key-3", loader)
}()
// Wait for Routine 1 to start loader
<-ready
// Routine 2 (Should join the flight)
go func() {
defer wg.Done()
w.GetOrSet(context.Background(), "gos-key-3", loader)
}()
// Let them finish
close(block)
wg.Wait()
if calls.Load() != 1 {
t.Errorf("Expected 1 loader call, got %d", calls.Load())
}
}
TestGetOrSet_FailSafe
Parameters
func TestGetOrSet_FailSafe(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
w := New[string](c, nil, nil, nil)
key := "gos-key-fs"
// Register with FailSafe
w.Register(key, ModeStrongLocal, 10*time.Millisecond, cache.WithFailSafe(1*time.Hour))
// Populate
w.Set(context.Background(), key, "stale-val")
// Wait for expiration
time.Sleep(20 * time.Millisecond)
// Loader fails
loader := func(ctx context.Context) (string, error) {
return "", errors.New("loader boom")
}
// Should return stale value
val, err := w.GetOrSet(context.Background(), key, loader)
if err != nil {
t.Fatalf("Expected success (FailSafe), got error: %v", err)
}
if val != "stale-val" {
t.Errorf("Expected 'stale-val', got '%v'", val)
}
}
TestGetOrSet_LoaderError
Parameters
func TestGetOrSet_LoaderError(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
w := New[string](c, nil, nil, nil)
w.Register("gos-key-err", ModeStrongLocal, 10*time.Minute)
boom := errors.New("boom")
loader := func(ctx context.Context) (string, error) {
return "", boom
}
_, err := w.GetOrSet(context.Background(), "gos-key-err", loader)
if !errors.Is(err, boom) {
t.Errorf("Expected 'boom' error, got: %v", err)
}
}
lease
type lease struct
Fields
| Name | Type | Description |
|---|---|---|
| keys | map[string]struct{} | |
| timer | *time.Timer | |
| ticker | *time.Ticker | |
| stop | chan struct{} | |
| sub | <-chan syncbus.Event |
LeaseManager
LeaseManager manages active leases and their renewal.
type LeaseManager struct
Fields
| Name | Type | Description |
|---|---|---|
| w | *Warp[T] | |
| bus | syncbus.Bus | |
| mu | sync.Mutex | |
| leases | map[string]*lease |
newLeaseManager
Parameters
Returns
func newLeaseManager[T any](w *Warp[T], bus syncbus.Bus) *LeaseManager[T]
{
return &LeaseManager[T]{w: w, bus: bus, leases: make(map[string]*lease)}
}
sequentialWarmup
Parameters
func sequentialWarmup(w *Warp[string], ctx context.Context)
{
if w.store == nil {
return
}
keys := make([]string, 0)
for i := 0; i < regShardCount; i++ {
shard := &w.shards[i]
shard.RLock()
for k := range shard.regs {
keys = append(keys, k)
}
shard.RUnlock()
}
for _, k := range keys {
select {
case <-ctx.Done():
return
default:
}
v, ok, err := w.store.Get(ctx, k)
if err != nil || !ok {
continue
}
shard := w.shard(k)
shard.RLock()
reg := shard.regs[k]
shard.RUnlock()
now := time.Now()
mv := merge.Value[string]{Data: v, Timestamp: now}
ttl := reg.ttl
if reg.ttlStrategy != nil {
ttl = reg.ttlStrategy.TTL(k)
}
reg.currentTTL = ttl
reg.lastAccess = now
_ = w.cache.Set(ctx, k, mv, ttl)
}
}
BenchmarkWarmup
Parameters
func BenchmarkWarmup(b *testing.B)
{
const numKeys = 500
ctx := context.Background()
store := adapter.NewInMemoryStore[string]()
keys := make([]string, numKeys)
for i := 0; i < numKeys; i++ {
k := fmt.Sprintf("key-%d", i)
keys[i] = k
_ = store.Set(ctx, k, fmt.Sprintf("val-%d", i))
}
b.Run("sequential", func(b *testing.B) {
for i := 0; i < b.N; i++ {
cache := cache.NewInMemory[merge.Value[string]]()
bus := syncbus.NewInMemoryBus()
w := New[string](cache, store, bus, nil)
for _, k := range keys {
w.Register(k, ModeStrongLocal, time.Minute)
}
b.StartTimer()
sequentialWarmup(w, ctx)
b.StopTimer()
}
})
b.Run("parallel", func(b *testing.B) {
for i := 0; i < b.N; i++ {
cache := cache.NewInMemory[merge.Value[string]]()
bus := syncbus.NewInMemoryBus()
w := New[string](cache, store, bus, nil)
for _, k := range keys {
w.Register(k, ModeStrongLocal, time.Minute)
}
b.StartTimer()
w.Warmup(ctx)
b.StopTimer()
}
})
}
Mode
Mode represents the consistency mode for a key. See docs/core.md for the full mode table.
type Mode int
registration
type registration struct
Methods
Returns
func (*registration) quorumSize() int
{
r.mu.Lock()
defer r.mu.Unlock()
if r.quorum < 1 {
return 1
}
return r.quorum
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.Mutex | |
| ttl | time.Duration | |
| ttlStrategy | cache.TTLStrategy | |
| ttlOpts | cache.TTLOptions | |
| mode | Mode | |
| currentTTL | time.Duration | |
| lastAccess | time.Time | |
| quorum | int |
Uses
regShard
type regShard struct
Fields
| Name | Type | Description |
|---|---|---|
| regs | map[string]*registration |
Warp
Warp orchestrates the interaction between cache, merge engine and sync bus.
type Warp struct
Fields
| Name | Type | Description |
|---|---|---|
| cache | cache.Cache[merge.Value[T]] | |
| store | adapter.Store[T] | |
| bus | syncbus.Bus | |
| merges | *merge.Engine[T] | |
| leases | *LeaseManager[T] | |
| publishErrCh | chan error | |
| shards | [regShardCount]regShard | |
| group | singleflight.Group | |
| hitCounter | prometheus.Counter | |
| missCounter | prometheus.Counter | |
| evictionCounter | prometheus.Counter | |
| latencyHist | prometheus.Histogram | |
| traceEnabled | bool | |
| resilient | bool | |
| publishTimeout | time.Duration |
Txn
Txn represents a batch of operations to be applied atomically.
type Txn struct
Fields
| Name | Type | Description |
|---|---|---|
| w | *Warp[T] | |
| ctx | context.Context | |
| sets | map[string]T | |
| deletes | map[string]struct{} | |
| cas | map[string]T |
versionedCache
versionedCache extends Cache with the ability to retrieve values at a specific time.
type versionedCache interface
Methods
Option
Option configures a Warp instance.
type Option func(*Warp[T])
WithMetrics
WithMetrics enables Prometheus metrics collection for core operations.
Parameters
Returns
func WithMetrics[T any](reg prometheus.Registerer) Option[T]
{
return func(w *Warp[T]) {
w.hitCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "warp_core_hits_total",
Help: "Total number of cache hits",
})
w.missCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "warp_core_misses_total",
Help: "Total number of cache misses",
})
w.evictionCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "warp_core_evictions_total",
Help: "Total number of evictions",
})
w.latencyHist = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "warp_core_latency_seconds",
Help: "Latency of core operations",
Buckets: prometheus.DefBuckets,
})
reg.MustRegister(w.hitCounter, w.missCounter, w.evictionCounter, w.latencyHist)
}
}
WithCacheResiliency
WithCacheResiliency enables the L2 Fail-Safe pattern.
If the cache (L1 or L2) returns an error (e.g. connection down),
Warp will suppress the error and proceed as if it were a cache miss (for Get)
or a successful operation (for Set/Invalidate), ensuring application stability.
Returns
func WithCacheResiliency[T any]() Option[T]
{
return func(w *Warp[T]) {
w.resilient = true
}
}
WithPublishTimeout
WithPublishTimeout sets a timeout for background syncbus publish operations
in ModeEventualDistributed. This prevents goroutine leaks if the bus is slow or down.
Parameters
Returns
func WithPublishTimeout[T any](d time.Duration) Option[T]
{
return func(w *Warp[T]) {
w.publishTimeout = d
}
}
hashString
Parameters
Returns
func hashString(key string) uint32
{
hash := uint32(offset32)
for i := 0; i < len(key); i++ {
hash ^= uint32(key[i])
hash *= prime32
}
return hash
}
New
New creates a new Warp instance.
Parameters
Returns
func New[T any](c cache.Cache[merge.Value[T]], s adapter.Store[T], bus syncbus.Bus, m *merge.Engine[T], opts ...Option[T]) *Warp[T]
{
if m == nil {
m = merge.NewEngine[T]()
}
w := &Warp[T]{
cache: c,
store: s,
bus: bus,
merges: m,
publishErrCh: make(chan error, 1),
}
for i := 0; i < regShardCount; i++ {
w.shards[i].regs = make(map[string]*registration)
}
w.leases = newLeaseManager[T](w, bus)
for _, opt := range opts {
opt(w)
}
if w.resilient {
w.cache = cache.NewResilient(w.cache)
}
return w
}
validatorCache
validatorCache adapts the Warp cache to the Validator interface by operating on raw values.
type validatorCache struct
Fields
| Name | Type | Description |
|---|---|---|
| w | *Warp[T] |
WithTracing
WithTracing enables OpenTelemetry tracing for core operations.
Returns
func WithTracing[T any]() Option[T]
{
return func(w *Warp[T]) {
w.traceEnabled = true
}
}
MockStoreER
MockStoreER implements adapter.Store and helps track eager refreshes
type MockStoreER struct
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.Mutex | |
| data | map[string]T | |
| delay | time.Duration | |
| refreshCall | int |
TestEagerRefresh_TriggersBackgroundUpdate
Parameters
func TestEagerRefresh_TriggersBackgroundUpdate(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
// Store takes 100ms to respond
store := &MockStoreER[string]{
data: make(map[string]string),
delay: 100 * time.Millisecond,
}
w := New[string](c, store, nil, nil)
key := "er-key-1"
val1 := "value-1"
val2 := "value-2"
store.data[key] = val1
// Register with:
// TTL: 200ms
// EagerRefreshThreshold: 0.5 (Refresh when 50% of TTL (100ms) is remaining)
w.Register(key, ModeStrongLocal, 200*time.Millisecond,
cache.WithEagerRefresh(0.5),
)
// 1. Initial Populate (Will be slow, 100ms, as it's a miss)
start := time.Now()
got, err := w.Get(context.Background(), key)
elapsed := time.Since(start)
if err != nil {
t.Fatalf("Initial Get failed: %v", err)
}
if got != val1 {
t.Errorf("Initial Get = %v, want %v", got, val1)
}
if elapsed < 100*time.Millisecond {
t.Errorf("Initial Get was too fast (%v), expected > 100ms", elapsed)
}
if store.refreshCall != 1 {
t.Errorf("Expected 1 store call, got %d", store.refreshCall)
}
// 2. Wait until just before Eager Refresh threshold (e.g., 90ms elapsed)
// TTL = 200ms. 50% is 100ms remaining. So trigger point is after 100ms.
// Current timestamp for item is now + 100ms from start of test roughly.
// So, we want current time to be (creationTime + 100ms).
// Let's sleep 90ms from the last Get finish (which took 100ms).
time.Sleep(90 * time.Millisecond) // Total elapsed: 100ms (fetch) + 90ms (sleep) = 190ms.
// Remaining: 200ms - 190ms = 10ms. 10/200 = 0.05. Below 0.5 threshold.
// Change backend data to verify refresh
store.data[key] = val2
store.mu.Lock()
store.refreshCall = 0 // Reset counter for next phase
store.mu.Unlock()
// 3. Get Again (Should trigger Eager Refresh)
// This call should return value-1 immediately (from cache)
start = time.Now()
got, err = w.Get(context.Background(), key)
elapsed = time.Since(start)
if err != nil {
t.Fatalf("Second Get failed: %v", err)
}
if got != val1 {
t.Errorf("Second Get = %v, want %v (stale during refresh)", got, val1)
}
if elapsed > 10*time.Millisecond { // Should be fast, from cache
t.Errorf("Second Get took too long (%v), expected fast", elapsed)
}
// 4. Wait for background refresh to complete
time.Sleep(150 * time.Millisecond) // Enough for store.delay (100ms) + some buffer
// 5. Get one more time (Should get fresh data from eager refresh)
got, err = w.Get(context.Background(), key)
if err != nil {
t.Fatalf("Third Get failed: %v", err)
}
if got != val2 {
t.Errorf("Third Get = %v, want %v (fresh after eager refresh)", got, val2)
}
if store.refreshCall != 1 {
t.Errorf("Expected 1 store call for eager refresh, got %d", store.refreshCall)
}
}
TestEagerRefresh_DoesNotTriggerIfTooFresh
Parameters
func TestEagerRefresh_DoesNotTriggerIfTooFresh(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
store := &MockStoreER[string]{
data: make(map[string]string),
delay: 10 * time.Millisecond,
}
w := New[string](c, store, nil, nil)
key := "er-key-2"
store.data[key] = "val-fresh"
w.Register(key, ModeStrongLocal, 100*time.Millisecond,
cache.WithEagerRefresh(0.2), // Refresh when 20% TTL (20ms) is remaining
)
// 1. Initial Populate
w.Get(context.Background(), key)
store.mu.Lock()
if store.refreshCall != 1 {
t.Errorf("Expected 1 store call, got %d", store.refreshCall)
}
store.refreshCall = 0
store.mu.Unlock()
// 2. Get again immediately (should not trigger eager refresh)
w.Get(context.Background(), key)
store.mu.Lock()
if store.refreshCall != 0 {
t.Errorf("Expected 0 store calls for eager refresh (too fresh), got %d", store.refreshCall)
}
store.mu.Unlock()
// 3. Wait until after eager refresh threshold but before TTL
// Total TTL 100ms. Eager refresh at 80ms elapsed.
time.Sleep(85 * time.Millisecond) // Item should be 85ms old, remaining 15ms. 15/100 = 0.15, less than 0.2 threshold.
// 4. Get again (should trigger eager refresh)
w.Get(context.Background(), key)
time.Sleep(20 * time.Millisecond) // Give background goroutine time to complete
store.mu.Lock()
if store.refreshCall != 1 {
t.Errorf("Expected 1 store call for eager refresh (now within window), got %d", store.refreshCall)
}
store.mu.Unlock()
}
TestEagerRefresh_DoesNotTriggerIfStoreNil
Parameters
func TestEagerRefresh_DoesNotTriggerIfStoreNil(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
// No store provided
w := New[string](c, nil, nil, nil)
key := "er-key-3"
val := "val-no-store"
// Manually set into cache as there is no store to fetch from
now := time.Now()
mv := merge.Value[string]{Data: val, Timestamp: now}
_ = c.Set(context.Background(), key, mv, 100*time.Millisecond)
w.Register(key, ModeStrongLocal, 100*time.Millisecond,
cache.WithEagerRefresh(0.1), // Refresh when 10% TTL (10ms) is remaining
)
// Wait for eager refresh window
time.Sleep(95 * time.Millisecond)
// Get (should not panic or error, should not try to refresh from nil store)
got, err := w.Get(context.Background(), key)
if err != nil {
t.Fatalf("Get with nil store failed: %v", err)
}
if got != val {
t.Errorf("Expected %v, got %v", val, got)
}
}
MockStoreFS
MockStoreFS implements adapter.Store for testing FailSafe
type MockStoreFS struct
Fields
| Name | Type | Description |
|---|---|---|
| data | map[string]T | |
| fail | bool | |
| failErr | error |
TestFailSafe_ReturnsStaleOnError
Parameters
func TestFailSafe_ReturnsStaleOnError(t *testing.T)
{
// Setup
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
store := &MockStoreFS[string]{
data: make(map[string]string),
failErr: errors.New("db boom"),
}
w := New[string](c, store, nil, nil)
key := "fs-key-1"
val := "initial-value"
store.data[key] = val
// Register with short TTL (50ms) and long Grace Period (1h)
ttl := 50 * time.Millisecond
w.Register(key, ModeStrongLocal, ttl, cache.WithFailSafe(1*time.Hour))
// 1. Initial Get (Populate Cache)
got, err := w.Get(context.Background(), key)
if err != nil {
t.Fatalf("Initial Get failed: %v", err)
}
if got != val {
t.Errorf("Initial Get = %v, want %v", got, val)
}
// 2. Wait for TTL to expire (50ms)
time.Sleep(100 * time.Millisecond)
// 3. Make Store Fail
store.fail = true
// 4. Get again -> Should return Stale Data (Fail-Safe)
got, err = w.Get(context.Background(), key)
if err != nil {
t.Fatalf("Expected stale value but got error: %v", err)
}
if got != val {
t.Errorf("Expected stale value %v, got %v", val, got)
}
}
TestFailSafe_RecoversWhenStoreRecover
Parameters
func TestFailSafe_RecoversWhenStoreRecover(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
store := &MockStoreFS[string]{
data: make(map[string]string),
}
w := New[string](c, store, nil, nil)
key := "fs-key-2"
val1 := "value-1"
store.data[key] = val1
w.Register(key, ModeStrongLocal, 50*time.Millisecond, cache.WithFailSafe(1*time.Hour))
// 1. Populate
w.Get(context.Background(), key)
// 2. Expire
time.Sleep(100 * time.Millisecond)
// 3. Update Store + Fail
val2 := "value-2"
store.data[key] = val2
store.fail = true
store.failErr = errors.New("temp error")
// 4. Get -> Stale "value-1"
got, err := w.Get(context.Background(), key)
if err != nil || got != val1 {
t.Errorf("Expected stale 'value-1', got %v (err: %v)", got, err)
}
// 5. Recover Store
store.fail = false
// 6. Get -> Fresh "value-2"
got, err = w.Get(context.Background(), key)
if err != nil {
t.Fatalf("Post-recovery Get failed: %v", err)
}
if got != val2 {
t.Errorf("Expected fresh 'value-2' after recovery, got %v", got)
}
}
TestFailSafe_RespectsGracePeriod
Parameters
func TestFailSafe_RespectsGracePeriod(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
store := &MockStoreFS[string]{
data: make(map[string]string),
failErr: errors.New("db boom"),
}
w := New[string](c, store, nil, nil)
key := "fs-key-3"
store.data[key] = "val"
// Register with short TTL (20ms) and SHORT Grace Period (50ms)
// Total life = 70ms
w.Register(key, ModeStrongLocal, 20*time.Millisecond, cache.WithFailSafe(50*time.Millisecond))
// 1. Populate
w.Get(context.Background(), key)
// 2. Wait for TTL + Grace Period to expire (Wait 150ms > 70ms)
time.Sleep(150 * time.Millisecond)
// 3. Make Store Fail
store.fail = true
// 4. Get -> Should Fail (Item evicted)
_, err := w.Get(context.Background(), key)
if err == nil {
t.Fatal("Expected error because grace period expired, but got success")
}
}
TestFailSafe_ReturnsErrorIfNotFound
Parameters
func TestFailSafe_ReturnsErrorIfNotFound(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
store := &MockStoreFS[string]{
data: make(map[string]string),
}
w := New[string](c, store, nil, nil)
key := "fs-key-4"
store.data[key] = "val"
w.Register(key, ModeStrongLocal, 50*time.Millisecond, cache.WithFailSafe(1*time.Hour))
w.Get(context.Background(), key)
time.Sleep(100 * time.Millisecond)
// Simulate deletion from DB (Get returns !ok or ErrNotFound)
delete(store.data, key)
// Get -> Should return ErrNotFound (or zero/false), NOT stale data.
_, err := w.Get(context.Background(), key)
if !errors.Is(err, ErrNotFound) {
t.Errorf("Expected ErrNotFound when DB deletes item, got: %v", err)
}
}
MockCacheFailing
MockCacheFailing implements cache.Cache and always fails
type MockCacheFailing struct
Fields
| Name | Type | Description |
|---|---|---|
| err | error |
TestResiliency_GetSupressesError
Parameters
func TestResiliency_GetSupressesError(t *testing.T)
{
// Setup failing cache
failingCache := &MockCacheFailing[merge.Value[string]]{
err: errors.New("redis down"),
}
store := &MockStoreFS[string]{
data: map[string]string{
"res-key": "db-value",
},
}
// Create Warp with Resiliency
w := New[string](failingCache, store, nil, nil, WithCacheResiliency[string]())
w.Register("res-key", ModeStrongLocal, 10*time.Minute)
// Get should NOT fail, but treat as miss and fetch from DB
val, err := w.Get(context.Background(), "res-key")
if err != nil {
t.Fatalf("Expected no error with resiliency, got: %v", err)
}
if val != "db-value" {
t.Errorf("Expected value from DB 'db-value', got '%v'", val)
}
}
TestResiliency_SetSupressesError
Parameters
func TestResiliency_SetSupressesError(t *testing.T)
{
failingCache := &MockCacheFailing[merge.Value[string]]{
err: errors.New("redis down"),
}
store := &MockStoreFS[string]{data: make(map[string]string)}
w := New[string](failingCache, store, nil, nil, WithCacheResiliency[string]())
w.Register("res-key-set", ModeStrongLocal, 10*time.Minute)
// Set should NOT fail
err := w.Set(context.Background(), "res-key-set", "value")
if err != nil {
t.Fatalf("Expected no error on Set with resiliency, got: %v", err)
}
// Verify it reached the store (Set ensures DB consistency even if cache fails)
if store.data["res-key-set"] != "value" {
t.Errorf("Expected value to be written to store despite cache failure")
}
}
TestResiliency_InvalidateSupressesError
Parameters
func TestResiliency_InvalidateSupressesError(t *testing.T)
{
failingCache := &MockCacheFailing[merge.Value[string]]{
err: errors.New("redis down"),
}
store := &MockStoreFS[string]{data: make(map[string]string)}
w := New[string](failingCache, store, nil, nil, WithCacheResiliency[string]())
w.Register("res-key-inv", ModeStrongLocal, 10*time.Minute)
// Invalidate should NOT fail
err := w.Invalidate(context.Background(), "res-key-inv")
if err != nil {
t.Fatalf("Expected no error on Invalidate with resiliency, got: %v", err)
}
}
TestResiliency_DisabledByDefault
Parameters
func TestResiliency_DisabledByDefault(t *testing.T)
{
failingCache := &MockCacheFailing[merge.Value[string]]{
err: errors.New("redis down"),
}
store := &MockStoreFS[string]{data: make(map[string]string)}
// Default Warp (No Resiliency)
w := New[string](failingCache, store, nil, nil)
w.Register("res-key-def", ModeStrongLocal, 10*time.Minute)
// Get SHOULD fail
_, err := w.Get(context.Background(), "res-key-def")
if err == nil {
t.Fatal("Expected error when resiliency is disabled, got nil")
}
}
WatchPrefix
WatchPrefix wraps watchbus.SubscribePrefix to expose prefix watching from core.
Parameters
Returns
func WatchPrefix(ctx context.Context, bus watchbus.WatchBus, prefix string) (chan []byte, error)
{
ch, err := bus.SubscribePrefix(ctx, prefix)
if err != nil {
return nil, err
}
metrics.WatcherGauge.Inc()
go func() {
<-ctx.Done()
metrics.WatcherGauge.Dec()
}()
return ch, nil
}
MockBusBlocked
MockBusBlocked implements syncbus.Bus and blocks on Publish until signaled
type MockBusBlocked struct
Methods
Parameters
Returns
func (*MockBusBlocked) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
select {
case <-m.blockCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Parameters
Returns
func (*MockBusBlocked) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
return nil, nil
}
Parameters
Returns
func (*MockBusBlocked) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
return nil
}
Parameters
Returns
func (*MockBusBlocked) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
return nil
}
Parameters
Returns
func (*MockBusBlocked) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
return nil
}
Parameters
Returns
func (*MockBusBlocked) RevokeLease(ctx context.Context, id string) error
{
return nil
}
Parameters
Returns
func (*MockBusBlocked) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return nil, nil
}
Parameters
Returns
func (*MockBusBlocked) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| blockCh | chan struct{} |
TestBackplane_TimeoutPreventsLeak
Parameters
func TestBackplane_TimeoutPreventsLeak(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
// Create a bus that blocks forever
bus := &MockBusBlocked{
blockCh: make(chan struct{}), // Never closed in this test path
}
w := New[string](c, nil, bus, nil, WithPublishTimeout[string](50*time.Millisecond))
w.Register("bp-key-timeout", ModeEventualDistributed, 10*time.Minute)
// Set triggers background publish
err := w.Set(context.Background(), "bp-key-timeout", "val")
if err != nil {
t.Fatalf("Expected success on Set, got: %v", err)
}
// Wait for timeout error on channel
select {
case err := <-w.PublishErrors():
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("Expected DeadlineExceeded, got: %v", err)
}
case <-time.After(500 * time.Millisecond):
t.Error("Timed out waiting for publish error")
}
}
TestBackplane_NoTimeoutByDefault
Parameters
func TestBackplane_NoTimeoutByDefault(t *testing.T)
{
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
bus := &MockBusBlocked{
blockCh: make(chan struct{}),
}
// No timeout configured
w := New[string](c, nil, bus, nil)
w.Register("bp-key-default", ModeEventualDistributed, 10*time.Minute)
w.Set(context.Background(), "bp-key-default", "val")
select {
case err := <-w.PublishErrors():
t.Errorf("Unexpected error received (should be blocked): %v", err)
case <-time.After(100 * time.Millisecond):
// Success: it's still blocked/running, logic is correct for no timeout
}
}
fakeQuorumBus
type fakeQuorumBus struct
Methods
Parameters
Returns
func (*fakeQuorumBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
return nil
}
Parameters
Returns
func (*fakeQuorumBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
if replicas <= 0 {
replicas = 1
}
b.mu.Lock()
var nextErr error
if len(b.errSeq) > 0 {
nextErr = b.errSeq[0]
b.errSeq = b.errSeq[1:]
} else {
nextErr = b.err
}
b.mu.Unlock()
if nextErr != nil {
return nextErr
}
for i := 0; i < replicas; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-b.ackCh:
}
}
return nil
}
Parameters
Returns
func (*fakeQuorumBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
if minZones <= 0 {
minZones = 1
}
b.mu.Lock()
var nextErr error
if len(b.errSeq) > 0 {
nextErr = b.errSeq[0]
b.errSeq = b.errSeq[1:]
} else {
nextErr = b.err
}
b.mu.Unlock()
if nextErr != nil {
return nextErr
}
// fakeQuorumBus uses ackCh to simulate acks, we can reuse it for topology acks
// assuming 1 ack per zone for simplicity in this fake
for i := 0; i < minZones; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-b.ackCh:
}
}
return nil
}
Parameters
Returns
func (*fakeQuorumBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
return nil, nil
}
Parameters
Returns
func (*fakeQuorumBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
return nil
}
Parameters
Returns
func (*fakeQuorumBus) RevokeLease(ctx context.Context, id string) error
{ return nil }
Parameters
Returns
func (*fakeQuorumBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return nil, nil
}
Parameters
Returns
func (*fakeQuorumBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| ackCh | chan struct{} | |
| err | error | |
| mu | sync.Mutex | |
| errSeq | []error |
newFakeQuorumBus
Returns
func newFakeQuorumBus() *fakeQuorumBus
{
return &fakeQuorumBus{ackCh: make(chan struct{})}
}
TestDistributedInvalidation
Parameters
func TestDistributedInvalidation(t *testing.T)
{
ctx := context.Background()
mr, err := miniredis.Run()
if err != nil {
t.Fatalf("miniredis run: %v", err)
}
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client})
store := adapter.NewInMemoryStore[int]()
reg1 := prometheus.NewRegistry()
reg2 := prometheus.NewRegistry()
w1 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg1))
w2 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg2))
w1.Register("counter", ModeStrongDistributed, time.Minute)
w2.Register("counter", ModeStrongDistributed, time.Minute)
mergeFn := func(old, new int) (int, error) { return old + new, nil }
w1.Merge("counter", mergeFn)
w2.Merge("counter", mergeFn)
ch, err := bus.Subscribe(ctx, "counter")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
t.Cleanup(func() {
_ = bus.Unsubscribe(context.Background(), "counter", ch)
_ = client.Close()
mr.Close()
})
go func() {
for range ch {
_ = w2.cache.Invalidate(ctx, "counter")
}
}()
if err := w1.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set1: %v", err)
}
if err := w1.Set(ctx, "counter", 2); err != nil {
t.Fatalf("set2: %v", err)
}
time.Sleep(50 * time.Millisecond)
v, err := w2.Get(ctx, "counter")
if err != nil {
t.Fatalf("get: %v", err)
}
if v != 3 {
t.Fatalf("expected 3 got %d", v)
}
before := bus.Metrics()
start := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
<-start
_ = w1.Invalidate(ctx, "counter")
}()
go func() {
defer wg.Done()
<-start
_ = w2.Invalidate(ctx, "counter")
}()
close(start)
wg.Wait()
time.Sleep(50 * time.Millisecond)
after := bus.Metrics()
// Deduplication is best-effort. If concurrent invalidations overlap, we expect 1 publish.
// If they happen sequentially (due to scheduler/networking), we expect 2. Both are valid.
if diff := after.Published - before.Published; diff < 1 || diff > 2 {
t.Fatalf("expected 1 or 2 publishes, got %d", diff)
}
ev := testutil.ToFloat64(w1.evictionCounter) + testutil.ToFloat64(w2.evictionCounter)
if ev != 2 {
t.Fatalf("expected 2 evictions got %v", ev)
}
}
TestWarpDistributedNATS
Parameters
func TestWarpDistributedNATS(t *testing.T)
{
ctx := context.Background()
s := natsserver.RunRandClientPortServer()
conn, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
bus := busnats.NewNATSBus(conn)
store := adapter.NewInMemoryStore[int]()
reg1 := prometheus.NewRegistry()
reg2 := prometheus.NewRegistry()
w1 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg1))
w2 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg2))
w1.Register("counter", ModeStrongDistributed, time.Minute)
w2.Register("counter", ModeStrongDistributed, time.Minute)
mergeFn := func(old, new int) (int, error) { return old + new, nil }
w1.Merge("counter", mergeFn)
w2.Merge("counter", mergeFn)
ch, err := bus.Subscribe(ctx, "counter")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
t.Cleanup(func() {
_ = bus.Unsubscribe(context.Background(), "counter", ch)
conn.Close()
s.Shutdown()
})
go func() {
for range ch {
_ = w2.cache.Invalidate(ctx, "counter")
}
}()
if err := w1.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set1: %v", err)
}
if err := w1.Set(ctx, "counter", 2); err != nil {
t.Fatalf("set2: %v", err)
}
time.Sleep(50 * time.Millisecond)
v, err := w2.Get(ctx, "counter")
if err != nil {
t.Fatalf("get: %v", err)
}
if v != 3 {
t.Fatalf("expected 3 got %d", v)
}
before := bus.Metrics()
start := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
<-start
_ = w1.Invalidate(ctx, "counter")
}()
go func() {
defer wg.Done()
<-start
_ = w2.Invalidate(ctx, "counter")
}()
close(start)
wg.Wait()
time.Sleep(50 * time.Millisecond)
after := bus.Metrics()
if after.Published-before.Published != 1 {
t.Fatalf("expected 1 publish, got %d", after.Published-before.Published)
}
ev := testutil.ToFloat64(w1.evictionCounter) + testutil.ToFloat64(w2.evictionCounter)
if ev != 2 {
t.Fatalf("expected 2 evictions got %v", ev)
}
}
TestWarpStrongDistributedWaitsForQuorum
Parameters
func TestWarpStrongDistributedWaitsForQuorum(t *testing.T)
{
ctx := context.Background()
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
if !w.SetQuorum("counter", 2) {
t.Fatalf("expected quorum configuration to succeed")
}
done := make(chan error, 1)
go func() {
done <- w.Set(ctx, "counter", 1)
}()
select {
case err := <-done:
t.Fatalf("set returned before quorum: %v", err)
case <-time.After(20 * time.Millisecond):
}
bus.ackCh <- struct{}{}
select {
case err := <-done:
t.Fatalf("set returned after single ack: %v", err)
case <-time.After(20 * time.Millisecond):
}
bus.ackCh <- struct{}{}
select {
case err := <-done:
if err != nil {
t.Fatalf("set error: %v", err)
}
case <-time.After(time.Second):
t.Fatalf("set timed out waiting for quorum")
}
}
TestWarpStrongDistributedQuorumTimeout
Parameters
func TestWarpStrongDistributedQuorumTimeout(t *testing.T)
{
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
w.SetQuorum("counter", 2)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
if err := w.Set(ctx, "counter", 1); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected deadline exceeded, got %v", err)
}
}
TestWarpStrongDistributedQuorumError
Parameters
func TestWarpStrongDistributedQuorumError(t *testing.T)
{
bus := newFakeQuorumBus()
bus.err = syncbus.ErrQuorumNotSatisfied
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
w.SetQuorum("counter", 3)
if err := w.Set(context.Background(), "counter", 1); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
t.Fatalf("expected quorum error, got %v", err)
}
}
TestWarpSetRollbackOnQuorumFailure
Parameters
func TestWarpSetRollbackOnQuorumFailure(t *testing.T)
{
ctx := context.Background()
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
go func() {
bus.ackCh <- struct{}{}
}()
if err := w.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set baseline: %v", err)
}
bus.err = syncbus.ErrQuorumNotSatisfied
if err := w.Set(ctx, "counter", 2); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
t.Fatalf("expected quorum failure, got %v", err)
}
if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
t.Fatalf("store value changed after failure: ok=%v err=%v got=%d", ok, err, got)
}
if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
t.Fatalf("cache get: %v", err)
} else if !ok || mv.Data != 1 {
t.Fatalf("cache value changed after failure: ok=%v data=%v", ok, mv.Data)
}
}
TestWarpInvalidateRollbackOnQuorumFailure
Parameters
func TestWarpInvalidateRollbackOnQuorumFailure(t *testing.T)
{
ctx := context.Background()
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
go func() {
bus.ackCh <- struct{}{}
}()
if err := w.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set baseline: %v", err)
}
bus.err = syncbus.ErrQuorumNotSatisfied
if err := w.Invalidate(ctx, "counter"); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
t.Fatalf("expected quorum failure, got %v", err)
}
if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
t.Fatalf("cache get: %v", err)
} else if !ok || mv.Data != 1 {
t.Fatalf("cache value removed after failure: ok=%v data=%v", ok, mv.Data)
}
if got, err := w.Get(ctx, "counter"); err != nil || got != 1 {
t.Fatalf("warp get mismatch after failure: val=%d err=%v", got, err)
}
}
TestWarpTxnRollbackOnQuorumFailure
Parameters
func TestWarpTxnRollbackOnQuorumFailure(t *testing.T)
{
ctx := context.Background()
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
go func() {
bus.ackCh <- struct{}{}
}()
if err := w.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set baseline: %v", err)
}
txn := w.Txn(ctx)
txn.Set("counter", 2)
bus.err = syncbus.ErrQuorumNotSatisfied
if err := txn.Commit(); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
t.Fatalf("expected quorum failure, got %v", err)
}
if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
t.Fatalf("cache get: %v", err)
} else if !ok || mv.Data != 1 {
t.Fatalf("cache value changed after commit failure: ok=%v data=%v", ok, mv.Data)
}
if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
t.Fatalf("store value changed after commit failure: ok=%v err=%v val=%d", ok, err, got)
}
}
TestWarpTxnDeleteRollbackOnQuorumFailure
Parameters
func TestWarpTxnDeleteRollbackOnQuorumFailure(t *testing.T)
{
ctx := context.Background()
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
go func() {
bus.ackCh <- struct{}{}
}()
if err := w.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set baseline: %v", err)
}
txn := w.Txn(ctx)
txn.Delete("counter")
bus.err = syncbus.ErrQuorumNotSatisfied
if err := txn.Commit(); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
t.Fatalf("expected quorum failure, got %v", err)
}
if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
t.Fatalf("cache get: %v", err)
} else if !ok || mv.Data != 1 {
t.Fatalf("cache value removed after delete failure: ok=%v data=%v", ok, mv.Data)
}
if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
t.Fatalf("store value removed after delete failure: ok=%v err=%v val=%d", ok, err, got)
}
}
TestWarpTxnRollbackMultiKeyOnQuorumFailure
Parameters
func TestWarpTxnRollbackMultiKeyOnQuorumFailure(t *testing.T)
{
ctx := context.Background()
bus := newFakeQuorumBus()
store := adapter.NewInMemoryStore[int]()
w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
w.Register("counter", ModeStrongDistributed, time.Minute)
w.Register("mirror", ModeStrongDistributed, time.Minute)
go func() {
bus.ackCh <- struct{}{}
}()
if err := w.Set(ctx, "counter", 1); err != nil {
t.Fatalf("set counter baseline: %v", err)
}
go func() {
bus.ackCh <- struct{}{}
}()
if err := w.Set(ctx, "mirror", 1); err != nil {
t.Fatalf("set mirror baseline: %v", err)
}
txn := w.Txn(ctx)
txn.Set("counter", 2)
txn.Set("mirror", 5)
bus.mu.Lock()
bus.errSeq = []error{nil, syncbus.ErrQuorumNotSatisfied}
bus.mu.Unlock()
go func() {
bus.ackCh <- struct{}{}
}()
if err := txn.Commit(); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
t.Fatalf("expected quorum failure, got %v", err)
}
if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
t.Fatalf("cache get counter: %v", err)
} else if !ok || mv.Data != 1 {
t.Fatalf("counter cache changed after failure: ok=%v val=%v", ok, mv.Data)
}
if mv, ok, err := w.cache.Get(ctx, "mirror"); err != nil {
t.Fatalf("cache get mirror: %v", err)
} else if !ok || mv.Data != 1 {
t.Fatalf("mirror cache changed after failure: ok=%v val=%v", ok, mv.Data)
}
if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
t.Fatalf("store counter changed after failure: ok=%v err=%v val=%d", ok, err, got)
}
if got, ok, err := store.Get(ctx, "mirror"); err != nil || !ok || got != 1 {
t.Fatalf("store mirror changed after failure: ok=%v err=%v val=%d", ok, err, got)
}
}
MockStoreSlow
MockStoreSlow implements adapter.Store and simulates latency
type MockStoreSlow struct
Fields
| Name | Type | Description |
|---|---|---|
| data | map[string]T | |
| delay | time.Duration |
TestSoftTimeout
Parameters
func TestSoftTimeout(t *testing.T)
{
// Setup
c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
// Store takes 200ms to respond
store := &MockStoreSlow[string]{
data: make(map[string]string),
delay: 200 * time.Millisecond,
}
w := New[string](c, store, nil, nil)
key := "slow-key"
val := "fast-value"
store.data[key] = val
// Register with:
// TTL: 1s (Longer TTL for refresh verification)
// FailSafe: 1h (So we have a stale backup)
// SoftTimeout: 50ms (Much shorter than store delay)
w.Register(key, ModeStrongLocal, 1*time.Second,
cache.WithFailSafe(1*time.Hour),
cache.WithSoftTimeout(50*time.Millisecond),
)
// 1. Initial Populate (Will be slow, 200ms, because no stale data yet)
start := time.Now()
got, err := w.Get(context.Background(), key)
elapsed := time.Since(start)
if err != nil {
t.Fatalf("Initial Get failed: %v", err)
}
if got != val {
t.Errorf("Initial Get = %v, want %v", got, val)
}
if elapsed < 200*time.Millisecond {
t.Errorf("Initial Get was too fast (%v), expected > 200ms", elapsed)
}
// 2. Wait for TTL to expire (1s + buffer)
time.Sleep(1100 * time.Millisecond)
// 3. Get Again (Stale exists + Store is slow)
// Should hit SoftTimeout at 50ms and return stale data immediately
start = time.Now()
got, err = w.Get(context.Background(), key)
elapsed = time.Since(start)
if err != nil {
t.Fatalf("Soft Timeout Get failed: %v", err)
}
if got != val {
t.Errorf("Soft Timeout Get = %v, want %v", got, val)
}
// Check timing: Should be around 50ms (SoftTimeout), definitely NOT 200ms
if elapsed > 150*time.Millisecond {
t.Errorf("Soft Timeout Get took too long (%v), expected ~50ms", elapsed)
}
// 4. Wait for background refresh to finish (remaining 150ms of 200ms delay + buffer)
time.Sleep(250 * time.Millisecond)
// 5. Get Again - Should be a fresh Hit now (0ms latency from cache)
start = time.Now()
got, err = w.Get(context.Background(), key)
elapsed = time.Since(start)
if err != nil {
t.Fatalf("Third Get failed: %v", err)
}
if got != val {
t.Errorf("Third Get = %v, want %v", got, val)
}
if elapsed > 10*time.Millisecond {
t.Errorf("Third Get was slow (%v), background refresh likely failed", elapsed)
}
}