mesh
packageAPI reference for the mesh
package.
Imports
(13)TestMeshMultiNode
Parameters
func TestMeshMultiNode(t *testing.T)
{
requireMulticast(t)
const nodeCount = 10
portBase := 10000 + (int(time.Now().Unix()) % 5000)
nodes := make([]*MeshBus, nodeCount)
opts := make([]MeshOptions, nodeCount)
ifi := findMulticastInterface()
ifaceName := ""
if ifi != nil {
ifaceName = ifi.Name
}
// Create seed node (Node 0)
seedAddr := fmt.Sprintf("127.0.0.1:%d", portBase)
opts[0] = MeshOptions{
Port: portBase,
Interface: ifaceName,
AdvertiseAddr: seedAddr,
Heartbeat: 50 * time.Millisecond,
}
var err error
nodes[0], err = NewMeshBus(opts[0])
if err != nil {
t.Fatalf("Failed to create seed node: %v", err)
}
defer nodes[0].Close()
// Create other nodes, some with seed, some using multicast solely (simulated by same group)
for i := 1; i < nodeCount; i++ {
port := portBase + i
addr := fmt.Sprintf("127.0.0.1:%d", port)
o := MeshOptions{
Port: port,
Interface: ifaceName,
AdvertiseAddr: addr,
Heartbeat: 50 * time.Millisecond,
}
// Every node knows the seed to ensure discovery even if Multicast is restricted
o.Peers = []string{seedAddr}
opts[i] = o
nodes[i], err = NewMeshBus(o)
if err != nil {
t.Fatalf("Failed to create node %d: %v", i, err)
}
defer nodes[i].Close()
}
// Wait for full mesh discovery
t.Log("Waiting for nodes to discover each other...")
// Increase timeout for discovery in potentially constrained environments
discoveryCtx, discoveryCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer discoveryCancel()
for i, node := range nodes {
discoveryLoop:
for {
peers := node.Peers()
// We wait until most nodes are discovered.
// In loopback/CI, we might not get 100% but we need enough for a valid test.
if len(peers) >= nodeCount/2 {
break discoveryLoop
}
select {
case <-discoveryCtx.Done():
t.Logf("Node %d only discovered %d/%d peers: %v", i, len(peers), nodeCount-1, peers)
break discoveryLoop
case <-time.After(200 * time.Millisecond):
}
}
}
// Verify propagation: Node 0 publishes, everyone else receives
key := "multi-node-sync"
verificationCtx, verificationCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer verificationCancel()
subs := make([]<-chan syncbus.Event, nodeCount)
for i := 1; i < nodeCount; i++ {
// Use verificationCtx so channels don't close prematurely if discovery took too long
ch, _ := nodes[i].Subscribe(verificationCtx, key)
subs[i] = ch
}
time.Sleep(500 * time.Millisecond) // Give time for subscriptions and gossip to settle
if err := nodes[0].Publish(context.Background(), key); err != nil {
t.Fatalf("Failed to publish from seed: %v", err)
}
var wg sync.WaitGroup
for i := 1; i < nodeCount; i++ {
wg.Add(1)
go func(idx int, ch <-chan syncbus.Event) {
defer wg.Done()
select {
case evt, ok := <-ch:
if !ok {
t.Errorf("Node %d: channel closed unexpectedly", idx)
return
}
if evt.Key != key {
t.Errorf("Node %d received wrong key: '%s' (expected '%s')", idx, evt.Key, key)
}
case <-verificationCtx.Done():
// If we timed out, check if we missed it or if it's just slow
t.Errorf("Node %d timed out waiting for event", idx)
}
}(i, subs[i])
}
wg.Wait()
}
BenchmarkMeshPublish
Parameters
func BenchmarkMeshPublish(b *testing.B)
{
opts := MeshOptions{
Port: 12000,
Heartbeat: 1 * time.Hour, // Disable heartbeats for bench
}
node, _ := NewMeshBus(opts)
defer node.Close()
ctx := context.Background()
key := "bench-key"
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = node.Publish(ctx, key)
}
}
BenchmarkMeshPacketMarshal
Parameters
func BenchmarkMeshPacketMarshal(b *testing.B)
{
p := packet{
Magic: magicByte,
Type: typeInvalidate,
NodeID: [16]byte{1, 2, 3, 4},
KeyHash: 12345678,
KeyLen: 10,
Key: []byte("test-key-1"),
}
buf := make([]byte, 1500)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = p.marshal(buf)
}
}
findMulticastInterface
Returns
func findMulticastInterface() *net.Interface
{
ifaces, _ := net.Interfaces()
for _, ifi := range ifaces {
if ifi.Flags&net.FlagMulticast != 0 && ifi.Flags&net.FlagUp != 0 && ifi.Flags&net.FlagLoopback == 0 {
return &ifi
}
}
return nil
}
requireMulticast
requireMulticast skips the test if multicast UDP is not functional in the
current environment (e.g. CI containers or hosts without multicast routing).
It probes using the same ipv4.PacketConn.JoinGroup path used by NewMeshBus.
Parameters
func requireMulticast(t *testing.T)
{
t.Helper()
addr, err := net.ResolveUDPAddr("udp4", "239.0.0.1:0")
if err != nil {
t.Skipf("requires multicast: cannot resolve 239.0.0.1: %v", err)
}
c, err := net.ListenPacket("udp4", "0.0.0.0:0")
if err != nil {
t.Skipf("requires multicast: cannot open UDP socket: %v", err)
}
defer c.Close()
pc := ipv4.NewPacketConn(c)
if err := pc.JoinGroup(nil, addr); err != nil {
t.Skipf("requires a functional multicast environment: %v", err)
}
}
TestMeshIntegration
Parameters
func TestMeshIntegration(t *testing.T)
{
requireMulticast(t)
ifi := findMulticastInterface()
ifaceName := ""
if ifi != nil {
ifaceName = ifi.Name
}
opts := MeshOptions{
Port: 8000 + (int(time.Now().Unix()) % 1000), // Randomish port
Group: "239.0.0.1",
Interface: ifaceName,
}
nodeA, err := NewMeshBus(opts)
if err != nil {
t.Fatalf("Failed to create nodeA: %v", err)
}
defer nodeA.Close()
nodeB, err := NewMeshBus(opts)
if err != nil {
t.Fatalf("Failed to create nodeB: %v", err)
}
defer nodeB.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
key := "test-key"
chB, err := nodeB.Subscribe(ctx, key)
if err != nil {
t.Fatalf("Failed to subscribe on nodeB: %v", err)
}
// Wait a bit for subscription to be ready
time.Sleep(200 * time.Millisecond)
if err := nodeA.Publish(ctx, key); err != nil {
t.Fatalf("Failed to publish from nodeA: %v", err)
}
select {
case evt := <-chB:
if evt.Key != key {
t.Errorf("Expected key %s, got %s", key, evt.Key)
}
case <-ctx.Done():
t.Error("Timed out waiting for invalidation on nodeB")
}
}
TestMeshLoopback
Parameters
func TestMeshLoopback(t *testing.T)
{
requireMulticast(t)
ifi := findMulticastInterface()
ifaceName := ""
if ifi != nil {
ifaceName = ifi.Name
}
opts := MeshOptions{
Port: 8000 + (int(time.Now().Unix()) % 1000) + 1,
Group: "239.0.0.1",
Interface: ifaceName,
}
node, err := NewMeshBus(opts)
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
defer node.Close()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
key := "loop-key"
ch, err := node.Subscribe(ctx, key)
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
if err := node.Publish(ctx, key); err != nil {
t.Fatalf("Failed to publish: %v", err)
}
select {
case <-ch:
t.Error("Node received its own message (loopback should be filtered)")
case <-ctx.Done():
// Success
}
}
TestMeshGossip
Parameters
func TestMeshGossip(t *testing.T)
{
requireMulticast(t)
portA := 9000 + (int(time.Now().Unix()) % 100)
portB := portA + 1
addrA := fmt.Sprintf("127.0.0.1:%d", portA)
optsA := MeshOptions{
Port: portA,
AdvertiseAddr: addrA,
Heartbeat: 100 * time.Millisecond,
}
nodeA, err := NewMeshBus(optsA)
if err != nil {
t.Fatalf("Failed to create nodeA: %v", err)
}
defer nodeA.Close()
optsB := MeshOptions{
Port: portB,
AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", portB),
Peers: []string{addrA},
Heartbeat: 100 * time.Millisecond,
}
nodeB, err := NewMeshBus(optsB)
if err != nil {
t.Fatalf("Failed to create nodeB: %v", err)
}
defer nodeB.Close()
// Wait for gossip discovery
found := false
for i := 0; i < 20; i++ {
peers := nodeA.Peers()
for _, p := range peers {
if p == optsB.AdvertiseAddr {
found = true
break
}
}
if found {
break
}
time.Sleep(100 * time.Millisecond)
}
if !found {
t.Errorf("NodeA did not discover NodeB via gossip (Peers: %v)", nodeA.Peers())
}
}
packet
type packet struct
Methods
Parameters
Returns
func (*packet) marshal(b []byte) (int, error)
{
if len(b) < 18 {
return 0, errShortBuffer
}
b[0] = p.Magic
b[1] = p.Type
copy(b[2:18], p.NodeID[:])
switch p.Type {
case typeInvalidate, typeHeartbeat:
if len(b) < 28+len(p.Key) {
return 0, errShortBuffer
}
binary.BigEndian.PutUint64(b[18:26], p.KeyHash)
binary.BigEndian.PutUint16(b[26:28], p.KeyLen)
copy(b[28:], p.Key)
return 28 + int(p.KeyLen), nil
case typeBatch:
if len(b) < 20 {
return 0, errShortBuffer
}
binary.BigEndian.PutUint16(b[18:20], uint16(len(p.Keys)))
curr := 20
for _, k := range p.Keys {
kLen := len(k)
if len(b) < curr+2+kLen {
return curr, errShortBuffer
}
binary.BigEndian.PutUint16(b[curr:curr+2], uint16(kLen))
copy(b[curr+2:], k)
curr += 2 + kLen
}
return curr, nil
}
return 18, nil
}
Parameters
Returns
func (*packet) unmarshal(b []byte) error
{
if len(b) < 18 {
return errShortBuffer
}
p.Magic = b[0]
if p.Magic != magicByte {
return errInvalidMagic
}
p.Type = b[1]
copy(p.NodeID[:], b[2:18])
switch p.Type {
case typeInvalidate, typeHeartbeat:
if len(b) < 28 {
return errShortBuffer
}
p.KeyHash = binary.BigEndian.Uint64(b[18:26])
p.KeyLen = binary.BigEndian.Uint16(b[26:28])
if len(b) < 28+int(p.KeyLen) {
return errShortBuffer
}
if p.KeyLen > 0 {
p.Key = make([]byte, p.KeyLen)
copy(p.Key, b[28:28+int(p.KeyLen)])
}
case typeBatch:
if len(b) < 20 {
return errShortBuffer
}
count := int(binary.BigEndian.Uint16(b[18:20]))
p.Keys = make([]string, 0, count)
curr := 20
for range count {
if len(b) < curr+2 {
return errShortBuffer
}
kLen := int(binary.BigEndian.Uint16(b[curr : curr+2]))
if len(b) < curr+2+kLen {
return errShortBuffer
}
p.Keys = append(p.Keys, string(b[curr+2:curr+2+kLen]))
curr += 2 + kLen
}
}
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| Magic | byte | |
| Type | byte | |
| NodeID | [16]byte | |
| KeyHash | uint64 | |
| KeyLen | uint16 | |
| Key | []byte | |
| Keys | []string |
MeshOptions
MeshOptions configures the Warp Mesh bus.
type MeshOptions struct
Fields
| Name | Type | Description |
|---|---|---|
| Port | int | |
| Interface | string | |
| Group | string | |
| Peers | []string | |
| AdvertiseAddr | string | |
| Heartbeat | time.Duration | |
| BatchInterval | time.Duration | |
| BatchSize | int |
MeshBus
MeshBus implements a peer-to-peer synchronization bus using UDP Multicast and Unicast Gossip.
type MeshBus struct
Methods
Publish sends an invalidation event to the mesh. It uses internal batching.
Parameters
Returns
func (*MeshBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
b.mu.Lock()
if _, ok := b.pending[key]; ok {
b.mu.Unlock()
return nil // Already pending delivery
}
b.pending[key] = struct{}{}
b.mu.Unlock()
select {
case b.publishCh <- key:
return nil
case <-ctx.Done():
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return ctx.Err()
}
}
broadcast sends the packet payload to multicast group and known unicast peers.
Parameters
Returns
func (*MeshBus) broadcast(payload []byte) error
{
// Multicast is the primary delivery mechanism
_, err := b.conn.WriteTo(payload, b.groupAddr)
if err == nil {
b.published.Add(1)
}
// Unicast to known peers (backup/cloud traversal)
b.peersMu.RLock()
// Copy pointers to avoid holding the lock during WriteTo
addrs := make([]*net.UDPAddr, 0, len(b.resolvedAddr)+len(b.opts.Peers))
for _, addr := range b.resolvedAddr {
addrs = append(addrs, addr)
}
b.peersMu.RUnlock()
// Send to resolved peers
for _, addr := range addrs {
_, _ = b.conn.WriteTo(payload, addr)
}
// Send to seed peers too if not yet resolved/known
for _, peer := range b.opts.Peers {
b.peersMu.RLock()
_, known := b.resolvedAddr[peer]
b.peersMu.RUnlock()
if known {
continue
}
addr, err := net.ResolveUDPAddr("udp4", peer)
if err != nil {
continue
}
_, _ = b.conn.WriteTo(payload, addr)
}
return err
}
PublishAndAwait is not fully supported by UDP Mesh due to its fire-and-forget nature.
Parameters
Returns
func (*MeshBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
if replicas > 0 {
return syncbus.ErrQuorumUnsupported
}
return b.Publish(ctx, key, opts...)
}
PublishAndAwaitTopology is not supported by UDP Mesh.
Parameters
Returns
func (*MeshBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
return syncbus.ErrQuorumUnsupported
}
Subscribe registers a channel to receive invalidation events for a key.
Parameters
Returns
func (*MeshBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
ch := make(chan syncbus.Event, 32)
b.mu.Lock()
b.subs[key] = append(b.subs[key], ch)
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unsubscribe(context.Background(), key, ch)
}()
return ch, nil
}
Unsubscribe removes a channel from key subscriptions.
Parameters
Returns
func (*MeshBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
b.mu.Lock()
defer b.mu.Unlock()
subs, ok := b.subs[key]
if !ok {
return nil
}
for i, c := range subs {
if c == ch {
b.subs[key] = append(subs[:i], subs[i+1:]...)
close(chan syncbus.Event(c))
break
}
}
if len(b.subs[key]) == 0 {
delete(b.subs, key)
}
return nil
}
IsHealthy returns true if the underlying UDP connection is active.
Returns
func (*MeshBus) IsHealthy() bool
{
return b.conn != nil
}
RevokeLease is a convenience for Publish with a lease prefix.
Parameters
Returns
func (*MeshBus) RevokeLease(ctx context.Context, id string) error
{
return b.Publish(ctx, "lease:"+id)
}
SubscribeLease is a convenience for Subscribe with a lease prefix.
Parameters
Returns
func (*MeshBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return b.Subscribe(ctx, "lease:"+id)
}
UnsubscribeLease is a convenience for Unsubscribe with a lease prefix.
Parameters
Returns
func (*MeshBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return b.Unsubscribe(ctx, "lease:"+id, ch)
}
func (*MeshBus) listen()
{
buf := make([]byte, 1500)
for {
select {
case <-b.ctx.Done():
return
default:
}
n, _, err := b.conn.ReadFrom(buf)
if err != nil {
continue
}
var p packet
if err := p.unmarshal(buf[:n]); err != nil {
continue
}
if p.NodeID == b.nodeID {
continue
}
b.received.Add(1)
if p.Type == typeHeartbeat {
b.peersMu.Lock()
addrStr := string(p.Key)
b.knownPeers[addrStr] = time.Now()
if _, ok := b.resolvedAddr[addrStr]; !ok {
if rAddr, err := net.ResolveUDPAddr("udp4", addrStr); err == nil {
b.resolvedAddr[addrStr] = rAddr
}
}
b.peersMu.Unlock()
continue
}
if p.Type == typeInvalidate {
b.handleInvalidate(string(p.Key))
}
if p.Type == typeBatch {
for _, key := range p.Keys {
b.handleInvalidate(key)
}
}
}
}
Parameters
func (*MeshBus) handleInvalidate(key string)
{
b.mu.RLock()
chans, ok := b.subs[key]
if ok {
evt := syncbus.Event{Key: key}
for _, ch := range chans {
select {
case ch <- evt:
default:
}
}
}
b.mu.RUnlock()
}
Close gracefully shuts down the mesh bus.
Returns
func (*MeshBus) Close() error
{
b.cancel()
if b.conn != nil {
return b.conn.Close()
}
return nil
}
func (*MeshBus) heartbeatLoop()
{
interval := b.opts.Heartbeat
if interval == 0 {
interval = 5 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
addr := b.opts.AdvertiseAddr
if addr == "" {
// Fallback to local address if possible, though AdvertiseAddr is preferred
addr = b.conn.LocalAddr().String()
}
p := packet{
Magic: magicByte,
Type: typeHeartbeat,
NodeID: b.nodeID,
KeyLen: uint16(len(addr)),
Key: []byte(addr),
}
buf := bufferPool.Get().([]byte)
n, err := p.marshal(buf)
if err == nil {
_ = b.broadcast(buf[:n])
}
bufferPool.Put(buf)
}
}
}
func (*MeshBus) cleanupPeers()
{
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
b.peersMu.Lock()
now := time.Now()
for addr, lastSeen := range b.knownPeers {
if now.Sub(lastSeen) > 60*time.Second {
delete(b.knownPeers, addr)
delete(b.resolvedAddr, addr)
}
}
b.peersMu.Unlock()
}
}
}
func (*MeshBus) runBatcher()
{
ticker := time.NewTicker(b.opts.BatchInterval)
defer ticker.Stop()
var batch []string
flush := func() {
if len(batch) == 0 {
return
}
p := packet{
Magic: magicByte,
Type: typeBatch,
NodeID: b.nodeID,
Keys: batch,
}
buf := bufferPool.Get().([]byte)
if n, err := p.marshal(buf); err == nil {
_ = b.broadcast(buf[:n])
}
bufferPool.Put(buf)
// Cleanup pending map
b.mu.Lock()
for _, k := range batch {
delete(b.pending, k)
}
b.mu.Unlock()
batch = nil
}
for {
select {
case <-b.ctx.Done():
return
case key := <-b.publishCh:
batch = append(batch, key)
if len(batch) >= b.opts.BatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
Metrics returns the published and received counts.
Returns
func (*MeshBus) Metrics() syncbus.Metrics
{
return syncbus.Metrics{
Published: b.published.Load(),
Delivered: b.received.Load(),
}
}
Peers returns a list of currently known active peers.
Returns
func (*MeshBus) Peers() []string
{
b.peersMu.RLock()
defer b.peersMu.RUnlock()
peers := make([]string, 0, len(b.knownPeers))
for addr := range b.knownPeers {
peers = append(peers, addr)
}
return peers
}
Fields
| Name | Type | Description |
|---|---|---|
| opts | MeshOptions | |
| nodeID | [16]byte | |
| conn | net.PacketConn | |
| pconn | *ipv4.PacketConn | |
| groupAddr | *net.UDPAddr | |
| mu | sync.RWMutex | |
| subs | map[string][]chan syncbus.Event | |
| peersMu | sync.RWMutex | |
| knownPeers | map[string]time.Time | |
| resolvedAddr | map[string]*net.UDPAddr | |
| publishCh | chan string | |
| pending | map[string]struct{} | |
| published | atomic.Uint64 | |
| received | atomic.Uint64 | |
| ctx | context.Context | |
| cancel | context.CancelFunc |
Uses
NewMeshBus
NewMeshBus creates a new Warp Mesh synchronization bus.
Parameters
Returns
func NewMeshBus(opts MeshOptions) (*MeshBus, error)
{
if opts.Port == 0 {
opts.Port = 7946
}
if opts.Group == "" {
opts.Group = "239.0.0.1"
}
addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", opts.Group, opts.Port))
if err != nil {
return nil, fmt.Errorf("mesh: failed to resolve multicast address: %w", err)
}
// Configure socket to allow multiple listeners on the same port.
lc := net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var err error
c.Control(func(fd uintptr) {
_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, 15, 1)
})
return err
},
}
c, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", opts.Port))
if err != nil {
return nil, fmt.Errorf("mesh: failed to listen on port %d: %w", opts.Port, err)
}
pconn := ipv4.NewPacketConn(c)
var iface *net.Interface
if opts.Interface != "" {
iface, err = net.InterfaceByName(opts.Interface)
if err != nil {
_ = c.Close()
return nil, fmt.Errorf("mesh: failed to find interface %s: %w", opts.Interface, err)
}
}
if err := pconn.JoinGroup(iface, addr); err != nil {
_ = c.Close()
return nil, fmt.Errorf("mesh: failed to join group %s: %w", opts.Group, err)
}
if iface != nil {
if err := pconn.SetMulticastInterface(iface); err != nil {
_ = c.Close()
return nil, fmt.Errorf("mesh: failed to set multicast interface: %w", err)
}
}
// Enable loopback so multiple nodes on same host can hear each other.
_ = pconn.SetMulticastLoopback(true)
if opts.BatchInterval == 0 {
opts.BatchInterval = 100 * time.Millisecond
}
if opts.BatchSize == 0 {
opts.BatchSize = 20
}
ctx, cancel := context.WithCancel(context.Background())
b := &MeshBus{
opts: opts,
nodeID: uuid.New(),
conn: c,
pconn: pconn,
groupAddr: addr,
subs: make(map[string][]chan syncbus.Event),
knownPeers: make(map[string]time.Time),
resolvedAddr: make(map[string]*net.UDPAddr),
publishCh: make(chan string, 1000),
pending: make(map[string]struct{}),
ctx: ctx,
cancel: cancel,
}
go b.listen()
go b.heartbeatLoop()
go b.cleanupPeers()
go b.runBatcher()
return b, nil
}