lock
packageAPI reference for the lock
package.
Imports
(8)newRedisLocker
Parameters
Returns
func newRedisLocker(t *testing.T) (*Redis, syncbus.Bus, context.Context, func())
{
t.Helper()
mr, err := miniredis.Run()
if err != nil {
t.Fatalf("miniredis run: %v", err)
}
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
bus := syncbus.NewInMemoryBus()
locker := NewRedis(client, bus)
ctx := context.Background()
cleanup := func() {
_ = client.Close()
mr.Close()
}
return locker, bus, ctx, cleanup
}
TestRedisTryLockAcquireReleaseAndBus
Parameters
func TestRedisTryLockAcquireReleaseAndBus(t *testing.T)
{
l, bus, ctx, cleanup := newRedisLocker(t)
defer cleanup()
lockCh, err := bus.Subscribe(ctx, "lock:k")
if err != nil {
t.Fatalf("subscribe lock: %v", err)
}
unlockCh, err := bus.Subscribe(ctx, "unlock:k")
if err != nil {
t.Fatalf("subscribe unlock: %v", err)
}
if err := l.Acquire(ctx, "k", time.Second); err != nil {
t.Fatalf("acquire: %v", err)
}
select {
case <-lockCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for lock publish")
}
if err := l.Release(ctx, "k"); err != nil {
t.Fatalf("release: %v", err)
}
select {
case <-unlockCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for unlock publish")
}
l.mu.Lock()
if _, ok := l.tokens["k"]; ok {
t.Fatal("token not cleaned up on release")
}
l.mu.Unlock()
ok, err := l.TryLock(ctx, "k", time.Second)
if err != nil || !ok {
t.Fatalf("trylock: %v ok %v", err, ok)
}
if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || ok {
t.Fatalf("expected lock held, ok %v err %v", ok, err)
}
if err := l.Release(ctx, "k"); err != nil {
t.Fatalf("release: %v", err)
}
}
TestRedisAcquireTimeout
Parameters
func TestRedisAcquireTimeout(t *testing.T)
{
l1, bus, ctx, cleanup := newRedisLocker(t)
defer cleanup()
l2 := NewRedis(l1.client, bus)
if ok, err := l1.TryLock(ctx, "k", 0); err != nil || !ok {
t.Fatalf("initial trylock: %v ok %v", err, ok)
}
cctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
defer cancel()
start := time.Now()
if err := l2.Acquire(cctx, "k", 0); err == nil {
t.Fatal("expected timeout error")
}
if time.Since(start) > 20*time.Millisecond {
t.Fatal("acquire did not respect context timeout")
}
}
TestInMemoryTryLockAcquireRelease
Parameters
func TestInMemoryTryLockAcquireRelease(t *testing.T)
{
l := NewInMemory(nil)
ctx := context.Background()
ok, err := l.TryLock(ctx, "k", time.Second)
if err != nil || !ok {
t.Fatalf("trylock: %v ok %v", err, ok)
}
if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || ok {
t.Fatalf("expected lock held, got ok %v err %v", ok, err)
}
if err := l.Release(ctx, "k"); err != nil {
t.Fatalf("release: %v", err)
}
if ok, err := l.TryLock(ctx, "k", time.Second); err != nil || !ok {
t.Fatalf("expected lock re-acquired, ok %v err %v", ok, err)
}
}
TestInMemoryAcquireTimeout
Parameters
func TestInMemoryAcquireTimeout(t *testing.T)
{
l := NewInMemory(nil)
ctx := context.Background()
_, _ = l.TryLock(ctx, "k", 0)
cctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
defer cancel()
start := time.Now()
err := l.Acquire(cctx, "k", 0)
if err == nil {
t.Fatal("expected timeout error")
}
if time.Since(start) > 20*time.Millisecond {
t.Fatal("acquire did not respect context timeout")
}
}
TestInMemoryLockTTLExpires
Parameters
func TestInMemoryLockTTLExpires(t *testing.T)
{
l := NewInMemory(nil)
ctx := context.Background()
if ok, err := l.TryLock(ctx, "k", 10*time.Millisecond); err != nil || !ok {
t.Fatalf("trylock: %v ok %v", err, ok)
}
time.Sleep(20 * time.Millisecond)
if ok, err := l.TryLock(ctx, "k", 0); err != nil || !ok {
t.Fatalf("lock should expire, ok %v err %v", ok, err)
}
}
lockState
type lockState struct
Fields
| Name | Type | Description |
|---|---|---|
| timer | *time.Timer | |
| notify | chan struct{} |
InMemory
InMemory implements Locker using local memory. Lock and unlock events are
propagated through a syncbus Bus allowing multiple nodes to coordinate.
type InMemory struct
Methods
Parameters
Returns
func (*InMemory) ensureSubscriptions(key string) error
{
l.mu.Lock()
if _, ok := l.subs[key]; ok {
l.mu.Unlock()
return nil
}
l.subs[key] = struct{}{}
l.mu.Unlock()
cleanup := func() {
l.mu.Lock()
delete(l.subs, key)
l.mu.Unlock()
}
lockCh, err := l.bus.Subscribe(context.Background(), "lock:"+key)
if err != nil {
cleanup()
return err
}
unlockCh, err := l.bus.Subscribe(context.Background(), "unlock:"+key)
if err != nil {
_ = l.bus.Unsubscribe(context.Background(), "lock:"+key, lockCh)
cleanup()
return err
}
go func() {
for range lockCh {
l.mu.Lock()
if l.pending["lock:"+key] > 0 {
l.pending["lock:"+key]--
l.mu.Unlock()
continue
}
if _, ok := l.locks[key]; !ok {
l.locks[key] = &lockState{notify: make(chan struct{})}
}
l.mu.Unlock()
}
}()
go func() {
for range unlockCh {
l.mu.Lock()
if l.pending["unlock:"+key] > 0 {
l.pending["unlock:"+key]--
l.mu.Unlock()
continue
}
if st, ok := l.locks[key]; ok {
if st.timer != nil {
st.timer.Stop()
}
close(st.notify)
delete(l.locks, key)
}
l.mu.Unlock()
}
}()
return nil
}
TryLock attempts to obtain the lock without waiting. It returns true on success.
Parameters
Returns
func (*InMemory) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
if err := l.ensureSubscriptions(key); err != nil {
return false, err
}
l.mu.Lock()
if _, ok := l.locks[key]; ok {
l.mu.Unlock()
return false, nil
}
st := &lockState{notify: make(chan struct{})}
if ttl > 0 {
st.timer = time.AfterFunc(ttl, func() {
_ = l.Release(context.Background(), key)
})
}
l.locks[key] = st
l.pending["lock:"+key]++
l.mu.Unlock()
_ = l.bus.Publish(ctx, "lock:"+key)
return true, nil
}
Acquire blocks until the lock is obtained or the context is cancelled.
Parameters
Returns
func (*InMemory) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
if err := l.ensureSubscriptions(key); err != nil {
return err
}
for {
ok, err := l.TryLock(ctx, key, ttl)
if err != nil {
return err
}
if ok {
_ = l.bus.Publish(ctx, "lock:"+key)
return nil
}
l.mu.Lock()
st := l.locks[key]
ch := st.notify
l.mu.Unlock()
select {
case <-ch:
case <-ctx.Done():
return ctx.Err()
}
}
}
Release frees the lock for the given key.
Parameters
Returns
func (*InMemory) Release(ctx context.Context, key string) error
{
l.mu.Lock()
st, ok := l.locks[key]
if ok {
if st.timer != nil {
st.timer.Stop()
}
close(st.notify)
delete(l.locks, key)
l.pending["unlock:"+key]++
}
l.mu.Unlock()
if ok {
_ = l.bus.Publish(ctx, "unlock:"+key)
}
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.Mutex | |
| bus | syncbus.Bus | |
| locks | map[string]*lockState | |
| subs | map[string]struct{} | |
| pending | map[string]int |
NewInMemory
NewInMemory returns a new in-memory locker that uses bus to propagate events.
Parameters
Returns
func NewInMemory(bus syncbus.Bus) *InMemory
{
if bus == nil {
bus = syncbus.NewInMemoryBus()
}
return &InMemory{
bus: bus,
locks: make(map[string]*lockState),
subs: make(map[string]struct{}),
pending: make(map[string]int),
}
}
Redis
Redis implements Locker using a Redis backend.
type Redis struct
Methods
TryLock attempts to obtain the lock without waiting.
Parameters
Returns
func (*Redis) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
{
token := uuid.NewString()
ok, err := r.client.SetNX(ctx, key, token, ttl).Result()
if err != nil {
return false, err
}
if ok {
r.mu.Lock()
r.tokens[key] = token
r.mu.Unlock()
}
return ok, nil
}
Acquire blocks until the lock is obtained or the context is cancelled.
Parameters
Returns
func (*Redis) Acquire(ctx context.Context, key string, ttl time.Duration) error
{
for {
ok, err := r.TryLock(ctx, key, ttl)
if err != nil {
return err
}
if ok {
_ = r.bus.Publish(ctx, "lock:"+key)
return nil
}
ch, err := r.bus.Subscribe(ctx, "unlock:"+key)
if err != nil {
return err
}
select {
case <-ch:
case <-ctx.Done():
_ = r.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
return ctx.Err()
}
_ = r.bus.Unsubscribe(context.Background(), "unlock:"+key, ch)
}
}
Release frees the lock for the given key.
Parameters
Returns
func (*Redis) Release(ctx context.Context, key string) error
{
r.mu.Lock()
token, ok := r.tokens[key]
r.mu.Unlock()
if !ok {
return nil
}
_, err := delScript.Run(ctx, r.client, []string{key}, token).Result()
if err == redis.Nil {
err = nil
}
if err == nil {
r.mu.Lock()
delete(r.tokens, key)
r.mu.Unlock()
_ = r.bus.Publish(ctx, "unlock:"+key)
}
return err
}
Fields
| Name | Type | Description |
|---|---|---|
| client | *redis.Client | |
| bus | syncbus.Bus | |
| mu | sync.Mutex | |
| tokens | map[string]string |
NewRedis
NewRedis returns a new Redis locker using the provided client.
Parameters
Returns
func NewRedis(client *redis.Client, bus syncbus.Bus) *Redis
{
if bus == nil {
bus = syncbus.NewInMemoryBus()
}
return &Redis{client: client, bus: bus, tokens: make(map[string]string)}
}