用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。一直不知道要写啥!!!!!
nsq官网: http://nsq.io/
为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。
部署完后我会用go和c#写一些代码方便大家学习。
准备工作:
》两台服务器:192.168.0.49; 192.168.0.105.
》需要在两台机器上安装好Docker
》两台机器上镜像的拉取
- docker pull nsqio / nsq
我们在105上启动lookup, nsqd和客户端都需要连接这个lookup。
- docker run--name lookupd - p 4160 : 4160 - p 4161 : 4161 nsqio / nsq / nsqlookupd
在105和49上启动nsqd, lookup的地址要写105
- docker run--name nsqd - p 4150 : 4150 - p 4151 : 4151 nsqio / nsq / nsqd--broadcast - address = 192.168.0.105--lookupd - tcp - address = 192.168.0.105 : 4160
- docker run--name nsqd - p 4150 : 4150 - p 4151 : 4151 nsqio / nsq / nsqd--broadcast - address = 192.168.0.49--lookupd - tcp - address = 192.168.0.105 : 4160
到了这一步就可以写代码发送和接收信息了。但是还有一个管理系统需要启动一下。nsqadmin
- docker run--name nsqadmin - p 4171 : 4171 nsqio / nsq / nsqadmin--lookupd - http - address = 192.168.0.105 : 4161
用浏览器看一下管理端:http://192.168.0.105:4171/nodes。找开Nodes标签里面有两个节点。192.168.0.105 和 192.168.0.49。其他的你可以点开看看。
我用go语言 简单写一个发送信息的例子:
go使用的库是 go-nsq 地址 : github.com/nsqio/go-nsq
- func main() {
- config := nsq.NewConfig()
- // 随便给哪个ip发都可以
- //w1, _ := nsq.NewProducer("192.168.0.105:4150", config)
- w1, _ := nsq.NewProducer("192.168.0.49:4150", config)
- err1 := w1.Ping()
- if err1 != nil {
- log.Fatal("should not be able to ping after Stop()")
- return
- }
- defer w1.Stop()
- topicName := "publishtest"
- msgCount := 2
- for i := 1; i < msgCount; i++ {
- err1 := w1.Publish(topicName, []byte("测试测试publis test case"))
- if err1 != nil {
- log.Fatal("error")
- }
- }
- }
可以尝试给49和105都发送一次试试。再看一下我们的管理页面:
publishtest被ip105和49都发送过。但是还没有channel:
客户端golang代码
- package main
- import (
- "github.com/nsqio/go-nsq"
- "time"
- "log"
- "fmt"
- "strconv"
- "os"
- "os/signal"
- )
- func main () {
- topicName := "publishtest"
- msgCount := 2
- for i := 0; i < msgCount; i++ {
- //time.Sleep(time.Millisecond * 20)
- go readMessage(topicName, i)
- }
- quit := make(chan os.Signal)
- signal.Notify(quit, os.Interrupt)
- fmt.Println("server is running....")
- <-quit
- fmt.Println("Shutdown server....")
- }
- type ConsumerHandle struct {
- q *nsq.Consumer
- msgGood int
- }
- func (h *ConsumerHandle) HandleMessage(message *nsq.Message) error {
- msg := string(message.Body) + " " + strconv.Itoa(h.msgGood)
- fmt.Println(msg)
- return nil
- }
- func readMessage(topicName string, msgCount int) {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println("error: ", err)
- }
- }()
- config := nsq.NewConfig()
- config.MaxInFlight = 1000
- config.MaxBackoffDuration = 500 * time.Second
- //q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount), config)
- //q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount) + "#ephemeral", config)
- q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount), config)
- h := &ConsumerHandle{q: q, msgGood:msgCount}
- q.AddHandler(h)
- err := q.ConnectToNSQLookupd("192.168.0.105:4161")
- //err := q.ConnectToNSQDs([]string{"192.168.0.105:4161"})
- //err := q.ConnectToNSQD("192.168.0.49:4150")
- //err := q.ConnectToNSQD("192.168.0.105:4415")
- if err != nil {
- fmt.Println("conect nsqd error")
- log.Println(err)
- }
- <-q.StopChan
- fmt.Println("end....")
- }
运行一下,会启动两个终端:
用我们的发送代码发送信息,再看我们的客户端
c# 使用的库为NsqSharp.Core 地址为:
https://github.com/tonyredondo/NsqSharp
简单客户端代码为:
- class Program
- {
- static void Main()
- {
- // Create a new Consumer for each topic/channel
- var consumerCount = 2;
- var listC = new List<Consumer>();
- for (var i = 0; i < consumerCount; i++)
- {
- var consumer = new Consumer("publishtest", $"channel{i}" );
- consumer.ChangeMaxInFlight(2500);
- consumer.AddHandler(new MessageHandler());
- consumer.ConnectToNsqLookupd("192.168.0.105:4161");
- listC.Add(consumer);
- }
- var exitEvent = new ManualResetEvent(false);
- Console.CancelKeyPress += (sender, eventArgs) => {
- eventArgs.Cancel = true;
- listC.ForEach(x => x.Stop());
- exitEvent.Set();
- };
- exitEvent.WaitOne();
- }
- }
- public class MessageHandler : IHandler
- {
- /// <summary>Handles a message.</summary>
- public void HandleMessage(IMessage message)
- {
- string msg = Encoding.UTF8.GetString(message.Body);
- Console.WriteLine(msg);
- }
- /// <summary>
- /// Called when a message has exceeded the specified <see cref="Config.MaxAttempts"/>.
- /// </summary>
- /// <param name="message">The failed message.</param>
- public void LogFailedMessage(IMessage message)
- {
- // Log failed messages
- }
- }