kafka-go

Source:: https://github.com/segmentio/kafka-go

Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Ρ‚ΠΎΠΏΠΈΠΊΠ°

Π’Π°ΠΊ ΠΊΠ°ΠΊ Ρ‚ΠΎΠΏΠΈΠΊ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ лишь ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»Π»Π΅Ρ€Ρƒ (Π²ΠΎ всяком случаС Π² kafka-go) сначала Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π½Π°ΠΉΡ‚ΠΈ ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»Π»Π΅Ρ€ ΠΈ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡ΠΈΡ‚ΡŒΡΡ ΠΊ Π½Π΅ΠΌΡƒ:

conn, err := kafka.DialContext(ctx, "tcp", *brokers)
if err != nil {
        log.Fatal(err)
}
defer conn.Close()
 
controller, err := conn.Controller()
if err != nil {
        panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
        panic(err.Error())
}
defer controllerConn.Close()

Π”Π°Π»Π΅Π΅ вызываСтся ΠΌΠ΅Ρ‚ΠΎΠ΄ создания Ρ‚ΠΎΠΏΠΈΠΊΠ° Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ всС ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹ ΡƒΠΊΠ°Π·Π°Π½Π½Ρ‹Π΅ Π² Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ Kafka Ρ‡Π΅Ρ€Π΅Π· ΠΏΠΎΠ»Π΅ ConfigEntries (min.insync.replicas, segment.bytes, retention.bytes ΠΈ Π΄ΡƒΡ€Π³ΠΈΠ΅)

controllerConn.CreateTopics(kafka.TopicConfig{
        Topic:             *name,
        NumPartitions:     *partitions,
        ReplicationFactor: *replicas,
        ConfigEntries: []kafka.ConfigEntry{
                // {ConfigName: "min.insync.replicas", ConfigValue: "2"},
                {ConfigName: "segment.bytes", ConfigValue: "2097152"},
                // {ConfigName: "retention.bytes", ConfigValue: "3145728"},
        },
})