【原】关于Go-Mirco如何使用Kafka作为消息队列

前言:其实一开始是打算用RabbitMQ的,不过出现点问题(随机性的),由于赶进度没有进一步搞,于是切换到Kafka。之前的那篇文章我隐藏了,有机会再看看。

本文重点不在Kafka等消息队列的对比与介绍,只关心于Go-Micro框架如何集成Kafka;阅读本文前,请确保您的Go-Micro框架可正常使用,其文档可参考:https://micro.mu/docs/install-guide.html


步骤:

一、引用包,你懂得。

import (

    ...

    "github.com/micro/go-micro/broker"
    "github.com/micro/go-plugins/broker/kafka"

    ....

)

以下以建立Service为例:(Function同理)

二、生产者

1.在main函数中:

service := micro.NewService(
    micro.Name("服务名"),
    micro.Version("0.0.2"),
    micro.Metadata(map[string]string{
        "type": "XXXXX",
    }),
    micro.Broker(kafka.NewBroker(func(o *broker.Options) {
        o.Addrs = config.BrokerURLs
    })),
)

if err := broker.Connect(); err != nil {
    log.Fatal(err.Error())
}

注意:config.BrokerURLs 是[]string 类型的,存放Kafka的IP和端口,例如:

var BrokerURLs = []string{
    0: "192.168.0.33:9092",
}

2.发布事件:

broker.Publish("Topic主题", &broker.Message{
    Header: map[string]string{
        "AAA": "BBBBB",
        "CCCCC": "DDDDDD",
    },
    Body: []byte("消息内容"),
})

三、消费者:

与发布者相似,唯独是订阅事件,如下代码:

broker.Subscribe("Topic主题", func(p broker.Publication) error {
    brokerHeader := p.Message().Header
    aaa := brokerHeader["AAA"]
    bbb := string(p.Message().Body)
})
if err != nil {
    log.Fatal(err.Error())
}

即可。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注