mesh API

mesh

package

API reference for the mesh package.

F
function

TestMeshMultiNode

Parameters

v1/syncbus/mesh/mesh_multi_node_test.go:13-129
func TestMeshMultiNode(t *testing.T)

{
	requireMulticast(t)
	const nodeCount = 10
	portBase := 10000 + (int(time.Now().Unix()) % 5000)

	nodes := make([]*MeshBus, nodeCount)
	opts := make([]MeshOptions, nodeCount)

	ifi := findMulticastInterface()
	ifaceName := ""
	if ifi != nil {
		ifaceName = ifi.Name
	}

	// Create seed node (Node 0)
	seedAddr := fmt.Sprintf("127.0.0.1:%d", portBase)
	opts[0] = MeshOptions{
		Port:          portBase,
		Interface:     ifaceName,
		AdvertiseAddr: seedAddr,
		Heartbeat:     50 * time.Millisecond,
	}

	var err error
	nodes[0], err = NewMeshBus(opts[0])
	if err != nil {
		t.Fatalf("Failed to create seed node: %v", err)
	}
	defer nodes[0].Close()

	// Create other nodes, some with seed, some using multicast solely (simulated by same group)
	for i := 1; i < nodeCount; i++ {
		port := portBase + i
		addr := fmt.Sprintf("127.0.0.1:%d", port)

		o := MeshOptions{
			Port:          port,
			Interface:     ifaceName,
			AdvertiseAddr: addr,
			Heartbeat:     50 * time.Millisecond,
		}

		// Every node knows the seed to ensure discovery even if Multicast is restricted
		o.Peers = []string{seedAddr}

		opts[i] = o
		nodes[i], err = NewMeshBus(o)
		if err != nil {
			t.Fatalf("Failed to create node %d: %v", i, err)
		}
		defer nodes[i].Close()
	}

	// Wait for full mesh discovery
	t.Log("Waiting for nodes to discover each other...")
	// Increase timeout for discovery in potentially constrained environments
	discoveryCtx, discoveryCancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer discoveryCancel()

	for i, node := range nodes {
	discoveryLoop:
		for {
			peers := node.Peers()
			// We wait until most nodes are discovered.
			// In loopback/CI, we might not get 100% but we need enough for a valid test.
			if len(peers) >= nodeCount/2 {
				break discoveryLoop
			}
			select {
			case <-discoveryCtx.Done():
				t.Logf("Node %d only discovered %d/%d peers: %v", i, len(peers), nodeCount-1, peers)
				break discoveryLoop
			case <-time.After(200 * time.Millisecond):
			}
		}
	}

	// Verify propagation: Node 0 publishes, everyone else receives
	key := "multi-node-sync"
	verificationCtx, verificationCancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer verificationCancel()

	subs := make([]<-chan syncbus.Event, nodeCount)
	for i := 1; i < nodeCount; i++ {
		// Use verificationCtx so channels don't close prematurely if discovery took too long
		ch, _ := nodes[i].Subscribe(verificationCtx, key)
		subs[i] = ch
	}

	time.Sleep(500 * time.Millisecond) // Give time for subscriptions and gossip to settle

	if err := nodes[0].Publish(context.Background(), key); err != nil {
		t.Fatalf("Failed to publish from seed: %v", err)
	}

	var wg sync.WaitGroup
	for i := 1; i < nodeCount; i++ {
		wg.Add(1)
		go func(idx int, ch <-chan syncbus.Event) {
			defer wg.Done()
			select {
			case evt, ok := <-ch:
				if !ok {
					t.Errorf("Node %d: channel closed unexpectedly", idx)
					return
				}
				if evt.Key != key {
					t.Errorf("Node %d received wrong key: '%s' (expected '%s')", idx, evt.Key, key)
				}
			case <-verificationCtx.Done():
				// If we timed out, check if we missed it or if it's just slow
				t.Errorf("Node %d timed out waiting for event", idx)
			}
		}(i, subs[i])
	}
	wg.Wait()
}
F
function

BenchmarkMeshPublish

Parameters

v1/syncbus/mesh/mesh_multi_node_test.go:131-146
func BenchmarkMeshPublish(b *testing.B)

{
	opts := MeshOptions{
		Port:      12000,
		Heartbeat: 1 * time.Hour, // Disable heartbeats for bench
	}
	node, _ := NewMeshBus(opts)
	defer node.Close()

	ctx := context.Background()
	key := "bench-key"

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_ = node.Publish(ctx, key)
	}
}
F
function

BenchmarkMeshPacketMarshal

Parameters

v1/syncbus/mesh/mesh_multi_node_test.go:148-163
func BenchmarkMeshPacketMarshal(b *testing.B)

{
	p := packet{
		Magic:   magicByte,
		Type:    typeInvalidate,
		NodeID:  [16]byte{1, 2, 3, 4},
		KeyHash: 12345678,
		KeyLen:  10,
		Key:     []byte("test-key-1"),
	}
	buf := make([]byte, 1500)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_, _ = p.marshal(buf)
	}
}
F
function

findMulticastInterface

Returns

v1/syncbus/mesh/mesh_test.go:13-21
func findMulticastInterface() *net.Interface

{
	ifaces, _ := net.Interfaces()
	for _, ifi := range ifaces {
		if ifi.Flags&net.FlagMulticast != 0 && ifi.Flags&net.FlagUp != 0 && ifi.Flags&net.FlagLoopback == 0 {
			return &ifi
		}
	}
	return nil
}
F
function

requireMulticast

requireMulticast skips the test if multicast UDP is not functional in the
current environment (e.g. CI containers or hosts without multicast routing).
It probes using the same ipv4.PacketConn.JoinGroup path used by NewMeshBus.

Parameters

v1/syncbus/mesh/mesh_test.go:26-41
func requireMulticast(t *testing.T)

{
	t.Helper()
	addr, err := net.ResolveUDPAddr("udp4", "239.0.0.1:0")
	if err != nil {
		t.Skipf("requires multicast: cannot resolve 239.0.0.1: %v", err)
	}
	c, err := net.ListenPacket("udp4", "0.0.0.0:0")
	if err != nil {
		t.Skipf("requires multicast: cannot open UDP socket: %v", err)
	}
	defer c.Close()
	pc := ipv4.NewPacketConn(c)
	if err := pc.JoinGroup(nil, addr); err != nil {
		t.Skipf("requires a functional multicast environment: %v", err)
	}
}
F
function

TestMeshIntegration

Parameters

v1/syncbus/mesh/mesh_test.go:43-93
func TestMeshIntegration(t *testing.T)

{
	requireMulticast(t)
	ifi := findMulticastInterface()
	ifaceName := ""
	if ifi != nil {
		ifaceName = ifi.Name
	}

	opts := MeshOptions{
		Port:      8000 + (int(time.Now().Unix()) % 1000), // Randomish port
		Group:     "239.0.0.1",
		Interface: ifaceName,
	}

	nodeA, err := NewMeshBus(opts)
	if err != nil {
		t.Fatalf("Failed to create nodeA: %v", err)
	}
	defer nodeA.Close()

	nodeB, err := NewMeshBus(opts)
	if err != nil {
		t.Fatalf("Failed to create nodeB: %v", err)
	}
	defer nodeB.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	key := "test-key"
	chB, err := nodeB.Subscribe(ctx, key)
	if err != nil {
		t.Fatalf("Failed to subscribe on nodeB: %v", err)
	}

	// Wait a bit for subscription to be ready
	time.Sleep(200 * time.Millisecond)

	if err := nodeA.Publish(ctx, key); err != nil {
		t.Fatalf("Failed to publish from nodeA: %v", err)
	}

	select {
	case evt := <-chB:
		if evt.Key != key {
			t.Errorf("Expected key %s, got %s", key, evt.Key)
		}
	case <-ctx.Done():
		t.Error("Timed out waiting for invalidation on nodeB")
	}
}
F
function

TestMeshLoopback

Parameters

v1/syncbus/mesh/mesh_test.go:95-134
func TestMeshLoopback(t *testing.T)

{
	requireMulticast(t)
	ifi := findMulticastInterface()
	ifaceName := ""
	if ifi != nil {
		ifaceName = ifi.Name
	}

	opts := MeshOptions{
		Port:      8000 + (int(time.Now().Unix()) % 1000) + 1,
		Group:     "239.0.0.1",
		Interface: ifaceName,
	}

	node, err := NewMeshBus(opts)
	if err != nil {
		t.Fatalf("Failed to create node: %v", err)
	}
	defer node.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	key := "loop-key"
	ch, err := node.Subscribe(ctx, key)
	if err != nil {
		t.Fatalf("Failed to subscribe: %v", err)
	}

	if err := node.Publish(ctx, key); err != nil {
		t.Fatalf("Failed to publish: %v", err)
	}

	select {
	case <-ch:
		t.Error("Node received its own message (loopback should be filtered)")
	case <-ctx.Done():
		// Success
	}
}
F
function

TestMeshGossip

Parameters

v1/syncbus/mesh/mesh_test.go:136-185
func TestMeshGossip(t *testing.T)

{
	requireMulticast(t)
	portA := 9000 + (int(time.Now().Unix()) % 100)
	portB := portA + 1

	addrA := fmt.Sprintf("127.0.0.1:%d", portA)

	optsA := MeshOptions{
		Port:          portA,
		AdvertiseAddr: addrA,
		Heartbeat:     100 * time.Millisecond,
	}
	nodeA, err := NewMeshBus(optsA)
	if err != nil {
		t.Fatalf("Failed to create nodeA: %v", err)
	}
	defer nodeA.Close()

	optsB := MeshOptions{
		Port:          portB,
		AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", portB),
		Peers:         []string{addrA},
		Heartbeat:     100 * time.Millisecond,
	}
	nodeB, err := NewMeshBus(optsB)
	if err != nil {
		t.Fatalf("Failed to create nodeB: %v", err)
	}
	defer nodeB.Close()

	// Wait for gossip discovery
	found := false
	for i := 0; i < 20; i++ {
		peers := nodeA.Peers()
		for _, p := range peers {
			if p == optsB.AdvertiseAddr {
				found = true
				break
			}
		}
		if found {
			break
		}
		time.Sleep(100 * time.Millisecond)
	}

	if !found {
		t.Errorf("NodeA did not discover NodeB via gossip (Peers: %v)", nodeA.Peers())
	}
}
S
struct

packet

v1/syncbus/mesh/packet.go:29-37
type packet struct

Methods

marshal
Method

Parameters

b []byte

Returns

int
error
func (*packet) marshal(b []byte) (int, error)
{
	if len(b) < 18 {
		return 0, errShortBuffer
	}

	b[0] = p.Magic
	b[1] = p.Type
	copy(b[2:18], p.NodeID[:])

	switch p.Type {
	case typeInvalidate, typeHeartbeat:
		if len(b) < 28+len(p.Key) {
			return 0, errShortBuffer
		}
		binary.BigEndian.PutUint64(b[18:26], p.KeyHash)
		binary.BigEndian.PutUint16(b[26:28], p.KeyLen)
		copy(b[28:], p.Key)
		return 28 + int(p.KeyLen), nil

	case typeBatch:
		if len(b) < 20 {
			return 0, errShortBuffer
		}
		binary.BigEndian.PutUint16(b[18:20], uint16(len(p.Keys)))
		curr := 20
		for _, k := range p.Keys {
			kLen := len(k)
			if len(b) < curr+2+kLen {
				return curr, errShortBuffer
			}
			binary.BigEndian.PutUint16(b[curr:curr+2], uint16(kLen))
			copy(b[curr+2:], k)
			curr += 2 + kLen
		}
		return curr, nil
	}

	return 18, nil
}
unmarshal
Method

Parameters

b []byte

Returns

error
func (*packet) unmarshal(b []byte) error
{
	if len(b) < 18 {
		return errShortBuffer
	}

	p.Magic = b[0]
	if p.Magic != magicByte {
		return errInvalidMagic
	}

	p.Type = b[1]
	copy(p.NodeID[:], b[2:18])

	switch p.Type {
	case typeInvalidate, typeHeartbeat:
		if len(b) < 28 {
			return errShortBuffer
		}
		p.KeyHash = binary.BigEndian.Uint64(b[18:26])
		p.KeyLen = binary.BigEndian.Uint16(b[26:28])
		if len(b) < 28+int(p.KeyLen) {
			return errShortBuffer
		}
		if p.KeyLen > 0 {
			p.Key = make([]byte, p.KeyLen)
			copy(p.Key, b[28:28+int(p.KeyLen)])
		}

	case typeBatch:
		if len(b) < 20 {
			return errShortBuffer
		}
		count := int(binary.BigEndian.Uint16(b[18:20]))
		p.Keys = make([]string, 0, count)
		curr := 20
		for range count {
			if len(b) < curr+2 {
				return errShortBuffer
			}
			kLen := int(binary.BigEndian.Uint16(b[curr : curr+2]))
			if len(b) < curr+2+kLen {
				return errShortBuffer
			}
			p.Keys = append(p.Keys, string(b[curr+2:curr+2+kLen]))
			curr += 2 + kLen
		}
	}

	return nil
}

Fields

Name Type Description
Magic byte
Type byte
NodeID [16]byte
KeyHash uint64
KeyLen uint16
Key []byte
Keys []string
S
struct

MeshOptions

MeshOptions configures the Warp Mesh bus.

v1/syncbus/mesh/mesh.go:18-27
type MeshOptions struct

Fields

Name Type Description
Port int
Interface string
Group string
Peers []string
AdvertiseAddr string
Heartbeat time.Duration
BatchInterval time.Duration
BatchSize int
S
struct

MeshBus

MeshBus implements a peer-to-peer synchronization bus using UDP Multicast and Unicast Gossip.

v1/syncbus/mesh/mesh.go:30-52
type MeshBus struct

Methods

Publish
Method

Publish sends an invalidation event to the mesh. It uses internal batching.

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*MeshBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	b.mu.Lock()
	if _, ok := b.pending[key]; ok {
		b.mu.Unlock()
		return nil // Already pending delivery
	}
	b.pending[key] = struct{}{}
	b.mu.Unlock()

	select {
	case b.publishCh <- key:
		return nil
	case <-ctx.Done():
		b.mu.Lock()
		delete(b.pending, key)
		b.mu.Unlock()
		return ctx.Err()
	}
}
broadcast
Method

broadcast sends the packet payload to multicast group and known unicast peers.

Parameters

payload []byte

Returns

error
func (*MeshBus) broadcast(payload []byte) error
{
	// Multicast is the primary delivery mechanism
	_, err := b.conn.WriteTo(payload, b.groupAddr)
	if err == nil {
		b.published.Add(1)
	}

	// Unicast to known peers (backup/cloud traversal)
	b.peersMu.RLock()
	// Copy pointers to avoid holding the lock during WriteTo
	addrs := make([]*net.UDPAddr, 0, len(b.resolvedAddr)+len(b.opts.Peers))
	for _, addr := range b.resolvedAddr {
		addrs = append(addrs, addr)
	}
	b.peersMu.RUnlock()

	// Send to resolved peers
	for _, addr := range addrs {
		_, _ = b.conn.WriteTo(payload, addr)
	}

	// Send to seed peers too if not yet resolved/known
	for _, peer := range b.opts.Peers {
		b.peersMu.RLock()
		_, known := b.resolvedAddr[peer]
		b.peersMu.RUnlock()
		if known {
			continue
		}

		addr, err := net.ResolveUDPAddr("udp4", peer)
		if err != nil {
			continue
		}
		_, _ = b.conn.WriteTo(payload, addr)
	}

	return err
}

PublishAndAwait is not fully supported by UDP Mesh due to its fire-and-forget nature.

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*MeshBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	if replicas > 0 {
		return syncbus.ErrQuorumUnsupported
	}
	return b.Publish(ctx, key, opts...)
}

PublishAndAwaitTopology is not supported by UDP Mesh.

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*MeshBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	return syncbus.ErrQuorumUnsupported
}
Subscribe
Method

Subscribe registers a channel to receive invalidation events for a key.

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*MeshBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	ch := make(chan syncbus.Event, 32)
	b.mu.Lock()
	b.subs[key] = append(b.subs[key], ch)
	b.mu.Unlock()

	go func() {
		<-ctx.Done()
		_ = b.Unsubscribe(context.Background(), key, ch)
	}()

	return ch, nil
}
Unsubscribe
Method

Unsubscribe removes a channel from key subscriptions.

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*MeshBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	b.mu.Lock()
	defer b.mu.Unlock()

	subs, ok := b.subs[key]
	if !ok {
		return nil
	}

	for i, c := range subs {
		if c == ch {
			b.subs[key] = append(subs[:i], subs[i+1:]...)
			close(chan syncbus.Event(c))
			break
		}
	}

	if len(b.subs[key]) == 0 {
		delete(b.subs, key)
	}

	return nil
}
IsHealthy
Method

IsHealthy returns true if the underlying UDP connection is active.

Returns

bool
func (*MeshBus) IsHealthy() bool
{
	return b.conn != nil
}
RevokeLease
Method

RevokeLease is a convenience for Publish with a lease prefix.

Parameters

id string

Returns

error
func (*MeshBus) RevokeLease(ctx context.Context, id string) error
{
	return b.Publish(ctx, "lease:"+id)
}

SubscribeLease is a convenience for Subscribe with a lease prefix.

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*MeshBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return b.Subscribe(ctx, "lease:"+id)
}

UnsubscribeLease is a convenience for Unsubscribe with a lease prefix.

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*MeshBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return b.Unsubscribe(ctx, "lease:"+id, ch)
}
listen
Method
func (*MeshBus) listen()
{
	buf := make([]byte, 1500)
	for {
		select {
		case <-b.ctx.Done():
			return
		default:
		}

		n, _, err := b.conn.ReadFrom(buf)
		if err != nil {
			continue
		}

		var p packet
		if err := p.unmarshal(buf[:n]); err != nil {
			continue
		}

		if p.NodeID == b.nodeID {
			continue
		}

		b.received.Add(1)

		if p.Type == typeHeartbeat {
			b.peersMu.Lock()
			addrStr := string(p.Key)
			b.knownPeers[addrStr] = time.Now()
			if _, ok := b.resolvedAddr[addrStr]; !ok {
				if rAddr, err := net.ResolveUDPAddr("udp4", addrStr); err == nil {
					b.resolvedAddr[addrStr] = rAddr
				}
			}
			b.peersMu.Unlock()
			continue
		}

		if p.Type == typeInvalidate {
			b.handleInvalidate(string(p.Key))
		}

		if p.Type == typeBatch {
			for _, key := range p.Keys {
				b.handleInvalidate(key)
			}
		}
	}
}

Parameters

key string
func (*MeshBus) handleInvalidate(key string)
{
	b.mu.RLock()
	chans, ok := b.subs[key]
	if ok {
		evt := syncbus.Event{Key: key}
		for _, ch := range chans {
			select {
			case ch <- evt:
			default:
			}
		}
	}
	b.mu.RUnlock()
}
Close
Method

Close gracefully shuts down the mesh bus.

Returns

error
func (*MeshBus) Close() error
{
	b.cancel()
	if b.conn != nil {
		return b.conn.Close()
	}
	return nil
}
heartbeatLoop
Method
func (*MeshBus) heartbeatLoop()
{
	interval := b.opts.Heartbeat
	if interval == 0 {
		interval = 5 * time.Second
	}
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-b.ctx.Done():
			return
		case <-ticker.C:
			addr := b.opts.AdvertiseAddr
			if addr == "" {
				// Fallback to local address if possible, though AdvertiseAddr is preferred
				addr = b.conn.LocalAddr().String()
			}

			p := packet{
				Magic:  magicByte,
				Type:   typeHeartbeat,
				NodeID: b.nodeID,
				KeyLen: uint16(len(addr)),
				Key:    []byte(addr),
			}

			buf := bufferPool.Get().([]byte)
			n, err := p.marshal(buf)
			if err == nil {
				_ = b.broadcast(buf[:n])
			}
			bufferPool.Put(buf)
		}
	}
}
cleanupPeers
Method
func (*MeshBus) cleanupPeers()
{
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-b.ctx.Done():
			return
		case <-ticker.C:
			b.peersMu.Lock()
			now := time.Now()
			for addr, lastSeen := range b.knownPeers {
				if now.Sub(lastSeen) > 60*time.Second {
					delete(b.knownPeers, addr)
					delete(b.resolvedAddr, addr)
				}
			}
			b.peersMu.Unlock()
		}
	}
}
runBatcher
Method
func (*MeshBus) runBatcher()
{
	ticker := time.NewTicker(b.opts.BatchInterval)
	defer ticker.Stop()

	var batch []string

	flush := func() {
		if len(batch) == 0 {
			return
		}

		p := packet{
			Magic:  magicByte,
			Type:   typeBatch,
			NodeID: b.nodeID,
			Keys:   batch,
		}

		buf := bufferPool.Get().([]byte)
		if n, err := p.marshal(buf); err == nil {
			_ = b.broadcast(buf[:n])
		}
		bufferPool.Put(buf)

		// Cleanup pending map
		b.mu.Lock()
		for _, k := range batch {
			delete(b.pending, k)
		}
		b.mu.Unlock()
		batch = nil
	}

	for {
		select {
		case <-b.ctx.Done():
			return
		case key := <-b.publishCh:
			batch = append(batch, key)
			if len(batch) >= b.opts.BatchSize {
				flush()
			}
		case <-ticker.C:
			flush()
		}
	}
}
Metrics
Method

Metrics returns the published and received counts.

Returns

func (*MeshBus) Metrics() syncbus.Metrics
{
	return syncbus.Metrics{
		Published: b.published.Load(),
		Delivered: b.received.Load(),
	}
}
Peers
Method

Peers returns a list of currently known active peers.

Returns

[]string
func (*MeshBus) Peers() []string
{
	b.peersMu.RLock()
	defer b.peersMu.RUnlock()

	peers := make([]string, 0, len(b.knownPeers))
	for addr := range b.knownPeers {
		peers = append(peers, addr)
	}
	return peers
}

Fields

Name Type Description
opts MeshOptions
nodeID [16]byte
conn net.PacketConn
pconn *ipv4.PacketConn
groupAddr *net.UDPAddr
mu sync.RWMutex
subs map[string][]chan syncbus.Event
peersMu sync.RWMutex
knownPeers map[string]time.Time
resolvedAddr map[string]*net.UDPAddr
publishCh chan string
pending map[string]struct{}
published atomic.Uint64
received atomic.Uint64
ctx context.Context
cancel context.CancelFunc
F
function

NewMeshBus

NewMeshBus creates a new Warp Mesh synchronization bus.

Parameters

Returns

error
v1/syncbus/mesh/mesh.go:55-140
func NewMeshBus(opts MeshOptions) (*MeshBus, error)

{
	if opts.Port == 0 {
		opts.Port = 7946
	}
	if opts.Group == "" {
		opts.Group = "239.0.0.1"
	}

	addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", opts.Group, opts.Port))
	if err != nil {
		return nil, fmt.Errorf("mesh: failed to resolve multicast address: %w", err)
	}

	// Configure socket to allow multiple listeners on the same port.
	lc := net.ListenConfig{
		Control: func(network, address string, c syscall.RawConn) error {
			var err error
			c.Control(func(fd uintptr) {
				_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
				_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, 15, 1)
			})
			return err
		},
	}

	c, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", opts.Port))
	if err != nil {
		return nil, fmt.Errorf("mesh: failed to listen on port %d: %w", opts.Port, err)
	}

	pconn := ipv4.NewPacketConn(c)

	var iface *net.Interface
	if opts.Interface != "" {
		iface, err = net.InterfaceByName(opts.Interface)
		if err != nil {
			_ = c.Close()
			return nil, fmt.Errorf("mesh: failed to find interface %s: %w", opts.Interface, err)
		}
	}

	if err := pconn.JoinGroup(iface, addr); err != nil {
		_ = c.Close()
		return nil, fmt.Errorf("mesh: failed to join group %s: %w", opts.Group, err)
	}

	if iface != nil {
		if err := pconn.SetMulticastInterface(iface); err != nil {
			_ = c.Close()
			return nil, fmt.Errorf("mesh: failed to set multicast interface: %w", err)
		}
	}

	// Enable loopback so multiple nodes on same host can hear each other.
	_ = pconn.SetMulticastLoopback(true)

	if opts.BatchInterval == 0 {
		opts.BatchInterval = 100 * time.Millisecond
	}
	if opts.BatchSize == 0 {
		opts.BatchSize = 20
	}

	ctx, cancel := context.WithCancel(context.Background())
	b := &MeshBus{
		opts:         opts,
		nodeID:       uuid.New(),
		conn:         c,
		pconn:        pconn,
		groupAddr:    addr,
		subs:         make(map[string][]chan syncbus.Event),
		knownPeers:   make(map[string]time.Time),
		resolvedAddr: make(map[string]*net.UDPAddr),
		publishCh:    make(chan string, 1000),
		pending:      make(map[string]struct{}),
		ctx:          ctx,
		cancel:       cancel,
	}

	go b.listen()
	go b.heartbeatLoop()
	go b.cleanupPeers()
	go b.runBatcher()

	return b, nil
}