main
API
main
packageAPI reference for the main
package.
Imports
(44)
STD
context
STD
flag
STD
fmt
STD
log
STD
sort
STD
strings
STD
sync
STD
sync/atomic
STD
time
PKG
github.com/dgraph-io/ristretto
INT
github.com/mirkobrombin/go-warp/v1/core
INT
github.com/mirkobrombin/go-warp/v1/presets
INT
github.com/mirkobrombin/go-warp/v1/syncbus/mesh
PKG
github.com/redis/go-redis/v9
STD
os
STD
os/signal
STD
syscall
STD
net/http
INT
github.com/mirkobrombin/go-warp/v1/adapter
INT
github.com/mirkobrombin/go-warp/v1/cache
INT
github.com/mirkobrombin/go-warp/v1/merge
STD
math/rand
STD
net/http/pprof
STD
runtime
INT
github.com/mirkobrombin/go-warp/v1/syncbus
PKG
github.com/prometheus/client_golang/prometheus
PKG
github.com/prometheus/client_golang/prometheus/promauto
PKG
github.com/prometheus/client_golang/prometheus/promhttp
PKG
github.com/tidwall/redcon
INT
github.com/mirkobrombin/go-warp/v1/watchbus
INT
github.com/mirkobrombin/go-warp/v1/lock
PKG
github.com/nats-io/nats.go
INT
github.com/mirkobrombin/go-warp/v1/syncbus/nats
INT
github.com/mirkobrombin/go-warp/v1/syncbus/redis
STD
errors
STD
path/filepath
STD
strconv
INT
github.com/mirkobrombin/go-warp/v1/cache/adaptive
INT
github.com/mirkobrombin/go-warp/v1/cache/versioned
INT
github.com/mirkobrombin/go-warp/v1/metrics
INT
github.com/mirkobrombin/go-warp/v1/validator
PKG
go.opentelemetry.io/otel
PKG
go.opentelemetry.io/otel/exporters/stdout/stdouttrace
PKG
go.opentelemetry.io/otel/sdk/trace
F
function
main
cmd/bench/main.go:30-49
func main()
{
flag.Parse()
payload := make([]byte, *dataSize)
for i := range payload {
payload[i] = 'x'
}
targets := strings.Split(*target, ",")
if *target == "all" {
targets = []string{"warp-local", "warp-mesh", "ristretto", "warp-redis", "redis", "dragonfly"}
}
fmt.Printf("| %-15s | %-10s | %-12s | %-12s |\n", "System", "Ops/sec", "Avg Latency", "P99 Latency")
fmt.Println("|:---|:---|:---|:---|")
for _, t := range targets {
runBenchmark(strings.TrimSpace(t), payload)
}
}
F
function
runBenchmark
Parameters
name
string
payload
[]byte
cmd/bench/main.go:51-182
func runBenchmark(name string, payload []byte)
{
var (
getFn func(ctx context.Context, key string) error
setFn func(ctx context.Context, key string, val []byte) error
cleanup func()
)
ctx := context.Background()
key := "bench:key"
switch name {
case "warp-local":
w := presets.NewInMemoryStandalone[[]byte]()
w.Register(key, core.ModeStrongLocal, time.Hour)
setFn = func(ctx context.Context, k string, v []byte) error { return w.Set(ctx, k, v) }
getFn = func(ctx context.Context, k string) error { _, err := w.Get(ctx, k); return err }
case "warp-mesh":
w := presets.NewMeshEventual[[]byte](mesh.MeshOptions{
Port: 7946,
Heartbeat: 1 * time.Hour,
})
w.Register(key, core.ModeEventualDistributed, time.Hour)
setFn = func(ctx context.Context, k string, v []byte) error { return w.Set(ctx, k, v) }
getFn = func(ctx context.Context, k string) error { _, err := w.Get(ctx, k); return err }
case "ristretto":
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7,
MaxCost: 1 << 30,
BufferItems: 64,
})
setFn = func(ctx context.Context, k string, v []byte) error {
cache.Set(k, v, 1)
return nil
}
getFn = func(ctx context.Context, k string) error {
_, found := cache.Get(k)
if !found {
return fmt.Errorf("not found")
}
return nil
}
cleanup = func() { cache.Close() }
case "warp-redis":
w := presets.NewRedisEventual[[]byte](presets.RedisOptions{Addr: *redisAddr})
w.Register(key, core.ModeEventualDistributed, time.Hour)
setFn = func(ctx context.Context, k string, v []byte) error { return w.Set(ctx, k, v) }
getFn = func(ctx context.Context, k string) error { _, err := w.Get(ctx, k); return err }
case "redis":
r := redis.NewClient(&redis.Options{Addr: *redisAddr})
setFn = func(ctx context.Context, k string, v []byte) error { return r.Set(ctx, k, v, 0).Err() }
getFn = func(ctx context.Context, k string) error { return r.Get(ctx, k).Err() }
cleanup = func() { r.Close() }
case "dragonfly":
r := redis.NewClient(&redis.Options{Addr: *dfAddr})
setFn = func(ctx context.Context, k string, v []byte) error { return r.Set(ctx, k, v, 0).Err() }
getFn = func(ctx context.Context, k string) error { return r.Get(ctx, k).Err() }
cleanup = func() { r.Close() }
default:
log.Printf("Unknown target: %s", name)
return
}
if cleanup != nil {
defer cleanup()
}
// Warmup
if err := setFn(ctx, key, payload); err != nil {
// Log but continue, might be connection refused if container is missing
// fmt.Printf("| %-15s | %-10s | %-12s | %-12s |\n", name, "FAIL", "-", "-")
// return
}
var wg sync.WaitGroup
var ops int64
totalReqs := *requests
latencies := make([]int64, totalReqs)
start := time.Now()
chunk := totalReqs / *concurrency
for i := 0; i < *concurrency; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
offset := idx * chunk
for j := 0; j < chunk; j++ {
reqStart := time.Now()
if err := getFn(ctx, key); err == nil {
atomic.AddInt64(&ops, 1)
latencies[offset+j] = time.Since(reqStart).Nanoseconds()
}
}
}(i)
}
wg.Wait()
elapsed := time.Since(start)
if ops == 0 {
fmt.Printf("| %-15s | %-10s | %-12s | %-12s |\n", name, "ERROR", "-", "-")
return
}
throughput := float64(ops) / elapsed.Seconds()
avgLat := float64(elapsed.Nanoseconds()) / float64(ops)
// Calculate P99
var p99 string = "-"
validLats := make([]int64, 0, ops)
for _, l := range latencies {
if l > 0 {
validLats = append(validLats, l)
}
}
if len(validLats) > 0 {
sort.Slice(validLats, func(i, j int) bool { return validLats[i] < validLats[j] })
p99Idx := int(float64(len(validLats)) * 0.99)
if p99Idx >= len(validLats) {
p99Idx = len(validLats) - 1
}
p99 = fmt.Sprintf("%d", validLats[p99Idx])
}
fmt.Printf("| %-15s | %-10.0f | %-12.0f | %-12s |\n", name, throughput, avgLat, p99)
}
F
function
main
cmd/mesh-test/main.go:18-109
func main()
{
id := flag.Int("id", 0, "Node ID")
port := flag.Int("port", 7946, "Mesh Port")
advertise := flag.String("adv", "", "Advertise Address")
peer := flag.String("peer", "", "Seed Peer")
flag.Parse()
opts := mesh.MeshOptions{
Port: *port,
AdvertiseAddr: *advertise,
Heartbeat: 100 * time.Millisecond,
}
if *peer != "" {
opts.Peers = []string{*peer}
}
w := presets.NewMeshEventual[string](opts)
w.Register("test:key", core.ModeEventualDistributed, time.Hour)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Node 0 will be the publisher
if *id == 0 {
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
count := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
val := fmt.Sprintf("val-%d", count)
log.Printf("[Node %d] Setting test:key to %s", *id, val)
if err := w.Set(ctx, "test:key", val); err != nil {
log.Printf("[Node %d] Set error: %v", *id, err)
}
count++
}
}
}()
} else {
// Other nodes will monitor for changes
// In go-warp, Warp itself doesn't automatically subscribe to the bus for all keys yet,
// so we bridge it manually as seen in distributed_test.go
ch, _ := w.Subscribe(ctx, "test:key")
go func() {
for range ch {
log.Printf("[Node %d] Received Invalidation for test:key", *id)
_ = w.InvalidateLocal(ctx, "test:key")
}
}()
var lastVal string
go func() {
for {
select {
case <-ctx.Done():
return
default:
val, _ := w.Get(ctx, "test:key")
if val != lastVal && val != "" {
fmt.Printf("[NODE_%d_RECEIVED] %s\n", *id, val)
lastVal = val
}
time.Sleep(100 * time.Millisecond)
}
}
}()
}
// Monitor peers
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
log.Printf("[Node %d] Known peers: %v", *id, w.Peers())
}
}
}()
<-sigCh
log.Printf("[Node %d] Shutting down...", *id)
}
F
function
main
cmd/smoke-cluster/main.go:20-79
func main()
{
port := flag.Int("port", 8080, "HTTP port")
meshPort := flag.Int("mesh-port", 7946, "Mesh port")
peers := flag.String("peers", "", "Comma-separated list of peers (e.g. 127.0.0.1:7947)")
mode := flag.String("mode", "mesh", "mesh or redis")
redisAddr := flag.String("redis-addr", "localhost:6379", "Redis address")
flag.Parse()
var w *core.Warp[string]
if *mode == "mesh" {
mOpts := mesh.MeshOptions{
Port: *meshPort,
AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", *meshPort),
}
if *peers != "" {
mOpts.Peers = strings.Split(*peers, ",")
}
bus, _ := mesh.NewMeshBus(mOpts)
c := cache.NewInMemory[merge.Value[string]]()
// Use Redis as shared L2 store even for Mesh bus test
rClient := redis.NewClient(&redis.Options{Addr: *redisAddr})
store := adapter.NewRedisStore[string](rClient)
engine := merge.NewEngine[string]()
w = core.New[string](c, store, bus, engine)
} else {
w = presets.NewRedisEventual[string](presets.RedisOptions{
Addr: *redisAddr,
})
}
// Register a test key
w.Register("smoke-key", core.ModeEventualDistributed, time.Hour)
http.HandleFunc("/set", func(wWriter http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
val := r.URL.Query().Get("val")
if err := w.Set(r.Context(), key, val); err != nil {
http.Error(wWriter, err.Error(), 500)
return
}
fmt.Fprintf(wWriter, "OK")
})
http.HandleFunc("/get", func(wWriter http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
val, err := w.Get(r.Context(), key)
if err != nil {
http.Error(wWriter, "warp: not found", 404)
return
}
fmt.Fprintf(wWriter, "%s", val)
})
log.Printf("Smoke test node listening on :%d (mode: %s)...", *port, *mode)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}
F
function
main
cmd/warp-bench/main.go:21-76
func main()
{
flag.Parse()
log.Printf("Starting benchmark: %d requests, %d concurrency, %d bytes payload", *requests, *concurrency, *dataSize)
log.Println("Initializing Warp (InMemory Standalone)...")
w := presets.NewInMemoryStandalone[[]byte]()
ctx := context.Background()
key := "bench_key"
val := make([]byte, *dataSize)
for i := 0; i < *dataSize; i++ {
val[i] = 'x'
}
w.Register(key, core.ModeStrongLocal, time.Hour)
if err := w.Set(ctx, key, val); err != nil {
log.Fatalf("Setup failed: %v", err)
}
var wg sync.WaitGroup
var ops int64
var errorsCount int64
start := time.Now()
totalReqs := *requests
reqsPerWorker := totalReqs / *concurrency
for i := 0; i < *concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < reqsPerWorker; j++ {
_, err := w.Get(ctx, key)
if err != nil {
atomic.AddInt64(&errorsCount, 1)
}
atomic.AddInt64(&ops, 1)
}
}()
}
wg.Wait()
elapsed := time.Since(start)
throughput := float64(ops) / elapsed.Seconds()
avgLatency := elapsed.Seconds() / float64(ops) * 1e9 // ns
log.Printf("Finished in %v", elapsed)
log.Printf("Throughput: %.2f req/s", throughput)
log.Printf("Avg Latency: %.2f ns", avgLatency)
if errorsCount > 0 {
log.Printf("Errors: %d", errorsCount)
}
}
S
struct
LargeObject
LargeObject simulates a heavy payload
cmd/warp-bench-stress/main.go:27-29
type LargeObject struct
Fields
| Name | Type | Description |
|---|---|---|
| Data | [1024]byte |
F
function
main
cmd/warp-bench-stress/main.go:31-123
func main()
{
flag.Parse()
go func() {
log.Println("Starting pprof on :6060")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
targetBytes := uint64(*allocGB) * 1024 * 1024 * 1024
approxEntrySize := uint64(1200)
numEntries := int(targetBytes / approxEntrySize)
log.Printf("Targeting %d GB (~%d entries)", *allocGB, numEntries)
c := cache.NewInMemory[merge.Value[LargeObject]](
cache.WithMaxEntries[merge.Value[LargeObject]](numEntries),
)
w := core.New[LargeObject](c, nil, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), *duration)
defer cancel()
log.Println("Phase 1: Filling Cache...")
fillStart := time.Now()
var wg sync.WaitGroup
chunkSize := numEntries / *procs
for p := 0; p < *procs; p++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
start := id * chunkSize
end := start + chunkSize
for i := start; i < end; i++ {
key := fmt.Sprintf("key-%d", i)
// Must register key first!
w.Register(key, core.ModeStrongLocal, 1*time.Hour)
if err := w.Set(context.Background(), key, LargeObject{}); err != nil {
log.Printf("Set error: %v", err)
}
if i%100000 == 0 {
runtime.Gosched()
}
}
}(p)
}
wg.Wait()
log.Printf("Filled in %v. Current Memory:", time.Since(fillStart))
printMemStats()
log.Printf("Phase 2: Churing (%v)...", *duration)
for p := 0; p < *procs; p++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(id)))
for {
select {
case <-ctx.Done():
return
default:
k := r.Intn(numEntries)
key := fmt.Sprintf("key-%d", k)
if r.Float32() < 0.2 {
w.Set(context.Background(), key, LargeObject{})
} else {
w.Get(context.Background(), key)
}
}
}
}(p)
}
monitorTicker := time.NewTicker(5 * time.Second)
defer monitorTicker.Stop()
go func() {
for {
select {
case <-ctx.Done():
return
case <-monitorTicker.C:
printMemStats()
}
}
}()
wg.Wait()
log.Println("Stress Test Completed.")
}
F
function
printMemStats
cmd/warp-bench-stress/main.go:125-132
func printMemStats()
{
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB", m.Alloc/1024/1024)
fmt.Printf("\tTotalAlloc = %v MiB", m.TotalAlloc/1024/1024)
fmt.Printf("\tSys = %v MiB", m.Sys/1024/1024)
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}
S
struct
server
cmd/warp-proxy/main.go:39-41
type server struct
Methods
handler
Method
Parameters
conn
redcon.Conn
cmd
redcon.Command
func (*server) handler(conn redcon.Conn, cmd redcon.Command)
{
if len(cmd.Args) == 0 {
return
}
opsProcessed.Inc()
switch strings.ToUpper(string(cmd.Args[0])) {
case "GET":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for 'get' command")
return
}
key := string(cmd.Args[1])
val, err := s.warp.Get(context.Background(), key)
if err != nil {
if err == core.ErrNotFound {
conn.WriteNull()
} else if err == core.ErrUnregistered {
// Auto-register on missed access (Sidecar behavior)
if s.warp.Register(key, core.ModeStrongLocal, 5*time.Minute) {
// Retry get after registration
val, err = s.warp.Get(context.Background(), key)
if err == nil {
conn.WriteBulk(val)
} else {
conn.WriteNull()
}
} else {
conn.WriteError(err.Error())
}
} else {
conn.WriteError(err.Error())
}
} else {
conn.WriteBulk(val)
}
case "SET":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for 'set' command")
return
}
key := string(cmd.Args[1])
val := cmd.Args[2]
// Check registration
_, err := s.warp.Get(context.Background(), key)
if err == core.ErrUnregistered {
s.warp.Register(key, core.ModeStrongLocal, 5*time.Minute)
}
err = s.warp.Set(context.Background(), key, val)
if err != nil {
conn.WriteError(err.Error())
} else {
conn.WriteString("OK")
}
case "PING":
if len(cmd.Args) > 1 {
conn.WriteBulk(cmd.Args[1])
} else {
conn.WriteString("PONG")
}
case "INFO":
conn.WriteBulkString("# Server\r\nredis_version:6.0.0\r\nwarp_version:1.0.0\r\n")
case "COMMAND", "CLIENT", "AUTH", "SELECT":
// Admin/Connection commands as No-Op to satisfy client libraries
conn.WriteString("OK")
case "QUIT":
conn.WriteString("OK")
conn.Close()
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
}
}
Fields
| Name | Type | Description |
|---|---|---|
| warp | *core.Warp[[]byte] |
F
function
main
cmd/warp-proxy/main.go:43-81
func main()
{
flag.Parse()
c := cache.NewInMemory[merge.Value[[]byte]](
cache.WithMaxEntries[merge.Value[[]byte]](10000),
)
bus := syncbus.NewInMemoryBus()
w := core.New[[]byte](c, nil, bus, nil)
srv := &server{warp: w}
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
log.Printf("metrics listening on :%d", *metricsPort)
if err := http.ListenAndServe(fmt.Sprintf(":%d", *metricsPort), mux); err != nil {
log.Printf("metrics server error: %v", err)
}
}()
listenAddr := fmt.Sprintf("%s:%d", *addr, *port)
log.Printf("warp-proxy listening on %s (redcon)", listenAddr)
err := redcon.ListenAndServe(listenAddr,
srv.handler,
func(conn redcon.Conn) bool {
// Accept
activeConns.Inc()
return true
},
func(conn redcon.Conn, err error) {
// Closed
activeConns.Dec()
},
)
if err != nil {
log.Fatal(err)
}
}
F
function
main
examples/v1/main.go:15-39
func main()
{
ctx := context.Background()
w := core.New[string](cache.NewInMemory[merge.Value[string]](), adapter.NewInMemoryStore[string](), nil, merge.NewEngine[string]())
w.Register("greeting", core.ModeStrongLocal, time.Minute)
if err := w.Set(ctx, "greeting", "Warp example"); err != nil {
panic(err)
}
v, _ := w.Get(ctx, "greeting")
fmt.Println(v)
// WatchBus prefix example
bus := watchbus.NewInMemory()
ch, err := core.WatchPrefix(ctx, bus, "user:")
if err != nil {
panic(err)
}
go func() {
for msg := range ch {
fmt.Printf("prefix event: %s\n", msg)
}
}()
_ = bus.Publish(ctx, "user:1", []byte("created"))
_ = bus.PublishPrefix(ctx, "user:", []byte("updated"))
time.Sleep(100 * time.Millisecond)
}
F
function
main
examples/v1/advanced/main.go:16-65
func main()
{
ctx := context.Background()
store := adapter.NewInMemoryStore[int]()
bus := syncbus.NewInMemoryBus()
engine := merge.NewEngine[int]()
engine.Register("counter", func(old, new int) (int, error) {
return old + new, nil
})
w1 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)
w2 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)
w1.Register("counter", core.ModeEventualDistributed, time.Minute)
w2.Register("counter", core.ModeEventualDistributed, time.Minute)
ch, _ := bus.Subscribe(ctx, "counter")
defer bus.Unsubscribe(ctx, "counter", ch)
go func() {
for range ch {
_ = w2.Invalidate(ctx, "counter")
}
}()
if err := w1.Set(ctx, "counter", 10); err != nil {
panic(err)
}
if err := w2.Set(ctx, "counter", 5); err != nil {
panic(err)
}
time.Sleep(10 * time.Millisecond)
v1, _ := w1.Get(ctx, "counter")
v2, _ := w2.Get(ctx, "counter")
fmt.Println("node1:", v1, "node2:", v2)
// WatchBus prefix example
wb := watchbus.NewInMemory()
pch, err := core.WatchPrefix(ctx, wb, "task:")
if err != nil {
panic(err)
}
go func() {
for msg := range pch {
fmt.Printf("task event: %s\n", msg)
}
}()
_ = wb.Publish(ctx, "task:1", []byte("done"))
_ = wb.PublishPrefix(ctx, "task:", []byte("all done"))
time.Sleep(100 * time.Millisecond)
}
F
function
main
examples/v1/leases/main.go:13-32
func main()
{
ctx := context.Background()
w := core.New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())
w.Register("task", core.ModeStrongLocal, time.Minute)
id, err := w.GrantLease(ctx, time.Second)
if err != nil {
panic(err)
}
w.AttachKey(id, "task")
_ = w.Set(ctx, "task", "running")
v, _ := w.Get(ctx, "task")
fmt.Println("before revoke:", v)
w.RevokeLease(ctx, id)
if _, err := w.Get(ctx, "task"); err != nil {
fmt.Println("after revoke: not found")
}
}
F
function
main
examples/v1/lock/main.go:12-35
func main()
{
ctx := context.Background()
bus := syncbus.NewInMemoryBus()
node1 := lock.NewInMemory(bus)
node2 := lock.NewInMemory(bus)
go func() {
if err := node1.Acquire(ctx, "leader", time.Second); err != nil {
panic(err)
}
fmt.Println("node1 elected as leader")
time.Sleep(500 * time.Millisecond)
_ = node1.Release(ctx, "leader")
}()
time.Sleep(100 * time.Millisecond)
if err := node2.Acquire(ctx, "leader", time.Second); err != nil {
panic(err)
}
fmt.Println("node2 elected as leader")
_ = node2.Release(ctx, "leader")
}
F
function
main
examples/v1/lock/nats/main.go:14-44
func main()
{
ctx := context.Background()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
panic(err)
}
defer nc.Close()
bus := busnats.NewNATSBus(nc)
node1 := lock.NewInMemory(bus)
node2 := lock.NewInMemory(bus)
go func() {
if err := node1.Acquire(ctx, "leader", time.Second); err != nil {
panic(err)
}
fmt.Println("node1 elected as leader")
time.Sleep(500 * time.Millisecond)
_ = node1.Release(ctx, "leader")
}()
time.Sleep(100 * time.Millisecond)
if err := node2.Acquire(ctx, "leader", time.Second); err != nil {
panic(err)
}
fmt.Println("node2 elected as leader")
_ = node2.Release(ctx, "leader")
}
F
function
main
examples/v1/mesh/main.go:13-65
func main()
{
ctx := context.Background()
// Node 1 setup
w1 := presets.NewMeshEventual[string](mesh.MeshOptions{
Port: 7946,
AdvertiseAddr: "127.0.0.1:7946",
Peers: []string{"127.0.0.1:7947"},
})
// Node 2 setup
w2 := presets.NewMeshEventual[string](mesh.MeshOptions{
Port: 7947,
AdvertiseAddr: "127.0.0.1:7947",
Peers: []string{"127.0.0.1:7946"},
})
// Register key on both nodes.
// Thanks to the new automatic subscription logic,
// nodes will now automatically listen for invalidations on the bus.
w1.Register("shared-key", core.ModeEventualDistributed, time.Minute)
w2.Register("shared-key", core.ModeEventualDistributed, time.Minute)
// Node 1 sets a value
fmt.Println("Node 1: Setting 'shared-key' to 'WarpSpeed'...")
if err := w1.Set(ctx, "shared-key", "WarpSpeed"); err != nil {
panic(err)
}
// Wait for gossip/propagation
time.Sleep(500 * time.Millisecond)
// Node 2 gets the value
val, err := w2.Get(ctx, "shared-key")
if err != nil {
fmt.Printf("Node 2: Error getting key: %v\n", err)
} else {
fmt.Printf("Node 2: Successfully retrieved value: %s\n", val)
}
// Node 2 updates the value
fmt.Println("Node 2: Updating 'shared-key' to 'HyperDrive'...")
if err := w2.Set(ctx, "shared-key", "HyperDrive"); err != nil {
panic(err)
}
// Wait for gossip
time.Sleep(500 * time.Millisecond)
// Node 1 gets the updated value
val1, _ := w1.Get(ctx, "shared-key")
fmt.Printf("Node 1: Current value: %s\n", val1)
}
F
function
main
examples/v1/redis/main.go:17-54
func main()
{
ctx := context.Background()
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
defer client.Close()
store := adapter.NewInMemoryStore[int]()
bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client, Region: "us-east-1"})
engine := merge.NewEngine[int]()
engine.Register("counter", func(old, new int) (int, error) {
return old + new, nil
})
w1 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)
w2 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)
w1.Register("counter", core.ModeEventualDistributed, time.Minute)
w2.Register("counter", core.ModeEventualDistributed, time.Minute)
ch, _ := bus.Subscribe(ctx, "counter")
defer bus.Unsubscribe(ctx, "counter", ch)
go func() {
for range ch {
_ = w2.Invalidate(ctx, "counter")
}
}()
if err := w1.Set(ctx, "counter", 10); err != nil {
panic(err)
}
if err := w2.Set(ctx, "counter", 5); err != nil {
panic(err)
}
time.Sleep(10 * time.Millisecond)
v1, _ := w1.Get(ctx, "counter")
v2, _ := w2.Get(ctx, "counter")
fmt.Println("node1:", v1, "node2:", v2)
}
F
function
main
examples/v1/ristretto/main.go:16-29
func main()
{
ctx := context.Background()
cfg := &ristretto.Config{NumCounters: 1e5, MaxCost: 1 << 20, BufferItems: 64}
c := cache.NewRistretto[merge.Value[string]](cache.WithRistretto(cfg))
w := core.New[string](c, adapter.NewInMemoryStore[string](), nil, merge.NewEngine[string]())
w.Register("greeting", core.ModeStrongLocal, time.Minute)
if err := w.Set(ctx, "greeting", "Warp example"); err != nil {
panic(err)
}
v, _ := w.Get(ctx, "greeting")
fmt.Println(v)
}
S
struct
runStats
examples/v1/super_advanced/main.go:33-38
type runStats struct
Fields
| Name | Type | Description |
|---|---|---|
| name | string | |
| val | int | |
| elapsed | time.Duration | |
| throughput | float64 |
S
struct
fileStore
fileStore is a simple disk-backed store introducing real I/O overhead
for each operation to highlight Warp’s caching benefits.
examples/v1/super_advanced/main.go:42-46
type fileStore struct
Methods
path
Method
Parameters
key
string
Returns
string
func (*fileStore) path(key string) string
{
return filepath.Join(s.dir, key+".txt")
}
Get
Method
Parameters
ctx
context.Context
key
string
Returns
int
bool
error
func (*fileStore) Get(ctx context.Context, key string) (int, bool, error)
{
s.mu.Lock()
defer s.mu.Unlock()
b, err := os.ReadFile(s.path(key))
if errors.Is(err, os.ErrNotExist) {
return 0, false, nil
}
if err != nil {
return 0, false, err
}
v, err := strconv.Atoi(string(b))
if err != nil {
return 0, false, err
}
if s.delay > 0 {
time.Sleep(s.delay)
}
return v, true, nil
}
Set
Method
Parameters
Returns
error
func (*fileStore) Set(ctx context.Context, key string, value int) error
{
s.mu.Lock()
defer s.mu.Unlock()
if err := os.WriteFile(s.path(key), []byte(strconv.Itoa(value)), 0o644); err != nil {
return err
}
if s.delay > 0 {
time.Sleep(s.delay)
}
return nil
}
Keys
Method
Parameters
ctx
context.Context
Returns
[]string
error
func (*fileStore) Keys(ctx context.Context) ([]string, error)
{
s.mu.Lock()
defer s.mu.Unlock()
entries, err := os.ReadDir(s.dir)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(entries))
for _, e := range entries {
if e.Type().IsRegular() {
keys = append(keys, strings.TrimSuffix(e.Name(), ".txt"))
}
}
return keys, nil
}
Fields
| Name | Type | Description |
|---|---|---|
| dir | string | |
| delay | time.Duration | |
| mu | sync.Mutex |
F
function
newFileStore
Parameters
dir
string
delay
Returns
examples/v1/super_advanced/main.go:48-51
func newFileStore(dir string, delay time.Duration) *fileStore
{
os.MkdirAll(dir, 0o755)
return &fileStore{dir: dir, delay: delay}
}
F
function
runWithoutWarp
Parameters
ctx
Returns
examples/v1/super_advanced/main.go:105-153
func runWithoutWarp(ctx context.Context) runStats
{
fmt.Println("=== Without Warp ===")
dir, err := os.MkdirTemp("", "warp_store")
if err != nil {
log.Printf("MkdirTemp error: %v", err)
return runStats{name: "without warp"}
}
defer os.RemoveAll(dir)
store := newFileStore(dir, 500*time.Microsecond)
if err := store.Set(ctx, "counter", 0); err != nil {
log.Printf("store.Set error: %v", err)
return runStats{name: "without warp"}
}
start := time.Now()
var mu sync.Mutex
var wg sync.WaitGroup
for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()
for range opsPerWorker {
mu.Lock()
v, _, err := store.Get(ctx, "counter")
if err != nil {
log.Printf("store.Get error: %v", err)
mu.Unlock()
return
}
if err := store.Set(ctx, "counter", v+1); err != nil {
log.Printf("store.Set error: %v", err)
mu.Unlock()
return
}
mu.Unlock()
}
}()
}
wg.Wait()
val, _, err := store.Get(ctx, "counter")
if err != nil {
log.Printf("store.Get error: %v", err)
return runStats{name: "without warp"}
}
elapsed := time.Since(start)
throughput := float64(opsPerWorker*workerCount) / elapsed.Seconds()
fmt.Printf("value=%d elapsed=%s throughput=%.2f ops/s\n", val, elapsed, throughput)
return runStats{name: "without warp", val: val, elapsed: elapsed, throughput: throughput}
}
Uses
F
function
runWithWarp
Parameters
ctx
Returns
examples/v1/super_advanced/main.go:155-258
func runWithWarp(ctx context.Context) runStats
{
fmt.Println("=== With Warp ===")
dir, err := os.MkdirTemp("", "warp_store")
if err != nil {
log.Printf("MkdirTemp error: %v", err)
return runStats{name: "with warp"}
}
defer os.RemoveAll(dir)
store := newFileStore(dir, 500*time.Microsecond)
if err := store.Set(ctx, "counter", 0); err != nil {
log.Printf("store.Set error: %v", err)
return runStats{name: "with warp"}
}
start := time.Now()
reg := metrics.NewRegistry()
metrics.RegisterCoreMetrics(reg)
bus := syncbus.NewInMemoryBus()
baseCache := cache.NewInMemory[merge.VersionedValue[int]]()
vCache := versioned.New(baseCache, workerCount*opsPerWorker, versioned.WithMetrics[int](reg))
engine := merge.NewEngine[int]()
w := core.New(vCache, store, bus, engine, core.WithMetrics[int](reg))
w.Merge("counter", func(old, new int) (int, error) { return old + new, nil })
w.Register("counter", core.ModeStrongLocal, time.Second, cache.WithSliding())
strat := adaptive.NewSlidingWindow(50*time.Millisecond, 20, time.Millisecond, 2*time.Second, reg)
w.RegisterDynamicTTL("hot", core.ModeStrongLocal, strat)
w.Warmup(ctx)
if err := w.Set(ctx, "hot", 1); err != nil {
log.Printf("w.Set hot error: %v", err)
return runStats{name: "with warp"}
}
leaseID, err := w.GrantLease(ctx, time.Second)
if err != nil {
log.Printf("GrantLease error: %v", err)
return runStats{name: "with warp"}
}
w.AttachKey(leaseID, "counter")
var wg sync.WaitGroup
var mu sync.Mutex
for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()
for range opsPerWorker {
txn := w.Txn(ctx)
txn.Set("counter", 1)
mu.Lock()
if err := txn.Commit(); err != nil {
log.Printf("txn.Commit error: %v", err)
mu.Unlock()
return
}
mu.Unlock()
}
}()
}
wg.Wait()
val, err := w.Get(ctx, "counter")
if err != nil {
log.Printf("w.Get error: %v", err)
return runStats{name: "with warp"}
}
past, err := w.GetAt(ctx, "counter", time.Now().Add(-time.Millisecond))
if err != nil {
log.Printf("w.GetAt error: %v", err)
return runStats{name: "with warp"}
}
fmt.Println("current:", val, "past:", past)
if err := w.Invalidate(ctx, "counter"); err != nil {
log.Printf("w.Invalidate error: %v", err)
}
w.RevokeLease(ctx, leaseID)
w.Unregister("counter")
mfs, err := reg.Gather()
if err != nil {
log.Printf("reg.Gather error: %v", err)
} else {
fmt.Println("metrics:")
for _, mf := range mfs {
for _, m := range mf.Metric {
if m.Counter != nil {
fmt.Printf(" %s %f\n", mf.GetName(), m.GetCounter().GetValue())
} else if m.Gauge != nil {
fmt.Printf(" %s %f\n", mf.GetName(), m.GetGauge().GetValue())
}
}
}
}
elapsed := time.Since(start)
throughput := float64(opsPerWorker*workerCount) / elapsed.Seconds()
fmt.Printf("value=%d elapsed=%s throughput=%.2f ops/s\n", val, elapsed, throughput)
return runStats{name: "with warp", val: val, elapsed: elapsed, throughput: throughput}
}
Uses
F
function
runWithWarpFull
Parameters
ctx
Returns
examples/v1/super_advanced/main.go:260-421
func runWithWarpFull(ctx context.Context) runStats
{
fmt.Println("=== With Warp + Validator & Bus ===")
dir, err := os.MkdirTemp("", "warp_store")
if err != nil {
log.Printf("MkdirTemp error: %v", err)
return runStats{name: "warp full"}
}
defer os.RemoveAll(dir)
store := newFileStore(dir, 500*time.Microsecond)
if err := store.Set(ctx, "counter", 0); err != nil {
log.Printf("store.Set error: %v", err)
return runStats{name: "warp full"}
}
start := time.Now()
reg := metrics.NewRegistry()
metrics.RegisterCoreMetrics(reg)
bus := syncbus.NewInMemoryBus()
wb := watchbus.NewInMemory()
baseCache := cache.NewInMemory[merge.VersionedValue[int]]()
vCache := versioned.New(baseCache, workerCount*opsPerWorker, versioned.WithMetrics[int](reg))
engine := merge.NewEngine[int]()
w := core.New(vCache, store, bus, engine, core.WithMetrics[int](reg))
w.Merge("counter", func(old, new int) (int, error) { return old + new, nil })
w.Register("counter", core.ModeStrongLocal, time.Second, cache.WithSliding())
strat := adaptive.NewSlidingWindow(50*time.Millisecond, 20, time.Millisecond, 2*time.Second, reg)
w.RegisterDynamicTTL("hot", core.ModeStrongLocal, strat)
w.Warmup(ctx)
if err := w.Set(ctx, "hot", 1); err != nil {
log.Printf("w.Set hot error: %v", err)
return runStats{name: "warp full"}
}
leaseID, err := w.GrantLease(ctx, time.Second)
if err != nil {
log.Printf("GrantLease error: %v", err)
return runStats{name: "warp full"}
}
w.AttachKey(leaseID, "counter")
v := w.Validator(validator.ModeAutoHeal, 100*time.Millisecond)
watchCtx, cancelWatch := context.WithCancel(ctx)
watchCh, err := core.WatchPrefix(watchCtx, wb, "event")
if err != nil {
log.Printf("WatchPrefix error: %v", err)
cancelWatch()
return runStats{name: "warp full"}
}
var watchWG sync.WaitGroup
watchWG.Add(1)
go func() {
defer watchWG.Done()
for {
select {
case <-watchCtx.Done():
if err := watchCtx.Err(); err != nil && err != context.Canceled {
log.Printf("watch error: %v", err)
}
return
case msg, ok := <-watchCh:
if !ok {
return
}
fmt.Println("watch:", string(msg))
}
}
}()
defer func() {
cancelWatch()
watchWG.Wait()
}()
locker := lock.NewInMemory(bus)
if err := locker.Acquire(ctx, "counter", 0); err != nil {
log.Printf("locker.Acquire error: %v", err)
return runStats{name: "warp full"}
}
var wg sync.WaitGroup
var mu sync.Mutex
for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()
for range opsPerWorker {
txn := w.Txn(ctx)
txn.Set("counter", 1)
mu.Lock()
if err := txn.Commit(); err != nil {
log.Printf("txn.Commit error: %v", err)
mu.Unlock()
return
}
mu.Unlock()
}
}()
}
wg.Wait()
valCtx, cancelVal := context.WithCancel(ctx)
go v.Run(valCtx)
time.Sleep(150 * time.Millisecond)
cancelVal()
if err := locker.Release(ctx, "counter"); err != nil {
log.Printf("locker.Release error: %v", err)
}
// allow validator to flush latest store state
time.Sleep(200 * time.Millisecond)
if err := wb.Publish(ctx, "event/warp", []byte("done")); err != nil {
log.Printf("wb.Publish error: %v", err)
}
val, _, err := store.Get(ctx, "counter")
if err != nil {
log.Printf("store.Get error: %v", err)
return runStats{name: "warp full"}
}
past, err := w.GetAt(ctx, "counter", time.Now().Add(-time.Millisecond))
if err != nil {
log.Printf("w.GetAt error: %v", err)
return runStats{name: "warp full"}
}
fmt.Println("current:", val, "past:", past)
if err := w.Invalidate(ctx, "counter"); err != nil {
log.Printf("w.Invalidate error: %v", err)
}
w.RevokeLease(ctx, leaseID)
w.Unregister("counter")
mfs, err := reg.Gather()
if err != nil {
log.Printf("reg.Gather error: %v", err)
} else {
fmt.Println("metrics:")
for _, mf := range mfs {
for _, m := range mf.Metric {
if m.Counter != nil {
fmt.Printf(" %s %f\n", mf.GetName(), m.GetCounter().GetValue())
} else if m.Gauge != nil {
fmt.Printf(" %s %f\n", mf.GetName(), m.GetGauge().GetValue())
}
}
}
}
elapsed := time.Since(start)
throughput := float64(opsPerWorker*workerCount) / elapsed.Seconds()
fmt.Printf("value=%d elapsed=%s throughput=%.2f ops/s\n", val, elapsed, throughput)
return runStats{name: "warp full", val: val, elapsed: elapsed, throughput: throughput}
}
Uses
F
function
main
examples/v1/super_advanced/main.go:423-442
func main()
{
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
without := runWithoutWarp(ctx)
withOnly := runWithWarp(ctx)
withFull := runWithWarpFull(ctx)
fmt.Println("\n=== Summary ===")
fmt.Printf("without warp: value=%d elapsed=%s throughput=%.2f ops/s\n", without.val, without.elapsed, without.throughput)
fmt.Printf("with warp: value=%d elapsed=%s throughput=%.2f ops/s\n", withOnly.val, withOnly.elapsed, withOnly.throughput)
fmt.Printf("with warp + extras: value=%d elapsed=%s throughput=%.2f ops/s\n", withFull.val, withFull.elapsed, withFull.throughput)
if withOnly.elapsed > 0 {
fmt.Printf("speedup warp vs without: %.2fx\n", without.elapsed.Seconds()/withOnly.elapsed.Seconds())
}
if withFull.elapsed > 0 {
fmt.Printf("speedup warp+extras vs without: %.2fx\n", without.elapsed.Seconds()/withFull.elapsed.Seconds())
}
}
F
function
main
examples/v1/telemetry/main.go:19-42
func main()
{
ctx := context.Background()
exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
log.Fatal(err)
}
tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exp))
defer func() { _ = tp.Shutdown(ctx) }()
otel.SetTracerProvider(tp)
reg := metrics.NewRegistry()
metrics.RegisterCoreMetrics(reg)
c := cache.NewInMemory[merge.Value[string]](cache.WithMetrics[merge.Value[string]](reg))
w := core.New[string](c, nil, nil, nil, core.WithMetrics[string](reg))
w.Register("greeting", core.ModeStrongLocal, time.Minute)
_ = w.Set(ctx, "greeting", "telemetry example")
_, _ = w.Get(ctx, "greeting")
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
log.Fatal(http.ListenAndServe(":2112", nil))
}
F
function
main
examples/v1/txn_merge/main.go:15-47
func main()
{
ctx := context.Background()
w := core.New[any](cache.NewInMemory[merge.Value[any]](), adapter.NewInMemoryStore[any](), nil, merge.NewEngine[any]())
w.Register("counter", core.ModeStrongLocal, time.Minute)
w.Register("logs", core.ModeStrongLocal, time.Minute)
w.Merge("counter", func(old, new any) (any, error) {
return old.(int) + new.(int), nil
})
w.Merge("logs", func(old, new any) (any, error) {
return append(old.([]string), new.([]string)...), nil
})
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
txn := w.Txn(ctx)
txn.Set("counter", 1)
txn.Set("logs", []string{fmt.Sprintf("event %d", i)})
if err := txn.Commit(); err != nil {
panic(err)
}
}(i)
}
wg.Wait()
counter, _ := w.Get(ctx, "counter")
logs, _ := w.Get(ctx, "logs")
fmt.Printf("counter: %d\n", counter.(int))
fmt.Printf("logs: %v\n", logs.([]string))
}
F
function
main
examples/v1/validator/main.go:15-34
func main()
{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := adapter.NewInMemoryStore[string]()
c := cache.NewInMemory[merge.Value[string]]()
w := core.New[string](c, store, nil, merge.NewEngine[string]())
w.Register("greeting", core.ModeStrongLocal, time.Minute)
_ = w.Set(ctx, "greeting", "hello")
_ = store.Set(ctx, "greeting", "hi")
v := w.Validator(validator.ModeAutoHeal, 20*time.Millisecond)
go v.Run(ctx)
time.Sleep(50 * time.Millisecond)
val, _ := w.Get(ctx, "greeting")
fmt.Println("healed value:", val)
fmt.Println("mismatches:", v.Metrics())
}
F
function
main
examples/v1/versioned/main.go:14-34
func main()
{
ctx := context.Background()
base := cache.NewInMemory[merge.VersionedValue[string]]()
c := versioned.New[string](base, 5)
w := core.New[string](c, nil, nil, merge.NewEngine[string]())
w.Register("status", core.ModeStrongLocal, time.Minute)
_ = w.Set(ctx, "status", "v1")
t1 := time.Now()
time.Sleep(10 * time.Millisecond)
_ = w.Set(ctx, "status", "v2")
latest, _ := w.Get(ctx, "status")
fmt.Println("latest:", latest)
old, err := w.GetAt(ctx, "status", t1)
if err != nil {
panic(err)
}
fmt.Println("at t1:", old)
}