Why Program Control is not fetching consumer message from claim?
I am trying to implement a Kafka Consumer Group using shopify sarama client. I am kind of confused, how should a ConsumerGroupHandler be used. Since, I don't know session and claim object. How can I use ConsumeClaim.
Is there some code, I am missing?
package kf
import (
"context"
"fmt"
"github.com/Shopify/sarama"
)
type Consumer struct {
flowEventReader sarama.ConsumerGroup
topic string
brokerUrls []string
}
func InitConsumer(brokers []string, topic string) *Consumer {
c := &Consumer{}
c.topic = topic
c.brokerUrls = brokers
var (
err error
)
conf := createSaramaKafkaConf()
c.flowEventReader, err = sarama.NewConsumerGroup(c.brokerUrls, "flowExecutor", conf)
if err != nil {
panic("failed to create consumer group on kafka cluster")
}
return c
}
func (c *Consumer) HandleMessages() {
// Consume from kafka and process
for {
err := c.flowEventReader.Consume(context.Background(), []string{c.topic}, exampleConsumerGroupHandler{})
if err != nil {
fmt.Errorf("FAILED")
}
}
}
type exampleConsumerGroupHandler struct{}
func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGro开发者_运维技巧upSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
func createSaramaKafkaConf() *sarama.Config {
conf := sarama.NewConfig()
version := "2.6.2"
kafkaVer, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic("failed to parse kafka version, executor will not run")
}
conf.Version = kafkaVer
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
return conf
}
Please help me out.
精彩评论