lock API

lock

package

API reference for the lock package.

S
struct

lockState

v1/lock/memory.go:11-14
type lockState struct

Fields

Name Type Description
timer *time.Timer
notify chan struct{}
S
struct
Implements: Locker

InMemory

InMemory implements Locker using local memory. Lock and unlock events are
propagated through a syncbus Bus allowing multiple nodes to coordinate.

v1/lock/memory.go:18-24
type InMemory struct

Methods

Parameters

key string

Returns

error
func (*InMemory) ensureSubscriptions(key string) error
{
	l.mu.Lock()
	if _, ok := l.subs[key]; ok {
		l.mu.Unlock()
		return nil
	}
	l.subs[key] = struct{}{}
	l.mu.Unlock()

	cleanup := func() {
		l.mu.Lock()
		delete(l.subs, key)
		l.mu.Unlock()
	}

	lockCh, err := l.bus.Subscribe(context.Background(), "lock:"+key)
	if err != nil {
		cleanup()
		return err
	}
	unlockCh, err := l.bus.Subscribe(context.Background(), "unlock:"+key)
	if err != nil {
		_ = l.bus.Unsubscribe(context.Background(), "lock:"+key, lockCh)
		cleanup()
		return err
	}

	go func() {
		for range lockCh {
			l.mu.Lock()
			if l.pending["lock:"+key] > 0 {
				l.pending["lock:"+key]--
				l.mu.Unlock()
				continue
			}
			if _, ok := l.locks[key]; !ok {
				l.locks[key] = &lockState{notify: make(chan struct{})}
			}
			l.mu.Unlock()
		}
	}()
	go func() {
		for range unlockCh {
			l.mu.Lock()
			if l.pending["unlock:"+key] > 0 {
				l.pending["unlock:"+key]--
				l.mu.Unlock()
				continue
			}
			if st, ok := l.locks[key]; ok {
				if st.timer != nil {
					st.timer.Stop()
				}
				close(st.notify)
				delete(l.locks, key)
			}
			l.mu.Unlock()
		}
	}()
	return nil
}
TryLock
Method

TryLock attempts to obtain the lock without waiting. It returns true on success.

Parameters

key string

Returns

bool
error
func (*InMemory) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
	if err := l.ensureSubscriptions(key); err != nil {
		return false, err
	}
	l.mu.Lock()
	if _, ok := l.locks[key]; ok {
		l.mu.Unlock()
		return false, nil
	}
	st := &lockState{notify: make(chan struct{})}
	if ttl > 0 {
		st.timer = time.AfterFunc(ttl, func() {
			_ = l.Release(context.Background(), key)
		})
	}
	l.locks[key] = st
	l.pending["lock:"+key]++
	l.mu.Unlock()
	_ = l.bus.Publish(ctx, "lock:"+key)
	return true, nil
}
Acquire
Method

Acquire blocks until the lock is obtained or the context is cancelled. The subscription to "unlock:<key>" is established before each TryLock attempt so that a Release that fires in the window between a failed TryLock and a later Subscribe call cannot be missed.

Parameters

key string

Returns

error
func (*InMemory) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
	for {
		// Subscribe BEFORE attempting the lock so we cannot miss a release
		// notification that fires between a failed TryLock and the subscribe.
		ch, err := l.bus.Subscribe(ctx, "unlock:"+key)
		if err != nil {
			return err
		}

		ok, err := l.TryLock(ctx, key, ttl)
		if err != nil {
			_ = l.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
			return err
		}
		if ok {
			_ = l.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
			return nil
		}

		select {
		case <-ch:
		case <-ctx.Done():
			_ = l.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
			return ctx.Err()
		}
		_ = l.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
	}
}
Release
Method

Release frees the lock for the given key.

Parameters

key string

Returns

error
func (*InMemory) Release(ctx context.Context, key string) error
{
	l.mu.Lock()
	st, ok := l.locks[key]
	if ok {
		if st.timer != nil {
			st.timer.Stop()
		}
		close(st.notify)
		delete(l.locks, key)
		l.pending["unlock:"+key]++
	}
	l.mu.Unlock()
	if ok {
		_ = l.bus.Publish(ctx, "unlock:"+key)
	}
	return nil
}

Fields

Name Type Description
mu sync.Mutex
bus syncbus.Bus
locks map[string]*lockState
subs map[string]struct{}
pending map[string]int
F
function

NewInMemory

NewInMemory returns a new in-memory locker that uses bus to propagate events.

Parameters

Returns

v1/lock/memory.go:27-37
func NewInMemory(bus syncbus.Bus) *InMemory

{
	if bus == nil {
		bus = syncbus.NewInMemoryBus()
	}
	return &InMemory{
		bus:     bus,
		locks:   make(map[string]*lockState),
		subs:    make(map[string]struct{}),
		pending: make(map[string]int),
	}
}
S
struct

lockEntry

v1/lock/nats.go:13-16
type lockEntry struct

Fields

Name Type Description
token string
timer *time.Timer
S
struct
Implements: Locker

NATS

NATS implements Locker using NATS JetStream Key-Value as the coordination
backend. This provides distributed locks without requiring Redis.

v1/lock/nats.go:20-28
type NATS struct

Methods

getKV
Method

Returns

func (*NATS) getKV() (jetstream.KeyValue, error)
{
	n.kvOnce.Do(func() {
		kv, err := n.js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{
			Bucket:  n.bucket,
			History: 1,
		})
		if err == nil {
			n.kv = kv
			return
		}
		if errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) {
			n.kv, n.kvErr = n.js.KeyValue(context.Background(), n.bucket)
			return
		}
		n.kvErr = err
	})
	return n.kv, n.kvErr
}
TryLock
Method

TryLock attempts to obtain the lock without waiting. It returns true on success. If ttl is greater than zero, a goroutine is scheduled to release the lock after the duration (per-key TTL, since JetStream KV TTL is bucket-level only).

Parameters

key string

Returns

bool
error
func (*NATS) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
	kv, err := n.getKV()
	if err != nil {
		return false, err
	}
	token := uuid.NewString()
	_, err = kv.Create(ctx, key, []byte(token))
	if err != nil {
		if errors.Is(err, jetstream.ErrKeyExists) {
			return false, nil
		}
		return false, err
	}
	entry := &lockEntry{token: token}
	if ttl > 0 {
		// Bind the timer to this specific token so a re-acquired lock held
		// by another caller cannot be released when this timer fires.
		entry.timer = time.AfterFunc(ttl, func() {
			_ = n.releaseWithToken(context.Background(), key, token)
		})
	}
	n.mu.Lock()
	n.locks[key] = entry
	n.mu.Unlock()
	return true, nil
}
Acquire
Method

Acquire blocks until the lock for key is obtained or the context is cancelled.

Parameters

key string

Returns

error
func (*NATS) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
	for {
		ok, err := n.TryLock(ctx, key, ttl)
		if err != nil {
			return err
		}
		if ok {
			return nil
		}
		kv, err := n.getKV()
		if err != nil {
			return err
		}
		watcher, err := kv.Watch(ctx, key, jetstream.UpdatesOnly())
		if err != nil {
			return err
		}
		waiting := true
		for waiting {
			select {
			case entry, open := <-watcher.Updates():
				if !open {
					waiting = false
					break
				}
				if entry == nil {
					continue
				}
				if entry.Operation() == jetstream.KeyValueDelete {
					waiting = false
				}
			case <-ctx.Done():
				_ = watcher.Stop()
				return ctx.Err()
			}
		}
		_ = watcher.Stop()
	}
}
Release
Method

Release frees the lock for the given key if this instance still holds it.

Parameters

key string

Returns

error
func (*NATS) Release(ctx context.Context, key string) error
{
	n.mu.Lock()
	entry, ok := n.locks[key]
	n.mu.Unlock()
	if !ok {
		return nil
	}
	return n.releaseWithToken(ctx, key, entry.token)
}

releaseWithToken deletes the lock only when the KV store still holds the expected token, using an optimistic last-revision check to prevent stealing a lock re-acquired by another caller after expiry or a network partition.

Parameters

key string
token string

Returns

error
func (*NATS) releaseWithToken(ctx context.Context, key, token string) error
{
	kv, err := n.getKV()
	if err != nil {
		return err
	}
	current, err := kv.Get(ctx, key)
	if err != nil {
		if errors.Is(err, jetstream.ErrKeyNotFound) {
			n.cleanupLocal(key, token)
			return nil
		}
		return err
	}
	if string(current.Value()) != token {
		// Another caller holds the lock; do not delete.
		return nil
	}
	// Atomic ownership-checked delete: fails if entry was modified between
	// our Get and this Delete (JSErrCodeStreamWrongLastSequence).
	if err := kv.Delete(ctx, key, jetstream.LastRevision(current.Revision())); err != nil {
		var apiErr *jetstream.APIError
		if errors.As(err, &apiErr) && apiErr.ErrorCode == jetstream.JSErrCodeStreamWrongLastSequence {
			// Lost a race — entry changed concurrently; our lock is already gone.
			return nil
		}
		return err
	}
	n.cleanupLocal(key, token)
	return nil
}
cleanupLocal
Method

Parameters

key string
token string
func (*NATS) cleanupLocal(key, token string)
{
	n.mu.Lock()
	defer n.mu.Unlock()
	if e, ok := n.locks[key]; ok && e.token == token {
		if e.timer != nil {
			e.timer.Stop()
		}
		delete(n.locks, key)
	}
}

Fields

Name Type Description
js jetstream.JetStream
bucket string
mu sync.Mutex
locks map[string]*lockEntry
kv jetstream.KeyValue
kvOnce sync.Once
kvErr error
F
function

NewNATS

NewNATS returns a NATS JetStream-backed distributed locker.
bucket is the JetStream KV bucket name; it is created automatically if absent.

Parameters

bucket
string

Returns

v1/lock/nats.go:32-38
func NewNATS(js jetstream.JetStream, bucket string) *NATS

{
	return &NATS{
		js:     js,
		bucket: bucket,
		locks:  make(map[string]*lockEntry),
	}
}
F
function

newNATSLocker

Parameters

Returns

v1/lock/nats_test.go:14-49
func newNATSLocker(t *testing.T) (*NATS, context.Context, func())

{
	t.Helper()
	s := natsserver.RunRandClientPortServer()
	if s == nil {
		t.Skip("requires NATS server")
	}
	// Restart with JetStream enabled.
	s.Shutdown()
	opts := natsserver.DefaultTestOptions
	opts.Port = -1
	opts.JetStream = true
	opts.StoreDir = t.TempDir()
	s, err := server.NewServer(&opts)
	if err != nil {
		t.Fatalf("new server: %v", err)
	}
	go s.Start()
	if !s.ReadyForConnections(5 * time.Second) {
		t.Fatal("NATS server not ready")
	}
	nc, err := nats.Connect(s.ClientURL())
	if err != nil {
		t.Fatalf("connect: %v", err)
	}
	js, err := jetstream.New(nc)
	if err != nil {
		t.Fatalf("jetstream: %v", err)
	}
	locker := NewNATS(js, "warp-lock-test")
	ctx := context.Background()
	cleanup := func() {
		nc.Close()
		s.Shutdown()
	}
	return locker, ctx, cleanup
}
F
function

TestNATSTryLockDoubleAcquire

Parameters

v1/lock/nats_test.go:51-82
func TestNATSTryLockDoubleAcquire(t *testing.T)

{
	l, ctx, cleanup := newNATSLocker(t)
	defer cleanup()

	ok, err := l.TryLock(ctx, "k", time.Second)
	if err != nil {
		t.Fatalf("trylock: %v", err)
	}
	if !ok {
		t.Fatal("expected lock acquired")
	}

	ok2, err := l.TryLock(ctx, "k", time.Second)
	if err != nil {
		t.Fatalf("second trylock: %v", err)
	}
	if ok2 {
		t.Fatal("expected lock to be held by first holder")
	}

	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("release: %v", err)
	}

	ok3, err := l.TryLock(ctx, "k", time.Second)
	if err != nil {
		t.Fatalf("trylock after release: %v", err)
	}
	if !ok3 {
		t.Fatal("expected lock re-acquired after release")
	}
}
F
function

TestNATSAcquireTimeout

Parameters

v1/lock/nats_test.go:84-101
func TestNATSAcquireTimeout(t *testing.T)

{
	l, ctx, cleanup := newNATSLocker(t)
	defer cleanup()

	if ok, err := l.TryLock(ctx, "k", 0); err != nil || !ok {
		t.Fatalf("initial trylock: %v ok %v", err, ok)
	}

	cctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
	defer cancel()
	start := time.Now()
	if err := l.Acquire(cctx, "k", 0); err == nil {
		t.Fatal("expected timeout error")
	}
	if time.Since(start) > 200*time.Millisecond {
		t.Fatal("acquire did not respect context timeout")
	}
}
S
struct

redisLockEntry

redisLockEntry tracks the ownership token and optional TTL timer for a held lock.

v1/lock/redis.go:22-25
type redisLockEntry struct

Fields

Name Type Description
token string
timer *time.Timer
S
struct
Implements: Locker

Redis

Redis implements Locker using a Redis backend.
Locks are acquired with SET NX EX and released via a Lua script that verifies
ownership before deletion, so only the current holder can free a lock.
Acquire subscribes to keyspace notifications for efficient wake-up; when the
server has notifications disabled it falls back to exponential-backoff polling.

v1/lock/redis.go:32-37
type Redis struct

Methods

TryLock
Method

TryLock attempts to obtain the lock without waiting. On success it stores a UUID token locally and, when ttl > 0, arms a timer that releases the lock automatically to clean up local state after expiry.

Parameters

key string

Returns

bool
error
func (*Redis) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
	token := uuid.NewString()
	ok, err := r.client.SetNX(ctx, key, token, ttl).Result()
	if err != nil {
		return false, err
	}
	if !ok {
		return false, nil
	}
	entry := &redisLockEntry{token: token}
	if ttl > 0 {
		// Capture token by value so the closure always releases the right lock,
		// even if the entry is replaced by a concurrent re-acquisition.
		capturedToken := token
		entry.timer = time.AfterFunc(ttl, func() {
			_ = r.releaseWithToken(context.Background(), key, capturedToken)
		})
	}
	r.mu.Lock()
	r.entries[key] = entry
	r.mu.Unlock()
	return true, nil
}
Release
Method

Release frees the lock for the given key if the caller currently holds it.

Parameters

key string

Returns

error
func (*Redis) Release(ctx context.Context, key string) error
{
	r.mu.Lock()
	entry, ok := r.entries[key]
	r.mu.Unlock()
	if !ok {
		return nil
	}
	if entry.timer != nil {
		entry.timer.Stop()
	}
	return r.releaseWithToken(ctx, key, entry.token)
}

releaseWithToken runs the ownership-checked Lua delete script and removes the local entry only when the stored token matches, preventing races with concurrent re-acquisitions from overwriting a newer holder's entry.

Parameters

key string
token string

Returns

error
func (*Redis) releaseWithToken(ctx context.Context, key, token string) error
{
	_, err := redisDelScript.Run(ctx, r.client, []string{key}, token).Result()
	if err == redis.Nil {
		err = nil
	}
	if err == nil {
		r.cleanupLocal(key, token)
	}
	return err
}
cleanupLocal
Method

cleanupLocal removes the entry for key only when the stored token matches, guarding against a timer firing after a concurrent re-acquisition.

Parameters

key string
token string
func (*Redis) cleanupLocal(key, token string)
{
	r.mu.Lock()
	defer r.mu.Unlock()
	if e, ok := r.entries[key]; ok && e.token == token {
		delete(r.entries, key)
	}
}
Acquire
Method

Acquire blocks until the lock for key is obtained or ctx is cancelled. It subscribes once to keyspace notifications (__keyevent@0__:del and :expired) before the retry loop so that a Release firing between a failed TryLock and the wait cannot be missed (lost-wakeup prevention). When the server does not publish those events the select falls through on a timer and retries with exponential back-off (5 ms → 500 ms).

Parameters

key string

Returns

error
func (*Redis) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
	const baseInterval = 5 * time.Millisecond
	const maxInterval = 500 * time.Millisecond
	interval := baseInterval

	// Subscribe ONCE before the retry loop — a single subscription avoids the
	// overhead of re-subscribing on every failed TryLock and prevents missing
	// release notifications that arrive between TryLock and the wait.
	pubsub := r.client.Subscribe(ctx,
		"__keyevent@0__:del",
		"__keyevent@0__:expired",
	)
	defer pubsub.Close()
	msgCh := pubsub.Channel()

	for {
		ok, err := r.TryLock(ctx, key, ttl)
		if err != nil {
			return err
		}
		if ok {
			return nil
		}

		timer := time.NewTimer(interval)
	waitForEvent:
		for {
			select {
			case msg, open := <-msgCh:
				if !open {
					break waitForEvent
				}
				if msg.Payload == key {
					interval = baseInterval // reset after a real wake-up
					break waitForEvent
				}
			case <-timer.C:
				if interval < maxInterval {
					interval *= 2
				}
				break waitForEvent
			case <-ctx.Done():
				timer.Stop()
				return ctx.Err()
			}
		}
		timer.Stop()
	}
}

Fields

Name Type Description
client redis.UniversalClient
mu sync.Mutex
entries map[string]*redisLockEntry
F
function

NewRedis

NewRedis returns a new Redis locker backed by client.
client may be a standalone, Sentinel, or cluster client.

Parameters

Returns

v1/lock/redis.go:41-46
func NewRedis(client redis.UniversalClient) *Redis

{
	return &Redis{
		client:  client,
		entries: make(map[string]*redisLockEntry),
	}
}
F
function

newTestRedisLocker

Parameters

Returns

v1/lock/redis_test.go:12-24
func newTestRedisLocker(t *testing.T) (*Redis, *miniredis.Miniredis, func())

{
	t.Helper()
	mr, err := miniredis.Run()
	if err != nil {
		t.Fatalf("miniredis.Run: %v", err)
	}
	client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
	locker := NewRedis(client)
	return locker, mr, func() {
		_ = client.Close()
		mr.Close()
	}
}
F
function

TestTryLock_basic

TestTryLock_basic verifies the acquire → contention → release → re-acquire
cycle using TryLock.

Parameters

v1/lock/redis_test.go:28-53
func TestTryLock_basic(t *testing.T)

{
	l, _, cleanup := newTestRedisLocker(t)
	defer cleanup()
	ctx := context.Background()

	ok, err := l.TryLock(ctx, "k", time.Second)
	if err != nil || !ok {
		t.Fatalf("first TryLock: ok=%v err=%v", ok, err)
	}

	// Lock is held — second attempt must fail.
	ok, err = l.TryLock(ctx, "k", time.Second)
	if err != nil || ok {
		t.Fatalf("second TryLock should fail while lock is held: ok=%v err=%v", ok, err)
	}

	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("Release: %v", err)
	}

	// After release the key must be acquirable again.
	ok, err = l.TryLock(ctx, "k", time.Second)
	if err != nil || !ok {
		t.Fatalf("TryLock after Release: ok=%v err=%v", ok, err)
	}
}
F
function

TestRelease_ownershipCheck

TestRelease_ownershipCheck verifies that a locker without the token for a key
cannot release a lock held by another locker instance.

Parameters

v1/lock/redis_test.go:57-83
func TestRelease_ownershipCheck(t *testing.T)

{
	l1, _, cleanup := newTestRedisLocker(t)
	defer cleanup()
	// l2 shares the same Redis but has no entry for "k".
	l2 := NewRedis(l1.client)
	ctx := context.Background()

	ok, err := l1.TryLock(ctx, "k", 5*time.Second)
	if err != nil || !ok {
		t.Fatalf("l1.TryLock: ok=%v err=%v", ok, err)
	}

	// l2 has no token for "k"; Release must be a no-op.
	if err := l2.Release(ctx, "k"); err != nil {
		t.Fatalf("l2.Release: %v", err)
	}

	// l1 must still hold the lock.
	ok, err = l1.TryLock(ctx, "k", time.Second)
	if err != nil || ok {
		t.Fatalf("lock should still be held by l1 after l2.Release: ok=%v err=%v", ok, err)
	}

	if err := l1.Release(ctx, "k"); err != nil {
		t.Fatalf("l1.Release: %v", err)
	}
}
F
function

TestAcquire_waits

TestAcquire_waits verifies that Acquire blocks while the lock is held and
returns promptly after the holder calls Release.
miniredis does not publish keyspace notifications, so this test exercises
the polling fallback path only.

Parameters

v1/lock/redis_test.go:89-121
func TestAcquire_waits(t *testing.T)

{
	l, _, cleanup := newTestRedisLocker(t)
	defer cleanup()
	ctx := context.Background()

	// Goroutine A acquires the lock.
	ok, err := l.TryLock(ctx, "k", 5*time.Second)
	if err != nil || !ok {
		t.Fatalf("TryLock: ok=%v err=%v", ok, err)
	}

	// Goroutine B tries to Acquire — must block.
	done := make(chan error, 1)
	go func() {
		done <- l.Acquire(ctx, "k", 5*time.Second)
	}()

	// Give B time to enter the wait loop.
	time.Sleep(20 * time.Millisecond)

	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("Release: %v", err)
	}

	select {
	case err := <-done:
		if err != nil {
			t.Fatalf("B.Acquire returned error: %v", err)
		}
	case <-time.After(500 * time.Millisecond):
		t.Fatal("B did not acquire the lock within 500 ms after Release")
	}
}
F
function

TestTTL_expiry

TestTTL_expiry verifies that a lock with a short TTL is released automatically
by Redis, allowing a subsequent TryLock to succeed.

Parameters

v1/lock/redis_test.go:125-143
func TestTTL_expiry(t *testing.T)

{
	l, mr, cleanup := newTestRedisLocker(t)
	defer cleanup()
	ctx := context.Background()

	ok, err := l.TryLock(ctx, "k", 100*time.Millisecond)
	if err != nil || !ok {
		t.Fatalf("TryLock: ok=%v err=%v", ok, err)
	}

	// Advance miniredis internal clock past the TTL without real sleeping.
	mr.FastForward(200 * time.Millisecond)

	// The Redis key has expired; a new TryLock must succeed.
	ok, err = l.TryLock(ctx, "k", time.Second)
	if err != nil || !ok {
		t.Fatalf("TryLock after TTL expiry: ok=%v err=%v", ok, err)
	}
}
I
interface

Locker

Locker defines basic distributed lock operations.

v1/lock/lock.go:9-19
type Locker interface

Methods

Acquire
Method

Parameters

key string

Returns

error
func Acquire(...)
TryLock
Method

Parameters

key string

Returns

bool
error
func TryLock(...)
Release
Method

Parameters

key string

Returns

error
func Release(...)
F
function

TestInMemoryTryLockAcquireRelease

Parameters

v1/lock/lock_test.go:9-25
func TestInMemoryTryLockAcquireRelease(t *testing.T)

{
	l := NewInMemory(nil)
	ctx := context.Background()
	ok, err := l.TryLock(ctx, "k", time.Second)
	if err != nil || !ok {
		t.Fatalf("trylock: %v ok %v", err, ok)
	}
	if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || ok {
		t.Fatalf("expected lock held, got ok %v err %v", ok, err)
	}
	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("release: %v", err)
	}
	if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || !ok {
		t.Fatalf("expected lock re-acquired, ok %v err %v", ok, err)
	}
}
F
function

TestInMemoryAcquireNoLostWakeup

TestInMemoryAcquireNoLostWakeup verifies that Acquire never misses a Release
notification even when Release fires in the narrow window between a failed
TryLock and the Subscribe call (the lost-wakeup race).

The test runs many iterations: in each one goroutine A holds the lock while
goroutine B calls Acquire. A releases immediately, racing with B’s subscribe
step. With the fix in place, B is guaranteed to receive the notification and
must unblock well within the 100 ms deadline.

Parameters

v1/lock/lock_test.go:35-67
func TestInMemoryAcquireNoLostWakeup(t *testing.T)

{
	const iterations = 200
	const deadline = 100 * time.Millisecond

	for i := 0; i < iterations; i++ {
		l := NewInMemory(nil)
		ctx := context.Background()

		ok, err := l.TryLock(ctx, "k", 0)
		if err != nil || !ok {
			t.Fatalf("iter %d: initial TryLock: ok=%v err=%v", i, ok, err)
		}

		done := make(chan error, 1)
		go func() {
			done <- l.Acquire(ctx, "k", 0)
		}()

		// Release immediately, deliberately racing with B's subscribe step.
		if err := l.Release(ctx, "k"); err != nil {
			t.Fatalf("iter %d: release: %v", i, err)
		}

		select {
		case err := <-done:
			if err != nil {
				t.Fatalf("iter %d: Acquire returned unexpected error: %v", i, err)
			}
		case <-time.After(deadline):
			t.Fatalf("iter %d: Acquire did not complete within %v — lost wakeup", i, deadline)
		}
	}
}
F
function

TestInMemoryAcquireTimeout

Parameters

v1/lock/lock_test.go:69-84
func TestInMemoryAcquireTimeout(t *testing.T)

{
	l := NewInMemory(nil)
	ctx := context.Background()
	_, _ = l.TryLock(ctx, "k", 0)

	cctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
	defer cancel()
	start := time.Now()
	err := l.Acquire(cctx, "k", 0)
	if err == nil {
		t.Fatal("expected timeout error")
	}
	if time.Since(start) > 20*time.Millisecond {
		t.Fatal("acquire did not respect context timeout")
	}
}
F
function

TestInMemoryLockTTLExpires

Parameters

v1/lock/lock_test.go:86-96
func TestInMemoryLockTTLExpires(t *testing.T)

{
	l := NewInMemory(nil)
	ctx := context.Background()
	if ok, err := l.TryLock(ctx, "k", 10*time.Millisecond); err != nil || !ok {
		t.Fatalf("trylock: %v ok %v", err, ok)
	}
	time.Sleep(20 * time.Millisecond)
	if ok, err := l.TryLock(ctx, "k", 0); err != nil || !ok {
		t.Fatalf("lock should expire, ok %v err %v", ok, err)
	}
}