分布式消息队列 https://nsq.io/ , 简单易用, 去中心化的设计使 nsq 更健壮, nsq 充分利用了 go 语言的 goroutine 和 channel 来实现的消息处理, 代码量也不大, 读不了多久就没了. 后期的文章我会把 nsq 的源码分析给大家看.
主要的分析路线如下
分析 nsq 的整体框架结构, 分析如何做到的无中心化分布式拓扑结构, 如何处理的单点故障.
分析 nsq 是如何保证消息的可靠性, 如何保证消息的处理, 对于消息的持久化是如何处理和扩展的.
分析 nsq 是如何做的消息的负载处理, 即如何把合理的, 不超过客户端消费能力的情况下, 把消息分发到不同的客户端.
分析 nsq 提供的一些辅助组件.
这篇帖子, 介绍 nsq 的主体结构, 以及他是如何做到去中心化的分布式拓扑结构, 如何处理的单点故障.
几个组件是需要先大概说一下
nsqd 消息队列的主体, 对消息的接收, 处理和把消息分发到客户端.
nsqlookupd nsq 拓扑结构信息的管理者, 有了他才能组成一个简单易用的无中心化的分布式拓扑网络结构.
go-nsq nsq 官方的 go 语言客户端, 基本上市面上的主流编程语言都有相应的客户端在这里 https://nsq.io/clients/client_libraries.html
还有可视化的组件 nsqadmin 和一些工具像 nsq_to_file,nsq_stat, 等等, 这些在后期的帖子里会介绍
使用方式
两种方式一种是直接连接另一种是通过 nsqlookupd 进行连接
直连方式
nsqd 是独立运行的, 我们可以直接使用部署几个 nsqd 然后使用客户端直连的方式使用
例子
目前资源有限, 我就都在一台机器上模拟了
启动两个 nsqd
- ./nsqd -tcp-address ":8000" -http-address ":8001" -data-path=./a
- ./nsqd -tcp-address ":7000" -http-address ":7001" -data-path=./b
正常启动会有类似下面的输出
- [nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7)
- [nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538
- [nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat
- [nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000
- [nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001
简单使用
- func main() {
- adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"}
- config := nsq.NewConfig()
- topicName := "testTopic1"
- c, _ := nsq.NewConsumer(topicName, "ch1", config)
- testHandler := &MyTestHandler{consumer: c}
- c.AddHandler(testHandler)
- if err := c.ConnectToNSQDs(adds); err != nil {
- panic(err)
- }
- stats := c.Stats()
- if stats.Connections == 0 {
- panic("stats report 0 connections (should be> 0)")
- }
- stop := make(chan os.Signal)
- signal.Notify(stop, os.Interrupt)
- fmt.Println("server is running....")
- <-stop
- }
- type MyTestHandler struct {
- consumer *nsq.Consumer
- }
- func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
- fmt.Println(string(message.Body))
- return nil
- }
方法 c.ConnectToNSQDs(adds), 连接多个 nsqd 服务
然后运行多个客户端实现
这时, 我们发送一个消息,
curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1'
nsqd 会根据他的算法, 把消息分配到一个客户端
客户端的输入如下
- 2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd
- 2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd
- server is running....
- hello world 2
但是这种做的话, 需要客户端做一些额外的工作, 需要频繁的去检查所有 nsqd 的状态, 如果发现出现问题需要客户端主动去处理这些问题.
总结
我使用的客户端库是官方库 go-nsq, 使用直接连 nsqd 的方式,
如果有 nsqd 出现问题, 现在的处理方式, 他会每隔一段时间执行一次重连操作. 想去掉这个连接信息就要额外做一些处理了.
如果对 nsqd 进行横向扩充, 只能是自己民额外的写一些代码调用 ConnectToNSQDs 或者 ConnectToNSQD 方法
去中心化连接方式 nsqlookupd
官方推荐使用连接 nsqlookupd 的方式, nsqlookupd 用于做服务的注册和发现, 这样可以做到去中心化.
图中我们运行着多个 nsqd 和多个 nsqlookupd 的实例, 客户端去连接 nsqlookupd 来操作 nsqd
例子
我们要先启动 nsqlookupd, 为了演示方便, 我启动两个 nsqlookupd 实例, 三个 nsqd 实例
- ./nsqlookupd -tcp-address ":8200" -http-address ":8201"
- ./nsqlookupd -tcp-address ":7200" -http-address ":7201"
为了演示横向扩充, 先启动两个, 客户端连接后, 再启动第三个.
- ./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
- ./nsqd -tcp-address ":7000" -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./b
--lookupd-tcp-address 用于指定 lookup 的连接地址
客户端简单代码
- package main
- import (
- "fmt"
- "os"
- "os/signal"
- "time"
- "github.com/nsqio/go-nsq"
- )
- func main() {
- adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
- config := nsq.NewConfig()
- config.MaxInFlight = 1000
- config.MaxBackoffDuration = 5 * time.Second
- config.DialTimeout = 10 * time.Second
- topicName := "testTopic1"
- c, _ := nsq.NewConsumer(topicName, "ch1", config)
- testHandler := &MyTestHandler{consumer: c}
- c.AddHandler(testHandler)
- if err := c.ConnectToNSQLookupds(adds); err != nil {
- panic(err)
- }
- stats := c.Stats()
- if stats.Connections == 0 {
- panic("stats report 0 connections (should be> 0)")
- }
- stop := make(chan os.Signal)
- signal.Notify(stop, os.Interrupt)
- fmt.Println("server is running....")
- <-stop
- }
- type MyTestHandler struct {
- consumer *nsq.Consumer
- }
- func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
- fmt.Println(string(message.Body))
- return nil
- }
方法 ConnectToNSQLookupds 就是用于连接 nsqlookupd 的, 但是需要注意的是, 连接的是 http 端口 7201 和 8201, 库 go-nsq 是通过请求其中一个 nsqlookupd 的 http 方法 http://127.0.0.1:7201/lookup?topic=testTopic1 来得到所有提供 topic=testTopic1 的 nsqd 列表信息, 然后对所有的 nsqd 进行连接,
- 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
- 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:7000) connecting to nsqd
- 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) connecting to nsqd
目前我们已经连接了两个.
我们演示一下橫向扩充, 启动第三个 nsqd
./nsqd -tcp-address ":6000" -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./c
这里会有一个问题, 当我启动了一个亲的 nsqd 但是他的 topic 是空的, 我们需指定这新的 nsqd 处理哪些 topic.
我们可以用 nsqadmin 查看所有的 topic
./nsqadmin --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201
然后去你的 nsqd 上去建 topic
curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1'
当然也可以自己写一些自动化的角本
查看客户端的日志输出
- 2019/08/30 14:56:01 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
- 2019/08/30 14:56:01 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:6000) connecting to nsqd
已经连上我们的新 nsqd 了
我手动关闭一个 nsqd 实例
客户端的日志输出已经断开了连接
- 2019/08/30 15:04:20 ERR 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) IO error - EOF
- 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) beginning close
- 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) readLoop exiting
- 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) breaking out of writeLoop
- 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) writeLoop exiting
- 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) finished draining, cleanup exiting
- 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-MacBook.local:8000) clean close complete
- 2019/08/30 15:04:20 WRN 1 [testTopic1/ch1] there are 2 connections left alive
并且 nsqd 和 nsqlookupd 也断开了连接, 客户端再次从 nsqlookupd 取所有的 nsqd 的地址时得到的总是可用的地址.
去中心化实现原理
nsqlookupd 用于管理整个网络拓扑结构, nsqd 用他实现服务的注册, 客户端使用他得到所有的 nsqd 服务节点信息, 然后所有的 consumer 端连接
实现原理如下,
nsqd 把自己的服务信息广播给一个或者多个 nsqlookupd
客户端连接一个或者多个 nsqlookupd, 通过 nsqlookupd 得到所有的 nsqd 的连接信息, 进行连接消费,
如果某个 nsqd 出现问题, down 机了, 会和 nsqlookupd 断开, 这样客户端从 nsqlookupd 得到的 nsqd 的列表永远是可用的. 客户端连接的是所有的 nsqd, 一个出问题了就用其他的连接, 所以也不会受影响.
来源: https://www.cnblogs.com/li-peng/p/11435083.html