前言:其实一开始是打算用RabbitMQ的,不过出现点问题(随机性的),由于赶进度没有进一步搞,于是切换到Kafka。之前的那篇文章我隐藏了,有机会再看看。
本文重点不在Kafka等消息队列的对比与介绍,只关心于Go-Micro框架如何集成Kafka;阅读本文前,请确保您的Go-Micro框架可正常使用,其文档可参考:https://micro.mu/docs/install-guide.html
步骤:
一、引用包,你懂得。
import ( ... "github.com/micro/go-micro/broker" "github.com/micro/go-plugins/broker/kafka" .... )
以下以建立Service为例:(Function同理)
二、生产者
1.在main函数中:
service := micro.NewService( micro.Name("服务名"), micro.Version("0.0.2"), micro.Metadata(map[string]string{ "type": "XXXXX", }), micro.Broker(kafka.NewBroker(func(o *broker.Options) { o.Addrs = config.BrokerURLs })), ) if err := broker.Connect(); err != nil { log.Fatal(err.Error()) }
注意:config.BrokerURLs 是[]string 类型的,存放Kafka的IP和端口,例如:
var BrokerURLs = []string{ 0: "192.168.0.33:9092", }
2.发布事件:
broker.Publish("Topic主题", &broker.Message{ Header: map[string]string{ "AAA": "BBBBB", "CCCCC": "DDDDDD", }, Body: []byte("消息内容"), })
三、消费者:
与发布者相似,唯独是订阅事件,如下代码:
broker.Subscribe("Topic主题", func(p broker.Publication) error { brokerHeader := p.Message().Header aaa := brokerHeader["AAA"] bbb := string(p.Message().Body) }) if err != nil { log.Fatal(err.Error()) }
即可。