watchbus
packageAPI reference for the watchbus
package.
Imports
(17)context
STD
sync
STD
fmt
STD
math/rand
STD
testing
STD
strings
PKG
github.com/nats-io/nats.go
STD
time
PKG
github.com/redis/go-redis/v9
PKG
github.com/alicebob/miniredis/v2
STD
bufio
STD
errors
STD
net
STD
net/http
STD
net/http/httptest
PKG
github.com/gorilla/websocket
PKG
github.com/nats-io/nats-server/v2/test
prefixNode
prefixNode represents a node in the prefix trie used for prefix
subscriptions and prefix lookups for key watchers. Each node keeps the
watchers of keys that share its prefix as well as subscribers that have
explicitly subscribed to that prefix.
type prefixNode struct
Fields
| Name | Type | Description |
|---|---|---|
| subs | []chan []byte | |
| watchers | map[chan []byte]struct{} | |
| children | map[rune]*prefixNode |
newPrefixNode
Returns
func newPrefixNode() *prefixNode
{
return &prefixNode{
watchers: make(map[chan []byte]struct{}),
children: make(map[rune]*prefixNode),
}
}
InMemoryWatchBus
InMemoryWatchBus is an in-memory implementation of WatchBus.
type InMemoryWatchBus struct
Methods
Publish sends data to all watchers of key.
Parameters
Returns
func (*InMemoryWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
targets := make(map[chan []byte]struct{})
b.mu.RLock()
for _, ch := range b.subs[key] {
targets[ch] = struct{}{}
}
node := b.root
for _, r := range key {
next, ok := node.children[r]
if !ok {
break
}
node = next
for _, ch := range node.subs {
targets[ch] = struct{}{}
}
}
b.mu.RUnlock()
for ch := range targets {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
safeSend(ch, data)
}
return nil
}
PublishPrefix sends data to all watchers of keys with the given prefix.
Parameters
Returns
func (*InMemoryWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
targets := make(map[chan []byte]struct{})
b.mu.RLock()
node := b.root
for _, r := range prefix {
next, ok := node.children[r]
if !ok {
node = nil
break
}
node = next
}
if node != nil {
for ch := range node.watchers {
targets[ch] = struct{}{}
}
for _, ch := range node.subs {
targets[ch] = struct{}{}
}
}
b.mu.RUnlock()
for ch := range targets {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
safeSend(ch, data)
}
return nil
}
Watch subscribes to key and returns a channel receiving messages.
Parameters
Returns
func (*InMemoryWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan []byte, 1)
b.mu.Lock()
b.subs[key] = append(b.subs[key], ch)
node := b.root
for _, r := range key {
if node.children[r] == nil {
node.children[r] = newPrefixNode()
}
node = node.children[r]
node.watchers[ch] = struct{}{}
}
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unwatch(context.Background(), key, ch)
}()
return ch, nil
}
SubscribePrefix subscribes to all keys with the given prefix.
Parameters
Returns
func (*InMemoryWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan []byte, 1)
b.mu.Lock()
node := b.root
for _, r := range prefix {
if node.children[r] == nil {
node.children[r] = newPrefixNode()
}
node = node.children[r]
}
node.subs = append(node.subs, ch)
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unwatch(context.Background(), prefix, ch)
}()
return ch, nil
}
Unwatch removes the channel from key watchers.
Parameters
Returns
func (*InMemoryWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
b.mu.Lock()
subs := b.subs[key]
removed := false
for i, c := range subs {
if c == ch {
subs[i] = subs[len(subs)-1]
subs = subs[:len(subs)-1]
b.subs[key] = subs
removed = true
break
}
}
if removed {
if len(subs) == 0 {
delete(b.subs, key)
}
node := b.root
for _, r := range key {
next, ok := node.children[r]
if !ok {
break
}
delete(next.watchers, ch)
node = next
}
b.mu.Unlock()
return nil
}
node := b.root
for _, r := range key {
next, ok := node.children[r]
if !ok {
node = nil
break
}
node = next
}
if node != nil {
subs := node.subs
for i, c := range subs {
if c == ch {
subs[i] = subs[len(subs)-1]
subs = subs[:len(subs)-1]
node.subs = subs
break
}
}
}
b.mu.Unlock()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.RWMutex | |
| subs | map[string][]chan []byte | |
| root | *prefixNode |
NewInMemory
NewInMemory creates a new InMemoryWatchBus.
Returns
func NewInMemory() *InMemoryWatchBus
{
return &InMemoryWatchBus{
subs: make(map[string][]chan []byte),
root: newPrefixNode(),
}
}
safeSend
Parameters
func safeSend(ch chan []byte, data []byte)
{
defer func() {
if r := recover(); r != nil {
// channel has been closed concurrently; drop the message
}
}()
select {
case ch <- data:
default:
}
}
BenchmarkInMemoryPublish
BenchmarkInMemoryPublish measures publish throughput with many concurrent
publishers and watchers.
Parameters
func BenchmarkInMemoryPublish(b *testing.B)
{
bus := NewInMemory()
ctx := context.Background()
const watchers = 1000
for i := 0; i < watchers; i++ {
key := fmt.Sprintf("key-%d", i)
ch, _ := bus.Watch(ctx, key)
go func(c chan []byte) {
for range c {
}
}(ch)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(0))
for pb.Next() {
key := fmt.Sprintf("key-%d", r.Intn(watchers))
_ = bus.Publish(ctx, key, []byte("data"))
}
})
}
NATSWatchBus
NATSWatchBus implements WatchBus using NATS Core subjects.
type NATSWatchBus struct
Methods
Publish sends data to all NATS subscribers of key.
Parameters
Returns
func (*NATSWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return b.conn.Publish(key, data)
}
PublishPrefix publishes val to all keys with the given prefix that are watched in the current process. Cross-process fan-out is not supported — only direct Publish calls (which route through NATS) are delivered to remote instances. Use Publish with explicit keys for cross-node delivery.
Parameters
Returns
func (*NATSWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
b.mu.Lock()
var keys []string
for k := range b.watches {
if strings.HasPrefix(k, prefix) {
keys = append(keys, k)
}
}
b.mu.Unlock()
for _, k := range keys {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := b.conn.Publish(k, data); err != nil {
return err
}
}
return nil
}
Watch subscribes to key and returns a channel receiving messages until the context is canceled or Unwatch is called.
Parameters
Returns
func (*NATSWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan []byte, 1)
sub, err := b.conn.Subscribe(key, func(msg *nats.Msg) {
select {
case ch <- msg.Data:
default:
}
})
if err != nil {
return nil, err
}
b.mu.Lock()
m := b.watches[key]
if m == nil {
m = make(map[chan []byte]*nats.Subscription)
b.watches[key] = m
}
m[ch] = sub
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unwatch(context.Background(), key, ch)
}()
return ch, nil
}
SubscribePrefix subscribes to all keys with the given prefix using the NATS wildcard subject prefix + ".>" (matches any subject one or more tokens deeper).
Parameters
Returns
func (*NATSWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ch := make(chan []byte, 1)
sub, err := b.conn.Subscribe(prefix+".>", func(msg *nats.Msg) {
select {
case ch <- msg.Data:
default:
}
})
if err != nil {
return nil, err
}
b.mu.Lock()
m := b.prefixWatches[prefix]
if m == nil {
m = make(map[chan []byte]*nats.Subscription)
b.prefixWatches[prefix] = m
}
m[ch] = sub
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unwatch(context.Background(), prefix, ch)
}()
return ch, nil
}
Unwatch stops delivering messages for key to ch, unsubscribes from NATS, and closes the channel.
Parameters
Returns
func (*NATSWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
select {
case <-ctx.Done():
return ctx.Err()
default:
}
b.mu.Lock()
if m, ok := b.watches[key]; ok {
if sub, ok := m[ch]; ok {
delete(m, ch)
if len(m) == 0 {
delete(b.watches, key)
}
b.mu.Unlock()
close(ch)
return sub.Unsubscribe()
}
}
if m, ok := b.prefixWatches[key]; ok {
if sub, ok := m[ch]; ok {
delete(m, ch)
if len(m) == 0 {
delete(b.prefixWatches, key)
}
b.mu.Unlock()
close(ch)
return sub.Unsubscribe()
}
}
b.mu.Unlock()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| conn | *nats.Conn | |
| mu | sync.Mutex | |
| watches | map[string]map[chan []byte]*nats.Subscription | |
| prefixWatches | map[string]map[chan []byte]*nats.Subscription |
NewNATSWatchBus
NewNATSWatchBus creates a new NATSWatchBus using the provided connection.
Parameters
Returns
func NewNATSWatchBus(conn *nats.Conn) *NATSWatchBus
{
return &NATSWatchBus{
conn: conn,
watches: make(map[string]map[chan []byte]*nats.Subscription),
prefixWatches: make(map[string]map[chan []byte]*nats.Subscription),
}
}
RedisWatchBus
RedisWatchBus uses Redis Streams to implement WatchBus.
type RedisWatchBus struct
Methods
Publish adds a new message to the Redis stream identified by key.
Parameters
Returns
func (*RedisWatchBus) Publish(ctx context.Context, key string, data []byte) error
{
if err := b.client.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]any{"data": data}}).Err(); err != nil {
return err
}
return b.client.Publish(ctx, key, data).Err()
}
PublishPrefix publishes the message to all keys having the given prefix.
Parameters
Returns
func (*RedisWatchBus) PublishPrefix(ctx context.Context, prefix string, data []byte) error
{
var cursor uint64
for {
keys, next, err := b.client.SScan(ctx, "watchbus:index", cursor, prefix+"*", 100).Result()
if err != nil {
return err
}
for _, k := range keys {
if err := b.Publish(ctx, k, data); err != nil {
return err
}
}
cursor = next
if cursor == 0 {
break
}
}
return b.client.Publish(ctx, prefix, data).Err()
}
Watch reads messages from the Redis stream.
Parameters
Returns
func (*RedisWatchBus) Watch(ctx context.Context, key string) (chan []byte, error)
{
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []byte, 1)
b.mu.Lock()
m := b.cancels[key]
if m == nil {
m = make(map[chan []byte]context.CancelFunc)
b.cancels[key] = m
}
m[ch] = cancel
if len(m) == 1 {
_ = b.client.SAdd(context.Background(), "watchbus:index", key).Err()
}
b.mu.Unlock()
go func() {
defer close(ch)
lastID := "$"
for {
res, err := b.client.XRead(ctx, &redis.XReadArgs{
Streams: []string{key, lastID},
Block: 0,
Count: 1,
}).Result()
if err != nil {
if ctx.Err() != nil {
return
}
time.Sleep(time.Second)
continue
}
for _, s := range res {
for _, msg := range s.Messages {
lastID = msg.ID
if v, ok := msg.Values["data"].(string); ok {
select {
case ch <- []byte(v):
case <-ctx.Done():
return
}
}
}
}
}
}()
return ch, nil
}
SubscribePrefix subscribes to all channels matching the given prefix.
Parameters
Returns
func (*RedisWatchBus) SubscribePrefix(ctx context.Context, prefix string) (chan []byte, error)
{
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []byte, 1)
ps := b.client.PSubscribe(ctx, prefix+"*")
b.mu.Lock()
m := b.prefixCancels[prefix]
if m == nil {
m = make(map[chan []byte]context.CancelFunc)
b.prefixCancels[prefix] = m
}
m[ch] = func() {
cancel()
_ = ps.Close()
}
b.mu.Unlock()
go func() {
defer close(ch)
for {
msg, err := ps.ReceiveMessage(ctx)
if err != nil {
return
}
select {
case ch <- []byte(msg.Payload):
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
Unwatch stops watching the given key and channel.
Parameters
Returns
func (*RedisWatchBus) Unwatch(ctx context.Context, key string, ch chan []byte) error
{
b.mu.Lock()
if m, ok := b.cancels[key]; ok {
if cancel, ok := m[ch]; ok {
delete(m, ch)
if len(m) == 0 {
delete(b.cancels, key)
_ = b.client.SRem(context.Background(), "watchbus:index", key).Err()
}
b.mu.Unlock()
cancel()
return nil
}
}
if m, ok := b.prefixCancels[key]; ok {
if cancel, ok := m[ch]; ok {
delete(m, ch)
if len(m) == 0 {
delete(b.prefixCancels, key)
}
b.mu.Unlock()
cancel()
return nil
}
}
b.mu.Unlock()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| client | *redis.Client | |
| mu | sync.Mutex | |
| cancels | map[string]map[chan []byte]context.CancelFunc | |
| prefixCancels | map[string]map[chan []byte]context.CancelFunc |
NewRedisWatchBus
NewRedisWatchBus creates a new RedisWatchBus using the provided client.
Parameters
Returns
func NewRedisWatchBus(client *redis.Client) *RedisWatchBus
{
return &RedisWatchBus{
client: client,
cancels: make(map[string]map[chan []byte]context.CancelFunc),
prefixCancels: make(map[string]map[chan []byte]context.CancelFunc),
}
}
TestRedisWatchBus
Parameters
func TestRedisWatchBus(t *testing.T)
{
mr, err := miniredis.Run()
if err != nil {
t.Fatalf("miniredis: %v", err)
}
defer mr.Close()
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
bus := NewRedisWatchBus(client)
ctx := context.Background()
chKey, err := bus.Watch(ctx, "foo1")
if err != nil {
t.Fatalf("watch: %v", err)
}
chPrefix, err := bus.SubscribePrefix(ctx, "foo")
if err != nil {
t.Fatalf("sub prefix: %v", err)
}
if err := bus.Publish(ctx, "foo1", []byte("a")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key message")
}
select {
case msg := <-chPrefix:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix message")
}
member, err := client.SIsMember(ctx, "watchbus:index", "foo1").Result()
if err != nil {
t.Fatalf("sismember: %v", err)
}
if !member {
t.Fatalf("expected key in index")
}
if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
t.Fatalf("publish prefix: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key prefix message")
}
// prefix subscriber may receive multiple messages; consume at least one
select {
case msg := <-chPrefix:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix message from publish prefix")
}
if err := bus.Unwatch(ctx, "foo1", chKey); err != nil {
t.Fatalf("unwatch: %v", err)
}
if err := bus.Unwatch(ctx, "foo", chPrefix); err != nil {
t.Fatalf("unwatch prefix: %v", err)
}
member, err = client.SIsMember(ctx, "watchbus:index", "foo1").Result()
if err != nil {
t.Fatalf("sismember: %v", err)
}
if member {
t.Fatalf("expected key removed from index")
}
}
TestInMemoryWatchBus
Parameters
func TestInMemoryWatchBus(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
ch, err := bus.Watch(ctx, "foo")
if err != nil {
t.Fatalf("watch: %v", err)
}
if err := bus.Publish(ctx, "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-ch:
if string(msg) != "hello" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for message")
}
if err := bus.Unwatch(ctx, "foo", ch); err != nil {
t.Fatalf("unwatch: %v", err)
}
}
TestInMemoryWatchBusPrefix
Parameters
func TestInMemoryWatchBusPrefix(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
chKey, err := bus.Watch(ctx, "foo1")
if err != nil {
t.Fatalf("watch: %v", err)
}
chPrefix, err := bus.SubscribePrefix(ctx, "foo")
if err != nil {
t.Fatalf("sub prefix: %v", err)
}
if err := bus.Publish(ctx, "foo1", []byte("a")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key msg")
}
select {
case msg := <-chPrefix:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix msg")
}
if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
t.Fatalf("publish prefix: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key msg")
}
select {
case msg := <-chPrefix:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix msg")
}
_ = bus.Unwatch(ctx, "foo1", chKey)
_ = bus.Unwatch(ctx, "foo", chPrefix)
}
TestInMemoryWatchBusContextCanceled
Parameters
func TestInMemoryWatchBusContextCanceled(t *testing.T)
{
bus := NewInMemory()
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := bus.Watch(ctx, "k"); err == nil {
t.Fatal("expected watch error on canceled context")
}
if err := bus.Publish(ctx, "k", nil); err == nil {
t.Fatal("expected publish error on canceled context")
}
ch := make(chan []byte)
if err := bus.Unwatch(ctx, "k", ch); err == nil {
t.Fatal("expected unwatch error on canceled context")
}
}
TestInMemoryWatchBusUnwatchUnknown
Parameters
func TestInMemoryWatchBusUnwatchUnknown(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
ch := make(chan []byte)
// Should not panic or error when channel not registered
if err := bus.Unwatch(ctx, "missing", ch); err != nil {
t.Fatalf("unwatch unexpected error: %v", err)
}
}
TestInMemoryWatchBusConcurrentUnwatch
Parameters
func TestInMemoryWatchBusConcurrentUnwatch(t *testing.T)
{
bus := NewInMemory()
ctx := context.Background()
ch, err := bus.Watch(ctx, "foo")
if err != nil {
t.Fatalf("watch: %v", err)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
if err := bus.Publish(ctx, "foo", []byte("data")); err != nil {
t.Errorf("publish: %v", err)
return
}
}
}()
time.Sleep(10 * time.Millisecond)
if err := bus.Unwatch(ctx, "foo", ch); err != nil {
t.Fatalf("unwatch: %v", err)
}
wg.Wait()
}
TestSSEHandlerStream
Parameters
func TestSSEHandlerStream(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(SSEHandler(bus))
defer srv.Close()
respCh := make(chan *http.Response, 1)
go func() {
resp, err := http.Get(srv.URL + "?key=foo")
if err != nil {
t.Errorf("get: %v", err)
return
}
respCh <- resp
}()
// wait for watcher registration
for i := 0; i < 100; i++ {
bus.mu.Lock()
if len(bus.subs["foo"]) == 1 {
bus.mu.Unlock()
break
}
bus.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
var resp *http.Response
select {
case resp = <-respCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
line, err := reader.ReadString('\n')
if err != nil {
t.Fatalf("read: %v", err)
}
line = strings.TrimSpace(line)
if line != "data: hello" {
t.Fatalf("unexpected line %q", line)
}
}
TestSSEHandlerMissingKey
Parameters
func TestSSEHandlerMissingKey(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(SSEHandler(bus))
defer srv.Close()
resp, err := http.Get(srv.URL)
if err != nil {
t.Fatalf("get: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Fatalf("expected 400, got %d", resp.StatusCode)
}
}
TestSSEHandlerContextCancel
Parameters
func TestSSEHandlerContextCancel(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(SSEHandler(bus))
defer srv.Close()
ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, srv.URL+"?key=foo", nil)
if err != nil {
t.Fatalf("request: %v", err)
}
respCh := make(chan struct{})
go func() {
_, _ = http.DefaultClient.Do(req)
close(respCh)
}()
// wait for watcher registration
for i := 0; i < 100; i++ {
bus.mu.Lock()
if len(bus.subs["foo"]) == 1 {
bus.mu.Unlock()
break
}
bus.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
cancel()
select {
case <-respCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for request to end")
}
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 0 {
bus.mu.Unlock()
t.Fatalf("expected watcher removed")
}
bus.mu.Unlock()
}
failingWriter
type failingWriter struct
Methods
Fields
| Name | Type | Description |
|---|---|---|
| header | http.Header |
newFailingWriter
Returns
func newFailingWriter() *failingWriter
{
return &failingWriter{header: make(http.Header)}
}
TestSSEHandlerWriteErrorUnwatches
Parameters
func TestSSEHandlerWriteErrorUnwatches(t *testing.T)
{
bus := NewInMemory()
handler := SSEHandler(bus)
req := httptest.NewRequest(http.MethodGet, "/?key=foo", nil)
resp := newFailingWriter()
done := make(chan struct{})
go func() {
handler(resp, req)
close(done)
}()
for i := 0; i < 100; i++ {
bus.mu.Lock()
if len(bus.subs["foo"]) == 1 {
bus.mu.Unlock()
break
}
bus.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("handler did not exit on write error")
}
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 0 {
bus.mu.Unlock()
t.Fatalf("expected watcher removed after write error")
}
bus.mu.Unlock()
}
TestWebSocketHandlerStream
Parameters
func TestWebSocketHandlerStream(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(WebSocketHandler(bus))
defer srv.Close()
u := "ws" + strings.TrimPrefix(srv.URL, "http") + "?key=foo"
conn, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
if err := bus.Publish(context.Background(), "foo", []byte("hello")); err != nil {
t.Fatalf("publish: %v", err)
}
_ = conn.SetReadDeadline(time.Now().Add(time.Second))
_, msg, err := conn.ReadMessage()
if err != nil {
t.Fatalf("read: %v", err)
}
if string(msg) != "hello" {
t.Fatalf("unexpected %s", msg)
}
}
TestWebSocketHandlerMissingKey
Parameters
func TestWebSocketHandlerMissingKey(t *testing.T)
{
bus := NewInMemory()
srv := httptest.NewServer(WebSocketHandler(bus))
defer srv.Close()
u := "ws" + strings.TrimPrefix(srv.URL, "http")
_, resp, err := websocket.DefaultDialer.Dial(u, nil)
if err == nil {
t.Fatal("expected error")
}
if resp == nil || resp.StatusCode != http.StatusBadRequest {
t.Fatalf("expected 400, got %v", resp)
}
}
TestWebSocketHandlerContextCancel
Parameters
func TestWebSocketHandlerContextCancel(t *testing.T)
{
bus := NewInMemory()
ctx, cancel := context.WithCancel(context.Background())
srv := httptest.NewUnstartedServer(WebSocketHandler(bus))
srv.Config.BaseContext = func(net.Listener) context.Context { return ctx }
srv.Start()
defer srv.Close()
u := "ws" + strings.TrimPrefix(srv.URL, "http") + "?key=foo"
conn, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 1 {
bus.mu.Unlock()
t.Fatalf("expected watcher registered")
}
bus.mu.Unlock()
cancel()
time.Sleep(50 * time.Millisecond)
bus.mu.Lock()
if len(bus.subs["foo"]) != 0 {
bus.mu.Unlock()
t.Fatalf("expected watcher removed")
}
bus.mu.Unlock()
conn.Close()
}
TestNATSWatchBus
Parameters
func TestNATSWatchBus(t *testing.T)
{
s := natsserver.RunRandClientPortServer()
defer s.Shutdown()
conn, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
defer conn.Close()
bus := NewNATSWatchBus(conn)
ctx := context.Background()
// foo.1 uses NATS dot-separated subjects so that the prefix wildcard foo.> matches.
chKey, err := bus.Watch(ctx, "foo.1")
if err != nil {
t.Fatalf("watch: %v", err)
}
chPrefix, err := bus.SubscribePrefix(ctx, "foo")
if err != nil {
t.Fatalf("sub prefix: %v", err)
}
if err := bus.Publish(ctx, "foo.1", []byte("a")); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key message")
}
select {
case msg := <-chPrefix:
if string(msg) != "a" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix message")
}
if err := bus.PublishPrefix(ctx, "foo", []byte("b")); err != nil {
t.Fatalf("publish prefix: %v", err)
}
select {
case msg := <-chKey:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for key prefix message")
}
// prefix subscriber receives via NATS wildcard routing
select {
case msg := <-chPrefix:
if string(msg) != "b" {
t.Fatalf("unexpected %s", msg)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for prefix message from publish prefix")
}
if err := bus.Unwatch(ctx, "foo.1", chKey); err != nil {
t.Fatalf("unwatch: %v", err)
}
if err := bus.Unwatch(ctx, "foo", chPrefix); err != nil {
t.Fatalf("unwatch prefix: %v", err)
}
bus.mu.Lock()
_, hasKey := bus.watches["foo.1"]
_, hasPrefix := bus.prefixWatches["foo"]
bus.mu.Unlock()
if hasKey {
t.Fatal("expected key removed from watches")
}
if hasPrefix {
t.Fatal("expected prefix removed from prefixWatches")
}
}
TestNATSWatchBusContextCancellation
Parameters
func TestNATSWatchBusContextCancellation(t *testing.T)
{
s := natsserver.RunRandClientPortServer()
defer s.Shutdown()
conn, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
defer conn.Close()
bus := NewNATSWatchBus(conn)
watchCtx, cancel := context.WithCancel(context.Background())
ch, err := bus.Watch(watchCtx, "bar.1")
if err != nil {
t.Fatalf("watch: %v", err)
}
cancel()
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel closed after context cancel")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for channel close")
}
bus.mu.Lock()
_, hasKey := bus.watches["bar.1"]
bus.mu.Unlock()
if hasKey {
t.Fatal("expected key removed from watches after context cancel")
}
}
WatchBus
WatchBus provides a simple message bus for streaming events.
Clients can publish messages to a key and watch for updates.
type WatchBus interface
Methods
Parameters
Returns
func SubscribePrefix(...)
SSEHandler
SSEHandler streams WatchBus events over Server-Sent Events.
The watched key is taken from the “key” query parameter.
Parameters
Returns
func SSEHandler(bus WatchBus) http.HandlerFunc
{
return func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "missing key", http.StatusBadRequest)
return
}
ctx, cancel := context.WithCancel(r.Context())
ch, err := bus.Watch(ctx, key)
if err != nil {
cancel()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() {
cancel()
_ = bus.Unwatch(context.Background(), key, ch)
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "stream unsupported", http.StatusInternalServerError)
return
}
for {
select {
case msg, ok := <-ch:
if !ok {
return
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", msg); err != nil {
return
}
flusher.Flush()
case <-ctx.Done():
return
}
}
}
}
Uses
WebSocketHandler
WebSocketHandler streams WatchBus events over WebSocket.
The watched key is taken from the “key” query parameter.
Parameters
Returns
func WebSocketHandler(bus WatchBus) http.HandlerFunc
{
return func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "missing key", http.StatusBadRequest)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(r.Context())
ch, err := bus.Watch(ctx, key)
if err != nil {
cancel()
return
}
defer func() {
cancel()
_ = bus.Unwatch(context.Background(), key, ch)
}()
for {
select {
case msg, ok := <-ch:
if !ok {
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
case <-ctx.Done():
return
}
}
}
}