Golang里读写kafka消息队列的简单用法
本文主要记录利用kafka来作为订阅模式消息队列的简单使用方法,kafka环境是在本地使用docker起的两个容器,启动方法如下:
// 创建网络 docker network create app-tier --driver bridge // 启动zookeeper docker run -d --name zookeeper-server \ --network app-tier \ -e ALLOW_ANONYMOUS_LOGIN=yes \ bitnami/zookeeper:latest // 启动第一个kafka实例 docker run -d --name kafka-server1 \ --network app-tier \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \ --hostname kafka-server1 \ -p 9092:9092 \ bitnami/kafka:latest // 启动第二个kafka实例 docker run -d --name kafka-server2 \ --network app-tier \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \ --hostname kafka-server2 \ -p 9093:9092 \ bitnami/kafka:latest
使用上面的命令即可启动两个kafka实例,我们这里kafka包为github.com/Shopify/sarama,下面是demo代码:
package main import ( "fmt" "log" "os" "os/signal" "time" "github.com/Shopify/sarama" ) var ( topic = "www.5bug.wang" logger = log.New(os.Stdout, "[OTelInterceptor] ", log.LstdFlags) brokerList = []string{"127.0.0.1:9092", "127.0.0.1:9093"} ) func recveMessage(tag int, broker string, signals chan os.Signal) { config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{broker}, config) if err != nil { panic(err) } partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) if err != nil { panic(err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): logger.Printf("-----> [%d/%s]%s/%d/%d\t%s\t%s\n", tag, broker, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) case err := <-partitionConsumer.Errors(): log.Println(err) case <-signals: return } } } func main() { c := sarama.NewConfig() producer, err := sarama.NewAsyncProducer(brokerList, c) if err != nil { panic(err) } defer producer.Close() // kill -2, trap SIGINT to trigger a shutdown signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) go recveMessage(1, "127.0.0.1:9092", signals) go recveMessage(2, "127.0.0.1:9093", signals) bulkSize := 1 duration := 3 * time.Second ticker := time.NewTicker(duration) logger.Printf("Starting to produce %v messages every %v", bulkSize, duration) for { select { case t := <-ticker.C: now := t.Format(time.RFC3339) logger.Printf("producing %v messages to topic: %s at %s", bulkSize, topic, now) for i := 0; i < bulkSize; i++ { producer.Input() <- &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder("5bug"), Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)), } } case <-signals: logger.Println("terminating the program") logger.Println("Bye :)") return } } }
将上述代码启动起来,然而并没有那么顺利,会出现如下错误:
dial tcp: lookup kafka-server: no such host
在本地出现这种错误是因为网络里无法识别kafka-server这个服务器名字,这里需要配置下hosts文件,增加:127.0.0.1 kafka-server即可!下面是运行成功的效果: