Merge Engine
The merge package handles conflict resolution for concurrent updates. It allows you to define custom logic for merging values instead of the default “Last-Write-Wins” strategy.
Overview
In a distributed system, or even concurrently on a single node, multiple writers might update the same key.
- Default Behavior: The last write overwrites the previous value.
- Merge Engine: Allows you to inspect the old value and the new value and produce a merged result.
API Reference
NewEngine
Creates a new Merge Engine.
engine := merge.NewEngine[int]()
Register
Registers a merge function for a specific key pattern.
// func Register(pattern string, fn MergeFn[T])
engine.Register("counter", func(old, new int) (int, error) {
return old + new, nil
})
MergeFn
The signature of a merge function.
type MergeFn[T any] func(old, new T) (T, error)
Value
Wraps a value with a timestamp used by merge strategies.
type Value[T any] struct {
Data T
Timestamp time.Time
}
Strategy Interface
Defines how two values should be merged.
type Strategy[T any] interface {
Merge(old, new Value[T]) Value[T]
}
Engine Methods
func (e *Engine[T]) Merge(key string, old, new Value[T]) (Value[T], error)
Merges two values based on the registered function or the default strategy.
Examples
1. Counter (Sum)
Useful for distributed counters where you want to capture all increments.
w.Merge("stats:visits", func(old, new int) (int, error) {
return old + new, nil
})
// Thread 1: Set(1)
// Thread 2: Set(1)
// Result: 2 (instead of 1)
2. Append to List
Useful for logging or activity streams.
w.Merge("user:logs", func(old, new []string) ([]string, error) {
// Append new logs to old logs
return append(old, new...), nil
})
3. CRDT-like Maps
Merge fields of a struct/map.
type UserProfile struct {
Name string
Email string
Age int
}
w.Merge("user:profile", func(old, new UserProfile) (UserProfile, error) {
merged := old
if new.Name != "" { merged.Name = new.Name }
if new.Email != "" { merged.Email = new.Email }
if new.Age != 0 { merged.Age = new.Age }
return merged, nil
})
How It Works with Core
When you call w.Set(key, value):
1. Warp fetches the current value from Cache (or Store).
2. If a value exists, it checks if a Merge Function is registered for key.
3. If yes, it calls fn(current, value).
4. The result is then saved to Cache and Store.
[!NOTE]
This happens before the value is sent to the Sync Bus (in distributed modes). This means the “merge” is local to the node performing the write. For strong consistency with merging, useModeStrongDistributedwhich ensures serialized access via quorum.