kafka-go
Source:: https://github.com/segmentio/kafka-go
Создание топика
Так как топик можно создать лишь при обращении к контроллеру (во всяком случае в kafka-go) сначала необходимо найти контроллер и подключиться к нему:
conn, err := kafka.DialContext(ctx, "tcp", *brokers)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
Далее вызывается метод создания топика в который можно передать все параметры указанные в документации Kafka через поле ConfigEntries
(min.insync.replicas, segment.bytes, retention.bytes и дургие)
controllerConn.CreateTopics(kafka.TopicConfig{
Topic: *name,
NumPartitions: *partitions,
ReplicationFactor: *replicas,
ConfigEntries: []kafka.ConfigEntry{
// {ConfigName: "min.insync.replicas", ConfigValue: "2"},
{ConfigName: "segment.bytes", ConfigValue: "2097152"},
// {ConfigName: "retention.bytes", ConfigValue: "3145728"},
},
})