watchbus API

watchbus

package

API reference for the watchbus package.

F
function

SSEHandler

SSEHandler streams WatchBus events over Server-Sent Events.
The watched key is taken from the “key” query parameter.

Parameters

bus
v1/watchbus/http.go:13-54
func SSEHandler(bus WatchBus) http.HandlerFunc

{
	return func(w http.ResponseWriter, r *http.Request) {
		key := r.URL.Query().Get("key")
		if key == "" {
			http.Error(w, "missing key", http.StatusBadRequest)
			return
		}
		ctx, cancel := context.WithCancel(r.Context())
		ch, err := bus.Watch(ctx, key)
		if err != nil {
			cancel()
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		defer func() {
			cancel()
			_ = bus.Unwatch(context.Background(), key, ch)
		}()
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")
		flusher, ok := w.(http.Flusher)
		if !ok {
			http.Error(w, "stream unsupported", http.StatusInternalServerError)
			return
		}
		for {
			select {
			case msg, ok := <-ch:
				if !ok {
					return
				}
				if _, err := fmt.Fprintf(w, "data: %s\n\n", msg); err != nil {
					return
				}
				flusher.Flush()
			case <-ctx.Done():
				return
			}
		}
	}
}
F
function

WebSocketHandler

WebSocketHandler streams WatchBus events over WebSocket.
The watched key is taken from the “key” query parameter.

Parameters

bus
v1/watchbus/http.go:60-96
func WebSocketHandler(bus WatchBus) http.HandlerFunc

{
	return func(w http.ResponseWriter, r *http.Request) {
		key := r.URL.Query().Get("key")
		if key == "" {
			http.Error(w, "missing key", http.StatusBadRequest)
			return
		}
		conn, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
			return
		}
		defer conn.Close()
		ctx, cancel := context.WithCancel(r.Context())
		ch, err := bus.Watch(ctx, key)
		if err != nil {
			cancel()
			return
		}
		defer func() {
			cancel()
			_ = bus.Unwatch(context.Background(), key, ch)
		}()
		for {
			select {
			case msg, ok := <-ch:
				if !ok {
					return
				}
				if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}
}
S
struct

prefixNode

prefixNode represents a node in the prefix trie used for prefix
subscriptions and prefix lookups for key watchers. Each node keeps the
watchers of keys that share its prefix as well as subscribers that have
explicitly subscribed to that prefix.

v1/watchbus/inmemory_watchbus.go:12-16
type prefixNode struct

Fields

Name Type Description
subs []chan []byte
watchers map[chan []byte]struct{}
children map[rune]*prefixNode
F
function

newPrefixNode

Returns

v1/watchbus/inmemory_watchbus.go:18-23
func newPrefixNode() *prefixNode

{
	return &prefixNode{
		watchers: make(map[chan []byte]struct{}),
		children: make(map[rune]*prefixNode),
	}
}
S
struct
Implements: WatchBus

InMemoryWatchBus

InMemoryWatchBus is an in-memory implementation of WatchBus.

v1/watchbus/inmemory_watchbus.go:26-30
type InMemoryWatchBus struct

Methods

Publish
Method

Publish sends data to all watchers of key.

Parameters

key string
data []byte

Returns

error
func (*InMemoryWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	targets := make(map[chan []byte]struct{})

	b.mu.RLock()
	for _, ch := range b.subs[key] {
		targets[ch] = struct{}{}
	}
	node := b.root
	for _, r := range key {
		next, ok := node.children[r]
		if !ok {
			break
		}
		node = next
		for _, ch := range node.subs {
			targets[ch] = struct{}{}
		}
	}
	b.mu.RUnlock()

	for ch := range targets {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		safeSend(ch, data)
	}
	return nil
}
PublishPrefix
Method

PublishPrefix sends data to all watchers of keys with the given prefix.

Parameters

prefix string
data []byte

Returns

error
func (*InMemoryWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	targets := make(map[chan []byte]struct{})

	b.mu.RLock()
	node := b.root
	for _, r := range prefix {
		next, ok := node.children[r]
		if !ok {
			node = nil
			break
		}
		node = next
	}
	if node != nil {
		for ch := range node.watchers {
			targets[ch] = struct{}{}
		}
		for _, ch := range node.subs {
			targets[ch] = struct{}{}
		}
	}
	b.mu.RUnlock()

	for ch := range targets {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		safeSend(ch, data)
	}
	return nil
}
Watch
Method

Watch subscribes to key and returns a channel receiving messages.

Parameters

key string

Returns

chan []byte
error
func (*InMemoryWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

	ch := make(chan []byte, 1)
	b.mu.Lock()
	b.subs[key] = append(b.subs[key], ch)
	node := b.root
	for _, r := range key {
		if node.children[r] == nil {
			node.children[r] = newPrefixNode()
		}
		node = node.children[r]
		node.watchers[ch] = struct{}{}
	}
	b.mu.Unlock()

	go func() {
		<-ctx.Done()
		_ = b.Unwatch(context.Background(), key, ch)
	}()
	return ch, nil
}

SubscribePrefix subscribes to all keys with the given prefix.

Parameters

prefix string

Returns

chan []byte
error
func (*InMemoryWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

	ch := make(chan []byte, 1)
	b.mu.Lock()
	node := b.root
	for _, r := range prefix {
		if node.children[r] == nil {
			node.children[r] = newPrefixNode()
		}
		node = node.children[r]
	}
	node.subs = append(node.subs, ch)
	b.mu.Unlock()

	go func() {
		<-ctx.Done()
		_ = b.Unwatch(context.Background(), prefix, ch)
	}()
	return ch, nil
}
Unwatch
Method

Unwatch removes the channel from key watchers.

Parameters

key string
ch chan []byte

Returns

error
func (*InMemoryWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	b.mu.Lock()
	subs := b.subs[key]
	removed := false
	for i, c := range subs {
		if c == ch {
			subs[i] = subs[len(subs)-1]
			subs = subs[:len(subs)-1]
			b.subs[key] = subs
			removed = true
			break
		}
	}
	if removed {
		if len(subs) == 0 {
			delete(b.subs, key)
		}
		node := b.root
		for _, r := range key {
			next, ok := node.children[r]
			if !ok {
				break
			}
			delete(next.watchers, ch)
			node = next
		}
		b.mu.Unlock()
		return nil
	}

	node := b.root
	for _, r := range key {
		next, ok := node.children[r]
		if !ok {
			node = nil
			break
		}
		node = next
	}
	if node != nil {
		subs := node.subs
		for i, c := range subs {
			if c == ch {
				subs[i] = subs[len(subs)-1]
				subs = subs[:len(subs)-1]
				node.subs = subs
				break
			}
		}
	}
	b.mu.Unlock()
	return nil
}

Fields

Name Type Description
mu sync.RWMutex
subs map[string][]chan []byte
root *prefixNode
F
function

NewInMemory

NewInMemory creates a new InMemoryWatchBus.

Returns

v1/watchbus/inmemory_watchbus.go:33-38
func NewInMemory() *InMemoryWatchBus

{
	return &InMemoryWatchBus{
		subs: make(map[string][]chan []byte),
		root: newPrefixNode(),
	}
}
F
function

safeSend

Parameters

ch
chan []byte
data
[]byte
v1/watchbus/inmemory_watchbus.go:119-129
func safeSend(ch chan []byte, data []byte)

{
	defer func() {
		if r := recover(); r != nil {
			// channel has been closed concurrently; drop the message
		}
	}()
	select {
	case ch <- data:
	default:
	}
}
S
struct
Implements: WatchBus

RedisWatchBus

RedisWatchBus uses Redis Streams to implement WatchBus.

v1/watchbus/redis_watchbus.go:12-17
type RedisWatchBus struct

Methods

Publish
Method

Publish adds a new message to the Redis stream identified by key.

Parameters

key string
data []byte

Returns

error
func (*RedisWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
	if err := b.client.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]any{"data": data}}).Err(); err != nil {
		return err
	}
	return b.client.Publish(ctx, key, data).Err()
}
PublishPrefix
Method

PublishPrefix publishes the message to all keys having the given prefix.

Parameters

prefix string
data []byte

Returns

error
func (*RedisWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
	var cursor uint64
	for {
		keys, next, err := b.client.SScan(ctx, "watchbus:index", cursor, prefix+"*", 100).Result()
		if err != nil {
			return err
		}
		for _, k := range keys {
			if err := b.Publish(ctx, k, data); err != nil {
				return err
			}
		}
		cursor = next
		if cursor == 0 {
			break
		}
	}
	return b.client.Publish(ctx, prefix, data).Err()
}
Watch
Method

Watch reads messages from the Redis stream.

Parameters

key string

Returns

chan []byte
error
func (*RedisWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
	ctx, cancel := context.WithCancel(ctx)
	ch := make(chan []byte, 1)

	b.mu.Lock()
	m := b.cancels[key]
	if m == nil {
		m = make(map[chan []byte]context.CancelFunc)
		b.cancels[key] = m
	}
	m[ch] = cancel
	if len(m) == 1 {
		_ = b.client.SAdd(context.Background(), "watchbus:index", key).Err()
	}
	b.mu.Unlock()

	go func() {
		defer close(ch)
		lastID := "$"
		for {
			res, err := b.client.XRead(ctx, &redis.XReadArgs{
				Streams: []string{key, lastID},
				Block:   0,
				Count:   1,
			}).Result()
			if err != nil {
				if ctx.Err() != nil {
					return
				}
				time.Sleep(time.Second)
				continue
			}
			for _, s := range res {
				for _, msg := range s.Messages {
					lastID = msg.ID
					if v, ok := msg.Values["data"].(string); ok {
						select {
						case ch <- []byte(v):
						case <-ctx.Done():
							return
						}
					}
				}
			}
		}
	}()

	return ch, nil
}

SubscribePrefix subscribes to all channels matching the given prefix.

Parameters

prefix string

Returns

chan []byte
error
func (*RedisWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
	ctx, cancel := context.WithCancel(ctx)
	ch := make(chan []byte, 1)

	ps := b.client.PSubscribe(ctx, prefix+"*")
	b.mu.Lock()
	m := b.prefixCancels[prefix]
	if m == nil {
		m = make(map[chan []byte]context.CancelFunc)
		b.prefixCancels[prefix] = m
	}
	m[ch] = func() {
		cancel()
		_ = ps.Close()
	}
	b.mu.Unlock()

	go func() {
		defer close(ch)
		for {
			msg, err := ps.ReceiveMessage(ctx)
			if err != nil {
				return
			}
			select {
			case ch <- []byte(msg.Payload):
			case <-ctx.Done():
				return
			}
		}
	}()
	return ch, nil
}
Unwatch
Method

Unwatch stops watching the given key and channel.

Parameters

key string
ch chan []byte

Returns

error
func (*RedisWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
	b.mu.Lock()
	if m, ok := b.cancels[key]; ok {
		if cancel, ok := m[ch]; ok {
			delete(m, ch)
			if len(m) == 0 {
				delete(b.cancels, key)
				_ = b.client.SRem(context.Background(), "watchbus:index", key).Err()
			}
			b.mu.Unlock()
			cancel()
			return nil
		}
	}
	if m, ok := b.prefixCancels[key]; ok {
		if cancel, ok := m[ch]; ok {
			delete(m, ch)
			if len(m) == 0 {
				delete(b.prefixCancels, key)
			}
			b.mu.Unlock()
			cancel()
			return nil
		}
	}
	b.mu.Unlock()
	return nil
}

Fields

Name Type Description
client *redis.Client
mu sync.Mutex
cancels map[string]map[chan []byte]context.CancelFunc
prefixCancels map[string]map[chan []byte]context.CancelFunc
F
function

NewRedisWatchBus

NewRedisWatchBus creates a new RedisWatchBus using the provided client.

Parameters

client

Returns

v1/watchbus/redis_watchbus.go:20-26
func NewRedisWatchBus(client *redis.Client) *RedisWatchBus

{
	return &RedisWatchBus{
		client:        client,
		cancels:       make(map[string]map[chan []byte]context.CancelFunc),
		prefixCancels: make(map[string]map[chan []byte]context.CancelFunc),
	}
}
F
function

TestRedisWatchBus

Parameters

v1/watchbus/redis_watchbus_test.go:12-95
func TestRedisWatchBus(t *testing.T)

{
	mr, err := miniredis.Run()
	if err != nil {
		t.Fatalf("miniredis: %v", err)
	}
	defer mr.Close()

	client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
	bus := NewRedisWatchBus(client)
	ctx := context.Background()

	chKey, err := bus.Watch(ctx, "foo1")
	if err != nil {
		t.Fatalf("watch: %v", err)
	}
	chPrefix, err := bus.SubscribePrefix(ctx, "foo")
	if err != nil {
		t.Fatalf("sub prefix: %v", err)
	}

	if err := bus.Publish(ctx, "foo1", []byte("a")); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case msg := <-chKey:
		if string(msg) != "a" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for key message")
	}
	select {
	case msg := <-chPrefix:
		if string(msg) != "a" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for prefix message")
	}

	member, err := client.SIsMember(ctx, "watchbus:index", "foo1").Result()
	if err != nil {
		t.Fatalf("sismember: %v", err)
	}
	if !member {
		t.Fatalf("expected key in index")
	}

	if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
		t.Fatalf("publish prefix: %v", err)
	}
	select {
	case msg := <-chKey:
		if string(msg) != "b" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for key prefix message")
	}
	// prefix subscriber may receive multiple messages; consume at least one
	select {
	case msg := <-chPrefix:
		if string(msg) != "b" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for prefix message from publish prefix")
	}

	if err := bus.Unwatch(ctx, "foo1", chKey); err != nil {
		t.Fatalf("unwatch: %v", err)
	}
	if err := bus.Unwatch(ctx, "foo", chPrefix); err != nil {
		t.Fatalf("unwatch prefix: %v", err)
	}

	member, err = client.SIsMember(ctx, "watchbus:index", "foo1").Result()
	if err != nil {
		t.Fatalf("sismember: %v", err)
	}
	if member {
		t.Fatalf("expected key removed from index")
	}
}
I
interface

WatchBus

WatchBus provides a simple message bus for streaming events.
Clients can publish messages to a key and watch for updates.

v1/watchbus/watchbus.go:7-19
type WatchBus interface

Methods

Publish
Method

Parameters

key string
data []byte

Returns

error
func Publish(...)
PublishPrefix
Method

Parameters

prefix string
data []byte

Returns

error
func PublishPrefix(...)
Watch
Method

Parameters

key string

Returns

chan []byte
error
func Watch(...)

Parameters

prefix string

Returns

chan []byte
error
func SubscribePrefix(...)
Unwatch
Method

Parameters

key string
ch chan []byte

Returns

error
func Unwatch(...)
F
function

TestInMemoryWatchBus

Parameters

v1/watchbus/watchbus_test.go:10-31
func TestInMemoryWatchBus(t *testing.T)

{
	bus := NewInMemory()
	ctx := context.Background()
	ch, err := bus.Watch(ctx, "foo")
	if err != nil {
		t.Fatalf("watch: %v", err)
	}
	if err := bus.Publish(ctx, "foo", []byte("hello")); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case msg := <-ch:
		if string(msg) != "hello" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for message")
	}
	if err := bus.Unwatch(ctx, "foo", ch); err != nil {
		t.Fatalf("unwatch: %v", err)
	}
}
F
function

TestInMemoryWatchBusPrefix

Parameters

v1/watchbus/watchbus_test.go:33-85
func TestInMemoryWatchBusPrefix(t *testing.T)

{
	bus := NewInMemory()
	ctx := context.Background()
	chKey, err := bus.Watch(ctx, "foo1")
	if err != nil {
		t.Fatalf("watch: %v", err)
	}
	chPrefix, err := bus.SubscribePrefix(ctx, "foo")
	if err != nil {
		t.Fatalf("sub prefix: %v", err)
	}
	if err := bus.Publish(ctx, "foo1", []byte("a")); err != nil {
		t.Fatalf("publish: %v", err)
	}
	select {
	case msg := <-chKey:
		if string(msg) != "a" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for key msg")
	}
	select {
	case msg := <-chPrefix:
		if string(msg) != "a" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for prefix msg")
	}

	if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
		t.Fatalf("publish prefix: %v", err)
	}
	select {
	case msg := <-chKey:
		if string(msg) != "b" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for key msg")
	}
	select {
	case msg := <-chPrefix:
		if string(msg) != "b" {
			t.Fatalf("unexpected %s", msg)
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for prefix msg")
	}
	_ = bus.Unwatch(ctx, "foo1", chKey)
	_ = bus.Unwatch(ctx, "foo", chPrefix)
}
F
function

TestInMemoryWatchBusContextCanceled

Parameters

v1/watchbus/watchbus_test.go:87-101
func TestInMemoryWatchBusContextCanceled(t *testing.T)

{
	bus := NewInMemory()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	if _, err := bus.Watch(ctx, "k"); err == nil {
		t.Fatal("expected watch error on canceled context")
	}
	if err := bus.Publish(ctx, "k", nil); err == nil {
		t.Fatal("expected publish error on canceled context")
	}
	ch := make(chan []byte)
	if err := bus.Unwatch(ctx, "k", ch); err == nil {
		t.Fatal("expected unwatch error on canceled context")
	}
}
F
function

TestInMemoryWatchBusUnwatchUnknown

Parameters

v1/watchbus/watchbus_test.go:103-111
func TestInMemoryWatchBusUnwatchUnknown(t *testing.T)

{
	bus := NewInMemory()
	ctx := context.Background()
	ch := make(chan []byte)
	// Should not panic or error when channel not registered
	if err := bus.Unwatch(ctx, "missing", ch); err != nil {
		t.Fatalf("unwatch unexpected error: %v", err)
	}
}
F
function

TestInMemoryWatchBusConcurrentUnwatch

Parameters

v1/watchbus/watchbus_test.go:113-140
func TestInMemoryWatchBusConcurrentUnwatch(t *testing.T)

{
	bus := NewInMemory()
	ctx := context.Background()
	ch, err := bus.Watch(ctx, "foo")
	if err != nil {
		t.Fatalf("watch: %v", err)
	}

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			if err := bus.Publish(ctx, "foo", []byte("data")); err != nil {
				t.Errorf("publish: %v", err)
				return
			}
		}
	}()

	time.Sleep(10 * time.Millisecond)

	if err := bus.Unwatch(ctx, "foo", ch); err != nil {
		t.Fatalf("unwatch: %v", err)
	}

	wg.Wait()
}
F
function

TestSSEHandlerStream

Parameters

v1/watchbus/http_test.go:17-64
func TestSSEHandlerStream(t *testing.T)

{
	bus := NewInMemory()
	srv := httptest.NewServer(SSEHandler(bus))
	defer srv.Close()

	respCh := make(chan *http.Response, 1)
	go func() {
		resp, err := http.Get(srv.URL + "?key=foo")
		if err != nil {
			t.Errorf("get: %v", err)
			return
		}
		respCh <- resp
	}()

	// wait for watcher registration
	for i := 0; i < 100; i++ {
		bus.mu.Lock()
		if len(bus.subs["foo"]) == 1 {
			bus.mu.Unlock()
			break
		}
		bus.mu.Unlock()
		time.Sleep(10 * time.Millisecond)
	}

	if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
		t.Fatalf("publish: %v", err)
	}

	var resp *http.Response
	select {
	case resp = <-respCh:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for response")
	}
	defer resp.Body.Close()

	reader := bufio.NewReader(resp.Body)
	line, err := reader.ReadString('\n')
	if err != nil {
		t.Fatalf("read: %v", err)
	}
	line = strings.TrimSpace(line)
	if line != "data: hello" {
		t.Fatalf("unexpected line %q", line)
	}
}
F
function

TestSSEHandlerMissingKey

Parameters

v1/watchbus/http_test.go:66-80
func TestSSEHandlerMissingKey(t *testing.T)

{
	bus := NewInMemory()
	srv := httptest.NewServer(SSEHandler(bus))
	defer srv.Close()

	resp, err := http.Get(srv.URL)
	if err != nil {
		t.Fatalf("get: %v", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusBadRequest {
		t.Fatalf("expected 400, got %d", resp.StatusCode)
	}
}
F
function

TestSSEHandlerContextCancel

Parameters

v1/watchbus/http_test.go:82-124
func TestSSEHandlerContextCancel(t *testing.T)

{
	bus := NewInMemory()
	srv := httptest.NewServer(SSEHandler(bus))
	defer srv.Close()

	ctx, cancel := context.WithCancel(context.Background())
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, srv.URL+"?key=foo", nil)
	if err != nil {
		t.Fatalf("request: %v", err)
	}

	respCh := make(chan struct{})
	go func() {
		_, _ = http.DefaultClient.Do(req)
		close(respCh)
	}()

	// wait for watcher registration
	for i := 0; i < 100; i++ {
		bus.mu.Lock()
		if len(bus.subs["foo"]) == 1 {
			bus.mu.Unlock()
			break
		}
		bus.mu.Unlock()
		time.Sleep(10 * time.Millisecond)
	}

	cancel()
	select {
	case <-respCh:
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for request to end")
	}

	time.Sleep(50 * time.Millisecond)
	bus.mu.Lock()
	if len(bus.subs["foo"]) != 0 {
		bus.mu.Unlock()
		t.Fatalf("expected watcher removed")
	}
	bus.mu.Unlock()
}
S
struct

failingWriter

v1/watchbus/http_test.go:126-128
type failingWriter struct

Methods

Write
Method

Parameters

[]byte

Returns

int
error
func (*failingWriter) Write([]byte) (int, error)
{ return 0, errors.New("write failed") }
WriteHeader
Method

Parameters

int
func (*failingWriter) WriteHeader(int)
{}
Flush
Method
func (*failingWriter) Flush()
{}

Fields

Name Type Description
header http.Header
F
function

newFailingWriter

Returns

v1/watchbus/http_test.go:130-132
func newFailingWriter() *failingWriter

{
	return &failingWriter{header: make(http.Header)}
}
F
function

TestSSEHandlerWriteErrorUnwatches

Parameters

v1/watchbus/http_test.go:139-179
func TestSSEHandlerWriteErrorUnwatches(t *testing.T)

{
	bus := NewInMemory()
	handler := SSEHandler(bus)
	req := httptest.NewRequest(http.MethodGet, "/?key=foo", nil)
	resp := newFailingWriter()

	done := make(chan struct{})
	go func() {
		handler(resp, req)
		close(done)
	}()

	for i := 0; i < 100; i++ {
		bus.mu.Lock()
		if len(bus.subs["foo"]) == 1 {
			bus.mu.Unlock()
			break
		}
		bus.mu.Unlock()
		time.Sleep(10 * time.Millisecond)
	}

	if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
		t.Fatalf("publish: %v", err)
	}

	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("handler did not exit on write error")
	}

	time.Sleep(50 * time.Millisecond)

	bus.mu.Lock()
	if len(bus.subs["foo"]) != 0 {
		bus.mu.Unlock()
		t.Fatalf("expected watcher removed after write error")
	}
	bus.mu.Unlock()
}
F
function

TestWebSocketHandlerStream

Parameters

v1/watchbus/http_test.go:181-205
func TestWebSocketHandlerStream(t *testing.T)

{
	bus := NewInMemory()
	srv := httptest.NewServer(WebSocketHandler(bus))
	defer srv.Close()

	u := "ws" + strings.TrimPrefix(srv.URL, "http") + "?key=foo"
	conn, _, err := websocket.DefaultDialer.Dial(u, nil)
	if err != nil {
		t.Fatalf("dial: %v", err)
	}
	defer conn.Close()

	if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
		t.Fatalf("publish: %v", err)
	}

	_ = conn.SetReadDeadline(time.Now().Add(time.Second))
	_, msg, err := conn.ReadMessage()
	if err != nil {
		t.Fatalf("read: %v", err)
	}
	if string(msg) != "hello" {
		t.Fatalf("unexpected %s", msg)
	}
}
F
function

TestWebSocketHandlerMissingKey

Parameters

v1/watchbus/http_test.go:207-220
func TestWebSocketHandlerMissingKey(t *testing.T)

{
	bus := NewInMemory()
	srv := httptest.NewServer(WebSocketHandler(bus))
	defer srv.Close()

	u := "ws" + strings.TrimPrefix(srv.URL, "http")
	_, resp, err := websocket.DefaultDialer.Dial(u, nil)
	if err == nil {
		t.Fatal("expected error")
	}
	if resp == nil || resp.StatusCode != http.StatusBadRequest {
		t.Fatalf("expected 400, got %v", resp)
	}
}
F
function

TestWebSocketHandlerContextCancel

Parameters

v1/watchbus/http_test.go:222-254
func TestWebSocketHandlerContextCancel(t *testing.T)

{
	bus := NewInMemory()
	ctx, cancel := context.WithCancel(context.Background())
	srv := httptest.NewUnstartedServer(WebSocketHandler(bus))
	srv.Config.BaseContext = func(net.Listener) context.Context { return ctx }
	srv.Start()
	defer srv.Close()

	u := "ws" + strings.TrimPrefix(srv.URL, "http") + "?key=foo"
	conn, _, err := websocket.DefaultDialer.Dial(u, nil)
	if err != nil {
		t.Fatalf("dial: %v", err)
	}

	time.Sleep(50 * time.Millisecond)
	bus.mu.Lock()
	if len(bus.subs["foo"]) != 1 {
		bus.mu.Unlock()
		t.Fatalf("expected watcher registered")
	}
	bus.mu.Unlock()

	cancel()
	time.Sleep(50 * time.Millisecond)

	bus.mu.Lock()
	if len(bus.subs["foo"]) != 0 {
		bus.mu.Unlock()
		t.Fatalf("expected watcher removed")
	}
	bus.mu.Unlock()
	conn.Close()
}
F
function

BenchmarkInMemoryPublish

BenchmarkInMemoryPublish measures publish throughput with many concurrent
publishers and watchers.

Parameters

v1/watchbus/inmemory_watchbus_bench_test.go:12-34
func BenchmarkInMemoryPublish(b *testing.B)

{
	bus := NewInMemory()
	ctx := context.Background()

	const watchers = 1000
	for i := 0; i < watchers; i++ {
		key := fmt.Sprintf("key-%d", i)
		ch, _ := bus.Watch(ctx, key)
		go func(c chan []byte) {
			for range c {
			}
		}(ch)
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		r := rand.New(rand.NewSource(0))
		for pb.Next() {
			key := fmt.Sprintf("key-%d", r.Intn(watchers))
			_ = bus.Publish(ctx, key, []byte("data"))
		}
	})
}