mesh
packageAPI reference for the mesh
package.
Imports
(13)MeshOptions
MeshOptions configures the Warp Mesh bus.
type MeshOptions struct
Fields
| Name | Type | Description |
|---|---|---|
| Port | int | |
| Interface | string | |
| Group | string | |
| Peers | []string | |
| AdvertiseAddr | string | |
| Heartbeat | time.Duration | |
| BatchInterval | time.Duration | |
| BatchSize | int |
MeshBus
MeshBus implements a peer-to-peer synchronization bus using UDP Multicast and Unicast Gossip.
type MeshBus struct
Methods
Publish sends an invalidation event to the mesh. It uses internal batching.
Parameters
Returns
func (*MeshBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
b.mu.Lock()
if _, ok := b.pending[key]; ok {
b.mu.Unlock()
return nil // Already pending delivery
}
b.pending[key] = struct{}{}
b.mu.Unlock()
select {
case b.publishCh <- key:
return nil
case <-ctx.Done():
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return ctx.Err()
}
}
broadcast sends the packet payload to multicast group and known unicast peers.
Parameters
Returns
func (*MeshBus) broadcast(payload []byte) error
{
// Multicast is the primary delivery mechanism
_, err := b.conn.WriteTo(payload, b.groupAddr)
if err == nil {
b.published.Add(1)
}
// Unicast to known peers (backup/cloud traversal)
b.peersMu.RLock()
// Copy pointers to avoid holding the lock during WriteTo
addrs := make([]*net.UDPAddr, 0, len(b.resolvedAddr)+len(b.opts.Peers))
for _, addr := range b.resolvedAddr {
addrs = append(addrs, addr)
}
b.peersMu.RUnlock()
// Send to resolved peers
for _, addr := range addrs {
_, _ = b.conn.WriteTo(payload, addr)
}
// Send to seed peers too if not yet resolved/known
for _, peer := range b.opts.Peers {
b.peersMu.RLock()
_, known := b.resolvedAddr[peer]
b.peersMu.RUnlock()
if known {
continue
}
addr, err := net.ResolveUDPAddr("udp4", peer)
if err != nil {
continue
}
_, _ = b.conn.WriteTo(payload, addr)
}
return err
}
PublishAndAwait is not fully supported by UDP Mesh due to its fire-and-forget nature.
Parameters
Returns
func (*MeshBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
if replicas > 0 {
return syncbus.ErrQuorumUnsupported
}
return b.Publish(ctx, key, opts...)
}
PublishAndAwaitTopology is not supported by UDP Mesh.
Parameters
Returns
func (*MeshBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
return syncbus.ErrQuorumUnsupported
}
Subscribe registers a channel to receive invalidation events for a key.
Parameters
Returns
func (*MeshBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
ch := make(chan syncbus.Event, 32)
b.mu.Lock()
b.subs[key] = append(b.subs[key], ch)
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unsubscribe(context.Background(), key, ch)
}()
return ch, nil
}
Unsubscribe removes a channel from key subscriptions.
Parameters
Returns
func (*MeshBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
b.mu.Lock()
defer b.mu.Unlock()
subs, ok := b.subs[key]
if !ok {
return nil
}
for i, c := range subs {
if c == ch {
b.subs[key] = append(subs[:i], subs[i+1:]...)
close(chan syncbus.Event(c))
break
}
}
if len(b.subs[key]) == 0 {
delete(b.subs, key)
}
return nil
}
IsHealthy returns true if the underlying UDP connection is active.
Returns
func (*MeshBus) IsHealthy() bool
{
return b.conn != nil
}
RevokeLease is a convenience for Publish with a lease prefix.
Parameters
Returns
func (*MeshBus) RevokeLease(ctx context.Context, id string) error
{
return b.Publish(ctx, "lease:"+id)
}
SubscribeLease is a convenience for Subscribe with a lease prefix.
Parameters
Returns
func (*MeshBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return b.Subscribe(ctx, "lease:"+id)
}
UnsubscribeLease is a convenience for Unsubscribe with a lease prefix.
Parameters
Returns
func (*MeshBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return b.Unsubscribe(ctx, "lease:"+id, ch)
}
func (*MeshBus) listen()
{
buf := make([]byte, 1500)
for {
select {
case <-b.ctx.Done():
return
default:
}
n, _, err := b.conn.ReadFrom(buf)
if err != nil {
continue
}
var p packet
if err := p.unmarshal(buf[:n]); err != nil {
continue
}
if p.NodeID == b.nodeID {
continue
}
b.received.Add(1)
if p.Type == typeHeartbeat {
b.peersMu.Lock()
addrStr := string(p.Key)
b.knownPeers[addrStr] = time.Now()
if _, ok := b.resolvedAddr[addrStr]; !ok {
if rAddr, err := net.ResolveUDPAddr("udp4", addrStr); err == nil {
b.resolvedAddr[addrStr] = rAddr
}
}
b.peersMu.Unlock()
continue
}
if p.Type == typeInvalidate {
b.handleInvalidate(string(p.Key))
}
if p.Type == typeBatch {
for _, key := range p.Keys {
b.handleInvalidate(key)
}
}
}
}
Parameters
func (*MeshBus) handleInvalidate(key string)
{
b.mu.RLock()
chans, ok := b.subs[key]
if ok {
evt := syncbus.Event{Key: key}
for _, ch := range chans {
select {
case ch <- evt:
default:
}
}
}
b.mu.RUnlock()
}
Close gracefully shuts down the mesh bus.
Returns
func (*MeshBus) Close() error
{
b.cancel()
if b.conn != nil {
return b.conn.Close()
}
return nil
}
func (*MeshBus) heartbeatLoop()
{
interval := b.opts.Heartbeat
if interval == 0 {
interval = 5 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
addr := b.opts.AdvertiseAddr
if addr == "" {
// Fallback to local address if possible, though AdvertiseAddr is preferred
addr = b.conn.LocalAddr().String()
}
p := packet{
Magic: magicByte,
Type: typeHeartbeat,
NodeID: b.nodeID,
KeyLen: uint16(len(addr)),
Key: []byte(addr),
}
buf := bufferPool.Get().([]byte)
n, err := p.marshal(buf)
if err == nil {
_ = b.broadcast(buf[:n])
}
bufferPool.Put(buf)
}
}
}
func (*MeshBus) cleanupPeers()
{
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
b.peersMu.Lock()
now := time.Now()
for addr, lastSeen := range b.knownPeers {
if now.Sub(lastSeen) > 60*time.Second {
delete(b.knownPeers, addr)
delete(b.resolvedAddr, addr)
}
}
b.peersMu.Unlock()
}
}
}
func (*MeshBus) runBatcher()
{
ticker := time.NewTicker(b.opts.BatchInterval)
defer ticker.Stop()
var batch []string
flush := func() {
if len(batch) == 0 {
return
}
p := packet{
Magic: magicByte,
Type: typeBatch,
NodeID: b.nodeID,
Keys: batch,
}
buf := bufferPool.Get().([]byte)
if n, err := p.marshal(buf); err == nil {
_ = b.broadcast(buf[:n])
}
bufferPool.Put(buf)
// Cleanup pending map
b.mu.Lock()
for _, k := range batch {
delete(b.pending, k)
}
b.mu.Unlock()
batch = nil
}
for {
select {
case <-b.ctx.Done():
return
case key := <-b.publishCh:
batch = append(batch, key)
if len(batch) >= b.opts.BatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
Metrics returns the published and received counts.
Returns
func (*MeshBus) Metrics() syncbus.Metrics
{
return syncbus.Metrics{
Published: b.published.Load(),
Delivered: b.received.Load(),
}
}
Peers returns a list of currently known active peers.
Returns
func (*MeshBus) Peers() []string
{
b.peersMu.RLock()
defer b.peersMu.RUnlock()
peers := make([]string, 0, len(b.knownPeers))
for addr := range b.knownPeers {
peers = append(peers, addr)
}
return peers
}
Fields
| Name | Type | Description |
|---|---|---|
| opts | MeshOptions | |
| nodeID | [16]byte | |
| conn | net.PacketConn | |
| pconn | *ipv4.PacketConn | |
| groupAddr | *net.UDPAddr | |
| mu | sync.RWMutex | |
| subs | map[string][]chan syncbus.Event | |
| peersMu | sync.RWMutex | |
| knownPeers | map[string]time.Time | |
| resolvedAddr | map[string]*net.UDPAddr | |
| publishCh | chan string | |
| pending | map[string]struct{} | |
| published | atomic.Uint64 | |
| received | atomic.Uint64 | |
| ctx | context.Context | |
| cancel | context.CancelFunc |
Uses
NewMeshBus
NewMeshBus creates a new Warp Mesh synchronization bus.
Parameters
Returns
func NewMeshBus(opts MeshOptions) (*MeshBus, error)
{
if opts.Port == 0 {
opts.Port = 7946
}
if opts.Group == "" {
opts.Group = "239.0.0.1"
}
addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", opts.Group, opts.Port))
if err != nil {
return nil, fmt.Errorf("mesh: failed to resolve multicast address: %w", err)
}
// Configure socket to allow multiple listeners on the same port.
lc := net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var err error
c.Control(func(fd uintptr) {
_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, 15, 1)
})
return err
},
}
c, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", opts.Port))
if err != nil {
return nil, fmt.Errorf("mesh: failed to listen on port %d: %w", opts.Port, err)
}
pconn := ipv4.NewPacketConn(c)
var iface *net.Interface
if opts.Interface != "" {
iface, err = net.InterfaceByName(opts.Interface)
if err != nil {
_ = c.Close()
return nil, fmt.Errorf("mesh: failed to find interface %s: %w", opts.Interface, err)
}
}
if err := pconn.JoinGroup(iface, addr); err != nil {
_ = c.Close()
return nil, fmt.Errorf("mesh: failed to join group %s: %w", opts.Group, err)
}
if iface != nil {
if err := pconn.SetMulticastInterface(iface); err != nil {
_ = c.Close()
return nil, fmt.Errorf("mesh: failed to set multicast interface: %w", err)
}
}
// Enable loopback so multiple nodes on same host can hear each other.
_ = pconn.SetMulticastLoopback(true)
if opts.BatchInterval == 0 {
opts.BatchInterval = 100 * time.Millisecond
}
if opts.BatchSize == 0 {
opts.BatchSize = 20
}
ctx, cancel := context.WithCancel(context.Background())
b := &MeshBus{
opts: opts,
nodeID: uuid.New(),
conn: c,
pconn: pconn,
groupAddr: addr,
subs: make(map[string][]chan syncbus.Event),
knownPeers: make(map[string]time.Time),
resolvedAddr: make(map[string]*net.UDPAddr),
publishCh: make(chan string, 1000),
pending: make(map[string]struct{}),
ctx: ctx,
cancel: cancel,
}
go b.listen()
go b.heartbeatLoop()
go b.cleanupPeers()
go b.runBatcher()
return b, nil
}
Uses
TestMeshMultiNode
Parameters
func TestMeshMultiNode(t *testing.T)
{
const nodeCount = 10
portBase := 10000 + (int(time.Now().Unix()) % 5000)
nodes := make([]*MeshBus, nodeCount)
opts := make([]MeshOptions, nodeCount)
ifi := findMulticastInterface()
ifaceName := ""
if ifi != nil {
ifaceName = ifi.Name
}
// Create seed node (Node 0)
seedAddr := fmt.Sprintf("127.0.0.1:%d", portBase)
opts[0] = MeshOptions{
Port: portBase,
Interface: ifaceName,
AdvertiseAddr: seedAddr,
Heartbeat: 50 * time.Millisecond,
}
var err error
nodes[0], err = NewMeshBus(opts[0])
if err != nil {
t.Fatalf("Failed to create seed node: %v", err)
}
defer nodes[0].Close()
// Create other nodes, some with seed, some using multicast solely (simulated by same group)
for i := 1; i < nodeCount; i++ {
port := portBase + i
addr := fmt.Sprintf("127.0.0.1:%d", port)
o := MeshOptions{
Port: port,
Interface: ifaceName,
AdvertiseAddr: addr,
Heartbeat: 50 * time.Millisecond,
}
// Every node knows the seed to ensure discovery even if Multicast is restricted
o.Peers = []string{seedAddr}
opts[i] = o
nodes[i], err = NewMeshBus(o)
if err != nil {
t.Fatalf("Failed to create node %d: %v", i, err)
}
defer nodes[i].Close()
}
// Wait for full mesh discovery
t.Log("Waiting for nodes to discover each other...")
// Increase timeout for discovery in potentially constrained environments
discoveryCtx, discoveryCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer discoveryCancel()
for i, node := range nodes {
discoveryLoop:
for {
peers := node.Peers()
// We wait until most nodes are discovered.
// In loopback/CI, we might not get 100% but we need enough for a valid test.
if len(peers) >= nodeCount/2 {
break discoveryLoop
}
select {
case <-discoveryCtx.Done():
t.Logf("Node %d only discovered %d/%d peers: %v", i, len(peers), nodeCount-1, peers)
break discoveryLoop
case <-time.After(200 * time.Millisecond):
}
}
}
// Verify propagation: Node 0 publishes, everyone else receives
key := "multi-node-sync"
verificationCtx, verificationCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer verificationCancel()
subs := make([]<-chan syncbus.Event, nodeCount)
for i := 1; i < nodeCount; i++ {
// Use verificationCtx so channels don't close prematurely if discovery took too long
ch, _ := nodes[i].Subscribe(verificationCtx, key)
subs[i] = ch
}
time.Sleep(500 * time.Millisecond) // Give time for subscriptions and gossip to settle
if err := nodes[0].Publish(context.Background(), key); err != nil {
t.Fatalf("Failed to publish from seed: %v", err)
}
var wg sync.WaitGroup
for i := 1; i < nodeCount; i++ {
wg.Add(1)
go func(idx int, ch <-chan syncbus.Event) {
defer wg.Done()
select {
case evt, ok := <-ch:
if !ok {
t.Errorf("Node %d: channel closed unexpectedly", idx)
return
}
if evt.Key != key {
t.Errorf("Node %d received wrong key: '%s' (expected '%s')", idx, evt.Key, key)
}
case <-verificationCtx.Done():
// If we timed out, check if we missed it or if it's just slow
t.Errorf("Node %d timed out waiting for event", idx)
}
}(i, subs[i])
}
wg.Wait()
}
BenchmarkMeshPublish
Parameters
func BenchmarkMeshPublish(b *testing.B)
{
opts := MeshOptions{
Port: 12000,
Heartbeat: 1 * time.Hour, // Disable heartbeats for bench
}
node, _ := NewMeshBus(opts)
defer node.Close()
ctx := context.Background()
key := "bench-key"
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = node.Publish(ctx, key)
}
}
BenchmarkMeshPacketMarshal
Parameters
func BenchmarkMeshPacketMarshal(b *testing.B)
{
p := packet{
Magic: magicByte,
Type: typeInvalidate,
NodeID: [16]byte{1, 2, 3, 4},
KeyHash: 12345678,
KeyLen: 10,
Key: []byte("test-key-1"),
}
buf := make([]byte, 1500)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = p.marshal(buf)
}
}
findMulticastInterface
Returns
func findMulticastInterface() *net.Interface
{
ifaces, _ := net.Interfaces()
for _, ifi := range ifaces {
if ifi.Flags&net.FlagMulticast != 0 && ifi.Flags&net.FlagUp != 0 && ifi.Flags&net.FlagLoopback == 0 {
return &ifi
}
}
return nil
}
TestMeshIntegration
Parameters
func TestMeshIntegration(t *testing.T)
{
ifi := findMulticastInterface()
ifaceName := ""
if ifi != nil {
ifaceName = ifi.Name
}
opts := MeshOptions{
Port: 8000 + (int(time.Now().Unix()) % 1000), // Randomish port
Group: "239.0.0.1",
Interface: ifaceName,
}
nodeA, err := NewMeshBus(opts)
if err != nil {
t.Fatalf("Failed to create nodeA: %v", err)
}
defer nodeA.Close()
nodeB, err := NewMeshBus(opts)
if err != nil {
t.Fatalf("Failed to create nodeB: %v", err)
}
defer nodeB.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
key := "test-key"
chB, err := nodeB.Subscribe(ctx, key)
if err != nil {
t.Fatalf("Failed to subscribe on nodeB: %v", err)
}
// Wait a bit for subscription to be ready
time.Sleep(200 * time.Millisecond)
if err := nodeA.Publish(ctx, key); err != nil {
t.Fatalf("Failed to publish from nodeA: %v", err)
}
select {
case evt := <-chB:
if evt.Key != key {
t.Errorf("Expected key %s, got %s", key, evt.Key)
}
case <-ctx.Done():
t.Error("Timed out waiting for invalidation on nodeB")
}
}
TestMeshLoopback
Parameters
func TestMeshLoopback(t *testing.T)
{
ifi := findMulticastInterface()
ifaceName := ""
if ifi != nil {
ifaceName = ifi.Name
}
opts := MeshOptions{
Port: 8000 + (int(time.Now().Unix()) % 1000) + 1,
Group: "239.0.0.1",
Interface: ifaceName,
}
node, err := NewMeshBus(opts)
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
defer node.Close()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
key := "loop-key"
ch, err := node.Subscribe(ctx, key)
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
if err := node.Publish(ctx, key); err != nil {
t.Fatalf("Failed to publish: %v", err)
}
select {
case <-ch:
t.Error("Node received its own message (loopback should be filtered)")
case <-ctx.Done():
// Success
}
}
TestMeshGossip
Parameters
func TestMeshGossip(t *testing.T)
{
portA := 9000 + (int(time.Now().Unix()) % 100)
portB := portA + 1
addrA := fmt.Sprintf("127.0.0.1:%d", portA)
optsA := MeshOptions{
Port: portA,
AdvertiseAddr: addrA,
Heartbeat: 100 * time.Millisecond,
}
nodeA, err := NewMeshBus(optsA)
if err != nil {
t.Fatalf("Failed to create nodeA: %v", err)
}
defer nodeA.Close()
optsB := MeshOptions{
Port: portB,
AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", portB),
Peers: []string{addrA},
Heartbeat: 100 * time.Millisecond,
}
nodeB, err := NewMeshBus(optsB)
if err != nil {
t.Fatalf("Failed to create nodeB: %v", err)
}
defer nodeB.Close()
// Wait for gossip discovery
found := false
for i := 0; i < 20; i++ {
peers := nodeA.Peers()
for _, p := range peers {
if p == optsB.AdvertiseAddr {
found = true
break
}
}
if found {
break
}
time.Sleep(100 * time.Millisecond)
}
if !found {
t.Errorf("NodeA did not discover NodeB via gossip (Peers: %v)", nodeA.Peers())
}
}
packet
type packet struct
Methods
Parameters
Returns
func (*packet) marshal(b []byte) (int, error)
{
if len(b) < 18 {
return 0, errShortBuffer
}
b[0] = p.Magic
b[1] = p.Type
copy(b[2:18], p.NodeID[:])
switch p.Type {
case typeInvalidate, typeHeartbeat:
if len(b) < 28+len(p.Key) {
return 0, errShortBuffer
}
binary.BigEndian.PutUint64(b[18:26], p.KeyHash)
binary.BigEndian.PutUint16(b[26:28], p.KeyLen)
copy(b[28:], p.Key)
return 28 + int(p.KeyLen), nil
case typeBatch:
if len(b) < 20 {
return 0, errShortBuffer
}
binary.BigEndian.PutUint16(b[18:20], uint16(len(p.Keys)))
curr := 20
for _, k := range p.Keys {
kLen := len(k)
if len(b) < curr+2+kLen {
return curr, errShortBuffer
}
binary.BigEndian.PutUint16(b[curr:curr+2], uint16(kLen))
copy(b[curr+2:], k)
curr += 2 + kLen
}
return curr, nil
}
return 18, nil
}
Parameters
Returns
func (*packet) unmarshal(b []byte) error
{
if len(b) < 18 {
return errShortBuffer
}
p.Magic = b[0]
if p.Magic != magicByte {
return errInvalidMagic
}
p.Type = b[1]
copy(p.NodeID[:], b[2:18])
switch p.Type {
case typeInvalidate, typeHeartbeat:
if len(b) < 28 {
return errShortBuffer
}
p.KeyHash = binary.BigEndian.Uint64(b[18:26])
p.KeyLen = binary.BigEndian.Uint16(b[26:28])
if len(b) < 28+int(p.KeyLen) {
return errShortBuffer
}
if p.KeyLen > 0 {
p.Key = make([]byte, p.KeyLen)
copy(p.Key, b[28:28+int(p.KeyLen)])
}
case typeBatch:
if len(b) < 20 {
return errShortBuffer
}
count := int(binary.BigEndian.Uint16(b[18:20]))
p.Keys = make([]string, 0, count)
curr := 20
for range count {
if len(b) < curr+2 {
return errShortBuffer
}
kLen := int(binary.BigEndian.Uint16(b[curr : curr+2]))
if len(b) < curr+2+kLen {
return errShortBuffer
}
p.Keys = append(p.Keys, string(b[curr+2:curr+2+kLen]))
curr += 2 + kLen
}
}
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| Magic | byte | |
| Type | byte | |
| NodeID | [16]byte | |
| KeyHash | uint64 | |
| KeyLen | uint16 | |
| Key | []byte | |
| Keys | []string |