kafka-go
Source:: https://github.com/segmentio/kafka-go
Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΡΠΎΠΏΠΈΠΊΠ°
Π’Π°ΠΊ ΠΊΠ°ΠΊ ΡΠΎΠΏΠΈΠΊ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΎΠ·Π΄Π°ΡΡ Π»ΠΈΡΡ ΠΏΡΠΈ ΠΎΠ±ΡΠ°ΡΠ΅Π½ΠΈΠΈ ΠΊ ΠΊΠΎΠ½ΡΡΠΎΠ»Π»Π΅ΡΡ (Π²ΠΎ Π²ΡΡΠΊΠΎΠΌ ΡΠ»ΡΡΠ°Π΅ Π² kafka-go) ΡΠ½Π°ΡΠ°Π»Π° Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎ Π½Π°ΠΉΡΠΈ ΠΊΠΎΠ½ΡΡΠΎΠ»Π»Π΅Ρ ΠΈ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠΈΡΡΡΡ ΠΊ Π½Π΅ΠΌΡ:
conn, err := kafka.DialContext(ctx, "tcp", *brokers)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
ΠΠ°Π»Π΅Π΅ Π²ΡΠ·ΡΠ²Π°Π΅ΡΡΡ ΠΌΠ΅ΡΠΎΠ΄ ΡΠΎΠ·Π΄Π°Π½ΠΈΡ ΡΠΎΠΏΠΈΠΊΠ° Π² ΠΊΠΎΡΠΎΡΡΠΉ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΡ Π²ΡΠ΅ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΡ ΡΠΊΠ°Π·Π°Π½Π½ΡΠ΅ Π² Π΄ΠΎΠΊΡΠΌΠ΅Π½ΡΠ°ΡΠΈΠΈ Kafka ΡΠ΅ΡΠ΅Π· ΠΏΠΎΠ»Π΅ ConfigEntries
(min.insync.replicas, segment.bytes, retention.bytes ΠΈ Π΄ΡΡΠ³ΠΈΠ΅)
controllerConn.CreateTopics(kafka.TopicConfig{
Topic: *name,
NumPartitions: *partitions,
ReplicationFactor: *replicas,
ConfigEntries: []kafka.ConfigEntry{
// {ConfigName: "min.insync.replicas", ConfigValue: "2"},
{ConfigName: "segment.bytes", ConfigValue: "2097152"},
// {ConfigName: "retention.bytes", ConfigValue: "3145728"},
},
})