kafka
packageAPI reference for the kafka
package.
Imports
(11)kafkaSubscription
type kafkaSubscription struct
Fields
| Name | Type | Description |
|---|---|---|
| pc | sarama.PartitionConsumer | |
| chans | []chan syncbus.Event |
KafkaBus
KafkaBus implements Bus using a Kafka backend.
type KafkaBus struct
Methods
Publish implements Bus.Publish.
Parameters
Returns
func (*KafkaBus) Publish(ctx context.Context, key string, opts ...syncbus.PublishOption) error
{
b.mu.Lock()
if b.closed {
b.mu.Unlock()
return errors.New("kafka bus closed")
}
if _, ok := b.pending[key]; ok {
b.mu.Unlock()
return nil // deduplicate
}
b.pending[key] = struct{}{}
b.wg.Add(1)
b.mu.Unlock()
defer b.wg.Done()
if j := rand.Int63n(int64(10 * time.Millisecond)); j > 0 {
select {
case <-ctx.Done():
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return ctx.Err()
case <-time.After(time.Duration(j)):
}
}
msg := &sarama.ProducerMessage{Topic: key, Value: sarama.StringEncoder("1")}
if _, _, err := b.producer.SendMessage(msg); err != nil {
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return err
}
b.published.Add(1)
b.mu.Lock()
delete(b.pending, key)
b.mu.Unlock()
return nil
}
PublishAndAwait implements Bus.PublishAndAwait. Kafka sync producer does not expose subscriber replication acknowledgements at this level, so quorum is unsupported.
Parameters
Returns
func (*KafkaBus) PublishAndAwait(ctx context.Context, key string, replicas int, opts ...syncbus.PublishOption) error
{
if replicas <= 1 {
return b.Publish(ctx, key, opts...)
}
return syncbus.ErrQuorumUnsupported
}
PublishAndAwaitTopology implements Bus.PublishAndAwaitTopology.
Parameters
Returns
func (*KafkaBus) PublishAndAwaitTopology(ctx context.Context, key string, minZones int, opts ...syncbus.PublishOption) error
{
// Kafka does not easily support this synchronous topology check.
return syncbus.ErrQuorumUnsupported
}
Subscribe implements Bus.Subscribe.
Parameters
Returns
func (*KafkaBus) Subscribe(ctx context.Context, key string) (<-chan syncbus.Event, error)
{
ch := make(chan syncbus.Event, 1)
b.mu.Lock()
sub := b.subs[key]
if sub == nil {
pc, err := b.consumer.ConsumePartition(key, 0, sarama.OffsetNewest)
if err != nil {
b.mu.Unlock()
return nil, err
}
sub = &kafkaSubscription{pc: pc}
b.subs[key] = sub
go b.dispatch(sub, key)
}
sub.chans = append(sub.chans, ch)
b.mu.Unlock()
go func() {
<-ctx.Done()
_ = b.Unsubscribe(context.Background(), key, ch)
}()
return ch, nil
}
Parameters
func (*KafkaBus) dispatch(sub *kafkaSubscription, key string)
{
for range sub.pc.Messages() {
b.mu.Lock()
chans := append([]chan syncbus.Event(nil), b.subs[key].chans...)
b.mu.Unlock()
evt := syncbus.Event{Key: key}
for _, ch := range chans {
select {
case ch <- evt:
b.delivered.Add(1)
default:
}
}
}
}
Unsubscribe implements Bus.Unsubscribe.
Parameters
Returns
func (*KafkaBus) Unsubscribe(ctx context.Context, key string, ch <-chan syncbus.Event) error
{
b.mu.Lock()
sub := b.subs[key]
if sub == nil {
b.mu.Unlock()
return nil
}
for i, c := range sub.chans {
if c == ch {
sub.chans[i] = sub.chans[len(sub.chans)-1]
sub.chans = sub.chans[:len(sub.chans)-1]
close(c)
break
}
}
if len(sub.chans) == 0 {
delete(b.subs, key)
b.mu.Unlock()
return sub.pc.Close()
}
b.mu.Unlock()
return nil
}
RevokeLease publishes a lease revocation event.
Parameters
Returns
func (*KafkaBus) RevokeLease(ctx context.Context, id string) error
{
return b.Publish(ctx, "lease:"+id)
}
SubscribeLease subscribes to lease revocation events.
Parameters
Returns
func (*KafkaBus) SubscribeLease(ctx context.Context, id string) (<-chan syncbus.Event, error)
{
return b.Subscribe(ctx, "lease:"+id)
}
UnsubscribeLease cancels a lease revocation subscription.
Parameters
Returns
func (*KafkaBus) UnsubscribeLease(ctx context.Context, id string, ch <-chan syncbus.Event) error
{
return b.Unsubscribe(ctx, "lease:"+id, ch)
}
Metrics returns the published and delivered counts.
Returns
func (*KafkaBus) Metrics() syncbus.Metrics
{
return syncbus.Metrics{
Published: b.published.Load(),
Delivered: b.delivered.Load(),
}
}
IsHealthy implements Bus.IsHealthy.
Returns
func (*KafkaBus) IsHealthy() bool
{
return true
}
Peers implements Bus.Peers.
Returns
func (*KafkaBus) Peers() []string
{
return nil
}
Close releases resources used by the KafkaBus.
Returns
func (*KafkaBus) Close() error
{
b.mu.Lock()
if b.closed {
b.mu.Unlock()
return nil
}
b.closed = true
b.mu.Unlock()
b.wg.Wait()
_ = b.producer.Close()
_ = b.consumer.Close()
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| producer | sarama.SyncProducer | |
| consumer | sarama.Consumer | |
| mu | sync.Mutex | |
| subs | map[string]*kafkaSubscription | |
| pending | map[string]struct{} | |
| published | atomic.Uint64 | |
| delivered | atomic.Uint64 | |
| closed | bool | |
| wg | sync.WaitGroup |
NewKafkaBus
NewKafkaBus creates a new KafkaBus connecting to the given brokers.
Parameters
Returns
func NewKafkaBus(brokers []string, cfg *sarama.Config) (*KafkaBus, error)
{
if !cfg.Producer.Return.Successes {
cfg.Producer.Return.Successes = true
}
client, err := sarama.NewClient(brokers, cfg)
if err != nil {
return nil, err
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
_ = client.Close()
return nil, err
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
_ = producer.Close()
_ = client.Close()
return nil, err
}
return &KafkaBus{
producer: producer,
consumer: consumer,
subs: make(map[string]*kafkaSubscription),
pending: make(map[string]struct{}),
}, nil
}
newKafkaBus
Parameters
Returns
func newKafkaBus(t *testing.T) (*KafkaBus, context.Context)
{
t.Helper()
addr := os.Getenv("WARP_TEST_KAFKA_ADDR")
if addr == "" {
t.Skip("WARP_TEST_KAFKA_ADDR not set, skipping Kafka integration tests")
}
t.Logf("TestKafkaBus: using real Kafka at %s", addr)
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// Speed up tests
config.Consumer.Offsets.Initial = sarama.OffsetNewest
bus, err := NewKafkaBus([]string{addr}, config)
if err != nil {
t.Fatalf("NewKafkaBus: %v", err)
}
ctx := context.Background()
t.Cleanup(func() {
bus.Close()
})
return bus, ctx
}
TestKafkaBusPublishSubscribeFlowAndMetrics
Parameters
func TestKafkaBusPublishSubscribeFlowAndMetrics(t *testing.T)
{
bus, ctx := newKafkaBus(t)
topic := "test-" + uuid.NewString()
ch, err := bus.Subscribe(ctx, topic)
if err != nil {
t.Fatalf("subscribe: %v", err)
}
// Wait for consumer to be ready (approx)
time.Sleep(2 * time.Second)
if err := bus.Publish(ctx, topic); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for publish")
}
metrics := bus.Metrics()
if metrics.Published != 1 {
t.Fatalf("expected published 1 got %d", metrics.Published)
}
if metrics.Delivered != 1 {
t.Fatalf("expected delivered 1 got %d", metrics.Delivered)
}
}
TestKafkaBusContextBasedUnsubscribe
Parameters
func TestKafkaBusContextBasedUnsubscribe(t *testing.T)
{
bus, _ := newKafkaBus(t)
topic := "test-unsub-" + uuid.NewString()
subCtx, cancel := context.WithCancel(context.Background())
ch, err := bus.Subscribe(subCtx, topic)
if err != nil {
t.Fatalf("subscribe: %v", err)
}
cancel()
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel closed")
}
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for unsubscribe")
}
}
TestKafkaBusDeduplicatePendingKeys
Parameters
func TestKafkaBusDeduplicatePendingKeys(t *testing.T)
{
bus, ctx := newKafkaBus(t)
topic := "test-dedup-" + uuid.NewString()
ch, err := bus.Subscribe(ctx, topic)
if err != nil {
t.Fatalf("subscribe: %v", err)
}
bus.mu.Lock()
bus.pending[topic] = struct{}{}
bus.mu.Unlock()
if err := bus.Publish(ctx, topic); err != nil {
t.Fatalf("publish: %v", err)
}
select {
case <-ch:
t.Fatal("unexpected publish when key pending")
case <-time.After(500 * time.Millisecond):
}
metrics := bus.Metrics()
if metrics.Published != 0 {
t.Fatalf("expected published 0 got %d", metrics.Published)
}
}
TestKafkaBusPublishError
Parameters
func TestKafkaBusPublishError(t *testing.T)
{
// Hard to simulate publish error with a valid client unless we close it or use mock.
// Since we are doing integration test with real client, we skip this or try to close producer.
bus, ctx := newKafkaBus(t)
bus.Close()
// This might not error immediately if async, but NewKafkaBus uses SyncProducer.
// SyncProducer.SendMessage should fail if closed.
// However, bus.Close() closes producer.
// Re-create a bus that we can break
// Actually, just checking if Publish returns error is enough.
if err := bus.Publish(ctx, "any"); err == nil {
// Sarama SyncProducer might return ErrClosed
t.Log("Publish on closed bus did not error, might be async behavior or sarama")
}
}