kafka-go

Source:: https://github.com/segmentio/kafka-go

Создание топика

Так как топик можно создать лишь при обращении к контроллеру (во всяком случае в kafka-go) сначала необходимо найти контроллер и подключиться к нему:

conn, err := kafka.DialContext(ctx, "tcp", *brokers)
if err != nil {
        log.Fatal(err)
}
defer conn.Close()
 
controller, err := conn.Controller()
if err != nil {
        panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
        panic(err.Error())
}
defer controllerConn.Close()

Далее вызывается метод создания топика в который можно передать все параметры указанные в документации Kafka через поле ConfigEntries (min.insync.replicas, segment.bytes, retention.bytes и дургие)

controllerConn.CreateTopics(kafka.TopicConfig{
        Topic:             *name,
        NumPartitions:     *partitions,
        ReplicationFactor: *replicas,
        ConfigEntries: []kafka.ConfigEntry{
                // {ConfigName: "min.insync.replicas", ConfigValue: "2"},
                {ConfigName: "segment.bytes", ConfigValue: "2097152"},
                // {ConfigName: "retention.bytes", ConfigValue: "3145728"},
        },
})