core API

core

package

API reference for the core package.

F
function

TestLeaseGrantAutoRenewAttachRevoke

Parameters

v1/core/lease_test.go:14-50
func TestLeaseGrantAutoRenewAttachRevoke(t *testing.T)

{
	ctx := context.Background()
	c := cache.NewInMemory[merge.Value[string]]()
	bus := syncbus.NewInMemoryBus()
	w := New[string](c, nil, bus, merge.NewEngine[string]())
	lm := w.leases

	ttl := 20 * time.Millisecond
	id, err := lm.Grant(ctx, ttl)
	if err != nil {
		t.Fatalf("grant: %v", err)
	}

	key := "foo"
	lm.Attach(id, key)
	_ = w.cache.Set(ctx, key, merge.Value[string]{Data: "bar", Timestamp: time.Now()}, time.Minute)

	time.Sleep(3 * ttl)
	lm.mu.Lock()
	_, ok := lm.leases[id]
	lm.mu.Unlock()
	if !ok {
		t.Fatalf("lease expired before renewal")
	}

	lm.Revoke(ctx, id)

	lm.mu.Lock()
	if _, ok := lm.leases[id]; ok {
		t.Fatalf("lease not removed after revoke")
	}
	lm.mu.Unlock()

	if _, ok, _ := w.cache.Get(ctx, key); ok {
		t.Fatalf("cache key not invalidated on revoke")
	}
}
F
function

TestLeaseGrantRejectsNonPositiveTTL

Parameters

v1/core/lease_test.go:52-66
func TestLeaseGrantRejectsNonPositiveTTL(t *testing.T)

{
	ctx := context.Background()
	c := cache.NewInMemory[merge.Value[string]]()
	w := New[string](c, nil, nil, merge.NewEngine[string]())

	if _, err := w.leases.Grant(ctx, 0); !errors.Is(err, ErrInvalidLeaseTTL) {
		t.Fatalf("expected ErrInvalidLeaseTTL, got %v", err)
	}

	w.leases.mu.Lock()
	if len(w.leases.leases) != 0 {
		t.Fatalf("expected no leases to be created for invalid ttl")
	}
	w.leases.mu.Unlock()
}
F
function

TestLeaseBusRevocation

Parameters

v1/core/lease_test.go:68-108
func TestLeaseBusRevocation(t *testing.T)

{
	ctx := context.Background()
	c := cache.NewInMemory[merge.Value[string]]()
	bus := syncbus.NewInMemoryBus()
	w := New[string](c, nil, bus, merge.NewEngine[string]())
	lm := w.leases

	id, err := lm.Grant(ctx, time.Minute)
	if err != nil {
		t.Fatalf("grant: %v", err)
	}
	lm.Attach(id, "a")
	_ = w.cache.Set(ctx, "a", merge.Value[string]{Data: "1", Timestamp: time.Now()}, time.Minute)

	ghostID := "ghost"
	lm.Attach(ghostID, "b")
	_ = w.cache.Set(ctx, "b", merge.Value[string]{Data: "2", Timestamp: time.Now()}, time.Minute)

	_ = bus.RevokeLease(ctx, id)
	_ = bus.RevokeLease(ctx, ghostID)

	time.Sleep(20 * time.Millisecond)

	lm.mu.Lock()
	if _, ok := lm.leases[id]; ok {
		lm.mu.Unlock()
		t.Fatalf("lease not revoked via bus")
	}
	if _, ok := lm.leases[ghostID]; ok {
		lm.mu.Unlock()
		t.Fatalf("placeholder lease not revoked via bus")
	}
	lm.mu.Unlock()

	if _, ok, _ := w.cache.Get(ctx, "a"); ok {
		t.Fatalf("key a not invalidated via bus revocation")
	}
	if _, ok, _ := w.cache.Get(ctx, "b"); ok {
		t.Fatalf("key b not invalidated via placeholder revocation")
	}
}
F
function

TestWatchPrefixMetricsAndEvents

Parameters

v1/core/watch_test.go:13-45
func TestWatchPrefixMetricsAndEvents(t *testing.T)

{
	bus := watchbus.NewInMemory()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	metrics.WatcherGauge.Set(0)

	ch, err := WatchPrefix(ctx, bus, "foo")
	if err != nil {
		t.Fatalf("watch prefix: %v", err)
	}
	if v := testutil.ToFloat64(metrics.WatcherGauge); v != 1 {
		t.Fatalf("expected gauge 1 got %v", v)
	}

	if err := bus.Publish(ctx, "foobar", []byte("hello")); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case msg := <-ch:
		if string(msg) != "hello" {
			t.Fatalf("unexpected message %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for event")
	}

	cancel()
	time.Sleep(20 * time.Millisecond)
	if v := testutil.ToFloat64(metrics.WatcherGauge); v != 0 {
		t.Fatalf("expected gauge 0 got %v", v)
	}
}
S
struct

errStore

v1/core/core_test.go:19-21
type errStore struct

Fields

Name Type Description
err error
S
struct

errBus

v1/core/core_test.go:32-32
type errBus struct

Methods

Publish
Method

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (errBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	return b.err
}

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (errBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	return b.err
}

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (errBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	return b.err
}
Subscribe
Method

Parameters

key string

Returns

<-chan syncbus.Event
error
func (errBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	return nil, b.err
}
Unsubscribe
Method

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (errBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	return b.err
}
RevokeLease
Method

Parameters

id string

Returns

error
func (errBus) RevokeLease(ctx context.Context, id string) error
{ return b.err }

Parameters

id string

Returns

<-chan syncbus.Event
error
func (errBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return nil, b.err
}

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (errBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return nil
}
IsHealthy
Method

Returns

bool
func (errBus) IsHealthy() bool
{
	return true
}
Peers
Method

Returns

[]string
func (errBus) Peers() []string
{
	return nil
}

Fields

Name Type Description
err error
S
struct

slowBus

v1/core/core_test.go:67-71
type slowBus struct

Methods

Publish
Method

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*slowBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	time.Sleep(b.delay)
	if b.done != nil {
		b.done <- struct{}{}
	}
	return b.InMemoryBus.Publish(ctx, key, opts...)
}

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*slowBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	time.Sleep(b.delay)
	return b.InMemoryBus.PublishAndAwait(ctx, key, replicas, opts...)
}

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*slowBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	time.Sleep(b.delay)
	return b.InMemoryBus.PublishAndAwaitTopology(ctx, key, minZones, opts...)
}
Subscribe
Method

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*slowBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	return nil, nil
}
Unsubscribe
Method

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*slowBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	return nil
}
RevokeLease
Method

Parameters

id string

Returns

error
func (*slowBus) RevokeLease(ctx context.Context, id string) error
{ return nil }

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*slowBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return nil, nil
}

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*slowBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return nil
}
Peers
Method

Returns

[]string
func (*slowBus) Peers() []string
{
	return nil
}

Fields

Name Type Description
delay time.Duration
done chan struct{}
S
struct

ttlCache

v1/core/core_test.go:109-113
type ttlCache struct

Fields

Name Type Description
mu sync.Mutex
items map[string]T
ttls map[string]time.Duration
F
function

newTTLCache

Returns

*ttlCache[T]
v1/core/core_test.go:115-120
func newTTLCache[T any]() *ttlCache[T]

{
	return &ttlCache[T]{
		items: make(map[string]T),
		ttls:  make(map[string]time.Duration),
	}
}
S
struct

mockStrategy

v1/core/core_test.go:151-155
type mockStrategy struct

Methods

Record
Method

Parameters

key string
func (*mockStrategy) Record(key string)
{
	m.mu.Lock()
	m.records = append(m.records, key)
	m.mu.Unlock()
}
TTL
Method

Parameters

key string

Returns

func (*mockStrategy) TTL(key string) time.Duration
{
	return m.ttl
}

Fields

Name Type Description
mu sync.Mutex
ttl time.Duration
records []string
S
struct

slowStore

v1/core/core_test.go:167-174
type slowStore struct

Fields

Name Type Description
data map[string]T
delay time.Duration
mu sync.Mutex
calls int
started chan struct{}
once sync.Once
F
function

TestWarpSetGet

Parameters

v1/core/core_test.go:210-227
func TestWarpSetGet(t *testing.T)

{
	ctx := context.Background()
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, syncbus.NewInMemoryBus(), merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)
	if err := w.Set(ctx, "foo", "bar"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	v, err := w.Get(ctx, "foo")
	if err != nil || v != "bar" {
		t.Fatalf("unexpected value: %v, err: %v", v, err)
	}
	if err := w.Invalidate(ctx, "foo"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if _, err := w.Get(ctx, "foo"); err == nil {
		t.Fatalf("expected error after invalidate")
	}
}
F
function

TestWarpConcurrentAccess

Parameters

v1/core/core_test.go:229-295
func TestWarpConcurrentAccess(t *testing.T)

{
	ctx := context.Background()
	c := cache.NewInMemory[merge.Value[int]]()
	s := adapter.NewInMemoryStore[int]()
	w := New(c, s, syncbus.NewInMemoryBus(), merge.NewEngine[int]())
	if !w.Register(
		"counter",
		ModeStrongLocal,
		time.Second,
		cache.WithSliding(),
		cache.WithDynamicTTL(10*time.Millisecond, 5*time.Millisecond, 1*time.Millisecond, 5*time.Millisecond, time.Second),
	) {
		t.Fatalf("expected registration to succeed")
	}
	if err := w.Set(ctx, "counter", 0); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	const goroutines = 32
	const iterations = 64

	start := make(chan struct{})
	errCh := make(chan error, 1)
	var wg sync.WaitGroup

	for i := 0; i < goroutines; i++ {
		wg.Add(1)
		go func(base int) {
			defer wg.Done()
			<-start
			for j := 0; j < iterations; j++ {
				if err := w.Set(ctx, "counter", base*iterations+j); err != nil {
					select {
					case errCh <- err:
					default:
					}
					return
				}
			}
		}(i)

		wg.Add(1)
		go func() {
			defer wg.Done()
			<-start
			for j := 0; j < iterations; j++ {
				if _, err := w.Get(ctx, "counter"); err != nil {
					select {
					case errCh <- err:
					default:
					}
					return
				}
				time.Sleep(time.Millisecond)
			}
		}()
	}

	close(start)
	wg.Wait()

	select {
	case err := <-errCh:
		t.Fatalf("concurrent access failed: %v", err)
	default:
	}
}
F
function

TestWarpGetAtRollback

Parameters

v1/core/core_test.go:297-317
func TestWarpGetAtRollback(t *testing.T)

{
	ctx := context.Background()
	base := cache.NewInMemory[merge.VersionedValue[string]]()
	c := versioned.New[string](base, 5)
	w := New[string](c, nil, nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)

	if err := w.Set(ctx, "foo", "v1"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	t1 := time.Now()
	time.Sleep(time.Millisecond)
	if err := w.Set(ctx, "foo", "v2"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	v, err := w.GetAt(ctx, "foo", t1)
	if err != nil || v != "v1" {
		t.Fatalf("expected v1, got %v err %v", v, err)
	}
}
F
function

TestWarpGetAtExpire

Parameters

v1/core/core_test.go:319-333
func TestWarpGetAtExpire(t *testing.T)

{
	ctx := context.Background()
	base := cache.NewInMemory[merge.VersionedValue[string]]()
	c := versioned.New[string](base, 5)
	w := New[string](c, nil, nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, 5*time.Millisecond)

	if err := w.Set(ctx, "foo", "v1"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	time.Sleep(10 * time.Millisecond)
	if _, err := w.GetAt(ctx, "foo", time.Now()); !errors.Is(err, ErrNotFound) {
		t.Fatalf("expected not found after ttl, got %v", err)
	}
}
F
function

TestWarpMerge

Parameters

v1/core/core_test.go:335-355
func TestWarpMerge(t *testing.T)

{
	ctx := context.Background()
	w := New(cache.NewInMemory[merge.Value[int]](), adapter.NewInMemoryStore[int](), nil, merge.NewEngine[int]())
	w.Register("cnt", ModeStrongLocal, time.Minute)
	w.Merge("cnt", func(old, new int) (int, error) {
		return old + new, nil
	})
	if err := w.Set(ctx, "cnt", 1); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if err := w.Set(ctx, "cnt", 2); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	v, err := w.Get(ctx, "cnt")
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if v != 3 {
		t.Fatalf("expected 3, got %v", v)
	}
}
F
function

TestWarpFallbackAndWarmup

Parameters

v1/core/core_test.go:357-377
func TestWarpFallbackAndWarmup(t *testing.T)

{
	ctx := context.Background()
	store := adapter.NewInMemoryStore[string]()
	_ = store.Set(ctx, "foo", "bar")
	w := New[string](cache.NewInMemory[merge.Value[string]](), store, nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)
	// fallback
	v, err := w.Get(ctx, "foo")
	if err != nil || v != "bar" {
		t.Fatalf("unexpected fallback value: %v err: %v", v, err)
	}
	// warmup
	if err := w.Invalidate(ctx, "foo"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	w.Warmup(ctx)
	v, err = w.Get(ctx, "foo")
	if err != nil || v != "bar" {
		t.Fatalf("expected warmup to load value, got %v err %v", v, err)
	}
}
F
function

TestWarpWarmupContextCancel

Parameters

v1/core/core_test.go:379-410
func TestWarpWarmupContextCancel(t *testing.T)

{
	ctx, cancel := context.WithCancel(context.Background())
	store := &slowStore[string]{
		data:    map[string]string{"a": "1", "b": "2"},
		delay:   50 * time.Millisecond,
		started: make(chan struct{}),
	}
	w := New[string](cache.NewInMemory[merge.Value[string]](), store, nil, merge.NewEngine[string]())
	w.Register("a", ModeStrongLocal, time.Minute)
	w.Register("b", ModeStrongLocal, time.Minute)

	done := make(chan struct{})
	go func() {
		w.Warmup(ctx)
		close(done)
	}()

	<-store.started
	cancel()
	<-done

	store.mu.Lock()
	calls := store.calls
	store.mu.Unlock()
	if calls == 0 {
		t.Fatalf("expected store to be called at least once")
	}

	if _, ok, _ := w.cache.Get(context.Background(), "b"); ok {
		t.Fatalf("expected key b not to be warmed up")
	}
}
F
function

TestWarpUnregister

Parameters

v1/core/core_test.go:412-422
func TestWarpUnregister(t *testing.T)

{
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)
	if _, ok := w.getReg("foo"); !ok {
		t.Fatalf("expected foo to be registered")
	}
	w.Unregister("foo")
	if _, ok := w.getReg("foo"); ok {
		t.Fatalf("expected foo to be unregistered")
	}
}
F
function

TestWarpGetStoreError

Parameters

v1/core/core_test.go:424-432
func TestWarpGetStoreError(t *testing.T)

{
	ctx := context.Background()
	expected := errors.New("boom")
	w := New[string](cache.NewInMemory[merge.Value[string]](), errStore[string]{err: expected}, nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)
	if _, err := w.Get(ctx, "foo"); !errors.Is(err, expected) {
		t.Fatalf("expected %v, got %v", expected, err)
	}
}
F
function

TestWarpRegisterDuplicate

Parameters

v1/core/core_test.go:434-446
func TestWarpRegisterDuplicate(t *testing.T)

{
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())
	if !w.Register("foo", ModeStrongLocal, time.Minute) {
		t.Fatalf("expected first registration to succeed")
	}
	if w.Register("foo", ModeEventualDistributed, 2*time.Minute) {
		t.Fatalf("expected duplicate registration to fail")
	}
	reg, _ := w.getReg("foo")
	if reg.mode != ModeStrongLocal || reg.ttl != time.Minute {
		t.Fatalf("registration should not be overwritten")
	}
}
F
function

TestWarpSetPublishError

Parameters

v1/core/core_test.go:448-464
func TestWarpSetPublishError(t *testing.T)

{
	ctx := context.Background()
	expected := errors.New("boom")
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, errBus{err: expected}, merge.NewEngine[string]())
	w.Register("foo", ModeEventualDistributed, time.Minute)
	if err := w.Set(ctx, "foo", "bar"); err != nil {
		t.Fatalf("unexpected set error: %v", err)
	}
	select {
	case err := <-w.PublishErrors():
		if !errors.Is(err, expected) {
			t.Fatalf("expected %v, got %v", expected, err)
		}
	case <-time.After(100 * time.Millisecond):
		t.Fatalf("expected publish error")
	}
}
F
function

TestWarpInvalidatePublishError

Parameters

v1/core/core_test.go:466-482
func TestWarpInvalidatePublishError(t *testing.T)

{
	ctx := context.Background()
	expected := errors.New("boom")
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, errBus{err: expected}, merge.NewEngine[string]())
	w.Register("foo", ModeEventualDistributed, time.Minute)
	if err := w.Invalidate(ctx, "foo"); err != nil {
		t.Fatalf("unexpected invalidate error: %v", err)
	}
	select {
	case err := <-w.PublishErrors():
		if !errors.Is(err, expected) {
			t.Fatalf("expected %v, got %v", expected, err)
		}
	case <-time.After(100 * time.Millisecond):
		t.Fatalf("expected publish error")
	}
}
F
function

TestWarpUnregisteredKey

Parameters

v1/core/core_test.go:484-499
func TestWarpUnregisteredKey(t *testing.T)

{
	ctx := context.Background()
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())

	if _, err := w.Get(ctx, "foo"); !errors.Is(err, ErrUnregistered) {
		t.Fatalf("expected ErrUnregistered, got %v", err)
	}

	if err := w.Set(ctx, "foo", "bar"); !errors.Is(err, ErrUnregistered) {
		t.Fatalf("expected ErrUnregistered, got %v", err)
	}

	if err := w.Invalidate(ctx, "foo"); !errors.Is(err, ErrUnregistered) {
		t.Fatalf("expected ErrUnregistered, got %v", err)
	}
}
F
function

TestWarpValidatorAutoHealTTL

Parameters

v1/core/core_test.go:501-528
func TestWarpValidatorAutoHealTTL(t *testing.T)

{
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	c := newTTLCache[merge.Value[string]]()
	store := adapter.NewInMemoryStore[string]()
	_ = store.Set(ctx, "k", "v1")

	w := New[string](c, store, nil, merge.NewEngine[string]())
	ttl := time.Minute
	w.Register("k", ModeStrongLocal, ttl)

	mv := merge.Value[string]{Data: "v0", Timestamp: time.Now()}
	_ = c.Set(ctx, "k", mv, ttl)
	orig := c.TTL("k")

	v := w.Validator(validator.ModeAutoHeal, time.Millisecond)
	go v.Run(ctx)
	time.Sleep(5 * time.Millisecond)

	healed, ok, err := c.Get(ctx, "k")
	if err != nil || !ok || healed.Data != "v1" {
		t.Fatalf("expected value healed to v1, got %v err %v", healed.Data, err)
	}
	if newTTL := c.TTL("k"); newTTL != orig {
		t.Fatalf("expected TTL %v, got %v", orig, newTTL)
	}
}
F
function

TestWarpRegisterDynamicTTL

Parameters

v1/core/core_test.go:529-551
func TestWarpRegisterDynamicTTL(t *testing.T)

{
	ctx := context.Background()
	c := newTTLCache[merge.Value[string]]()
	strat := &mockStrategy{ttl: 2 * time.Second}
	w := New[string](c, nil, nil, merge.NewEngine[string]())
	if !w.RegisterDynamicTTL("foo", ModeStrongLocal, strat) {
		t.Fatalf("expected registration success")
	}
	if err := w.Set(ctx, "foo", "bar"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if ttl := c.TTL("foo"); ttl != 2*time.Second {
		t.Fatalf("expected ttl 2s, got %v", ttl)
	}
	if _, err := w.Get(ctx, "foo"); err != nil {
		t.Fatalf("unexpected get error: %v", err)
	}
	strat.mu.Lock()
	defer strat.mu.Unlock()
	if len(strat.records) != 2 {
		t.Fatalf("expected 2 record calls, got %d", len(strat.records))
	}
}
F
function

TestWarpTxnCASMismatch

Parameters

v1/core/core_test.go:553-565
func TestWarpTxnCASMismatch(t *testing.T)

{
	ctx := context.Background()
	w := New[string](cache.NewInMemory[merge.Value[string]](), adapter.NewInMemoryStore[string](), nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)
	if err := w.Set(ctx, "foo", "a"); err != nil {
		t.Fatalf("set: %v", err)
	}
	txn := w.Txn(ctx)
	txn.CompareAndSwap("foo", "b", "c")
	if err := txn.Commit(); !errors.Is(err, ErrCASMismatch) {
		t.Fatalf("expected ErrCASMismatch got %v", err)
	}
}
F
function

TestConcurrentRegisterGetSet

Parameters

v1/core/core_test.go:567-593
func TestConcurrentRegisterGetSet(t *testing.T)

{
	ctx := context.Background()
	w := New[string](cache.NewInMemory[merge.Value[string]](), nil, syncbus.NewInMemoryBus(), merge.NewEngine[string]())
	const n = 50
	var wg sync.WaitGroup
	for i := 0; i < n; i++ {
		i := i
		wg.Add(1)
		go func() {
			defer wg.Done()
			key := fmt.Sprintf("k%d", i)
			if !w.Register(key, ModeStrongLocal, time.Minute) {
				t.Errorf("register failed for %s", key)
				return
			}
			val := fmt.Sprintf("v%d", i)
			if err := w.Set(ctx, key, val); err != nil {
				t.Errorf("set failed for %s: %v", key, err)
				return
			}
			if got, err := w.Get(ctx, key); err != nil || got != val {
				t.Errorf("get failed for %s: %v %v", key, got, err)
			}
		}()
	}
	wg.Wait()
}
F
function

TestWarpSetAsyncPublish

Parameters

v1/core/core_test.go:595-613
func TestWarpSetAsyncPublish(t *testing.T)

{
	bus := &slowBus{delay: 100 * time.Millisecond, done: make(chan struct{}, 1), InMemoryBus: syncbus.NewInMemoryBus()}
	c := cache.NewInMemory[merge.Value[string]]()
	w := New[string](c, nil, bus, merge.NewEngine[string]())
	w.Register("foo", ModeEventualDistributed, time.Minute)

	start := time.Now()
	if err := w.Set(context.Background(), "foo", "bar"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if d := time.Since(start); d >= 100*time.Millisecond {
		t.Fatalf("set blocked on publish: %v", d)
	}
	select {
	case <-bus.done:
	case <-time.After(200 * time.Millisecond):
		t.Fatalf("publish did not complete")
	}
}
F
function

TestWarpInvalidateAsyncPublish

Parameters

v1/core/core_test.go:615-633
func TestWarpInvalidateAsyncPublish(t *testing.T)

{
	bus := &slowBus{delay: 100 * time.Millisecond, done: make(chan struct{}, 1), InMemoryBus: syncbus.NewInMemoryBus()}
	c := cache.NewInMemory[merge.Value[string]]()
	w := New[string](c, nil, bus, merge.NewEngine[string]())
	w.Register("foo", ModeEventualDistributed, time.Minute)

	start := time.Now()
	if err := w.Invalidate(context.Background(), "foo"); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if d := time.Since(start); d >= 100*time.Millisecond {
		t.Fatalf("invalidate blocked on publish: %v", d)
	}
	select {
	case <-bus.done:
	case <-time.After(200 * time.Millisecond):
		t.Fatalf("publish did not complete")
	}
}
F
function

TestWarpGetSingleflight

Parameters

v1/core/core_test.go:635-666
func TestWarpGetSingleflight(t *testing.T)

{
	ctx := context.Background()
	store := &slowStore[string]{
		data:    map[string]string{"foo": "bar"},
		delay:   50 * time.Millisecond,
		started: make(chan struct{}),
	}
	w := New[string](cache.NewInMemory[merge.Value[string]](), store, nil, merge.NewEngine[string]())
	w.Register("foo", ModeStrongLocal, time.Minute)

	const numCallers = 10
	var wg sync.WaitGroup
	wg.Add(numCallers)

	for i := 0; i < numCallers; i++ {
		go func() {
			defer wg.Done()
			v, err := w.Get(ctx, "foo")
			if err != nil {
				t.Errorf("unexpected error: %v", err)
			}
			if v != "bar" {
				t.Errorf("expected value 'bar', got '%s'", v)
			}
		}()
	}
	wg.Wait()

	if store.calls != 1 {
		t.Fatalf("expected store.Get to be called once, got %d", store.calls)
	}
}
F
function

TestGetOrSet_Hit

Parameters

v1/core/factory_test.go:15-39
func TestGetOrSet_Hit(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	w := New[string](c, nil, nil, nil)
	w.Register("gos-key-1", ModeStrongLocal, 10*time.Minute)

	// Pre-populate
	w.Set(context.Background(), "gos-key-1", "cached-val")

	loaderCalled := false
	loader := func(ctx context.Context) (string, error) {
		loaderCalled = true
		return "loaded-val", nil
	}

	val, err := w.GetOrSet(context.Background(), "gos-key-1", loader)
	if err != nil {
		t.Fatalf("Expected success, got: %v", err)
	}
	if val != "cached-val" {
		t.Errorf("Expected 'cached-val', got '%v'", val)
	}
	if loaderCalled {
		t.Error("Loader should not be called on hit")
	}
}
F
function

TestGetOrSet_Miss

Parameters

v1/core/factory_test.go:41-63
func TestGetOrSet_Miss(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	w := New[string](c, nil, nil, nil)
	w.Register("gos-key-2", ModeStrongLocal, 10*time.Minute)

	loader := func(ctx context.Context) (string, error) {
		return "loaded-val", nil
	}

	val, err := w.GetOrSet(context.Background(), "gos-key-2", loader)
	if err != nil {
		t.Fatalf("Expected success, got: %v", err)
	}
	if val != "loaded-val" {
		t.Errorf("Expected 'loaded-val', got '%v'", val)
	}

	// Verify it's now in cache
	cached, err := w.Get(context.Background(), "gos-key-2")
	if err != nil || cached != "loaded-val" {
		t.Errorf("Value was not cached correctly. Got: %v, Err: %v", cached, err)
	}
}
F
function

TestGetOrSet_Singleflight

Parameters

v1/core/factory_test.go:65-109
func TestGetOrSet_Singleflight(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	w := New[string](c, nil, nil, nil)
	w.Register("gos-key-3", ModeStrongLocal, 10*time.Minute)

	var calls atomic.Int32
	ready := make(chan struct{})
	block := make(chan struct{})

	var once sync.Once
	loader := func(ctx context.Context) (string, error) {
		calls.Add(1)
		once.Do(func() {
			close(ready) // Signal we started
		})
		<-block // Wait to finish
		return "concurrent-val", nil
	}

	var wg sync.WaitGroup
	wg.Add(2)

	// Routine 1
	go func() {
		defer wg.Done()
		w.GetOrSet(context.Background(), "gos-key-3", loader)
	}()

	// Wait for Routine 1 to start loader
	<-ready

	// Routine 2 (Should join the flight)
	go func() {
		defer wg.Done()
		w.GetOrSet(context.Background(), "gos-key-3", loader)
	}()

	// Let them finish
	close(block)
	wg.Wait()

	if calls.Load() != 1 {
		t.Errorf("Expected 1 loader call, got %d", calls.Load())
	}
}
F
function

TestGetOrSet_FailSafe

Parameters

v1/core/factory_test.go:111-138
func TestGetOrSet_FailSafe(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	w := New[string](c, nil, nil, nil)

	key := "gos-key-fs"
	// Register with FailSafe
	w.Register(key, ModeStrongLocal, 10*time.Millisecond, cache.WithFailSafe(1*time.Hour))

	// Populate
	w.Set(context.Background(), key, "stale-val")

	// Wait for expiration
	time.Sleep(20 * time.Millisecond)

	// Loader fails
	loader := func(ctx context.Context) (string, error) {
		return "", errors.New("loader boom")
	}

	// Should return stale value
	val, err := w.GetOrSet(context.Background(), key, loader)
	if err != nil {
		t.Fatalf("Expected success (FailSafe), got error: %v", err)
	}
	if val != "stale-val" {
		t.Errorf("Expected 'stale-val', got '%v'", val)
	}
}
F
function

TestGetOrSet_LoaderError

Parameters

v1/core/factory_test.go:140-154
func TestGetOrSet_LoaderError(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	w := New[string](c, nil, nil, nil)
	w.Register("gos-key-err", ModeStrongLocal, 10*time.Minute)

	boom := errors.New("boom")
	loader := func(ctx context.Context) (string, error) {
		return "", boom
	}

	_, err := w.GetOrSet(context.Background(), "gos-key-err", loader)
	if !errors.Is(err, boom) {
		t.Errorf("Expected 'boom' error, got: %v", err)
	}
}
S
struct

lease

v1/core/lease.go:16-22
type lease struct

Fields

Name Type Description
keys map[string]struct{}
timer *time.Timer
ticker *time.Ticker
stop chan struct{}
sub <-chan syncbus.Event
S
struct

LeaseManager

LeaseManager manages active leases and their renewal.

v1/core/lease.go:25-31
type LeaseManager struct

Fields

Name Type Description
w *Warp[T]
bus syncbus.Bus
mu sync.Mutex
leases map[string]*lease
F
function

newLeaseManager

Parameters

w
*Warp[T]

Returns

*LeaseManager[T]
v1/core/lease.go:33-35
func newLeaseManager[T any](w *Warp[T], bus syncbus.Bus) *LeaseManager[T]

{
	return &LeaseManager[T]{w: w, bus: bus, leases: make(map[string]*lease)}
}
F
function

sequentialWarmup

Parameters

w
*Warp[string]
v1/core/warmup_bench_test.go:15-52
func sequentialWarmup(w *Warp[string], ctx context.Context)

{
	if w.store == nil {
		return
	}
	keys := make([]string, 0)
	for i := 0; i < regShardCount; i++ {
		shard := &w.shards[i]
		shard.RLock()
		for k := range shard.regs {
			keys = append(keys, k)
		}
		shard.RUnlock()
	}
	for _, k := range keys {
		select {
		case <-ctx.Done():
			return
		default:
		}
		v, ok, err := w.store.Get(ctx, k)
		if err != nil || !ok {
			continue
		}
		shard := w.shard(k)
		shard.RLock()
		reg := shard.regs[k]
		shard.RUnlock()
		now := time.Now()
		mv := merge.Value[string]{Data: v, Timestamp: now}
		ttl := reg.ttl
		if reg.ttlStrategy != nil {
			ttl = reg.ttlStrategy.TTL(k)
		}
		reg.currentTTL = ttl
		reg.lastAccess = now
		_ = w.cache.Set(ctx, k, mv, ttl)
	}
}
F
function

BenchmarkWarmup

Parameters

v1/core/warmup_bench_test.go:54-92
func BenchmarkWarmup(b *testing.B)

{
	const numKeys = 500
	ctx := context.Background()
	store := adapter.NewInMemoryStore[string]()
	keys := make([]string, numKeys)
	for i := 0; i < numKeys; i++ {
		k := fmt.Sprintf("key-%d", i)
		keys[i] = k
		_ = store.Set(ctx, k, fmt.Sprintf("val-%d", i))
	}

	b.Run("sequential", func(b *testing.B) {
		for i := 0; i < b.N; i++ {
			cache := cache.NewInMemory[merge.Value[string]]()
			bus := syncbus.NewInMemoryBus()
			w := New[string](cache, store, bus, nil)
			for _, k := range keys {
				w.Register(k, ModeStrongLocal, time.Minute)
			}
			b.StartTimer()
			sequentialWarmup(w, ctx)
			b.StopTimer()
		}
	})

	b.Run("parallel", func(b *testing.B) {
		for i := 0; i < b.N; i++ {
			cache := cache.NewInMemory[merge.Value[string]]()
			bus := syncbus.NewInMemoryBus()
			w := New[string](cache, store, bus, nil)
			for _, k := range keys {
				w.Register(k, ModeStrongLocal, time.Minute)
			}
			b.StartTimer()
			w.Warmup(ctx)
			b.StopTimer()
		}
	})
}
T
type

Mode

Mode represents the consistency mode for a key. See docs/core.md for the full mode table.

v1/core/core.go:29-29
type Mode int
S
struct

registration

v1/core/core.go:37-46
type registration struct

Methods

quorumSize
Method

Returns

int
func (*registration) quorumSize() int
{
	r.mu.Lock()
	defer r.mu.Unlock()
	if r.quorum < 1 {
		return 1
	}
	return r.quorum
}

Fields

Name Type Description
mu sync.Mutex
ttl time.Duration
ttlStrategy cache.TTLStrategy
ttlOpts cache.TTLOptions
mode Mode
currentTTL time.Duration
lastAccess time.Time
quorum int
S
struct

regShard

v1/core/core.go:48-51
type regShard struct

Fields

Name Type Description
regs map[string]*registration
S
struct

Warp

Warp orchestrates the interaction between cache, merge engine and sync bus.

v1/core/core.go:56-76
type Warp struct

Fields

Name Type Description
cache cache.Cache[merge.Value[T]]
store adapter.Store[T]
bus syncbus.Bus
merges *merge.Engine[T]
leases *LeaseManager[T]
publishErrCh chan error
shards [regShardCount]regShard
group singleflight.Group
hitCounter prometheus.Counter
missCounter prometheus.Counter
evictionCounter prometheus.Counter
latencyHist prometheus.Histogram
traceEnabled bool
resilient bool
publishTimeout time.Duration
S
struct

Txn

Txn represents a batch of operations to be applied atomically.

v1/core/core.go:79-85
type Txn struct

Fields

Name Type Description
w *Warp[T]
ctx context.Context
sets map[string]T
deletes map[string]struct{}
cas map[string]T
I
interface

versionedCache

versionedCache extends Cache with the ability to retrieve values at a specific time.

v1/core/core.go:88-91
type versionedCache interface

Methods

GetAt
Method

Parameters

key string

Returns

bool
error
func GetAt(...)
T
type

Option

Option configures a Warp instance.

v1/core/core.go:94-94
type Option func(*Warp[T])
F
function

WithMetrics

WithMetrics enables Prometheus metrics collection for core operations.

Parameters

Returns

Option[T]
v1/core/core.go:97-118
func WithMetrics[T any](reg prometheus.Registerer) Option[T]

{
	return func(w *Warp[T]) {
		w.hitCounter = prometheus.NewCounter(prometheus.CounterOpts{
			Name: "warp_core_hits_total",
			Help: "Total number of cache hits",
		})
		w.missCounter = prometheus.NewCounter(prometheus.CounterOpts{
			Name: "warp_core_misses_total",
			Help: "Total number of cache misses",
		})
		w.evictionCounter = prometheus.NewCounter(prometheus.CounterOpts{
			Name: "warp_core_evictions_total",
			Help: "Total number of evictions",
		})
		w.latencyHist = prometheus.NewHistogram(prometheus.HistogramOpts{
			Name:    "warp_core_latency_seconds",
			Help:    "Latency of core operations",
			Buckets: prometheus.DefBuckets,
		})
		reg.MustRegister(w.hitCounter, w.missCounter, w.evictionCounter, w.latencyHist)
	}
}
F
function

WithCacheResiliency

WithCacheResiliency enables the L2 Fail-Safe pattern.
If the cache (L1 or L2) returns an error (e.g. connection down),
Warp will suppress the error and proceed as if it were a cache miss (for Get)
or a successful operation (for Set/Invalidate), ensuring application stability.

Returns

Option[T]
v1/core/core.go:124-128
func WithCacheResiliency[T any]() Option[T]

{
	return func(w *Warp[T]) {
		w.resilient = true
	}
}
F
function

WithPublishTimeout

WithPublishTimeout sets a timeout for background syncbus publish operations
in ModeEventualDistributed. This prevents goroutine leaks if the bus is slow or down.

Parameters

Returns

Option[T]
v1/core/core.go:132-136
func WithPublishTimeout[T any](d time.Duration) Option[T]

{
	return func(w *Warp[T]) {
		w.publishTimeout = d
	}
}
F
function

hashString

Parameters

key
string

Returns

uint32
v1/core/core.go:143-150
func hashString(key string) uint32

{
	hash := uint32(offset32)
	for i := 0; i < len(key); i++ {
		hash ^= uint32(key[i])
		hash *= prime32
	}
	return hash
}
F
function

New

New creates a new Warp instance.

Returns

*Warp[T]
v1/core/core.go:171-195
func New[T any](c cache.Cache[merge.Value[T]], s adapter.Store[T], bus syncbus.Bus, m *merge.Engine[T], opts ...Option[T]) *Warp[T]

{
	if m == nil {
		m = merge.NewEngine[T]()
	}
	w := &Warp[T]{
		cache:        c,
		store:        s,
		bus:          bus,
		merges:       m,
		publishErrCh: make(chan error, 1),
	}
	for i := 0; i < regShardCount; i++ {
		w.shards[i].regs = make(map[string]*registration)
	}
	w.leases = newLeaseManager[T](w, bus)
	for _, opt := range opts {
		opt(w)
	}

	if w.resilient {
		w.cache = cache.NewResilient(w.cache)
	}

	return w
}
S
struct

validatorCache

validatorCache adapts the Warp cache to the Validator interface by operating on raw values.

v1/core/core.go:1446-1448
type validatorCache struct

Fields

Name Type Description
w *Warp[T]
F
function

WithTracing

WithTracing enables OpenTelemetry tracing for core operations.

Returns

Option[T]
v1/core/core.go:1493-1497
func WithTracing[T any]() Option[T]

{
	return func(w *Warp[T]) {
		w.traceEnabled = true
	}
}
S
struct

MockStoreER

MockStoreER implements adapter.Store and helps track eager refreshes

v1/core/eager_refresh_test.go:14-19
type MockStoreER struct

Fields

Name Type Description
mu sync.Mutex
data map[string]T
delay time.Duration
refreshCall int
F
function

TestEagerRefresh_TriggersBackgroundUpdate

Parameters

v1/core/eager_refresh_test.go:60-142
func TestEagerRefresh_TriggersBackgroundUpdate(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))

	// Store takes 100ms to respond
	store := &MockStoreER[string]{
		data:  make(map[string]string),
		delay: 100 * time.Millisecond,
	}
	w := New[string](c, store, nil, nil)

	key := "er-key-1"
	val1 := "value-1"
	val2 := "value-2"
	store.data[key] = val1

	// Register with:
	// TTL: 200ms
	// EagerRefreshThreshold: 0.5 (Refresh when 50% of TTL (100ms) is remaining)
	w.Register(key, ModeStrongLocal, 200*time.Millisecond,
		cache.WithEagerRefresh(0.5),
	)

	// 1. Initial Populate (Will be slow, 100ms, as it's a miss)
	start := time.Now()
	got, err := w.Get(context.Background(), key)
	elapsed := time.Since(start)
	if err != nil {
		t.Fatalf("Initial Get failed: %v", err)
	}
	if got != val1 {
		t.Errorf("Initial Get = %v, want %v", got, val1)
	}
	if elapsed < 100*time.Millisecond {
		t.Errorf("Initial Get was too fast (%v), expected > 100ms", elapsed)
	}
	if store.refreshCall != 1 {
		t.Errorf("Expected 1 store call, got %d", store.refreshCall)
	}

	// 2. Wait until just before Eager Refresh threshold (e.g., 90ms elapsed)
	// TTL = 200ms. 50% is 100ms remaining. So trigger point is after 100ms.
	// Current timestamp for item is now + 100ms from start of test roughly.
	// So, we want current time to be (creationTime + 100ms).
	// Let's sleep 90ms from the last Get finish (which took 100ms).
	time.Sleep(90 * time.Millisecond) // Total elapsed: 100ms (fetch) + 90ms (sleep) = 190ms.
	// Remaining: 200ms - 190ms = 10ms. 10/200 = 0.05. Below 0.5 threshold.

	// Change backend data to verify refresh
	store.data[key] = val2
	store.mu.Lock()
	store.refreshCall = 0 // Reset counter for next phase
	store.mu.Unlock()

	// 3. Get Again (Should trigger Eager Refresh)
	// This call should return value-1 immediately (from cache)
	start = time.Now()
	got, err = w.Get(context.Background(), key)
	elapsed = time.Since(start)
	if err != nil {
		t.Fatalf("Second Get failed: %v", err)
	}
	if got != val1 {
		t.Errorf("Second Get = %v, want %v (stale during refresh)", got, val1)
	}
	if elapsed > 10*time.Millisecond { // Should be fast, from cache
		t.Errorf("Second Get took too long (%v), expected fast", elapsed)
	}

	// 4. Wait for background refresh to complete
	time.Sleep(150 * time.Millisecond) // Enough for store.delay (100ms) + some buffer

	// 5. Get one more time (Should get fresh data from eager refresh)
	got, err = w.Get(context.Background(), key)
	if err != nil {
		t.Fatalf("Third Get failed: %v", err)
	}
	if got != val2 {
		t.Errorf("Third Get = %v, want %v (fresh after eager refresh)", got, val2)
	}
	if store.refreshCall != 1 {
		t.Errorf("Expected 1 store call for eager refresh, got %d", store.refreshCall)
	}
}
F
function

TestEagerRefresh_DoesNotTriggerIfTooFresh

Parameters

v1/core/eager_refresh_test.go:144-189
func TestEagerRefresh_DoesNotTriggerIfTooFresh(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))

	store := &MockStoreER[string]{
		data:  make(map[string]string),
		delay: 10 * time.Millisecond,
	}
	w := New[string](c, store, nil, nil)

	key := "er-key-2"
	store.data[key] = "val-fresh"

	w.Register(key, ModeStrongLocal, 100*time.Millisecond,
		cache.WithEagerRefresh(0.2), // Refresh when 20% TTL (20ms) is remaining
	)

	// 1. Initial Populate
	w.Get(context.Background(), key)
	store.mu.Lock()
	if store.refreshCall != 1 {
		t.Errorf("Expected 1 store call, got %d", store.refreshCall)
	}
	store.refreshCall = 0
	store.mu.Unlock()

	// 2. Get again immediately (should not trigger eager refresh)
	w.Get(context.Background(), key)
	store.mu.Lock()
	if store.refreshCall != 0 {
		t.Errorf("Expected 0 store calls for eager refresh (too fresh), got %d", store.refreshCall)
	}
	store.mu.Unlock()

	// 3. Wait until after eager refresh threshold but before TTL
	// Total TTL 100ms. Eager refresh at 80ms elapsed.
	time.Sleep(85 * time.Millisecond) // Item should be 85ms old, remaining 15ms. 15/100 = 0.15, less than 0.2 threshold.

	// 4. Get again (should trigger eager refresh)
	w.Get(context.Background(), key)
	time.Sleep(20 * time.Millisecond) // Give background goroutine time to complete
	store.mu.Lock()
	if store.refreshCall != 1 {
		t.Errorf("Expected 1 store call for eager refresh (now within window), got %d", store.refreshCall)
	}
	store.mu.Unlock()
}
F
function

TestEagerRefresh_DoesNotTriggerIfStoreNil

Parameters

v1/core/eager_refresh_test.go:191-220
func TestEagerRefresh_DoesNotTriggerIfStoreNil(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))

	// No store provided
	w := New[string](c, nil, nil, nil)

	key := "er-key-3"
	val := "val-no-store"

	// Manually set into cache as there is no store to fetch from
	now := time.Now()
	mv := merge.Value[string]{Data: val, Timestamp: now}
	_ = c.Set(context.Background(), key, mv, 100*time.Millisecond)

	w.Register(key, ModeStrongLocal, 100*time.Millisecond,
		cache.WithEagerRefresh(0.1), // Refresh when 10% TTL (10ms) is remaining
	)

	// Wait for eager refresh window
	time.Sleep(95 * time.Millisecond)

	// Get (should not panic or error, should not try to refresh from nil store)
	got, err := w.Get(context.Background(), key)
	if err != nil {
		t.Fatalf("Get with nil store failed: %v", err)
	}
	if got != val {
		t.Errorf("Expected %v, got %v", val, got)
	}
}
S
struct

MockStoreFS

MockStoreFS implements adapter.Store for testing FailSafe

v1/core/failsafe_test.go:14-18
type MockStoreFS struct

Fields

Name Type Description
data map[string]T
fail bool
failErr error
F
function

TestFailSafe_ReturnsStaleOnError

Parameters

v1/core/failsafe_test.go:50-90
func TestFailSafe_ReturnsStaleOnError(t *testing.T)

{
	// Setup
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	store := &MockStoreFS[string]{
		data:    make(map[string]string),
		failErr: errors.New("db boom"),
	}
	w := New[string](c, store, nil, nil)

	key := "fs-key-1"
	val := "initial-value"
	store.data[key] = val

	// Register with short TTL (50ms) and long Grace Period (1h)
	ttl := 50 * time.Millisecond
	w.Register(key, ModeStrongLocal, ttl, cache.WithFailSafe(1*time.Hour))

	// 1. Initial Get (Populate Cache)
	got, err := w.Get(context.Background(), key)
	if err != nil {
		t.Fatalf("Initial Get failed: %v", err)
	}
	if got != val {
		t.Errorf("Initial Get = %v, want %v", got, val)
	}

	// 2. Wait for TTL to expire (50ms)
	time.Sleep(100 * time.Millisecond)

	// 3. Make Store Fail
	store.fail = true

	// 4. Get again -> Should return Stale Data (Fail-Safe)
	got, err = w.Get(context.Background(), key)
	if err != nil {
		t.Fatalf("Expected stale value but got error: %v", err)
	}
	if got != val {
		t.Errorf("Expected stale value %v, got %v", val, got)
	}
}
F
function

TestFailSafe_RecoversWhenStoreRecover

Parameters

v1/core/failsafe_test.go:92-134
func TestFailSafe_RecoversWhenStoreRecover(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	store := &MockStoreFS[string]{
		data: make(map[string]string),
	}
	w := New[string](c, store, nil, nil)

	key := "fs-key-2"
	val1 := "value-1"
	store.data[key] = val1

	w.Register(key, ModeStrongLocal, 50*time.Millisecond, cache.WithFailSafe(1*time.Hour))

	// 1. Populate
	w.Get(context.Background(), key)

	// 2. Expire
	time.Sleep(100 * time.Millisecond)

	// 3. Update Store + Fail
	val2 := "value-2"
	store.data[key] = val2
	store.fail = true
	store.failErr = errors.New("temp error")

	// 4. Get -> Stale "value-1"
	got, err := w.Get(context.Background(), key)
	if err != nil || got != val1 {
		t.Errorf("Expected stale 'value-1', got %v (err: %v)", got, err)
	}

	// 5. Recover Store
	store.fail = false

	// 6. Get -> Fresh "value-2"
	got, err = w.Get(context.Background(), key)
	if err != nil {
		t.Fatalf("Post-recovery Get failed: %v", err)
	}
	if got != val2 {
		t.Errorf("Expected fresh 'value-2' after recovery, got %v", got)
	}
}
F
function

TestFailSafe_RespectsGracePeriod

Parameters

v1/core/failsafe_test.go:136-165
func TestFailSafe_RespectsGracePeriod(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	store := &MockStoreFS[string]{
		data:    make(map[string]string),
		failErr: errors.New("db boom"),
	}
	w := New[string](c, store, nil, nil)

	key := "fs-key-3"
	store.data[key] = "val"

	// Register with short TTL (20ms) and SHORT Grace Period (50ms)
	// Total life = 70ms
	w.Register(key, ModeStrongLocal, 20*time.Millisecond, cache.WithFailSafe(50*time.Millisecond))

	// 1. Populate
	w.Get(context.Background(), key)

	// 2. Wait for TTL + Grace Period to expire (Wait 150ms > 70ms)
	time.Sleep(150 * time.Millisecond)

	// 3. Make Store Fail
	store.fail = true

	// 4. Get -> Should Fail (Item evicted)
	_, err := w.Get(context.Background(), key)
	if err == nil {
		t.Fatal("Expected error because grace period expired, but got success")
	}
}
F
function

TestFailSafe_ReturnsErrorIfNotFound

Parameters

v1/core/failsafe_test.go:167-190
func TestFailSafe_ReturnsErrorIfNotFound(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))
	store := &MockStoreFS[string]{
		data: make(map[string]string),
	}
	w := New[string](c, store, nil, nil)

	key := "fs-key-4"
	store.data[key] = "val"

	w.Register(key, ModeStrongLocal, 50*time.Millisecond, cache.WithFailSafe(1*time.Hour))
	w.Get(context.Background(), key)
	
	time.Sleep(100 * time.Millisecond)

	// Simulate deletion from DB (Get returns !ok or ErrNotFound)
	delete(store.data, key)
	
	// Get -> Should return ErrNotFound (or zero/false), NOT stale data.
	_, err := w.Get(context.Background(), key)
	if !errors.Is(err, ErrNotFound) {
		t.Errorf("Expected ErrNotFound when DB deletes item, got: %v", err)
	}
}
S
struct

MockCacheFailing

MockCacheFailing implements cache.Cache and always fails

v1/core/resiliency_test.go:13-15
type MockCacheFailing struct

Fields

Name Type Description
err error
F
function

TestResiliency_GetSupressesError

Parameters

v1/core/resiliency_test.go:30-54
func TestResiliency_GetSupressesError(t *testing.T)

{
	// Setup failing cache
	failingCache := &MockCacheFailing[merge.Value[string]]{
		err: errors.New("redis down"),
	}
	
	store := &MockStoreFS[string]{
		data: map[string]string{
			"res-key": "db-value",
		},
	}

	// Create Warp with Resiliency
	w := New[string](failingCache, store, nil, nil, WithCacheResiliency[string]())
	w.Register("res-key", ModeStrongLocal, 10*time.Minute)

	// Get should NOT fail, but treat as miss and fetch from DB
	val, err := w.Get(context.Background(), "res-key")
	if err != nil {
		t.Fatalf("Expected no error with resiliency, got: %v", err)
	}
	if val != "db-value" {
		t.Errorf("Expected value from DB 'db-value', got '%v'", val)
	}
}
F
function

TestResiliency_SetSupressesError

Parameters

v1/core/resiliency_test.go:56-75
func TestResiliency_SetSupressesError(t *testing.T)

{
	failingCache := &MockCacheFailing[merge.Value[string]]{
		err: errors.New("redis down"),
	}
	store := &MockStoreFS[string]{data: make(map[string]string)}

	w := New[string](failingCache, store, nil, nil, WithCacheResiliency[string]())
	w.Register("res-key-set", ModeStrongLocal, 10*time.Minute)

	// Set should NOT fail
	err := w.Set(context.Background(), "res-key-set", "value")
	if err != nil {
		t.Fatalf("Expected no error on Set with resiliency, got: %v", err)
	}

	// Verify it reached the store (Set ensures DB consistency even if cache fails)
	if store.data["res-key-set"] != "value" {
		t.Errorf("Expected value to be written to store despite cache failure")
	}
}
F
function

TestResiliency_InvalidateSupressesError

Parameters

v1/core/resiliency_test.go:77-91
func TestResiliency_InvalidateSupressesError(t *testing.T)

{
	failingCache := &MockCacheFailing[merge.Value[string]]{
		err: errors.New("redis down"),
	}
	store := &MockStoreFS[string]{data: make(map[string]string)}

	w := New[string](failingCache, store, nil, nil, WithCacheResiliency[string]())
	w.Register("res-key-inv", ModeStrongLocal, 10*time.Minute)

	// Invalidate should NOT fail
	err := w.Invalidate(context.Background(), "res-key-inv")
	if err != nil {
		t.Fatalf("Expected no error on Invalidate with resiliency, got: %v", err)
	}
}
F
function

TestResiliency_DisabledByDefault

Parameters

v1/core/resiliency_test.go:93-108
func TestResiliency_DisabledByDefault(t *testing.T)

{
	failingCache := &MockCacheFailing[merge.Value[string]]{
		err: errors.New("redis down"),
	}
	store := &MockStoreFS[string]{data: make(map[string]string)}

	// Default Warp (No Resiliency)
	w := New[string](failingCache, store, nil, nil)
	w.Register("res-key-def", ModeStrongLocal, 10*time.Minute)

	// Get SHOULD fail
	_, err := w.Get(context.Background(), "res-key-def")
	if err == nil {
		t.Fatal("Expected error when resiliency is disabled, got nil")
	}
}
F
function

WatchPrefix

WatchPrefix wraps watchbus.SubscribePrefix to expose prefix watching from core.

Parameters

prefix
string

Returns

chan
[]byte
error
v1/core/watch.go:11-22
func WatchPrefix(ctx context.Context, bus watchbus.WatchBus, prefix string) (chan []byte, error)

{
	ch, err := bus.SubscribePrefix(ctx, prefix)
	if err != nil {
		return nil, err
	}
	metrics.WatcherGauge.Inc()
	go func() {
		<-ctx.Done()
		metrics.WatcherGauge.Dec()
	}()
	return ch, nil
}
S
struct

MockBusBlocked

MockBusBlocked implements syncbus.Bus and blocks on Publish until signaled

v1/core/backplane_test.go:15-17
type MockBusBlocked struct

Methods

Publish
Method

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*MockBusBlocked) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	select {
	case <-m.blockCh:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}
Subscribe
Method

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*MockBusBlocked) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	return nil, nil
}
Unsubscribe
Method

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*MockBusBlocked) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	return nil
}
IsHealthy
Method

Returns

bool
func (*MockBusBlocked) IsHealthy() bool
{
	return true
}

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*MockBusBlocked) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	return nil
}

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*MockBusBlocked) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	return nil
}
RevokeLease
Method

Parameters

id string

Returns

error
func (*MockBusBlocked) RevokeLease(ctx context.Context, id string) error
{
	return nil
}

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*MockBusBlocked) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return nil, nil
}

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*MockBusBlocked) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return nil
}
SetTopology
Method

Parameters

minZones int
func (*MockBusBlocked) SetTopology(minZones int)
{}
Peers
Method

Returns

[]string
func (*MockBusBlocked) Peers() []string
{
	return nil
}

Fields

Name Type Description
blockCh chan struct{}
F
function

TestBackplane_TimeoutPreventsLeak

Parameters

v1/core/backplane_test.go:66-92
func TestBackplane_TimeoutPreventsLeak(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))

	// Create a bus that blocks forever
	bus := &MockBusBlocked{
		blockCh: make(chan struct{}), // Never closed in this test path
	}

	w := New[string](c, nil, bus, nil, WithPublishTimeout[string](50*time.Millisecond))
	w.Register("bp-key-timeout", ModeEventualDistributed, 10*time.Minute)

	// Set triggers background publish
	err := w.Set(context.Background(), "bp-key-timeout", "val")
	if err != nil {
		t.Fatalf("Expected success on Set, got: %v", err)
	}

	// Wait for timeout error on channel
	select {
	case err := <-w.PublishErrors():
		if !errors.Is(err, context.DeadlineExceeded) {
			t.Errorf("Expected DeadlineExceeded, got: %v", err)
		}
	case <-time.After(500 * time.Millisecond):
		t.Error("Timed out waiting for publish error")
	}
}
F
function

TestBackplane_NoTimeoutByDefault

Parameters

v1/core/backplane_test.go:94-113
func TestBackplane_NoTimeoutByDefault(t *testing.T)

{
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))

	bus := &MockBusBlocked{
		blockCh: make(chan struct{}),
	}

	// No timeout configured
	w := New[string](c, nil, bus, nil)
	w.Register("bp-key-default", ModeEventualDistributed, 10*time.Minute)

	w.Set(context.Background(), "bp-key-default", "val")

	select {
	case err := <-w.PublishErrors():
		t.Errorf("Unexpected error received (should be blocked): %v", err)
	case <-time.After(100 * time.Millisecond):
		// Success: it's still blocked/running, logic is correct for no timeout
	}
}
S
struct

fakeQuorumBus

v1/core/distributed_test.go:25-30
type fakeQuorumBus struct

Methods

Publish
Method

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*fakeQuorumBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	return nil
}

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*fakeQuorumBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	if replicas <= 0 {
		replicas = 1
	}
	b.mu.Lock()
	var nextErr error
	if len(b.errSeq) > 0 {
		nextErr = b.errSeq[0]
		b.errSeq = b.errSeq[1:]
	} else {
		nextErr = b.err
	}
	b.mu.Unlock()
	if nextErr != nil {
		return nextErr
	}
	for i := 0; i < replicas; i++ {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-b.ackCh:
		}
	}
	return nil
}

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*fakeQuorumBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	if minZones <= 0 {
		minZones = 1
	}
	b.mu.Lock()
	var nextErr error
	if len(b.errSeq) > 0 {
		nextErr = b.errSeq[0]
		b.errSeq = b.errSeq[1:]
	} else {
		nextErr = b.err
	}
	b.mu.Unlock()
	if nextErr != nil {
		return nextErr
	}
	// fakeQuorumBus uses ackCh to simulate acks, we can reuse it for topology acks
	// assuming 1 ack per zone for simplicity in this fake
	for i := 0; i < minZones; i++ {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-b.ackCh:
		}
	}
	return nil
}
Subscribe
Method

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*fakeQuorumBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	return nil, nil
}
Unsubscribe
Method

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*fakeQuorumBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	return nil
}
RevokeLease
Method

Parameters

id string

Returns

error
func (*fakeQuorumBus) RevokeLease(ctx context.Context, id string) error
{ return nil }

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*fakeQuorumBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return nil, nil
}

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*fakeQuorumBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return nil
}
IsHealthy
Method

Returns

bool
func (*fakeQuorumBus) IsHealthy() bool
{
	return true
}
Peers
Method

Returns

[]string
func (*fakeQuorumBus) Peers() []string
{
	return nil
}

Fields

Name Type Description
ackCh chan struct{}
err error
mu sync.Mutex
errSeq []error
F
function

newFakeQuorumBus

Returns

v1/core/distributed_test.go:32-34
func newFakeQuorumBus() *fakeQuorumBus

{
	return &fakeQuorumBus{ackCh: make(chan struct{})}
}
F
function

TestDistributedInvalidation

Parameters

v1/core/distributed_test.go:120-200
func TestDistributedInvalidation(t *testing.T)

{
	ctx := context.Background()
	mr, err := miniredis.Run()
	if err != nil {
		t.Fatalf("miniredis run: %v", err)
	}
	client := redis.NewClient(&redis.Options{Addr: mr.Addr()})

	bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client})
	store := adapter.NewInMemoryStore[int]()

	reg1 := prometheus.NewRegistry()
	reg2 := prometheus.NewRegistry()
	w1 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg1))
	w2 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg2))

	w1.Register("counter", ModeStrongDistributed, time.Minute)
	w2.Register("counter", ModeStrongDistributed, time.Minute)
	mergeFn := func(old, new int) (int, error) { return old + new, nil }
	w1.Merge("counter", mergeFn)
	w2.Merge("counter", mergeFn)

	ch, err := bus.Subscribe(ctx, "counter")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	t.Cleanup(func() {
		_ = bus.Unsubscribe(context.Background(), "counter", ch)
		_ = client.Close()
		mr.Close()
	})
	go func() {
		for range ch {
			_ = w2.cache.Invalidate(ctx, "counter")
		}
	}()

	if err := w1.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set1: %v", err)
	}
	if err := w1.Set(ctx, "counter", 2); err != nil {
		t.Fatalf("set2: %v", err)
	}
	time.Sleep(50 * time.Millisecond)
	v, err := w2.Get(ctx, "counter")
	if err != nil {
		t.Fatalf("get: %v", err)
	}
	if v != 3 {
		t.Fatalf("expected 3 got %d", v)
	}

	before := bus.Metrics()
	start := make(chan struct{})
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		<-start
		_ = w1.Invalidate(ctx, "counter")
	}()
	go func() {
		defer wg.Done()
		<-start
		_ = w2.Invalidate(ctx, "counter")
	}()
	close(start)
	wg.Wait()
	time.Sleep(50 * time.Millisecond)
	after := bus.Metrics()
	// Deduplication is best-effort. If concurrent invalidations overlap, we expect 1 publish.
	// If they happen sequentially (due to scheduler/networking), we expect 2. Both are valid.
	if diff := after.Published - before.Published; diff < 1 || diff > 2 {
		t.Fatalf("expected 1 or 2 publishes, got %d", diff)
	}

	ev := testutil.ToFloat64(w1.evictionCounter) + testutil.ToFloat64(w2.evictionCounter)
	if ev != 2 {
		t.Fatalf("expected 2 evictions got %v", ev)
	}
}
F
function

TestWarpDistributedNATS

Parameters

v1/core/distributed_test.go:202-279
func TestWarpDistributedNATS(t *testing.T)

{
	ctx := context.Background()
	s := natsserver.RunRandClientPortServer()
	conn, err := nats.Connect(s.ClientURL())
	if err != nil {
		t.Fatalf("connect: %v", err)
	}
	bus := busnats.NewNATSBus(conn)
	store := adapter.NewInMemoryStore[int]()

	reg1 := prometheus.NewRegistry()
	reg2 := prometheus.NewRegistry()
	w1 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg1))
	w2 := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int](), WithMetrics[int](reg2))

	w1.Register("counter", ModeStrongDistributed, time.Minute)
	w2.Register("counter", ModeStrongDistributed, time.Minute)
	mergeFn := func(old, new int) (int, error) { return old + new, nil }
	w1.Merge("counter", mergeFn)
	w2.Merge("counter", mergeFn)

	ch, err := bus.Subscribe(ctx, "counter")
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}
	t.Cleanup(func() {
		_ = bus.Unsubscribe(context.Background(), "counter", ch)
		conn.Close()
		s.Shutdown()
	})
	go func() {
		for range ch {
			_ = w2.cache.Invalidate(ctx, "counter")
		}
	}()

	if err := w1.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set1: %v", err)
	}
	if err := w1.Set(ctx, "counter", 2); err != nil {
		t.Fatalf("set2: %v", err)
	}
	time.Sleep(50 * time.Millisecond)
	v, err := w2.Get(ctx, "counter")
	if err != nil {
		t.Fatalf("get: %v", err)
	}
	if v != 3 {
		t.Fatalf("expected 3 got %d", v)
	}

	before := bus.Metrics()
	start := make(chan struct{})
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		<-start
		_ = w1.Invalidate(ctx, "counter")
	}()
	go func() {
		defer wg.Done()
		<-start
		_ = w2.Invalidate(ctx, "counter")
	}()
	close(start)
	wg.Wait()
	time.Sleep(50 * time.Millisecond)
	after := bus.Metrics()
	if after.Published-before.Published != 1 {
		t.Fatalf("expected 1 publish, got %d", after.Published-before.Published)
	}

	ev := testutil.ToFloat64(w1.evictionCounter) + testutil.ToFloat64(w2.evictionCounter)
	if ev != 2 {
		t.Fatalf("expected 2 evictions got %v", ev)
	}
}
F
function

TestWarpStrongDistributedWaitsForQuorum

Parameters

v1/core/distributed_test.go:281-320
func TestWarpStrongDistributedWaitsForQuorum(t *testing.T)

{
	ctx := context.Background()
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)
	if !w.SetQuorum("counter", 2) {
		t.Fatalf("expected quorum configuration to succeed")
	}

	done := make(chan error, 1)
	go func() {
		done <- w.Set(ctx, "counter", 1)
	}()

	select {
	case err := <-done:
		t.Fatalf("set returned before quorum: %v", err)
	case <-time.After(20 * time.Millisecond):
	}

	bus.ackCh <- struct{}{}

	select {
	case err := <-done:
		t.Fatalf("set returned after single ack: %v", err)
	case <-time.After(20 * time.Millisecond):
	}

	bus.ackCh <- struct{}{}

	select {
	case err := <-done:
		if err != nil {
			t.Fatalf("set error: %v", err)
		}
	case <-time.After(time.Second):
		t.Fatalf("set timed out waiting for quorum")
	}
}
F
function

TestWarpStrongDistributedQuorumTimeout

Parameters

v1/core/distributed_test.go:322-334
func TestWarpStrongDistributedQuorumTimeout(t *testing.T)

{
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)
	w.SetQuorum("counter", 2)

	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
	defer cancel()
	if err := w.Set(ctx, "counter", 1); !errors.Is(err, context.DeadlineExceeded) {
		t.Fatalf("expected deadline exceeded, got %v", err)
	}
}
F
function

TestWarpStrongDistributedQuorumError

Parameters

v1/core/distributed_test.go:336-347
func TestWarpStrongDistributedQuorumError(t *testing.T)

{
	bus := newFakeQuorumBus()
	bus.err = syncbus.ErrQuorumNotSatisfied
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)
	w.SetQuorum("counter", 3)

	if err := w.Set(context.Background(), "counter", 1); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
		t.Fatalf("expected quorum error, got %v", err)
	}
}
F
function

TestWarpSetRollbackOnQuorumFailure

Parameters

v1/core/distributed_test.go:349-376
func TestWarpSetRollbackOnQuorumFailure(t *testing.T)

{
	ctx := context.Background()
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)

	go func() {
		bus.ackCh <- struct{}{}
	}()
	if err := w.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set baseline: %v", err)
	}

	bus.err = syncbus.ErrQuorumNotSatisfied
	if err := w.Set(ctx, "counter", 2); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
		t.Fatalf("expected quorum failure, got %v", err)
	}

	if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
		t.Fatalf("store value changed after failure: ok=%v err=%v got=%d", ok, err, got)
	}
	if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
		t.Fatalf("cache get: %v", err)
	} else if !ok || mv.Data != 1 {
		t.Fatalf("cache value changed after failure: ok=%v data=%v", ok, mv.Data)
	}
}
F
function

TestWarpInvalidateRollbackOnQuorumFailure

Parameters

v1/core/distributed_test.go:378-405
func TestWarpInvalidateRollbackOnQuorumFailure(t *testing.T)

{
	ctx := context.Background()
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)

	go func() {
		bus.ackCh <- struct{}{}
	}()
	if err := w.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set baseline: %v", err)
	}

	bus.err = syncbus.ErrQuorumNotSatisfied
	if err := w.Invalidate(ctx, "counter"); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
		t.Fatalf("expected quorum failure, got %v", err)
	}

	if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
		t.Fatalf("cache get: %v", err)
	} else if !ok || mv.Data != 1 {
		t.Fatalf("cache value removed after failure: ok=%v data=%v", ok, mv.Data)
	}
	if got, err := w.Get(ctx, "counter"); err != nil || got != 1 {
		t.Fatalf("warp get mismatch after failure: val=%d err=%v", got, err)
	}
}
F
function

TestWarpTxnRollbackOnQuorumFailure

Parameters

v1/core/distributed_test.go:407-436
func TestWarpTxnRollbackOnQuorumFailure(t *testing.T)

{
	ctx := context.Background()
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)

	go func() {
		bus.ackCh <- struct{}{}
	}()
	if err := w.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set baseline: %v", err)
	}

	txn := w.Txn(ctx)
	txn.Set("counter", 2)
	bus.err = syncbus.ErrQuorumNotSatisfied
	if err := txn.Commit(); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
		t.Fatalf("expected quorum failure, got %v", err)
	}

	if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
		t.Fatalf("cache get: %v", err)
	} else if !ok || mv.Data != 1 {
		t.Fatalf("cache value changed after commit failure: ok=%v data=%v", ok, mv.Data)
	}
	if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
		t.Fatalf("store value changed after commit failure: ok=%v err=%v val=%d", ok, err, got)
	}
}
F
function

TestWarpTxnDeleteRollbackOnQuorumFailure

Parameters

v1/core/distributed_test.go:438-467
func TestWarpTxnDeleteRollbackOnQuorumFailure(t *testing.T)

{
	ctx := context.Background()
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)

	go func() {
		bus.ackCh <- struct{}{}
	}()
	if err := w.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set baseline: %v", err)
	}

	txn := w.Txn(ctx)
	txn.Delete("counter")
	bus.err = syncbus.ErrQuorumNotSatisfied
	if err := txn.Commit(); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
		t.Fatalf("expected quorum failure, got %v", err)
	}

	if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
		t.Fatalf("cache get: %v", err)
	} else if !ok || mv.Data != 1 {
		t.Fatalf("cache value removed after delete failure: ok=%v data=%v", ok, mv.Data)
	}
	if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
		t.Fatalf("store value removed after delete failure: ok=%v err=%v val=%d", ok, err, got)
	}
}
F
function

TestWarpTxnRollbackMultiKeyOnQuorumFailure

Parameters

v1/core/distributed_test.go:469-524
func TestWarpTxnRollbackMultiKeyOnQuorumFailure(t *testing.T)

{
	ctx := context.Background()
	bus := newFakeQuorumBus()
	store := adapter.NewInMemoryStore[int]()
	w := New(cache.NewInMemory[merge.Value[int]](), store, bus, merge.NewEngine[int]())
	w.Register("counter", ModeStrongDistributed, time.Minute)
	w.Register("mirror", ModeStrongDistributed, time.Minute)

	go func() {
		bus.ackCh <- struct{}{}
	}()
	if err := w.Set(ctx, "counter", 1); err != nil {
		t.Fatalf("set counter baseline: %v", err)
	}

	go func() {
		bus.ackCh <- struct{}{}
	}()
	if err := w.Set(ctx, "mirror", 1); err != nil {
		t.Fatalf("set mirror baseline: %v", err)
	}

	txn := w.Txn(ctx)
	txn.Set("counter", 2)
	txn.Set("mirror", 5)

	bus.mu.Lock()
	bus.errSeq = []error{nil, syncbus.ErrQuorumNotSatisfied}
	bus.mu.Unlock()

	go func() {
		bus.ackCh <- struct{}{}
	}()

	if err := txn.Commit(); !errors.Is(err, syncbus.ErrQuorumNotSatisfied) {
		t.Fatalf("expected quorum failure, got %v", err)
	}

	if mv, ok, err := w.cache.Get(ctx, "counter"); err != nil {
		t.Fatalf("cache get counter: %v", err)
	} else if !ok || mv.Data != 1 {
		t.Fatalf("counter cache changed after failure: ok=%v val=%v", ok, mv.Data)
	}
	if mv, ok, err := w.cache.Get(ctx, "mirror"); err != nil {
		t.Fatalf("cache get mirror: %v", err)
	} else if !ok || mv.Data != 1 {
		t.Fatalf("mirror cache changed after failure: ok=%v val=%v", ok, mv.Data)
	}

	if got, ok, err := store.Get(ctx, "counter"); err != nil || !ok || got != 1 {
		t.Fatalf("store counter changed after failure: ok=%v err=%v val=%d", ok, err, got)
	}
	if got, ok, err := store.Get(ctx, "mirror"); err != nil || !ok || got != 1 {
		t.Fatalf("store mirror changed after failure: ok=%v err=%v val=%d", ok, err, got)
	}
}
S
struct

MockStoreSlow

MockStoreSlow implements adapter.Store and simulates latency

v1/core/soft_timeout_test.go:13-16
type MockStoreSlow struct

Fields

Name Type Description
data map[string]T
delay time.Duration
F
function

TestSoftTimeout

Parameters

v1/core/soft_timeout_test.go:47-125
func TestSoftTimeout(t *testing.T)

{
	// Setup
	c := cache.NewInMemory[merge.Value[string]](cache.WithSweepInterval[merge.Value[string]](10 * time.Millisecond))

	// Store takes 200ms to respond
	store := &MockStoreSlow[string]{
		data:  make(map[string]string),
		delay: 200 * time.Millisecond,
	}

	w := New[string](c, store, nil, nil)

	key := "slow-key"
	val := "fast-value"
	store.data[key] = val

	// Register with:
	// TTL: 1s (Longer TTL for refresh verification)
	// FailSafe: 1h (So we have a stale backup)
	// SoftTimeout: 50ms (Much shorter than store delay)
	w.Register(key, ModeStrongLocal, 1*time.Second,
		cache.WithFailSafe(1*time.Hour),
		cache.WithSoftTimeout(50*time.Millisecond),
	)

	// 1. Initial Populate (Will be slow, 200ms, because no stale data yet)
	start := time.Now()
	got, err := w.Get(context.Background(), key)
	elapsed := time.Since(start)

	if err != nil {
		t.Fatalf("Initial Get failed: %v", err)
	}
	if got != val {
		t.Errorf("Initial Get = %v, want %v", got, val)
	}
	if elapsed < 200*time.Millisecond {
		t.Errorf("Initial Get was too fast (%v), expected > 200ms", elapsed)
	}

	// 2. Wait for TTL to expire (1s + buffer)
	time.Sleep(1100 * time.Millisecond)

	// 3. Get Again (Stale exists + Store is slow)
	// Should hit SoftTimeout at 50ms and return stale data immediately
	start = time.Now()
	got, err = w.Get(context.Background(), key)
	elapsed = time.Since(start)

	if err != nil {
		t.Fatalf("Soft Timeout Get failed: %v", err)
	}
	if got != val {
		t.Errorf("Soft Timeout Get = %v, want %v", got, val)
	}

	// Check timing: Should be around 50ms (SoftTimeout), definitely NOT 200ms
	if elapsed > 150*time.Millisecond {
		t.Errorf("Soft Timeout Get took too long (%v), expected ~50ms", elapsed)
	}

	// 4. Wait for background refresh to finish (remaining 150ms of 200ms delay + buffer)
	time.Sleep(250 * time.Millisecond)

	// 5. Get Again - Should be a fresh Hit now (0ms latency from cache)
	start = time.Now()
	got, err = w.Get(context.Background(), key)
	elapsed = time.Since(start)

	if err != nil {
		t.Fatalf("Third Get failed: %v", err)
	}
	if got != val {
		t.Errorf("Third Get = %v, want %v", got, val)
	}
	if elapsed > 10*time.Millisecond {
		t.Errorf("Third Get was slow (%v), background refresh likely failed", elapsed)
	}
}