一文详解Golang连接kafka的基本操作
目录
- 1.kafka的学习
- 1.1 启动kafka与zookeeper
- 1.2 创建topic
- 1.3 生产消息
- 1.4 消费之前的消息
- 1.5 指定偏移量消费
- 1.6 消费最新的信息
- 2 go操作
- 2.1 发送消息
- 2.2 消费消息
- 2.3 列出所有topic
- 2.4 创建topic
1.kafka的学习
1.1 启动kafka与zookphpeeper
kafka与zookeeper是相关联的
bin/zookeeper-server-start.sh config/zookeeper.properties
与
bin/kafka-server-start.sh config/server.properties
1.2 创建topic
bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092
1.3 生产消息
bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello
运行后可以发送多条,ctrl+c退出
1.4 消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello
1.5 指定偏移量消费
bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello
1.6 消费最新的信息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
2 go操作
2.1 发送消息
// Kafka 配置 const ( KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址 KafkaTopic = "k8s-version" // Kafka 主题 ) func main() { sendMesgKafka() } func sendMesgKafka() { w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{KafkaBroker}, Topic: KafkaTopic, Balancer: &kafka.LeastBytes{}, }) err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("one!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("two!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("three!"), }, ) if err != nil { log.Fajavascripttal("failed to write messages:", err) } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) } fmt.Println("Message sent successfully") }
2.2 消费消息
// to consume messages topic := "test" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition) if err != nil { log.Fatal("failed to dial leaderjavascript:", err) } conn.SetReadDeadline(time.Now().Add(10*time.Second)) BATch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max b := make([]byte, 10e3) // 10KB max per message for { n, err := batch.Read(b) if err != nil { break } fmt.Println(string(b[:n])) } if err := batch.Cphplose(); err != nil { log.Fatal("failed to close batch:", err) } if err := conn.Close(); err != nil { log.Fatal("failed to close connection:", err) }
2.3 列出所有topic
func main() { conn, err := kafka.Dial("tcp", "u8sMaster:9092") if err != nil { panic(err.Error()) } defer conn.Close() partitions, err := conn.ReadPartitions() if err != nil { panic(err.Error()) } m := map[string]struct{}{} for _, p := range partitions { m[p.Topic] = struct{}{} } for k := range m { fmt.Println(k) } }
2.4 创建topic
func main() { conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9js092", "topic2", 0) if err != nil { panic(err.Error()) } }
精准地创建topic
func main() { conn, err := kafka.Dial("tcp", "u8sMaster:9092") if err != nil { panic(err.Error()) } defer conn.Close() controller, err := conn.Controller() if err != nil { panic(err.Error()) } var connLeader *kafka.Conn connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer connLeader.Close() }
这里省略了kafka集群的配置,未来有机会补充
以上就是一文详解golang连接kafka的基本操作的详细内容,更多关于go连接kafka的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论