lock API

lock

package

API reference for the lock package.

F
function

newRedisLocker

Parameters

v1/lock/redis_test.go:14-29
func newRedisLocker(t *testing.T) (*Redis, syncbus.Bus, context.Context, func())

{
	t.Helper()
	mr, err := miniredis.Run()
	if err != nil {
		t.Fatalf("miniredis run: %v", err)
	}
	client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
	bus := syncbus.NewInMemoryBus()
	locker := NewRedis(client, bus)
	ctx := context.Background()
	cleanup := func() {
		_ = client.Close()
		mr.Close()
	}
	return locker, bus, ctx, cleanup
}
F
function

TestRedisTryLockAcquireReleaseAndBus

Parameters

v1/lock/redis_test.go:31-76
func TestRedisTryLockAcquireReleaseAndBus(t *testing.T)

{
	l, bus, ctx, cleanup := newRedisLocker(t)
	defer cleanup()

	lockCh, err := bus.Subscribe(ctx, "lock:k")
	if err != nil {
		t.Fatalf("subscribe lock: %v", err)
	}
	unlockCh, err := bus.Subscribe(ctx, "unlock:k")
	if err != nil {
		t.Fatalf("subscribe unlock: %v", err)
	}

	if err := l.Acquire(ctx, "k", time.Second); err != nil {
		t.Fatalf("acquire: %v", err)
	}
	select {
	case <-lockCh:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for lock publish")
	}
	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("release: %v", err)
	}
	select {
	case <-unlockCh:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for unlock publish")
	}
	l.mu.Lock()
	if _, ok := l.tokens["k"]; ok {
		t.Fatal("token not cleaned up on release")
	}
	l.mu.Unlock()

	ok, err := l.TryLock(ctx, "k", time.Second)
	if err != nil || !ok {
		t.Fatalf("trylock: %v ok %v", err, ok)
	}
	if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || ok {
		t.Fatalf("expected lock held, ok %v err %v", ok, err)
	}
	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("release: %v", err)
	}
}
F
function

TestRedisAcquireTimeout

Parameters

v1/lock/redis_test.go:78-96
func TestRedisAcquireTimeout(t *testing.T)

{
	l1, bus, ctx, cleanup := newRedisLocker(t)
	defer cleanup()
	l2 := NewRedis(l1.client, bus)

	if ok, err := l1.TryLock(ctx, "k", 0); err != nil || !ok {
		t.Fatalf("initial trylock: %v ok %v", err, ok)
	}

	cctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
	defer cancel()
	start := time.Now()
	if err := l2.Acquire(cctx, "k", 0); err == nil {
		t.Fatal("expected timeout error")
	}
	if time.Since(start) > 20*time.Millisecond {
		t.Fatal("acquire did not respect context timeout")
	}
}
I
interface

Locker

Locker defines basic distributed lock operations.

v1/lock/lock.go:9-19
type Locker interface

Methods

Acquire
Method

Parameters

key string

Returns

error
func Acquire(...)
TryLock
Method

Parameters

key string

Returns

bool
error
func TryLock(...)
Release
Method

Parameters

key string

Returns

error
func Release(...)
F
function

TestInMemoryTryLockAcquireRelease

Parameters

v1/lock/lock_test.go:9-25
func TestInMemoryTryLockAcquireRelease(t *testing.T)

{
	l := NewInMemory(nil)
	ctx := context.Background()
	ok, err := l.TryLock(ctx, "k", time.Second)
	if err != nil || !ok {
		t.Fatalf("trylock: %v ok %v", err, ok)
	}
	if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || ok {
		t.Fatalf("expected lock held, got ok %v err %v", ok, err)
	}
	if err := l.Release(ctx, "k"); err != nil {
		t.Fatalf("release: %v", err)
	}
	if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || !ok {
		t.Fatalf("expected lock re-acquired, ok %v err %v", ok, err)
	}
}
F
function

TestInMemoryAcquireTimeout

Parameters

v1/lock/lock_test.go:27-42
func TestInMemoryAcquireTimeout(t *testing.T)

{
	l := NewInMemory(nil)
	ctx := context.Background()
	_, _ = l.TryLock(ctx, "k", 0)

	cctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
	defer cancel()
	start := time.Now()
	err := l.Acquire(cctx, "k", 0)
	if err == nil {
		t.Fatal("expected timeout error")
	}
	if time.Since(start) > 20*time.Millisecond {
		t.Fatal("acquire did not respect context timeout")
	}
}
F
function

TestInMemoryLockTTLExpires

Parameters

v1/lock/lock_test.go:44-54
func TestInMemoryLockTTLExpires(t *testing.T)

{
	l := NewInMemory(nil)
	ctx := context.Background()
	if ok, err := l.TryLock(ctx, "k", 10*time.Millisecond); err != nil || !ok {
		t.Fatalf("trylock: %v ok %v", err, ok)
	}
	time.Sleep(20 * time.Millisecond)
	if ok, err := l.TryLock(ctx, "k", 0); err != nil || !ok {
		t.Fatalf("lock should expire, ok %v err %v", ok, err)
	}
}
S
struct

lockState

v1/lock/memory.go:11-14
type lockState struct

Fields

Name Type Description
timer *time.Timer
notify chan struct{}
S
struct
Implements: Locker

InMemory

InMemory implements Locker using local memory. Lock and unlock events are
propagated through a syncbus Bus allowing multiple nodes to coordinate.

v1/lock/memory.go:18-24
type InMemory struct

Methods

Parameters

key string

Returns

error
func (*InMemory) ensureSubscriptions(key string) error
{
	l.mu.Lock()
	if _, ok := l.subs[key]; ok {
		l.mu.Unlock()
		return nil
	}
	l.subs[key] = struct{}{}
	l.mu.Unlock()

	cleanup := func() {
		l.mu.Lock()
		delete(l.subs, key)
		l.mu.Unlock()
	}

	lockCh, err := l.bus.Subscribe(context.Background(), "lock:"+key)
	if err != nil {
		cleanup()
		return err
	}
	unlockCh, err := l.bus.Subscribe(context.Background(), "unlock:"+key)
	if err != nil {
		_ = l.bus.Unsubscribe(context.Background(), "lock:"+key, lockCh)
		cleanup()
		return err
	}

	go func() {
		for range lockCh {
			l.mu.Lock()
			if l.pending["lock:"+key] > 0 {
				l.pending["lock:"+key]--
				l.mu.Unlock()
				continue
			}
			if _, ok := l.locks[key]; !ok {
				l.locks[key] = &lockState{notify: make(chan struct{})}
			}
			l.mu.Unlock()
		}
	}()
	go func() {
		for range unlockCh {
			l.mu.Lock()
			if l.pending["unlock:"+key] > 0 {
				l.pending["unlock:"+key]--
				l.mu.Unlock()
				continue
			}
			if st, ok := l.locks[key]; ok {
				if st.timer != nil {
					st.timer.Stop()
				}
				close(st.notify)
				delete(l.locks, key)
			}
			l.mu.Unlock()
		}
	}()
	return nil
}
TryLock
Method

TryLock attempts to obtain the lock without waiting. It returns true on success.

Parameters

key string

Returns

bool
error
func (*InMemory) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
	if err := l.ensureSubscriptions(key); err != nil {
		return false, err
	}
	l.mu.Lock()
	if _, ok := l.locks[key]; ok {
		l.mu.Unlock()
		return false, nil
	}
	st := &lockState{notify: make(chan struct{})}
	if ttl > 0 {
		st.timer = time.AfterFunc(ttl, func() {
			_ = l.Release(context.Background(), key)
		})
	}
	l.locks[key] = st
	l.pending["lock:"+key]++
	l.mu.Unlock()
	_ = l.bus.Publish(ctx, "lock:"+key)
	return true, nil
}
Acquire
Method

Acquire blocks until the lock is obtained or the context is cancelled.

Parameters

key string

Returns

error
func (*InMemory) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
	if err := l.ensureSubscriptions(key); err != nil {
		return err
	}
	for {
		ok, err := l.TryLock(ctx, key, ttl)
		if err != nil {
			return err
		}
		if ok {
			_ = l.bus.Publish(ctx, "lock:"+key)
			return nil
		}
		l.mu.Lock()
		st := l.locks[key]
		ch := st.notify
		l.mu.Unlock()
		select {
		case <-ch:
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}
Release
Method

Release frees the lock for the given key.

Parameters

key string

Returns

error
func (*InMemory) Release(ctx context.Context, key string) error
{
	l.mu.Lock()
	st, ok := l.locks[key]
	if ok {
		if st.timer != nil {
			st.timer.Stop()
		}
		close(st.notify)
		delete(l.locks, key)
		l.pending["unlock:"+key]++
	}
	l.mu.Unlock()
	if ok {
		_ = l.bus.Publish(ctx, "unlock:"+key)
	}
	return nil
}

Fields

Name Type Description
mu sync.Mutex
bus syncbus.Bus
locks map[string]*lockState
subs map[string]struct{}
pending map[string]int
F
function

NewInMemory

NewInMemory returns a new in-memory locker that uses bus to propagate events.

Parameters

Returns

v1/lock/memory.go:27-37
func NewInMemory(bus syncbus.Bus) *InMemory

{
	if bus == nil {
		bus = syncbus.NewInMemoryBus()
	}
	return &InMemory{
		bus:     bus,
		locks:   make(map[string]*lockState),
		subs:    make(map[string]struct{}),
		pending: make(map[string]int),
	}
}
S
struct
Implements: Locker

Redis

Redis implements Locker using a Redis backend.

v1/lock/redis.go:23-29
type Redis struct

Methods

TryLock
Method

TryLock attempts to obtain the lock without waiting.

Parameters

key string

Returns

bool
error
func (*Redis) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
	token := uuid.NewString()
	ok, err := r.client.SetNX(ctx, key, token, ttl).Result()
	if err != nil {
		return false, err
	}
	if ok {
		r.mu.Lock()
		r.tokens[key] = token
		r.mu.Unlock()
	}
	return ok, nil
}
Acquire
Method

Acquire blocks until the lock is obtained or the context is cancelled.

Parameters

key string

Returns

error
func (*Redis) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
	for {
		ok, err := r.TryLock(ctx, key, ttl)
		if err != nil {
			return err
		}
		if ok {
			_ = r.bus.Publish(ctx, "lock:"+key)
			return nil
		}
		ch, err := r.bus.Subscribe(ctx, "unlock:"+key)
		if err != nil {
			return err
		}
		select {
		case <-ch:
		case <-ctx.Done():
			_ = r.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
			return ctx.Err()
		}
		_ = r.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
	}
}
Release
Method

Release frees the lock for the given key.

Parameters

key string

Returns

error
func (*Redis) Release(ctx context.Context, key string) error
{
	r.mu.Lock()
	token, ok := r.tokens[key]
	r.mu.Unlock()
	if !ok {
		return nil
	}
	_, err := delScript.Run(ctx, r.client, []string{key}, token).Result()
	if err == redis.Nil {
		err = nil
	}
	if err == nil {
		r.mu.Lock()
		delete(r.tokens, key)
		r.mu.Unlock()
		_ = r.bus.Publish(ctx, "unlock:"+key)
	}
	return err
}

Fields

Name Type Description
client *redis.Client
bus syncbus.Bus
mu sync.Mutex
tokens map[string]string
F
function

NewRedis

NewRedis returns a new Redis locker using the provided client.

Parameters

Returns

v1/lock/redis.go:32-37
func NewRedis(client *redis.Client, bus syncbus.Bus) *Redis

{
	if bus == nil {
		bus = syncbus.NewInMemoryBus()
	}
	return &Redis{client: client, bus: bus, tokens: make(map[string]string)}
}