mesh API

mesh

package

API reference for the mesh package.

S
struct

MeshOptions

MeshOptions configures the Warp Mesh bus.

v1/syncbus/mesh/mesh.go:18-27
type MeshOptions struct

Fields

Name Type Description
Port int
Interface string
Group string
Peers []string
AdvertiseAddr string
Heartbeat time.Duration
BatchInterval time.Duration
BatchSize int
S
struct

MeshBus

MeshBus implements a peer-to-peer synchronization bus using UDP Multicast and Unicast Gossip.

v1/syncbus/mesh/mesh.go:30-52
type MeshBus struct

Methods

Publish
Method

Publish sends an invalidation event to the mesh. It uses internal batching.

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*MeshBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	b.mu.Lock()
	if _, ok := b.pending[key]; ok {
		b.mu.Unlock()
		return nil // Already pending delivery
	}
	b.pending[key] = struct{}{}
	b.mu.Unlock()

	select {
	case b.publishCh <- key:
		return nil
	case <-ctx.Done():
		b.mu.Lock()
		delete(b.pending, key)
		b.mu.Unlock()
		return ctx.Err()
	}
}
broadcast
Method

broadcast sends the packet payload to multicast group and known unicast peers.

Parameters

payload []byte

Returns

error
func (*MeshBus) broadcast(payload []byte) error
{
	// Multicast is the primary delivery mechanism
	_, err := b.conn.WriteTo(payload, b.groupAddr)
	if err == nil {
		b.published.Add(1)
	}

	// Unicast to known peers (backup/cloud traversal)
	b.peersMu.RLock()
	// Copy pointers to avoid holding the lock during WriteTo
	addrs := make([]*net.UDPAddr, 0, len(b.resolvedAddr)+len(b.opts.Peers))
	for _, addr := range b.resolvedAddr {
		addrs = append(addrs, addr)
	}
	b.peersMu.RUnlock()

	// Send to resolved peers
	for _, addr := range addrs {
		_, _ = b.conn.WriteTo(payload, addr)
	}

	// Send to seed peers too if not yet resolved/known
	for _, peer := range b.opts.Peers {
		b.peersMu.RLock()
		_, known := b.resolvedAddr[peer]
		b.peersMu.RUnlock()
		if known {
			continue
		}

		addr, err := net.ResolveUDPAddr("udp4", peer)
		if err != nil {
			continue
		}
		_, _ = b.conn.WriteTo(payload, addr)
	}

	return err
}

PublishAndAwait is not fully supported by UDP Mesh due to its fire-and-forget nature.

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*MeshBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	if replicas > 0 {
		return syncbus.ErrQuorumUnsupported
	}
	return b.Publish(ctx, key, opts...)
}

PublishAndAwaitTopology is not supported by UDP Mesh.

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*MeshBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	return syncbus.ErrQuorumUnsupported
}
Subscribe
Method

Subscribe registers a channel to receive invalidation events for a key.

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*MeshBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	ch := make(chan syncbus.Event, 32)
	b.mu.Lock()
	b.subs[key] = append(b.subs[key], ch)
	b.mu.Unlock()

	go func() {
		<-ctx.Done()
		_ = b.Unsubscribe(context.Background(), key, ch)
	}()

	return ch, nil
}
Unsubscribe
Method

Unsubscribe removes a channel from key subscriptions.

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*MeshBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	b.mu.Lock()
	defer b.mu.Unlock()

	subs, ok := b.subs[key]
	if !ok {
		return nil
	}

	for i, c := range subs {
		if c == ch {
			b.subs[key] = append(subs[:i], subs[i+1:]...)
			close(chan syncbus.Event(c))
			break
		}
	}

	if len(b.subs[key]) == 0 {
		delete(b.subs, key)
	}

	return nil
}
IsHealthy
Method

IsHealthy returns true if the underlying UDP connection is active.

Returns

bool
func (*MeshBus) IsHealthy() bool
{
	return b.conn != nil
}
RevokeLease
Method

RevokeLease is a convenience for Publish with a lease prefix.

Parameters

id string

Returns

error
func (*MeshBus) RevokeLease(ctx context.Context, id string) error
{
	return b.Publish(ctx, "lease:"+id)
}

SubscribeLease is a convenience for Subscribe with a lease prefix.

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*MeshBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return b.Subscribe(ctx, "lease:"+id)
}

UnsubscribeLease is a convenience for Unsubscribe with a lease prefix.

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*MeshBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return b.Unsubscribe(ctx, "lease:"+id, ch)
}
listen
Method
func (*MeshBus) listen()
{
	buf := make([]byte, 1500)
	for {
		select {
		case <-b.ctx.Done():
			return
		default:
		}

		n, _, err := b.conn.ReadFrom(buf)
		if err != nil {
			continue
		}

		var p packet
		if err := p.unmarshal(buf[:n]); err != nil {
			continue
		}

		if p.NodeID == b.nodeID {
			continue
		}

		b.received.Add(1)

		if p.Type == typeHeartbeat {
			b.peersMu.Lock()
			addrStr := string(p.Key)
			b.knownPeers[addrStr] = time.Now()
			if _, ok := b.resolvedAddr[addrStr]; !ok {
				if rAddr, err := net.ResolveUDPAddr("udp4", addrStr); err == nil {
					b.resolvedAddr[addrStr] = rAddr
				}
			}
			b.peersMu.Unlock()
			continue
		}

		if p.Type == typeInvalidate {
			b.handleInvalidate(string(p.Key))
		}

		if p.Type == typeBatch {
			for _, key := range p.Keys {
				b.handleInvalidate(key)
			}
		}
	}
}

Parameters

key string
func (*MeshBus) handleInvalidate(key string)
{
	b.mu.RLock()
	chans, ok := b.subs[key]
	if ok {
		evt := syncbus.Event{Key: key}
		for _, ch := range chans {
			select {
			case ch <- evt:
			default:
			}
		}
	}
	b.mu.RUnlock()
}
Close
Method

Close gracefully shuts down the mesh bus.

Returns

error
func (*MeshBus) Close() error
{
	b.cancel()
	if b.conn != nil {
		return b.conn.Close()
	}
	return nil
}
heartbeatLoop
Method
func (*MeshBus) heartbeatLoop()
{
	interval := b.opts.Heartbeat
	if interval == 0 {
		interval = 5 * time.Second
	}
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-b.ctx.Done():
			return
		case <-ticker.C:
			addr := b.opts.AdvertiseAddr
			if addr == "" {
				// Fallback to local address if possible, though AdvertiseAddr is preferred
				addr = b.conn.LocalAddr().String()
			}

			p := packet{
				Magic:  magicByte,
				Type:   typeHeartbeat,
				NodeID: b.nodeID,
				KeyLen: uint16(len(addr)),
				Key:    []byte(addr),
			}

			buf := bufferPool.Get().([]byte)
			n, err := p.marshal(buf)
			if err == nil {
				_ = b.broadcast(buf[:n])
			}
			bufferPool.Put(buf)
		}
	}
}
cleanupPeers
Method
func (*MeshBus) cleanupPeers()
{
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-b.ctx.Done():
			return
		case <-ticker.C:
			b.peersMu.Lock()
			now := time.Now()
			for addr, lastSeen := range b.knownPeers {
				if now.Sub(lastSeen) > 60*time.Second {
					delete(b.knownPeers, addr)
					delete(b.resolvedAddr, addr)
				}
			}
			b.peersMu.Unlock()
		}
	}
}
runBatcher
Method
func (*MeshBus) runBatcher()
{
	ticker := time.NewTicker(b.opts.BatchInterval)
	defer ticker.Stop()

	var batch []string

	flush := func() {
		if len(batch) == 0 {
			return
		}

		p := packet{
			Magic:  magicByte,
			Type:   typeBatch,
			NodeID: b.nodeID,
			Keys:   batch,
		}

		buf := bufferPool.Get().([]byte)
		if n, err := p.marshal(buf); err == nil {
			_ = b.broadcast(buf[:n])
		}
		bufferPool.Put(buf)

		// Cleanup pending map
		b.mu.Lock()
		for _, k := range batch {
			delete(b.pending, k)
		}
		b.mu.Unlock()
		batch = nil
	}

	for {
		select {
		case <-b.ctx.Done():
			return
		case key := <-b.publishCh:
			batch = append(batch, key)
			if len(batch) >= b.opts.BatchSize {
				flush()
			}
		case <-ticker.C:
			flush()
		}
	}
}
Metrics
Method

Metrics returns the published and received counts.

Returns

func (*MeshBus) Metrics() syncbus.Metrics
{
	return syncbus.Metrics{
		Published: b.published.Load(),
		Delivered: b.received.Load(),
	}
}
Peers
Method

Peers returns a list of currently known active peers.

Returns

[]string
func (*MeshBus) Peers() []string
{
	b.peersMu.RLock()
	defer b.peersMu.RUnlock()

	peers := make([]string, 0, len(b.knownPeers))
	for addr := range b.knownPeers {
		peers = append(peers, addr)
	}
	return peers
}

Fields

Name Type Description
opts MeshOptions
nodeID [16]byte
conn net.PacketConn
pconn *ipv4.PacketConn
groupAddr *net.UDPAddr
mu sync.RWMutex
subs map[string][]chan syncbus.Event
peersMu sync.RWMutex
knownPeers map[string]time.Time
resolvedAddr map[string]*net.UDPAddr
publishCh chan string
pending map[string]struct{}
published atomic.Uint64
received atomic.Uint64
ctx context.Context
cancel context.CancelFunc
F
function

NewMeshBus

NewMeshBus creates a new Warp Mesh synchronization bus.

Parameters

Returns

error
v1/syncbus/mesh/mesh.go:55-140
func NewMeshBus(opts MeshOptions) (*MeshBus, error)

{
	if opts.Port == 0 {
		opts.Port = 7946
	}
	if opts.Group == "" {
		opts.Group = "239.0.0.1"
	}

	addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", opts.Group, opts.Port))
	if err != nil {
		return nil, fmt.Errorf("mesh: failed to resolve multicast address: %w", err)
	}

	// Configure socket to allow multiple listeners on the same port.
	lc := net.ListenConfig{
		Control: func(network, address string, c syscall.RawConn) error {
			var err error
			c.Control(func(fd uintptr) {
				_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
				_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, 15, 1)
			})
			return err
		},
	}

	c, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", opts.Port))
	if err != nil {
		return nil, fmt.Errorf("mesh: failed to listen on port %d: %w", opts.Port, err)
	}

	pconn := ipv4.NewPacketConn(c)

	var iface *net.Interface
	if opts.Interface != "" {
		iface, err = net.InterfaceByName(opts.Interface)
		if err != nil {
			_ = c.Close()
			return nil, fmt.Errorf("mesh: failed to find interface %s: %w", opts.Interface, err)
		}
	}

	if err := pconn.JoinGroup(iface, addr); err != nil {
		_ = c.Close()
		return nil, fmt.Errorf("mesh: failed to join group %s: %w", opts.Group, err)
	}

	if iface != nil {
		if err := pconn.SetMulticastInterface(iface); err != nil {
			_ = c.Close()
			return nil, fmt.Errorf("mesh: failed to set multicast interface: %w", err)
		}
	}

	// Enable loopback so multiple nodes on same host can hear each other.
	_ = pconn.SetMulticastLoopback(true)

	if opts.BatchInterval == 0 {
		opts.BatchInterval = 100 * time.Millisecond
	}
	if opts.BatchSize == 0 {
		opts.BatchSize = 20
	}

	ctx, cancel := context.WithCancel(context.Background())
	b := &MeshBus{
		opts:         opts,
		nodeID:       uuid.New(),
		conn:         c,
		pconn:        pconn,
		groupAddr:    addr,
		subs:         make(map[string][]chan syncbus.Event),
		knownPeers:   make(map[string]time.Time),
		resolvedAddr: make(map[string]*net.UDPAddr),
		publishCh:    make(chan string, 1000),
		pending:      make(map[string]struct{}),
		ctx:          ctx,
		cancel:       cancel,
	}

	go b.listen()
	go b.heartbeatLoop()
	go b.cleanupPeers()
	go b.runBatcher()

	return b, nil
}
F
function

TestMeshMultiNode

Parameters

v1/syncbus/mesh/mesh_multi_node_test.go:13-128
func TestMeshMultiNode(t *testing.T)

{
	const nodeCount = 10
	portBase := 10000 + (int(time.Now().Unix()) % 5000)

	nodes := make([]*MeshBus, nodeCount)
	opts := make([]MeshOptions, nodeCount)

	ifi := findMulticastInterface()
	ifaceName := ""
	if ifi != nil {
		ifaceName = ifi.Name
	}

	// Create seed node (Node 0)
	seedAddr := fmt.Sprintf("127.0.0.1:%d", portBase)
	opts[0] = MeshOptions{
		Port:          portBase,
		Interface:     ifaceName,
		AdvertiseAddr: seedAddr,
		Heartbeat:     50 * time.Millisecond,
	}

	var err error
	nodes[0], err = NewMeshBus(opts[0])
	if err != nil {
		t.Fatalf("Failed to create seed node: %v", err)
	}
	defer nodes[0].Close()

	// Create other nodes, some with seed, some using multicast solely (simulated by same group)
	for i := 1; i < nodeCount; i++ {
		port := portBase + i
		addr := fmt.Sprintf("127.0.0.1:%d", port)

		o := MeshOptions{
			Port:          port,
			Interface:     ifaceName,
			AdvertiseAddr: addr,
			Heartbeat:     50 * time.Millisecond,
		}

		// Every node knows the seed to ensure discovery even if Multicast is restricted
		o.Peers = []string{seedAddr}

		opts[i] = o
		nodes[i], err = NewMeshBus(o)
		if err != nil {
			t.Fatalf("Failed to create node %d: %v", i, err)
		}
		defer nodes[i].Close()
	}

	// Wait for full mesh discovery
	t.Log("Waiting for nodes to discover each other...")
	// Increase timeout for discovery in potentially constrained environments
	discoveryCtx, discoveryCancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer discoveryCancel()

	for i, node := range nodes {
	discoveryLoop:
		for {
			peers := node.Peers()
			// We wait until most nodes are discovered.
			// In loopback/CI, we might not get 100% but we need enough for a valid test.
			if len(peers) >= nodeCount/2 {
				break discoveryLoop
			}
			select {
			case <-discoveryCtx.Done():
				t.Logf("Node %d only discovered %d/%d peers: %v", i, len(peers), nodeCount-1, peers)
				break discoveryLoop
			case <-time.After(200 * time.Millisecond):
			}
		}
	}

	// Verify propagation: Node 0 publishes, everyone else receives
	key := "multi-node-sync"
	verificationCtx, verificationCancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer verificationCancel()

	subs := make([]<-chan syncbus.Event, nodeCount)
	for i := 1; i < nodeCount; i++ {
		// Use verificationCtx so channels don't close prematurely if discovery took too long
		ch, _ := nodes[i].Subscribe(verificationCtx, key)
		subs[i] = ch
	}

	time.Sleep(500 * time.Millisecond) // Give time for subscriptions and gossip to settle

	if err := nodes[0].Publish(context.Background(), key); err != nil {
		t.Fatalf("Failed to publish from seed: %v", err)
	}

	var wg sync.WaitGroup
	for i := 1; i < nodeCount; i++ {
		wg.Add(1)
		go func(idx int, ch <-chan syncbus.Event) {
			defer wg.Done()
			select {
			case evt, ok := <-ch:
				if !ok {
					t.Errorf("Node %d: channel closed unexpectedly", idx)
					return
				}
				if evt.Key != key {
					t.Errorf("Node %d received wrong key: '%s' (expected '%s')", idx, evt.Key, key)
				}
			case <-verificationCtx.Done():
				// If we timed out, check if we missed it or if it's just slow
				t.Errorf("Node %d timed out waiting for event", idx)
			}
		}(i, subs[i])
	}
	wg.Wait()
}
F
function

BenchmarkMeshPublish

Parameters

v1/syncbus/mesh/mesh_multi_node_test.go:130-145
func BenchmarkMeshPublish(b *testing.B)

{
	opts := MeshOptions{
		Port:      12000,
		Heartbeat: 1 * time.Hour, // Disable heartbeats for bench
	}
	node, _ := NewMeshBus(opts)
	defer node.Close()

	ctx := context.Background()
	key := "bench-key"

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_ = node.Publish(ctx, key)
	}
}
F
function

BenchmarkMeshPacketMarshal

Parameters

v1/syncbus/mesh/mesh_multi_node_test.go:147-162
func BenchmarkMeshPacketMarshal(b *testing.B)

{
	p := packet{
		Magic:   magicByte,
		Type:    typeInvalidate,
		NodeID:  [16]byte{1, 2, 3, 4},
		KeyHash: 12345678,
		KeyLen:  10,
		Key:     []byte("test-key-1"),
	}
	buf := make([]byte, 1500)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_, _ = p.marshal(buf)
	}
}
F
function

findMulticastInterface

Returns

v1/syncbus/mesh/mesh_test.go:11-19
func findMulticastInterface() *net.Interface

{
	ifaces, _ := net.Interfaces()
	for _, ifi := range ifaces {
		if ifi.Flags&net.FlagMulticast != 0 && ifi.Flags&net.FlagUp != 0 && ifi.Flags&net.FlagLoopback == 0 {
			return &ifi
		}
	}
	return nil
}
F
function

TestMeshIntegration

Parameters

v1/syncbus/mesh/mesh_test.go:21-70
func TestMeshIntegration(t *testing.T)

{
	ifi := findMulticastInterface()
	ifaceName := ""
	if ifi != nil {
		ifaceName = ifi.Name
	}

	opts := MeshOptions{
		Port:      8000 + (int(time.Now().Unix()) % 1000), // Randomish port
		Group:     "239.0.0.1",
		Interface: ifaceName,
	}

	nodeA, err := NewMeshBus(opts)
	if err != nil {
		t.Fatalf("Failed to create nodeA: %v", err)
	}
	defer nodeA.Close()

	nodeB, err := NewMeshBus(opts)
	if err != nil {
		t.Fatalf("Failed to create nodeB: %v", err)
	}
	defer nodeB.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	key := "test-key"
	chB, err := nodeB.Subscribe(ctx, key)
	if err != nil {
		t.Fatalf("Failed to subscribe on nodeB: %v", err)
	}

	// Wait a bit for subscription to be ready
	time.Sleep(200 * time.Millisecond)

	if err := nodeA.Publish(ctx, key); err != nil {
		t.Fatalf("Failed to publish from nodeA: %v", err)
	}

	select {
	case evt := <-chB:
		if evt.Key != key {
			t.Errorf("Expected key %s, got %s", key, evt.Key)
		}
	case <-ctx.Done():
		t.Error("Timed out waiting for invalidation on nodeB")
	}
}
F
function

TestMeshLoopback

Parameters

v1/syncbus/mesh/mesh_test.go:72-110
func TestMeshLoopback(t *testing.T)

{
	ifi := findMulticastInterface()
	ifaceName := ""
	if ifi != nil {
		ifaceName = ifi.Name
	}

	opts := MeshOptions{
		Port:      8000 + (int(time.Now().Unix()) % 1000) + 1,
		Group:     "239.0.0.1",
		Interface: ifaceName,
	}

	node, err := NewMeshBus(opts)
	if err != nil {
		t.Fatalf("Failed to create node: %v", err)
	}
	defer node.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	key := "loop-key"
	ch, err := node.Subscribe(ctx, key)
	if err != nil {
		t.Fatalf("Failed to subscribe: %v", err)
	}

	if err := node.Publish(ctx, key); err != nil {
		t.Fatalf("Failed to publish: %v", err)
	}

	select {
	case <-ch:
		t.Error("Node received its own message (loopback should be filtered)")
	case <-ctx.Done():
		// Success
	}
}
F
function

TestMeshGossip

Parameters

v1/syncbus/mesh/mesh_test.go:112-160
func TestMeshGossip(t *testing.T)

{
	portA := 9000 + (int(time.Now().Unix()) % 100)
	portB := portA + 1

	addrA := fmt.Sprintf("127.0.0.1:%d", portA)

	optsA := MeshOptions{
		Port:          portA,
		AdvertiseAddr: addrA,
		Heartbeat:     100 * time.Millisecond,
	}
	nodeA, err := NewMeshBus(optsA)
	if err != nil {
		t.Fatalf("Failed to create nodeA: %v", err)
	}
	defer nodeA.Close()

	optsB := MeshOptions{
		Port:          portB,
		AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", portB),
		Peers:         []string{addrA},
		Heartbeat:     100 * time.Millisecond,
	}
	nodeB, err := NewMeshBus(optsB)
	if err != nil {
		t.Fatalf("Failed to create nodeB: %v", err)
	}
	defer nodeB.Close()

	// Wait for gossip discovery
	found := false
	for i := 0; i < 20; i++ {
		peers := nodeA.Peers()
		for _, p := range peers {
			if p == optsB.AdvertiseAddr {
				found = true
				break
			}
		}
		if found {
			break
		}
		time.Sleep(100 * time.Millisecond)
	}

	if !found {
		t.Errorf("NodeA did not discover NodeB via gossip (Peers: %v)", nodeA.Peers())
	}
}
S
struct

packet

v1/syncbus/mesh/packet.go:29-37
type packet struct

Methods

marshal
Method

Parameters

b []byte

Returns

int
error
func (*packet) marshal(b []byte) (int, error)
{
	if len(b) < 18 {
		return 0, errShortBuffer
	}

	b[0] = p.Magic
	b[1] = p.Type
	copy(b[2:18], p.NodeID[:])

	switch p.Type {
	case typeInvalidate, typeHeartbeat:
		if len(b) < 28+len(p.Key) {
			return 0, errShortBuffer
		}
		binary.BigEndian.PutUint64(b[18:26], p.KeyHash)
		binary.BigEndian.PutUint16(b[26:28], p.KeyLen)
		copy(b[28:], p.Key)
		return 28 + int(p.KeyLen), nil

	case typeBatch:
		if len(b) < 20 {
			return 0, errShortBuffer
		}
		binary.BigEndian.PutUint16(b[18:20], uint16(len(p.Keys)))
		curr := 20
		for _, k := range p.Keys {
			kLen := len(k)
			if len(b) < curr+2+kLen {
				return curr, errShortBuffer
			}
			binary.BigEndian.PutUint16(b[curr:curr+2], uint16(kLen))
			copy(b[curr+2:], k)
			curr += 2 + kLen
		}
		return curr, nil
	}

	return 18, nil
}
unmarshal
Method

Parameters

b []byte

Returns

error
func (*packet) unmarshal(b []byte) error
{
	if len(b) < 18 {
		return errShortBuffer
	}

	p.Magic = b[0]
	if p.Magic != magicByte {
		return errInvalidMagic
	}

	p.Type = b[1]
	copy(p.NodeID[:], b[2:18])

	switch p.Type {
	case typeInvalidate, typeHeartbeat:
		if len(b) < 28 {
			return errShortBuffer
		}
		p.KeyHash = binary.BigEndian.Uint64(b[18:26])
		p.KeyLen = binary.BigEndian.Uint16(b[26:28])
		if len(b) < 28+int(p.KeyLen) {
			return errShortBuffer
		}
		if p.KeyLen > 0 {
			p.Key = make([]byte, p.KeyLen)
			copy(p.Key, b[28:28+int(p.KeyLen)])
		}

	case typeBatch:
		if len(b) < 20 {
			return errShortBuffer
		}
		count := int(binary.BigEndian.Uint16(b[18:20]))
		p.Keys = make([]string, 0, count)
		curr := 20
		for range count {
			if len(b) < curr+2 {
				return errShortBuffer
			}
			kLen := int(binary.BigEndian.Uint16(b[curr : curr+2]))
			if len(b) < curr+2+kLen {
				return errShortBuffer
			}
			p.Keys = append(p.Keys, string(b[curr+2:curr+2+kLen]))
			curr += 2 + kLen
		}
	}

	return nil
}

Fields

Name Type Description
Magic byte
Type byte
NodeID [16]byte
KeyHash uint64
KeyLen uint16
Key []byte
Keys []string