main API

main

package

API reference for the main package.

F
function

main

cmd/bench/main.go:30-49
func main()

{
	flag.Parse()

	payload := make([]byte, *dataSize)
	for i := range payload {
		payload[i] = 'x'
	}

	targets := strings.Split(*target, ",")
	if *target == "all" {
		targets = []string{"warp-local", "warp-mesh", "ristretto", "warp-redis", "redis", "dragonfly"}
	}

	fmt.Printf("| %-15s | %-10s | %-12s | %-12s |\n", "System", "Ops/sec", "Avg Latency", "P99 Latency")
	fmt.Println("|:---|:---|:---|:---|")

	for _, t := range targets {
		runBenchmark(strings.TrimSpace(t), payload)
	}
}
F
function

runBenchmark

Parameters

name
string
payload
[]byte
cmd/bench/main.go:51-182
func runBenchmark(name string, payload []byte)

{
	var (
		getFn   func(ctx context.Context, key string) error
		setFn   func(ctx context.Context, key string, val []byte) error
		cleanup func()
	)

	ctx := context.Background()
	key := "bench:key"

	switch name {
	case "warp-local":
		w := presets.NewInMemoryStandalone[[]byte]()
		w.Register(key, core.ModeStrongLocal, time.Hour)
		setFn = func(ctx context.Context, k string, v []byte) error { return w.Set(ctx, k, v) }
		getFn = func(ctx context.Context, k string) error { _, err := w.Get(ctx, k); return err }

	case "warp-mesh":
		w := presets.NewMeshEventual[[]byte](mesh.MeshOptions{
			Port:      7946,
			Heartbeat: 1 * time.Hour,
		})
		w.Register(key, core.ModeEventualDistributed, time.Hour)
		setFn = func(ctx context.Context, k string, v []byte) error { return w.Set(ctx, k, v) }
		getFn = func(ctx context.Context, k string) error { _, err := w.Get(ctx, k); return err }

	case "ristretto":
		cache, _ := ristretto.NewCache(&ristretto.Config{
			NumCounters: 1e7,
			MaxCost:     1 << 30,
			BufferItems: 64,
		})
		setFn = func(ctx context.Context, k string, v []byte) error {
			cache.Set(k, v, 1)
			return nil
		}
		getFn = func(ctx context.Context, k string) error {
			_, found := cache.Get(k)
			if !found {
				return fmt.Errorf("not found")
			}
			return nil
		}
		cleanup = func() { cache.Close() }

	case "warp-redis":
		w := presets.NewRedisEventual[[]byte](presets.RedisOptions{Addr: *redisAddr})
		w.Register(key, core.ModeEventualDistributed, time.Hour)
		setFn = func(ctx context.Context, k string, v []byte) error { return w.Set(ctx, k, v) }
		getFn = func(ctx context.Context, k string) error { _, err := w.Get(ctx, k); return err }

	case "redis":
		r := redis.NewClient(&redis.Options{Addr: *redisAddr})
		setFn = func(ctx context.Context, k string, v []byte) error { return r.Set(ctx, k, v, 0).Err() }
		getFn = func(ctx context.Context, k string) error { return r.Get(ctx, k).Err() }
		cleanup = func() { r.Close() }

	case "dragonfly":
		r := redis.NewClient(&redis.Options{Addr: *dfAddr})
		setFn = func(ctx context.Context, k string, v []byte) error { return r.Set(ctx, k, v, 0).Err() }
		getFn = func(ctx context.Context, k string) error { return r.Get(ctx, k).Err() }
		cleanup = func() { r.Close() }

	default:
		log.Printf("Unknown target: %s", name)
		return
	}

	if cleanup != nil {
		defer cleanup()
	}

	// Warmup
	if err := setFn(ctx, key, payload); err != nil {
		// Log but continue, might be connection refused if container is missing
		// fmt.Printf("| %-15s | %-10s | %-12s | %-12s |\n", name, "FAIL", "-", "-")
		// return
	}

	var wg sync.WaitGroup
	var ops int64
	totalReqs := *requests
	latencies := make([]int64, totalReqs)

	start := time.Now()
	chunk := totalReqs / *concurrency

	for i := 0; i < *concurrency; i++ {
		wg.Add(1)
		go func(idx int) {
			defer wg.Done()
			offset := idx * chunk
			for j := 0; j < chunk; j++ {
				reqStart := time.Now()
				if err := getFn(ctx, key); err == nil {
					atomic.AddInt64(&ops, 1)
					latencies[offset+j] = time.Since(reqStart).Nanoseconds()
				}
			}
		}(i)
	}

	wg.Wait()
	elapsed := time.Since(start)

	if ops == 0 {
		fmt.Printf("| %-15s | %-10s | %-12s | %-12s |\n", name, "ERROR", "-", "-")
		return
	}

	throughput := float64(ops) / elapsed.Seconds()
	avgLat := float64(elapsed.Nanoseconds()) / float64(ops)

	// Calculate P99
	var p99 string = "-"
	validLats := make([]int64, 0, ops)
	for _, l := range latencies {
		if l > 0 {
			validLats = append(validLats, l)
		}
	}
	if len(validLats) > 0 {
		sort.Slice(validLats, func(i, j int) bool { return validLats[i] < validLats[j] })
		p99Idx := int(float64(len(validLats)) * 0.99)
		if p99Idx >= len(validLats) {
			p99Idx = len(validLats) - 1
		}
		p99 = fmt.Sprintf("%d", validLats[p99Idx])
	}

	fmt.Printf("| %-15s | %-10.0f | %-12.0f | %-12s |\n", name, throughput, avgLat, p99)
}
F
function

main

cmd/mesh-test/main.go:18-109
func main()

{
	id := flag.Int("id", 0, "Node ID")
	port := flag.Int("port", 7946, "Mesh Port")
	advertise := flag.String("adv", "", "Advertise Address")
	peer := flag.String("peer", "", "Seed Peer")
	flag.Parse()

	opts := mesh.MeshOptions{
		Port:          *port,
		AdvertiseAddr: *advertise,
		Heartbeat:     100 * time.Millisecond,
	}
	if *peer != "" {
		opts.Peers = []string{*peer}
	}

	w := presets.NewMeshEventual[string](opts)
	w.Register("test:key", core.ModeEventualDistributed, time.Hour)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	// Node 0 will be the publisher
	if *id == 0 {
		go func() {
			ticker := time.NewTicker(1 * time.Second)
			defer ticker.Stop()
			count := 0
			for {
				select {
				case <-ctx.Done():
					return
				case <-ticker.C:
					val := fmt.Sprintf("val-%d", count)
					log.Printf("[Node %d] Setting test:key to %s", *id, val)
					if err := w.Set(ctx, "test:key", val); err != nil {
						log.Printf("[Node %d] Set error: %v", *id, err)
					}
					count++
				}
			}
		}()
	} else {
		// Other nodes will monitor for changes
		// In go-warp, Warp itself doesn't automatically subscribe to the bus for all keys yet,
		// so we bridge it manually as seen in distributed_test.go
		ch, _ := w.Subscribe(ctx, "test:key")
		go func() {
			for range ch {
				log.Printf("[Node %d] Received Invalidation for test:key", *id)
				_ = w.InvalidateLocal(ctx, "test:key")
			}
		}()

		var lastVal string
		go func() {
			for {
				select {
				case <-ctx.Done():
					return
				default:
					val, _ := w.Get(ctx, "test:key")
					if val != lastVal && val != "" {
						fmt.Printf("[NODE_%d_RECEIVED] %s\n", *id, val)
						lastVal = val
					}
					time.Sleep(100 * time.Millisecond)
				}
			}
		}()
	}

	// Monitor peers
	go func() {
		ticker := time.NewTicker(2 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				log.Printf("[Node %d] Known peers: %v", *id, w.Peers())
			}
		}
	}()

	<-sigCh
	log.Printf("[Node %d] Shutting down...", *id)
}
F
function

main

cmd/smoke-cluster/main.go:20-79
func main()

{
	port := flag.Int("port", 8080, "HTTP port")
	meshPort := flag.Int("mesh-port", 7946, "Mesh port")
	peers := flag.String("peers", "", "Comma-separated list of peers (e.g. 127.0.0.1:7947)")
	mode := flag.String("mode", "mesh", "mesh or redis")
	redisAddr := flag.String("redis-addr", "localhost:6379", "Redis address")
	flag.Parse()

	var w *core.Warp[string]

	if *mode == "mesh" {
		mOpts := mesh.MeshOptions{
			Port:          *meshPort,
			AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", *meshPort),
		}
		if *peers != "" {
			mOpts.Peers = strings.Split(*peers, ",")
		}

		bus, _ := mesh.NewMeshBus(mOpts)
		c := cache.NewInMemory[merge.Value[string]]()

		// Use Redis as shared L2 store even for Mesh bus test
		rClient := redis.NewClient(&redis.Options{Addr: *redisAddr})
		store := adapter.NewRedisStore[string](rClient)

		engine := merge.NewEngine[string]()
		w = core.New[string](c, store, bus, engine)
	} else {
		w = presets.NewRedisEventual[string](presets.RedisOptions{
			Addr: *redisAddr,
		})
	}

	// Register a test key
	w.Register("smoke-key", core.ModeEventualDistributed, time.Hour)

	http.HandleFunc("/set", func(wWriter http.ResponseWriter, r *http.Request) {
		key := r.URL.Query().Get("key")
		val := r.URL.Query().Get("val")
		if err := w.Set(r.Context(), key, val); err != nil {
			http.Error(wWriter, err.Error(), 500)
			return
		}
		fmt.Fprintf(wWriter, "OK")
	})

	http.HandleFunc("/get", func(wWriter http.ResponseWriter, r *http.Request) {
		key := r.URL.Query().Get("key")
		val, err := w.Get(r.Context(), key)
		if err != nil {
			http.Error(wWriter, "warp: not found", 404)
			return
		}
		fmt.Fprintf(wWriter, "%s", val)
	})

	log.Printf("Smoke test node listening on :%d (mode: %s)...", *port, *mode)
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}
F
function

main

cmd/warp-bench/main.go:21-76
func main()

{
	flag.Parse()

	log.Printf("Starting benchmark: %d requests, %d concurrency, %d bytes payload", *requests, *concurrency, *dataSize)

	log.Println("Initializing Warp (InMemory Standalone)...")
	w := presets.NewInMemoryStandalone[[]byte]()

	ctx := context.Background()
	key := "bench_key"
	val := make([]byte, *dataSize)
	for i := 0; i < *dataSize; i++ {
		val[i] = 'x'
	}

	w.Register(key, core.ModeStrongLocal, time.Hour)
	if err := w.Set(ctx, key, val); err != nil {
		log.Fatalf("Setup failed: %v", err)
	}

	var wg sync.WaitGroup
	var ops int64
	var errorsCount int64

	start := time.Now()

	totalReqs := *requests
	reqsPerWorker := totalReqs / *concurrency

	for i := 0; i < *concurrency; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < reqsPerWorker; j++ {
				_, err := w.Get(ctx, key)
				if err != nil {
					atomic.AddInt64(&errorsCount, 1)
				}
				atomic.AddInt64(&ops, 1)
			}
		}()
	}

	wg.Wait()
	elapsed := time.Since(start)

	throughput := float64(ops) / elapsed.Seconds()
	avgLatency := elapsed.Seconds() / float64(ops) * 1e9 // ns

	log.Printf("Finished in %v", elapsed)
	log.Printf("Throughput: %.2f req/s", throughput)
	log.Printf("Avg Latency: %.2f ns", avgLatency)
	if errorsCount > 0 {
		log.Printf("Errors: %d", errorsCount)
	}
}
S
struct

LargeObject

LargeObject simulates a heavy payload

cmd/warp-bench-stress/main.go:27-29
type LargeObject struct

Fields

Name Type Description
Data [1024]byte
F
function

main

cmd/warp-bench-stress/main.go:31-123
func main()

{
	flag.Parse()

	go func() {
		log.Println("Starting pprof on :6060")
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	targetBytes := uint64(*allocGB) * 1024 * 1024 * 1024
	approxEntrySize := uint64(1200)
	numEntries := int(targetBytes / approxEntrySize)

	log.Printf("Targeting %d GB (~%d entries)", *allocGB, numEntries)

	c := cache.NewInMemory[merge.Value[LargeObject]](
		cache.WithMaxEntries[merge.Value[LargeObject]](numEntries),
	)
	w := core.New[LargeObject](c, nil, nil, nil)

	ctx, cancel := context.WithTimeout(context.Background(), *duration)
	defer cancel()

	log.Println("Phase 1: Filling Cache...")
	fillStart := time.Now()
	var wg sync.WaitGroup
	chunkSize := numEntries / *procs

	for p := 0; p < *procs; p++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			start := id * chunkSize
			end := start + chunkSize
			for i := start; i < end; i++ {
				key := fmt.Sprintf("key-%d", i)
				// Must register key first!
				w.Register(key, core.ModeStrongLocal, 1*time.Hour)
				if err := w.Set(context.Background(), key, LargeObject{}); err != nil {
					log.Printf("Set error: %v", err)
				}
				if i%100000 == 0 {
					runtime.Gosched()
				}
			}
		}(p)
	}
	wg.Wait()
	log.Printf("Filled in %v. Current Memory:", time.Since(fillStart))
	printMemStats()

	log.Printf("Phase 2: Churing (%v)...", *duration)

	for p := 0; p < *procs; p++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(id)))

			for {
				select {
				case <-ctx.Done():
					return
				default:
					k := r.Intn(numEntries)
					key := fmt.Sprintf("key-%d", k)

					if r.Float32() < 0.2 {
						w.Set(context.Background(), key, LargeObject{})
					} else {
						w.Get(context.Background(), key)
					}
				}
			}
		}(p)
	}

	monitorTicker := time.NewTicker(5 * time.Second)
	defer monitorTicker.Stop()

	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			case <-monitorTicker.C:
				printMemStats()
			}
		}
	}()

	wg.Wait()
	log.Println("Stress Test Completed.")
}
F
function

printMemStats

cmd/warp-bench-stress/main.go:125-132
func printMemStats()

{
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("Alloc = %v MiB", m.Alloc/1024/1024)
	fmt.Printf("\tTotalAlloc = %v MiB", m.TotalAlloc/1024/1024)
	fmt.Printf("\tSys = %v MiB", m.Sys/1024/1024)
	fmt.Printf("\tNumGC = %v\n", m.NumGC)
}
S
struct

server

cmd/warp-proxy/main.go:39-41
type server struct

Methods

handler
Method

Parameters

func (*server) handler(conn redcon.Conn, cmd redcon.Command)
{
	if len(cmd.Args) == 0 {
		return
	}
	opsProcessed.Inc()

	switch strings.ToUpper(string(cmd.Args[0])) {
	case "GET":
		if len(cmd.Args) != 2 {
			conn.WriteError("ERR wrong number of arguments for 'get' command")
			return
		}
		key := string(cmd.Args[1])
		val, err := s.warp.Get(context.Background(), key)
		if err != nil {
			if err == core.ErrNotFound {
				conn.WriteNull()
			} else if err == core.ErrUnregistered {
				// Auto-register on missed access (Sidecar behavior)
				if s.warp.Register(key, core.ModeStrongLocal, 5*time.Minute) {
					// Retry get after registration
					val, err = s.warp.Get(context.Background(), key)
					if err == nil {
						conn.WriteBulk(val)
					} else {
						conn.WriteNull()
					}
				} else {
					conn.WriteError(err.Error())
				}
			} else {
				conn.WriteError(err.Error())
			}
		} else {
			conn.WriteBulk(val)
		}

	case "SET":
		if len(cmd.Args) != 3 {
			conn.WriteError("ERR wrong number of arguments for 'set' command")
			return
		}
		key := string(cmd.Args[1])
		val := cmd.Args[2]

		// Check registration
		_, err := s.warp.Get(context.Background(), key)
		if err == core.ErrUnregistered {
			s.warp.Register(key, core.ModeStrongLocal, 5*time.Minute)
		}

		err = s.warp.Set(context.Background(), key, val)
		if err != nil {
			conn.WriteError(err.Error())
		} else {
			conn.WriteString("OK")
		}

	case "PING":
		if len(cmd.Args) > 1 {
			conn.WriteBulk(cmd.Args[1])
		} else {
			conn.WriteString("PONG")
		}

	case "INFO":
		conn.WriteBulkString("# Server\r\nredis_version:6.0.0\r\nwarp_version:1.0.0\r\n")

	case "COMMAND", "CLIENT", "AUTH", "SELECT":
		// Admin/Connection commands as No-Op to satisfy client libraries
		conn.WriteString("OK")

	case "QUIT":
		conn.WriteString("OK")
		conn.Close()

	default:
		conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
	}
}

Fields

Name Type Description
warp *core.Warp[[]byte]
F
function

main

cmd/warp-proxy/main.go:43-81
func main()

{
	flag.Parse()

	c := cache.NewInMemory[merge.Value[[]byte]](
		cache.WithMaxEntries[merge.Value[[]byte]](10000),
	)
	bus := syncbus.NewInMemoryBus()
	w := core.New[[]byte](c, nil, bus, nil)

	srv := &server{warp: w}

	go func() {
		mux := http.NewServeMux()
		mux.Handle("/metrics", promhttp.Handler())
		log.Printf("metrics listening on :%d", *metricsPort)
		if err := http.ListenAndServe(fmt.Sprintf(":%d", *metricsPort), mux); err != nil {
			log.Printf("metrics server error: %v", err)
		}
	}()

	listenAddr := fmt.Sprintf("%s:%d", *addr, *port)
	log.Printf("warp-proxy listening on %s (redcon)", listenAddr)

	err := redcon.ListenAndServe(listenAddr,
		srv.handler,
		func(conn redcon.Conn) bool {
			// Accept
			activeConns.Inc()
			return true
		},
		func(conn redcon.Conn, err error) {
			// Closed
			activeConns.Dec()
		},
	)
	if err != nil {
		log.Fatal(err)
	}
}
F
function

main

examples/v1/main.go:15-39
func main()

{
	ctx := context.Background()
	w := core.New[string](cache.NewInMemory[merge.Value[string]](), adapter.NewInMemoryStore[string](), nil, merge.NewEngine[string]())
	w.Register("greeting", core.ModeStrongLocal, time.Minute)
	if err := w.Set(ctx, "greeting", "Warp example"); err != nil {
		panic(err)
	}
	v, _ := w.Get(ctx, "greeting")
	fmt.Println(v)

	// WatchBus prefix example
	bus := watchbus.NewInMemory()
	ch, err := core.WatchPrefix(ctx, bus, "user:")
	if err != nil {
		panic(err)
	}
	go func() {
		for msg := range ch {
			fmt.Printf("prefix event: %s\n", msg)
		}
	}()
	_ = bus.Publish(ctx, "user:1", []byte("created"))
	_ = bus.PublishPrefix(ctx, "user:", []byte("updated"))
	time.Sleep(100 * time.Millisecond)
}
F
function

main

examples/v1/advanced/main.go:16-65
func main()

{
	ctx := context.Background()
	store := adapter.NewInMemoryStore[int]()
	bus := syncbus.NewInMemoryBus()
	engine := merge.NewEngine[int]()
	engine.Register("counter", func(old, new int) (int, error) {
		return old + new, nil
	})

	w1 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)
	w2 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)

	w1.Register("counter", core.ModeEventualDistributed, time.Minute)
	w2.Register("counter", core.ModeEventualDistributed, time.Minute)

	ch, _ := bus.Subscribe(ctx, "counter")
	defer bus.Unsubscribe(ctx, "counter", ch)
	go func() {
		for range ch {
			_ = w2.Invalidate(ctx, "counter")
		}
	}()

	if err := w1.Set(ctx, "counter", 10); err != nil {
		panic(err)
	}
	if err := w2.Set(ctx, "counter", 5); err != nil {
		panic(err)
	}

	time.Sleep(10 * time.Millisecond)
	v1, _ := w1.Get(ctx, "counter")
	v2, _ := w2.Get(ctx, "counter")
	fmt.Println("node1:", v1, "node2:", v2)

	// WatchBus prefix example
	wb := watchbus.NewInMemory()
	pch, err := core.WatchPrefix(ctx, wb, "task:")
	if err != nil {
		panic(err)
	}
	go func() {
		for msg := range pch {
			fmt.Printf("task event: %s\n", msg)
		}
	}()
	_ = wb.Publish(ctx, "task:1", []byte("done"))
	_ = wb.PublishPrefix(ctx, "task:", []byte("all done"))
	time.Sleep(100 * time.Millisecond)
}
F
function

main

examples/v1/leases/main.go:13-32
func main()

{
	ctx := context.Background()
	w := core.New[string](cache.NewInMemory[merge.Value[string]](), nil, nil, merge.NewEngine[string]())

	w.Register("task", core.ModeStrongLocal, time.Minute)
	id, err := w.GrantLease(ctx, time.Second)
	if err != nil {
		panic(err)
	}
	w.AttachKey(id, "task")

	_ = w.Set(ctx, "task", "running")
	v, _ := w.Get(ctx, "task")
	fmt.Println("before revoke:", v)

	w.RevokeLease(ctx, id)
	if _, err := w.Get(ctx, "task"); err != nil {
		fmt.Println("after revoke: not found")
	}
}
F
function

main

examples/v1/lock/main.go:12-35
func main()

{
	ctx := context.Background()
	bus := syncbus.NewInMemoryBus()

	node1 := lock.NewInMemory(bus)
	node2 := lock.NewInMemory(bus)

	go func() {
		if err := node1.Acquire(ctx, "leader", time.Second); err != nil {
			panic(err)
		}
		fmt.Println("node1 elected as leader")
		time.Sleep(500 * time.Millisecond)
		_ = node1.Release(ctx, "leader")
	}()

	time.Sleep(100 * time.Millisecond)

	if err := node2.Acquire(ctx, "leader", time.Second); err != nil {
		panic(err)
	}
	fmt.Println("node2 elected as leader")
	_ = node2.Release(ctx, "leader")
}
F
function

main

examples/v1/lock/nats/main.go:14-44
func main()

{
	ctx := context.Background()

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		panic(err)
	}
	defer nc.Close()

	bus := busnats.NewNATSBus(nc)

	node1 := lock.NewInMemory(bus)
	node2 := lock.NewInMemory(bus)

	go func() {
		if err := node1.Acquire(ctx, "leader", time.Second); err != nil {
			panic(err)
		}
		fmt.Println("node1 elected as leader")
		time.Sleep(500 * time.Millisecond)
		_ = node1.Release(ctx, "leader")
	}()

	time.Sleep(100 * time.Millisecond)

	if err := node2.Acquire(ctx, "leader", time.Second); err != nil {
		panic(err)
	}
	fmt.Println("node2 elected as leader")
	_ = node2.Release(ctx, "leader")
}
F
function

main

examples/v1/mesh/main.go:13-65
func main()

{
	ctx := context.Background()

	// Node 1 setup
	w1 := presets.NewMeshEventual[string](mesh.MeshOptions{
		Port:          7946,
		AdvertiseAddr: "127.0.0.1:7946",
		Peers:         []string{"127.0.0.1:7947"},
	})

	// Node 2 setup
	w2 := presets.NewMeshEventual[string](mesh.MeshOptions{
		Port:          7947,
		AdvertiseAddr: "127.0.0.1:7947",
		Peers:         []string{"127.0.0.1:7946"},
	})

	// Register key on both nodes.
	// Thanks to the new automatic subscription logic,
	// nodes will now automatically listen for invalidations on the bus.
	w1.Register("shared-key", core.ModeEventualDistributed, time.Minute)
	w2.Register("shared-key", core.ModeEventualDistributed, time.Minute)

	// Node 1 sets a value
	fmt.Println("Node 1: Setting 'shared-key' to 'WarpSpeed'...")
	if err := w1.Set(ctx, "shared-key", "WarpSpeed"); err != nil {
		panic(err)
	}

	// Wait for gossip/propagation
	time.Sleep(500 * time.Millisecond)

	// Node 2 gets the value
	val, err := w2.Get(ctx, "shared-key")
	if err != nil {
		fmt.Printf("Node 2: Error getting key: %v\n", err)
	} else {
		fmt.Printf("Node 2: Successfully retrieved value: %s\n", val)
	}

	// Node 2 updates the value
	fmt.Println("Node 2: Updating 'shared-key' to 'HyperDrive'...")
	if err := w2.Set(ctx, "shared-key", "HyperDrive"); err != nil {
		panic(err)
	}

	// Wait for gossip
	time.Sleep(500 * time.Millisecond)

	// Node 1 gets the updated value
	val1, _ := w1.Get(ctx, "shared-key")
	fmt.Printf("Node 1: Current value: %s\n", val1)
}
F
function

main

examples/v1/redis/main.go:17-54
func main()

{
	ctx := context.Background()
	client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	defer client.Close()

	store := adapter.NewInMemoryStore[int]()
	bus := busredis.NewRedisBus(busredis.RedisBusOptions{Client: client, Region: "us-east-1"})
	engine := merge.NewEngine[int]()
	engine.Register("counter", func(old, new int) (int, error) {
		return old + new, nil
	})

	w1 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)
	w2 := core.New(cache.NewInMemory[merge.Value[int]](), store, bus, engine)

	w1.Register("counter", core.ModeEventualDistributed, time.Minute)
	w2.Register("counter", core.ModeEventualDistributed, time.Minute)

	ch, _ := bus.Subscribe(ctx, "counter")
	defer bus.Unsubscribe(ctx, "counter", ch)
	go func() {
		for range ch {
			_ = w2.Invalidate(ctx, "counter")
		}
	}()

	if err := w1.Set(ctx, "counter", 10); err != nil {
		panic(err)
	}
	if err := w2.Set(ctx, "counter", 5); err != nil {
		panic(err)
	}

	time.Sleep(10 * time.Millisecond)
	v1, _ := w1.Get(ctx, "counter")
	v2, _ := w2.Get(ctx, "counter")
	fmt.Println("node1:", v1, "node2:", v2)
}
F
function

main

examples/v1/ristretto/main.go:16-29
func main()

{
	ctx := context.Background()

	cfg := &ristretto.Config{NumCounters: 1e5, MaxCost: 1 << 20, BufferItems: 64}
	c := cache.NewRistretto[merge.Value[string]](cache.WithRistretto(cfg))

	w := core.New[string](c, adapter.NewInMemoryStore[string](), nil, merge.NewEngine[string]())
	w.Register("greeting", core.ModeStrongLocal, time.Minute)
	if err := w.Set(ctx, "greeting", "Warp example"); err != nil {
		panic(err)
	}
	v, _ := w.Get(ctx, "greeting")
	fmt.Println(v)
}
S
struct

runStats

examples/v1/super_advanced/main.go:33-38
type runStats struct

Fields

Name Type Description
name string
val int
elapsed time.Duration
throughput float64
S
struct

fileStore

fileStore is a simple disk-backed store introducing real I/O overhead
for each operation to highlight Warp’s caching benefits.

examples/v1/super_advanced/main.go:42-46
type fileStore struct

Methods

path
Method

Parameters

key string

Returns

string
func (*fileStore) path(key string) string
{
	return filepath.Join(s.dir, key+".txt")
}
Get
Method

Parameters

key string

Returns

int
bool
error
func (*fileStore) Get(ctx context.Context, key string) (int, bool, error)
{
	s.mu.Lock()
	defer s.mu.Unlock()
	b, err := os.ReadFile(s.path(key))
	if errors.Is(err, os.ErrNotExist) {
		return 0, false, nil
	}
	if err != nil {
		return 0, false, err
	}
	v, err := strconv.Atoi(string(b))
	if err != nil {
		return 0, false, err
	}
	if s.delay > 0 {
		time.Sleep(s.delay)
	}
	return v, true, nil
}
Set
Method

Parameters

key string
value int

Returns

error
func (*fileStore) Set(ctx context.Context, key string, value int) error
{
	s.mu.Lock()
	defer s.mu.Unlock()
	if err := os.WriteFile(s.path(key), []byte(strconv.Itoa(value)), 0o644); err != nil {
		return err
	}
	if s.delay > 0 {
		time.Sleep(s.delay)
	}
	return nil
}
Keys
Method

Parameters

Returns

[]string
error
func (*fileStore) Keys(ctx context.Context) ([]string, error)
{
	s.mu.Lock()
	defer s.mu.Unlock()
	entries, err := os.ReadDir(s.dir)
	if err != nil {
		return nil, err
	}
	keys := make([]string, 0, len(entries))
	for _, e := range entries {
		if e.Type().IsRegular() {
			keys = append(keys, strings.TrimSuffix(e.Name(), ".txt"))
		}
	}
	return keys, nil
}

Fields

Name Type Description
dir string
delay time.Duration
mu sync.Mutex
F
function

newFileStore

Parameters

dir
string

Returns

examples/v1/super_advanced/main.go:48-51
func newFileStore(dir string, delay time.Duration) *fileStore

{
	os.MkdirAll(dir, 0o755)
	return &fileStore{dir: dir, delay: delay}
}
F
function

runWithoutWarp

Parameters

Returns

examples/v1/super_advanced/main.go:105-153
func runWithoutWarp(ctx context.Context) runStats

{
	fmt.Println("=== Without Warp ===")
	dir, err := os.MkdirTemp("", "warp_store")
	if err != nil {
		log.Printf("MkdirTemp error: %v", err)
		return runStats{name: "without warp"}
	}
	defer os.RemoveAll(dir)
	store := newFileStore(dir, 500*time.Microsecond)
	if err := store.Set(ctx, "counter", 0); err != nil {
		log.Printf("store.Set error: %v", err)
		return runStats{name: "without warp"}
	}

	start := time.Now()
	var mu sync.Mutex
	var wg sync.WaitGroup
	for range workerCount {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for range opsPerWorker {
				mu.Lock()
				v, _, err := store.Get(ctx, "counter")
				if err != nil {
					log.Printf("store.Get error: %v", err)
					mu.Unlock()
					return
				}
				if err := store.Set(ctx, "counter", v+1); err != nil {
					log.Printf("store.Set error: %v", err)
					mu.Unlock()
					return
				}
				mu.Unlock()
			}
		}()
	}
	wg.Wait()
	val, _, err := store.Get(ctx, "counter")
	if err != nil {
		log.Printf("store.Get error: %v", err)
		return runStats{name: "without warp"}
	}
	elapsed := time.Since(start)
	throughput := float64(opsPerWorker*workerCount) / elapsed.Seconds()
	fmt.Printf("value=%d elapsed=%s throughput=%.2f ops/s\n", val, elapsed, throughput)
	return runStats{name: "without warp", val: val, elapsed: elapsed, throughput: throughput}
}
F
function

runWithWarp

Parameters

Returns

examples/v1/super_advanced/main.go:155-258
func runWithWarp(ctx context.Context) runStats

{
	fmt.Println("=== With Warp ===")
	dir, err := os.MkdirTemp("", "warp_store")
	if err != nil {
		log.Printf("MkdirTemp error: %v", err)
		return runStats{name: "with warp"}
	}
	defer os.RemoveAll(dir)
	store := newFileStore(dir, 500*time.Microsecond)
	if err := store.Set(ctx, "counter", 0); err != nil {
		log.Printf("store.Set error: %v", err)
		return runStats{name: "with warp"}
	}

	start := time.Now()
	reg := metrics.NewRegistry()
	metrics.RegisterCoreMetrics(reg)

	bus := syncbus.NewInMemoryBus()

	baseCache := cache.NewInMemory[merge.VersionedValue[int]]()
	vCache := versioned.New(baseCache, workerCount*opsPerWorker, versioned.WithMetrics[int](reg))
	engine := merge.NewEngine[int]()
	w := core.New(vCache, store, bus, engine, core.WithMetrics[int](reg))

	w.Merge("counter", func(old, new int) (int, error) { return old + new, nil })

	w.Register("counter", core.ModeStrongLocal, time.Second, cache.WithSliding())
	strat := adaptive.NewSlidingWindow(50*time.Millisecond, 20, time.Millisecond, 2*time.Second, reg)
	w.RegisterDynamicTTL("hot", core.ModeStrongLocal, strat)

	w.Warmup(ctx)
	if err := w.Set(ctx, "hot", 1); err != nil {
		log.Printf("w.Set hot error: %v", err)
		return runStats{name: "with warp"}
	}
	leaseID, err := w.GrantLease(ctx, time.Second)
	if err != nil {
		log.Printf("GrantLease error: %v", err)
		return runStats{name: "with warp"}
	}
	w.AttachKey(leaseID, "counter")

	var wg sync.WaitGroup
	var mu sync.Mutex
	for range workerCount {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for range opsPerWorker {
				txn := w.Txn(ctx)
				txn.Set("counter", 1)
				mu.Lock()
				if err := txn.Commit(); err != nil {
					log.Printf("txn.Commit error: %v", err)
					mu.Unlock()
					return
				}
				mu.Unlock()
			}
		}()
	}
	wg.Wait()

	val, err := w.Get(ctx, "counter")
	if err != nil {
		log.Printf("w.Get error: %v", err)
		return runStats{name: "with warp"}
	}
	past, err := w.GetAt(ctx, "counter", time.Now().Add(-time.Millisecond))
	if err != nil {
		log.Printf("w.GetAt error: %v", err)
		return runStats{name: "with warp"}
	}
	fmt.Println("current:", val, "past:", past)

	if err := w.Invalidate(ctx, "counter"); err != nil {
		log.Printf("w.Invalidate error: %v", err)
	}
	w.RevokeLease(ctx, leaseID)
	w.Unregister("counter")

	mfs, err := reg.Gather()
	if err != nil {
		log.Printf("reg.Gather error: %v", err)
	} else {
		fmt.Println("metrics:")
		for _, mf := range mfs {
			for _, m := range mf.Metric {
				if m.Counter != nil {
					fmt.Printf("  %s %f\n", mf.GetName(), m.GetCounter().GetValue())
				} else if m.Gauge != nil {
					fmt.Printf("  %s %f\n", mf.GetName(), m.GetGauge().GetValue())
				}
			}
		}
	}

	elapsed := time.Since(start)
	throughput := float64(opsPerWorker*workerCount) / elapsed.Seconds()
	fmt.Printf("value=%d elapsed=%s throughput=%.2f ops/s\n", val, elapsed, throughput)

	return runStats{name: "with warp", val: val, elapsed: elapsed, throughput: throughput}
}
F
function

runWithWarpFull

Parameters

Returns

examples/v1/super_advanced/main.go:260-421
func runWithWarpFull(ctx context.Context) runStats

{
	fmt.Println("=== With Warp + Validator & Bus ===")
	dir, err := os.MkdirTemp("", "warp_store")
	if err != nil {
		log.Printf("MkdirTemp error: %v", err)
		return runStats{name: "warp full"}
	}
	defer os.RemoveAll(dir)
	store := newFileStore(dir, 500*time.Microsecond)
	if err := store.Set(ctx, "counter", 0); err != nil {
		log.Printf("store.Set error: %v", err)
		return runStats{name: "warp full"}
	}

	start := time.Now()
	reg := metrics.NewRegistry()
	metrics.RegisterCoreMetrics(reg)

	bus := syncbus.NewInMemoryBus()
	wb := watchbus.NewInMemory()

	baseCache := cache.NewInMemory[merge.VersionedValue[int]]()
	vCache := versioned.New(baseCache, workerCount*opsPerWorker, versioned.WithMetrics[int](reg))
	engine := merge.NewEngine[int]()
	w := core.New(vCache, store, bus, engine, core.WithMetrics[int](reg))

	w.Merge("counter", func(old, new int) (int, error) { return old + new, nil })

	w.Register("counter", core.ModeStrongLocal, time.Second, cache.WithSliding())
	strat := adaptive.NewSlidingWindow(50*time.Millisecond, 20, time.Millisecond, 2*time.Second, reg)
	w.RegisterDynamicTTL("hot", core.ModeStrongLocal, strat)

	w.Warmup(ctx)
	if err := w.Set(ctx, "hot", 1); err != nil {
		log.Printf("w.Set hot error: %v", err)
		return runStats{name: "warp full"}
	}
	leaseID, err := w.GrantLease(ctx, time.Second)
	if err != nil {
		log.Printf("GrantLease error: %v", err)
		return runStats{name: "warp full"}
	}
	w.AttachKey(leaseID, "counter")

	v := w.Validator(validator.ModeAutoHeal, 100*time.Millisecond)

	watchCtx, cancelWatch := context.WithCancel(ctx)
	watchCh, err := core.WatchPrefix(watchCtx, wb, "event")
	if err != nil {
		log.Printf("WatchPrefix error: %v", err)
		cancelWatch()
		return runStats{name: "warp full"}
	}

	var watchWG sync.WaitGroup
	watchWG.Add(1)
	go func() {
		defer watchWG.Done()
		for {
			select {
			case <-watchCtx.Done():
				if err := watchCtx.Err(); err != nil && err != context.Canceled {
					log.Printf("watch error: %v", err)
				}
				return
			case msg, ok := <-watchCh:
				if !ok {
					return
				}
				fmt.Println("watch:", string(msg))
			}
		}
	}()

	defer func() {
		cancelWatch()
		watchWG.Wait()
	}()

	locker := lock.NewInMemory(bus)
	if err := locker.Acquire(ctx, "counter", 0); err != nil {
		log.Printf("locker.Acquire error: %v", err)
		return runStats{name: "warp full"}
	}

	var wg sync.WaitGroup
	var mu sync.Mutex
	for range workerCount {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for range opsPerWorker {
				txn := w.Txn(ctx)
				txn.Set("counter", 1)
				mu.Lock()
				if err := txn.Commit(); err != nil {
					log.Printf("txn.Commit error: %v", err)
					mu.Unlock()
					return
				}
				mu.Unlock()
			}
		}()
	}
	wg.Wait()

	valCtx, cancelVal := context.WithCancel(ctx)
	go v.Run(valCtx)
	time.Sleep(150 * time.Millisecond)
	cancelVal()

	if err := locker.Release(ctx, "counter"); err != nil {
		log.Printf("locker.Release error: %v", err)
	}

	// allow validator to flush latest store state
	time.Sleep(200 * time.Millisecond)

	if err := wb.Publish(ctx, "event/warp", []byte("done")); err != nil {
		log.Printf("wb.Publish error: %v", err)
	}

	val, _, err := store.Get(ctx, "counter")
	if err != nil {
		log.Printf("store.Get error: %v", err)
		return runStats{name: "warp full"}
	}
	past, err := w.GetAt(ctx, "counter", time.Now().Add(-time.Millisecond))
	if err != nil {
		log.Printf("w.GetAt error: %v", err)
		return runStats{name: "warp full"}
	}
	fmt.Println("current:", val, "past:", past)

	if err := w.Invalidate(ctx, "counter"); err != nil {
		log.Printf("w.Invalidate error: %v", err)
	}
	w.RevokeLease(ctx, leaseID)
	w.Unregister("counter")

	mfs, err := reg.Gather()
	if err != nil {
		log.Printf("reg.Gather error: %v", err)
	} else {
		fmt.Println("metrics:")
		for _, mf := range mfs {
			for _, m := range mf.Metric {
				if m.Counter != nil {
					fmt.Printf("  %s %f\n", mf.GetName(), m.GetCounter().GetValue())
				} else if m.Gauge != nil {
					fmt.Printf("  %s %f\n", mf.GetName(), m.GetGauge().GetValue())
				}
			}
		}
	}

	elapsed := time.Since(start)
	throughput := float64(opsPerWorker*workerCount) / elapsed.Seconds()
	fmt.Printf("value=%d elapsed=%s throughput=%.2f ops/s\n", val, elapsed, throughput)

	return runStats{name: "warp full", val: val, elapsed: elapsed, throughput: throughput}
}
F
function

main

examples/v1/super_advanced/main.go:423-442
func main()

{
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	without := runWithoutWarp(ctx)
	withOnly := runWithWarp(ctx)
	withFull := runWithWarpFull(ctx)

	fmt.Println("\n=== Summary ===")
	fmt.Printf("without warp: value=%d elapsed=%s throughput=%.2f ops/s\n", without.val, without.elapsed, without.throughput)
	fmt.Printf("with warp: value=%d elapsed=%s throughput=%.2f ops/s\n", withOnly.val, withOnly.elapsed, withOnly.throughput)
	fmt.Printf("with warp + extras: value=%d elapsed=%s throughput=%.2f ops/s\n", withFull.val, withFull.elapsed, withFull.throughput)

	if withOnly.elapsed > 0 {
		fmt.Printf("speedup warp vs without: %.2fx\n", without.elapsed.Seconds()/withOnly.elapsed.Seconds())
	}
	if withFull.elapsed > 0 {
		fmt.Printf("speedup warp+extras vs without: %.2fx\n", without.elapsed.Seconds()/withFull.elapsed.Seconds())
	}
}
F
function

main

examples/v1/telemetry/main.go:19-42
func main()

{
	ctx := context.Background()

	exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
	if err != nil {
		log.Fatal(err)
	}
	tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exp))
	defer func() { _ = tp.Shutdown(ctx) }()
	otel.SetTracerProvider(tp)

	reg := metrics.NewRegistry()
	metrics.RegisterCoreMetrics(reg)

	c := cache.NewInMemory[merge.Value[string]](cache.WithMetrics[merge.Value[string]](reg))
	w := core.New[string](c, nil, nil, nil, core.WithMetrics[string](reg))

	w.Register("greeting", core.ModeStrongLocal, time.Minute)
	_ = w.Set(ctx, "greeting", "telemetry example")
	_, _ = w.Get(ctx, "greeting")

	http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
	log.Fatal(http.ListenAndServe(":2112", nil))
}
F
function

main

examples/v1/txn_merge/main.go:15-47
func main()

{
	ctx := context.Background()
	w := core.New[any](cache.NewInMemory[merge.Value[any]](), adapter.NewInMemoryStore[any](), nil, merge.NewEngine[any]())
	w.Register("counter", core.ModeStrongLocal, time.Minute)
	w.Register("logs", core.ModeStrongLocal, time.Minute)

	w.Merge("counter", func(old, new any) (any, error) {
		return old.(int) + new.(int), nil
	})
	w.Merge("logs", func(old, new any) (any, error) {
		return append(old.([]string), new.([]string)...), nil
	})

	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			txn := w.Txn(ctx)
			txn.Set("counter", 1)
			txn.Set("logs", []string{fmt.Sprintf("event %d", i)})
			if err := txn.Commit(); err != nil {
				panic(err)
			}
		}(i)
	}
	wg.Wait()

	counter, _ := w.Get(ctx, "counter")
	logs, _ := w.Get(ctx, "logs")
	fmt.Printf("counter: %d\n", counter.(int))
	fmt.Printf("logs: %v\n", logs.([]string))
}
F
function

main

examples/v1/validator/main.go:15-34
func main()

{
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	store := adapter.NewInMemoryStore[string]()
	c := cache.NewInMemory[merge.Value[string]]()
	w := core.New[string](c, store, nil, merge.NewEngine[string]())
	w.Register("greeting", core.ModeStrongLocal, time.Minute)

	_ = w.Set(ctx, "greeting", "hello")
	_ = store.Set(ctx, "greeting", "hi")

	v := w.Validator(validator.ModeAutoHeal, 20*time.Millisecond)
	go v.Run(ctx)

	time.Sleep(50 * time.Millisecond)
	val, _ := w.Get(ctx, "greeting")
	fmt.Println("healed value:", val)
	fmt.Println("mismatches:", v.Metrics())
}
F
function

main

examples/v1/versioned/main.go:14-34
func main()

{
	ctx := context.Background()
	base := cache.NewInMemory[merge.VersionedValue[string]]()
	c := versioned.New[string](base, 5)
	w := core.New[string](c, nil, nil, merge.NewEngine[string]())

	w.Register("status", core.ModeStrongLocal, time.Minute)
	_ = w.Set(ctx, "status", "v1")
	t1 := time.Now()
	time.Sleep(10 * time.Millisecond)
	_ = w.Set(ctx, "status", "v2")

	latest, _ := w.Get(ctx, "status")
	fmt.Println("latest:", latest)

	old, err := w.GetAt(ctx, "status", t1)
	if err != nil {
		panic(err)
	}
	fmt.Println("at t1:", old)
}