watchbus
packageAPI reference for the watchbus
package.
Imports
(15)SSEHandler
SSEHandler streams WatchBus events over Server-Sent Events.
The watched key is taken from the “key” query parameter.
Parameters
Returns
func SSEHandler(bus WatchBus) http.HandlerFunc
{
return func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "missing key", http.StatusBadRequest)
return
}
ctx, cancel := context.WithCancel(r.Context())
ch, err := bus.Watch(ctx, key)
if err != nil {
cancel()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() {
cancel()
_ = bus.Unwatch(context.Background(), key, ch)
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "stream unsupported", http.StatusInternalServerError)
return
}
for {
select {
case msg, ok := <-ch:
if !ok {
return
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", msg); err != nil {
return
}
flusher.Flush()
case <-ctx.Done():
return
}
}
}
}
Uses
WebSocketHandler
WebSocketHandler streams WatchBus events over WebSocket.
The watched key is taken from the “key” query parameter.
Parameters
Returns
func WebSocketHandler(bus WatchBus) http.HandlerFunc
{
return func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "missing key", http.StatusBadRequest)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(r.Context())
ch, err := bus.Watch(ctx, key)
if err != nil {
cancel()
return
}
defer func() {
cancel()
_ = bus.Unwatch(context.Background(), key, ch)
}()
for {
select {
case msg, ok := <-ch:
if !ok {
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
case <-ctx.Done():
return
}
}
}
}
Uses
prefixNode
prefixNode represents a node in the prefix trie used for prefix
subscriptions and prefix lookups for key watchers. Each node keeps the
watchers of keys that share its prefix as well as subscribers that have
explicitly subscribed to that prefix.
type prefixNode struct
Fields
| Name | Type | Description |
|---|---|---|
| subs | []chan []byte | |
| watchers | map[chan []byte]struct{} | |
| children | map[rune]*prefixNode |
newPrefixNode
Returns
func newPrefixNode() *prefixNode
{
return &prefixNode{
watchers: make(map[chan []byte]struct{}),
children: make(map[rune]*prefixNode),
}
}
InMemoryWatchBus
InMemoryWatchBus is an in-memory implementation of WatchBus.
type InMemoryWatchBus struct
Methods
Publish sends data to all watchers of key.
Parameters
Returns
func (*InMemoryWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
targets := make(map[chan []byte]struct{})
b.mu.RLock()
for _, ch := range b.subs[key] {
targets[ch] = struct{}{}
}
node := b.root
for _, r := range key {
next, ok := node.children[r]
if !ok {
break
}
node = next
for _, ch := range node.subs {
targets[ch] = struct{}{}
}
}
b.mu.RUnlock()
for ch := range targets {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
safeSend(ch, data)
}
return nil
}
PublishPrefix sends data to all watchers of keys with the given prefix.
Parameters
Returns
func (*InMemoryWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
targets := make(map[chan []byte]struct{})
b.mu.RLock()
node := b.root
for _, r := range prefix {
next, ok := node.children[r]
if !ok {
node = nil
break
}
node = next
}
if node != nil {
for ch := range node.watchers {
targets[ch] = struct{}{}
}
for _, ch := range node.subs {
targets[ch] = struct{}{}
}
}
b.mu.RUnlock()
for ch := range targets {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
safeSend(ch, data)
}
return nil
}
Watch subscribes to key and returns a channel receiving messages.
Parameters
Returns
func (*InMemoryWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan []byte, 1)
b.mu.Lock()
b.subs[key] = append(b.subs[key], ch)
node := b.root
for _, r := range key {
if node.children[r] == nil {
node.children[r] = newPrefixNode()
}
node = node.children[r]
node.watchers[ch] = struct{}{}
}
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unwatch(context.Background(), key, ch)
}()
return ch, nil
}
SubscribePrefix subscribes to all keys with the given prefix.
Parameters
Returns
func (*InMemoryWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan []byte, 1)
b.mu.Lock()
node := b.root
for _, r := range prefix {
if node.children[r] == nil {
node.children[r] = newPrefixNode()
}
node = node.children[r]
}
node.subs = append(node.subs, ch)
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unwatch(context.Background(), prefix, ch)
}()
return ch, nil
}
Unwatch removes the channel from key watchers.
Parameters
Returns
func (*InMemoryWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
b.mu.Lock()
subs := b.subs[key]
removed := false
for i, c := range subs {
if c == ch {
subs[i] = subs[len(subs)-1]
subs = subs[:len(subs)-1]
b.subs[key] = subs
removed = true
break
}
}
if removed {
if len(subs) == 0 {
delete(b.subs, key)
}
node := b.root
for _, r := range key {
next, ok := node.children[r]
if !ok {
break
}
delete(next.watchers, ch)
node = next
}
b.mu.Unlock()
return nil
}
node := b.root
for _, r := range key {
next, ok := node.children[r]
if !ok {
node = nil
break
}
node = next
}
if node != nil {
subs := node.subs
for i, c := range subs {
if c == ch {
subs[i] = subs[len(subs)-1]
subs = subs[:len(subs)-1]
node.subs = subs
break
}
}
}
b.mu.Unlock()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.RWMutex | |
| subs | map[string][]chan []byte | |
| root | *prefixNode |
NewInMemory
NewInMemory creates a new InMemoryWatchBus.
Returns
func NewInMemory() *InMemoryWatchBus
{
return &InMemoryWatchBus{
subs: make(map[string][]chan []byte),
root: newPrefixNode(),
}
}
safeSend
Parameters
func safeSend(ch chan []byte, data []byte)
{
defer func() {
if r := recover(); r != nil {
// channel has been closed concurrently; drop the message
}
}()
select {
case ch <- data:
default:
}
}
RedisWatchBus
RedisWatchBus uses Redis Streams to implement WatchBus.
type RedisWatchBus struct
Methods
Publish adds a new message to the Redis stream identified by key.
Parameters
Returns
func (*RedisWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
if err := b.client.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]any{"data": data}}).Err(); err != nil {
return err
}
return b.client.Publish(ctx, key, data).Err()
}
PublishPrefix publishes the message to all keys having the given prefix.
Parameters
Returns
func (*RedisWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
var cursor uint64
for {
keys, next, err := b.client.SScan(ctx, "watchbus:index", cursor, prefix+"*", 100).Result()
if err != nil {
return err
}
for _, k := range keys {
if err := b.Publish(ctx, k, data); err != nil {
return err
}
}
cursor = next
if cursor == 0 {
break
}
}
return b.client.Publish(ctx, prefix, data).Err()
}
Watch reads messages from the Redis stream.
Parameters
Returns
func (*RedisWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []byte, 1)
b.mu.Lock()
m := b.cancels[key]
if m == nil {
m = make(map[chan []byte]context.CancelFunc)
b.cancels[key] = m
}
m[ch] = cancel
if len(m) == 1 {
_ = b.client.SAdd(context.Background(), "watchbus:index", key).Err()
}
b.mu.Unlock()
go func() {
defer close(ch)
lastID := "$"
for {
res, err := b.client.XRead(ctx, &redis.XReadArgs{
Streams: []string{key, lastID},
Block: 0,
Count: 1,
}).Result()
if err != nil {
if ctx.Err() != nil {
return
}
time.Sleep(time.Second)
continue
}
for _, s := range res {
for _, msg := range s.Messages {
lastID = msg.ID
if v, ok := msg.Values["data"].(string); ok {
select {
case ch <- []byte(v):
case <-ctx.Done():
return
}
}
}
}
}
}()
return ch, nil
}
SubscribePrefix subscribes to all channels matching the given prefix.
Parameters
Returns
func (*RedisWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []byte, 1)
ps := b.client.PSubscribe(ctx, prefix+"*")
b.mu.Lock()
m := b.prefixCancels[prefix]
if m == nil {
m = make(map[chan []byte]context.CancelFunc)
b.prefixCancels[prefix] = m
}
m[ch] = func() {
cancel()
_ = ps.Close()
}
b.mu.Unlock()
go func() {
defer close(ch)
for {
msg, err := ps.ReceiveMessage(ctx)
if err != nil {
return
}
select {
case ch <- []byte(msg.Payload):
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
Unwatch stops watching the given key and channel.
Parameters
Returns
func (*RedisWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
b.mu.Lock()
if m, ok := b.cancels[key]; ok {
if cancel, ok := m[ch]; ok {
delete(m, ch)
if len(m) == 0 {
delete(b.cancels, key)
_ = b.client.SRem(context.Background(), "watchbus:index", key).Err()
}
b.mu.Unlock()
cancel()
return nil
}
}
if m, ok := b.prefixCancels[key]; ok {
if cancel, ok := m[ch]; ok {
delete(m, ch)
if len(m) == 0 {
delete(b.prefixCancels, key)
}
b.mu.Unlock()
cancel()
return nil
}
}
b.mu.Unlock()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| client | *redis.Client | |
| mu | sync.Mutex | |
| cancels | map[string]map[chan []byte]context.CancelFunc | |
| prefixCancels | map[string]map[chan []byte]context.CancelFunc |
NewRedisWatchBus
NewRedisWatchBus creates a new RedisWatchBus using the provided client.
Parameters
Returns
func NewRedisWatchBus(client *redis.Client) *RedisWatchBus
{
return &RedisWatchBus{
client: client,
cancels: make(map[string]map[chan []byte]context.CancelFunc),
prefixCancels: make(map[string]map[chan []byte]context.CancelFunc),
}
}
TestRedisWatchBus
Parameters
func TestRedisWatchBus(t *testing.T)
{
mr, err := miniredis.Run()
if err != nil {
t.Fatalf("miniredis: %v", err)
}
defer mr.Close()
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
bus := NewRedisWatchBus(client)
ctx := context.Background()
chKey, err := bus.Watch(ctx, "foo1")
if err != nil {
t.Fatalf("watch: %v", err)
}
chPrefix, err := bus.SubscribePrefix(ctx, "foo")
if err != nil {
t.Fatalf("sub prefix: %v", err)
}
if err := bus.Publish(ctx, "foo1", []byte("a")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key message")
}
select {
case msg := <-chPrefix:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix message")
}
member, err := client.SIsMember(ctx, "watchbus:index", "foo1").Result()
if err != nil {
t.Fatalf("sismember: %v", err)
}
if !member {
t.Fatalf("expected key in index")
}
if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
t.Fatalf("publish prefix: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key prefix message")
}
// prefix subscriber may receive multiple messages; consume at least one
select {
case msg := <-chPrefix:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix message from publish prefix")
}
if err := bus.Unwatch(ctx, "foo1", chKey); err != nil {
t.Fatalf("unwatch: %v", err)
}
if err := bus.Unwatch(ctx, "foo", chPrefix); err != nil {
t.Fatalf("unwatch prefix: %v", err)
}
member, err = client.SIsMember(ctx, "watchbus:index", "foo1").Result()
if err != nil {
t.Fatalf("sismember: %v", err)
}
if member {
t.Fatalf("expected key removed from index")
}
}
WatchBus
WatchBus provides a simple message bus for streaming events.
Clients can publish messages to a key and watch for updates.
type WatchBus interface
Methods
Parameters
Returns
func SubscribePrefix(...)
TestInMemoryWatchBus
Parameters
func TestInMemoryWatchBus(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
ch, err := bus.Watch(ctx, "foo")
if err != nil {
t.Fatalf("watch: %v", err)
}
if err := bus.Publish(ctx, "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-ch:
if string(msg) != "hello" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for message")
}
if err := bus.Unwatch(ctx, "foo", ch); err != nil {
t.Fatalf("unwatch: %v", err)
}
}
TestInMemoryWatchBusPrefix
Parameters
func TestInMemoryWatchBusPrefix(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
chKey, err := bus.Watch(ctx, "foo1")
if err != nil {
t.Fatalf("watch: %v", err)
}
chPrefix, err := bus.SubscribePrefix(ctx, "foo")
if err != nil {
t.Fatalf("sub prefix: %v", err)
}
if err := bus.Publish(ctx, "foo1", []byte("a")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key msg")
}
select {
case msg := <-chPrefix:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix msg")
}
if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
t.Fatalf("publish prefix: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key msg")
}
select {
case msg := <-chPrefix:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix msg")
}
_ = bus.Unwatch(ctx, "foo1", chKey)
_ = bus.Unwatch(ctx, "foo", chPrefix)
}
TestInMemoryWatchBusContextCanceled
Parameters
func TestInMemoryWatchBusContextCanceled(t *testing.T)
{
bus := NewInMemory()
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := bus.Watch(ctx, "k"); err == nil {
t.Fatal("expected watch error on canceled context")
}
if err := bus.Publish(ctx, "k", nil); err == nil {
t.Fatal("expected publish error on canceled context")
}
ch := make(chan []byte)
if err := bus.Unwatch(ctx, "k", ch); err == nil {
t.Fatal("expected unwatch error on canceled context")
}
}
TestInMemoryWatchBusUnwatchUnknown
Parameters
func TestInMemoryWatchBusUnwatchUnknown(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
ch := make(chan []byte)
// Should not panic or error when channel not registered
if err := bus.Unwatch(ctx, "missing", ch); err != nil {
t.Fatalf("unwatch unexpected error: %v", err)
}
}
TestInMemoryWatchBusConcurrentUnwatch
Parameters
func TestInMemoryWatchBusConcurrentUnwatch(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
ch, err := bus.Watch(ctx, "foo")
if err != nil {
t.Fatalf("watch: %v", err)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
if err := bus.Publish(ctx, "foo", []byte("data")); err != nil {
t.Errorf("publish: %v", err)
return
}
}
}()
time.Sleep(10 * time.Millisecond)
if err := bus.Unwatch(ctx, "foo", ch); err != nil {
t.Fatalf("unwatch: %v", err)
}
wg.Wait()
}
TestSSEHandlerStream
Parameters
func TestSSEHandlerStream(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(SSEHandler(bus))
defer srv.Close()
respCh := make(chan *http.Response, 1)
go func() {
resp, err := http.Get(srv.URL + "?key=foo")
if err != nil {
t.Errorf("get: %v", err)
return
}
respCh <- resp
}()
// wait for watcher registration
for i := 0; i < 100; i++ {
bus.mu.Lock()
if len(bus.subs["foo"]) == 1 {
bus.mu.Unlock()
break
}
bus.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
var resp *http.Response
select {
case resp = <-respCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
line, err := reader.ReadString('\n')
if err != nil {
t.Fatalf("read: %v", err)
}
line = strings.TrimSpace(line)
if line != "data: hello" {
t.Fatalf("unexpected line %q", line)
}
}
TestSSEHandlerMissingKey
Parameters
func TestSSEHandlerMissingKey(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(SSEHandler(bus))
defer srv.Close()
resp, err := http.Get(srv.URL)
if err != nil {
t.Fatalf("get: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Fatalf("expected 400, got %d", resp.StatusCode)
}
}
TestSSEHandlerContextCancel
Parameters
func TestSSEHandlerContextCancel(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(SSEHandler(bus))
defer srv.Close()
ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, srv.URL+"?key=foo", nil)
if err != nil {
t.Fatalf("request: %v", err)
}
respCh := make(chan struct{})
go func() {
_, _ = http.DefaultClient.Do(req)
close(respCh)
}()
// wait for watcher registration
for i := 0; i < 100; i++ {
bus.mu.Lock()
if len(bus.subs["foo"]) == 1 {
bus.mu.Unlock()
break
}
bus.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
cancel()
select {
case <-respCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for request to end")
}
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 0 {
bus.mu.Unlock()
t.Fatalf("expected watcher removed")
}
bus.mu.Unlock()
}
failingWriter
type failingWriter struct
Methods
Fields
| Name | Type | Description |
|---|---|---|
| header | http.Header |
newFailingWriter
Returns
func newFailingWriter() *failingWriter
{
return &failingWriter{header: make(http.Header)}
}
TestSSEHandlerWriteErrorUnwatches
Parameters
func TestSSEHandlerWriteErrorUnwatches(t *testing.T)
{
bus := NewInMemory()
handler := SSEHandler(bus)
req := httptest.NewRequest(http.MethodGet, "/?key=foo", nil)
resp := newFailingWriter()
done := make(chan struct{})
go func() {
handler(resp, req)
close(done)
}()
for i := 0; i < 100; i++ {
bus.mu.Lock()
if len(bus.subs["foo"]) == 1 {
bus.mu.Unlock()
break
}
bus.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("handler did not exit on write error")
}
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 0 {
bus.mu.Unlock()
t.Fatalf("expected watcher removed after write error")
}
bus.mu.Unlock()
}
TestWebSocketHandlerStream
Parameters
func TestWebSocketHandlerStream(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(WebSocketHandler(bus))
defer srv.Close()
u := "ws" + strings.TrimPrefix(srv.URL, "http") + "?key=foo"
conn, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
_ = conn.SetReadDeadline(time.Now().Add(time.Second))
_, msg, err := conn.ReadMessage()
if err != nil {
t.Fatalf("read: %v", err)
}
if string(msg) != "hello" {
t.Fatalf("unexpected %s", msg)
}
}
TestWebSocketHandlerMissingKey
Parameters
func TestWebSocketHandlerMissingKey(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(WebSocketHandler(bus))
defer srv.Close()
u := "ws" + strings.TrimPrefix(srv.URL, "http")
_, resp, err := websocket.DefaultDialer.Dial(u, nil)
if err == nil {
t.Fatal("expected error")
}
if resp == nil || resp.StatusCode != http.StatusBadRequest {
t.Fatalf("expected 400, got %v", resp)
}
}
TestWebSocketHandlerContextCancel
Parameters
func TestWebSocketHandlerContextCancel(t *testing.T)
{
bus := NewInMemory()
ctx, cancel := context.WithCancel(context.Background())
srv := httptest.NewUnstartedServer(WebSocketHandler(bus))
srv.Config.BaseContext = func(net.Listener) context.Context { return ctx }
srv.Start()
defer srv.Close()
u := "ws" + strings.TrimPrefix(srv.URL, "http") + "?key=foo"
conn, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 1 {
bus.mu.Unlock()
t.Fatalf("expected watcher registered")
}
bus.mu.Unlock()
cancel()
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 0 {
bus.mu.Unlock()
t.Fatalf("expected watcher removed")
}
bus.mu.Unlock()
conn.Close()
}
BenchmarkInMemoryPublish
BenchmarkInMemoryPublish measures publish throughput with many concurrent
publishers and watchers.
Parameters
func BenchmarkInMemoryPublish(b *testing.B)
{
bus := NewInMemory()
ctx := context.Background()
const watchers = 1000
for i := 0; i < watchers; i++ {
key := fmt.Sprintf("key-%d", i)
ch, _ := bus.Watch(ctx, key)
go func(c chan []byte) {
for range c {
}
}(ch)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(0))
for pb.Next() {
key := fmt.Sprintf("key-%d", r.Intn(watchers))
_ = bus.Publish(ctx, key, []byte("data"))
}
})
}