franz-go

source:: https://github.com/twmb/franz-go

seeds := []string{"localhost:9092"}
// One client can both produce and consume!
// Consuming can either be direct (no consumer group), or through a group. Below, we use a group.
cl, err := kgo.NewClient(
	kgo.SeedBrokers(seeds...),
	kgo.ConsumerGroup("my-group-identifier"),
	kgo.ConsumeTopics("foo"),
)
if err != nil {
	panic(err)
}
defer cl.Close()
 
ctx := context.Background()
 
// 1.) Producing a message
// All record production goes through Produce, and the callback can be used
// to allow for synchronous or asynchronous production.
var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: "foo", Value: []byte("bar")}
cl.Produce(ctx, record, func(_ *kgo.Record, err error) {
	defer wg.Done()
	if err != nil {
		fmt.Printf("record had a produce error: %v\n", err)
	}
 
})
wg.Wait()
 
// Alternatively, ProduceSync exists to synchronously produce a batch of records.
if err := cl.ProduceSync(ctx, record).FirstErr(); err != nil {
	fmt.Printf("record had a produce error while synchronously producing: %v\n", err)
}
 
// 2.) Consuming messages from a topic
for {
	fetches := cl.PollFetches(ctx)
	if errs := fetches.Errors(); len(errs) > 0 {
		// All errors are retried internally when fetching, but non-retriable errors are
		// returned from polls so that users can notice and take action.
		panic(fmt.Sprint(errs))
	}
 
	// We can iterate through a record iterator...
	iter := fetches.RecordIter()
	for !iter.Done() {
		record := iter.Next()
		fmt.Println(string(record.Value), "from an iterator!")
	}
 
	// or a callback function.
	fetches.EachPartition(func(p kgo.FetchTopicPartition) {
		for _, record := range p.Records {
			fmt.Println(string(record.Value), "from range inside a callback!")
		}
 
		// We can even use a second callback!
		p.EachRecord(func(record *kgo.Record) {
			fmt.Println(string(record.Value), "from a second callback!")
		})
	})
}