kafka API

kafka

package

API reference for the kafka package.

S
struct

kafkaSubscription

v1/syncbus/kafka/kafka.go:15-18
type kafkaSubscription struct

Fields

Name Type Description
pc sarama.PartitionConsumer
chans []chan syncbus.Event
S
struct

KafkaBus

KafkaBus implements Bus using a Kafka backend.

v1/syncbus/kafka/kafka.go:21-31
type KafkaBus struct

Methods

Publish
Method

Publish implements Bus.Publish.

Parameters

key string
opts ...syncbus.PublishOption

Returns

error
func (*KafkaBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
	b.mu.Lock()
	if b.closed {
		b.mu.Unlock()
		return errors.New("kafka bus closed")
	}
	if _, ok := b.pending[key]; ok {
		b.mu.Unlock()
		return nil // deduplicate
	}
	b.pending[key] = struct{}{}
	b.wg.Add(1)
	b.mu.Unlock()
	defer b.wg.Done()

	if j := rand.Int63n(int64(10 * time.Millisecond)); j > 0 {
		select {
		case <-ctx.Done():
			b.mu.Lock()
			delete(b.pending, key)
			b.mu.Unlock()
			return ctx.Err()
		case <-time.After(time.Duration(j)):
		}
	}

	msg := &sarama.ProducerMessage{Topic: key, Value: sarama.StringEncoder("1")}
	if _, _, err := b.producer.SendMessage(msg); err != nil {
		b.mu.Lock()
		delete(b.pending, key)
		b.mu.Unlock()
		return err
	}
	b.published.Add(1)
	b.mu.Lock()
	delete(b.pending, key)
	b.mu.Unlock()
	return nil
}

PublishAndAwait implements Bus.PublishAndAwait. Kafka sync producer does not expose subscriber replication acknowledgements at this level, so quorum is unsupported.

Parameters

key string
replicas int
opts ...syncbus.PublishOption

Returns

error
func (*KafkaBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
	if replicas <= 1 {
		return b.Publish(ctx, key, opts...)
	}
	return syncbus.ErrQuorumUnsupported
}

PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.

Parameters

key string
minZones int
opts ...syncbus.PublishOption

Returns

error
func (*KafkaBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
	// Kafka does not easily support this synchronous topology check.
	return syncbus.ErrQuorumUnsupported
}
Subscribe
Method

Subscribe implements Bus.Subscribe.

Parameters

key string

Returns

<-chan syncbus.Event
error
func (*KafkaBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
	ch := make(chan syncbus.Event, 1)
	b.mu.Lock()
	sub := b.subs[key]
	if sub == nil {
		pc, err := b.consumer.ConsumePartition(key, 0, sarama.OffsetNewest)
		if err != nil {
			b.mu.Unlock()
			return nil, err
		}
		sub = &kafkaSubscription{pc: pc}
		b.subs[key] = sub
		go b.dispatch(sub, key)
	}
	sub.chans = append(sub.chans, ch)
	b.mu.Unlock()

	go func() {
		<-ctx.Done()
		_ = b.Unsubscribe(context.Background(), key, ch)
	}()
	return ch, nil
}
dispatch
Method

Parameters

key string
func (*KafkaBus) dispatch(sub *kafkaSubscription, key string)
{
	for range sub.pc.Messages() {
		b.mu.Lock()
		chans := append([]chan syncbus.Event(nil), b.subs[key].chans...)
		b.mu.Unlock()

		evt := syncbus.Event{Key: key}
		for _, ch := range chans {
			select {
			case ch <- evt:
				b.delivered.Add(1)
			default:
			}
		}
	}
}
Unsubscribe
Method

Unsubscribe implements Bus.Unsubscribe.

Parameters

key string
ch <-chan syncbus.Event

Returns

error
func (*KafkaBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
	b.mu.Lock()
	sub := b.subs[key]
	if sub == nil {
		b.mu.Unlock()
		return nil
	}
	for i, c := range sub.chans {
		if c == ch {
			sub.chans[i] = sub.chans[len(sub.chans)-1]
			sub.chans = sub.chans[:len(sub.chans)-1]
			close(c)
			break
		}
	}
	if len(sub.chans) == 0 {
		delete(b.subs, key)
		b.mu.Unlock()
		return sub.pc.Close()
	}
	b.mu.Unlock()
	return nil
}
RevokeLease
Method

RevokeLease publishes a lease revocation event.

Parameters

id string

Returns

error
func (*KafkaBus) RevokeLease(ctx context.Context, id string) error
{
	return b.Publish(ctx, "lease:"+id)
}

SubscribeLease subscribes to lease revocation events.

Parameters

id string

Returns

<-chan syncbus.Event
error
func (*KafkaBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
	return b.Subscribe(ctx, "lease:"+id)
}

UnsubscribeLease cancels a lease revocation subscription.

Parameters

id string
ch <-chan syncbus.Event

Returns

error
func (*KafkaBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
	return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Metrics
Method

Metrics returns the published and delivered counts.

Returns

func (*KafkaBus) Metrics() syncbus.Metrics
{
	return syncbus.Metrics{
		Published: b.published.Load(),
		Delivered: b.delivered.Load(),
	}
}
IsHealthy
Method

IsHealthy implements Bus.IsHealthy.

Returns

bool
func (*KafkaBus) IsHealthy() bool
{
	return true
}
Peers
Method

Peers implements Bus.Peers.

Returns

[]string
func (*KafkaBus) Peers() []string
{
	return nil
}
Close
Method

Close releases resources used by the KafkaBus.

Returns

error
func (*KafkaBus) Close() error
{
	b.mu.Lock()
	if b.closed {
		b.mu.Unlock()
		return nil
	}
	b.closed = true
	b.mu.Unlock()

	b.wg.Wait()
	_ = b.producer.Close()
	_ = b.consumer.Close()
	return nil
}

Fields

Name Type Description
producer sarama.SyncProducer
consumer sarama.Consumer
mu sync.Mutex
subs map[string]*kafkaSubscription
pending map[string]struct{}
published atomic.Uint64
delivered atomic.Uint64
closed bool
wg sync.WaitGroup
F
function

NewKafkaBus

NewKafkaBus creates a new KafkaBus connecting to the given brokers.

Parameters

brokers
[]string

Returns

error
v1/syncbus/kafka/kafka.go:34-59
func NewKafkaBus(brokers []string, cfg *sarama.Config) (*KafkaBus, error)

{
	if !cfg.Producer.Return.Successes {
		cfg.Producer.Return.Successes = true
	}
	client, err := sarama.NewClient(brokers, cfg)
	if err != nil {
		return nil, err
	}
	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		_ = client.Close()
		return nil, err
	}
	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		_ = producer.Close()
		_ = client.Close()
		return nil, err
	}
	return &KafkaBus{
		producer: producer,
		consumer: consumer,
		subs:     make(map[string]*kafkaSubscription),
		pending:  make(map[string]struct{}),
	}, nil
}
F
function

newKafkaBus

Parameters

v1/syncbus/kafka/kafka_test.go:13-36
func newKafkaBus(t *testing.T) (*KafkaBus, context.Context)

{
	t.Helper()
	addr := os.Getenv("WARP_TEST_KAFKA_ADDR")
	if addr == "" {
		t.Skip("WARP_TEST_KAFKA_ADDR not set, skipping Kafka integration tests")
	}
	t.Logf("TestKafkaBus: using real Kafka at %s", addr)

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	// Speed up tests
	config.Consumer.Offsets.Initial = sarama.OffsetNewest

	bus, err := NewKafkaBus([]string{addr}, config)
	if err != nil {
		t.Fatalf("NewKafkaBus: %v", err)
	}

	ctx := context.Background()
	t.Cleanup(func() {
		bus.Close()
	})
	return bus, ctx
}
F
function

TestKafkaBusPublishSubscribeFlowAndMetrics

Parameters

v1/syncbus/kafka/kafka_test.go:38-67
func TestKafkaBusPublishSubscribeFlowAndMetrics(t *testing.T)

{
	bus, ctx := newKafkaBus(t)
	topic := "test-" + uuid.NewString()

	ch, err := bus.Subscribe(ctx, topic)
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	// Wait for consumer to be ready (approx)
	time.Sleep(2 * time.Second)

	if err := bus.Publish(ctx, topic); err != nil {
		t.Fatalf("publish: %v", err)
	}

	select {
	case <-ch:
	case <-time.After(10 * time.Second):
		t.Fatal("timeout waiting for publish")
	}

	metrics := bus.Metrics()
	if metrics.Published != 1 {
		t.Fatalf("expected published 1 got %d", metrics.Published)
	}
	if metrics.Delivered != 1 {
		t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
	}
}
F
function

TestKafkaBusContextBasedUnsubscribe

Parameters

v1/syncbus/kafka/kafka_test.go:69-88
func TestKafkaBusContextBasedUnsubscribe(t *testing.T)

{
	bus, _ := newKafkaBus(t)
	topic := "test-unsub-" + uuid.NewString()

	subCtx, cancel := context.WithCancel(context.Background())
	ch, err := bus.Subscribe(subCtx, topic)
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	cancel()
	select {
	case _, ok := <-ch:
		if ok {
			t.Fatal("expected channel closed")
		}
	case <-time.After(2 * time.Second):
		t.Fatal("timeout waiting for unsubscribe")
	}
}
F
function

TestKafkaBusDeduplicatePendingKeys

Parameters

v1/syncbus/kafka/kafka_test.go:90-117
func TestKafkaBusDeduplicatePendingKeys(t *testing.T)

{
	bus, ctx := newKafkaBus(t)
	topic := "test-dedup-" + uuid.NewString()

	ch, err := bus.Subscribe(ctx, topic)
	if err != nil {
		t.Fatalf("subscribe: %v", err)
	}

	bus.mu.Lock()
	bus.pending[topic] = struct{}{}
	bus.mu.Unlock()

	if err := bus.Publish(ctx, topic); err != nil {
		t.Fatalf("publish: %v", err)
	}

	select {
	case <-ch:
		t.Fatal("unexpected publish when key pending")
	case <-time.After(500 * time.Millisecond):
	}

	metrics := bus.Metrics()
	if metrics.Published != 0 {
		t.Fatalf("expected published 0 got %d", metrics.Published)
	}
}
F
function

TestKafkaBusPublishError

Parameters

v1/syncbus/kafka/kafka_test.go:119-135
func TestKafkaBusPublishError(t *testing.T)

{
	// Hard to simulate publish error with a valid client unless we close it or use mock.
	// Since we are doing integration test with real client, we skip this or try to close producer.
	bus, ctx := newKafkaBus(t)
	bus.Close()

	// This might not error immediately if async, but NewKafkaBus uses SyncProducer.
	// SyncProducer.SendMessage should fail if closed.
	// However, bus.Close() closes producer.

	// Re-create a bus that we can break
	// Actually, just checking if Publish returns error is enough.
	if err := bus.Publish(ctx, "any"); err == nil {
		// Sarama SyncProducer might return ErrClosed
		t.Log("Publish on closed bus did not error, might be async behavior or sarama")
	}
}