1, 什么是 RabbitMQ. 详见 http://www.rabbitmq.com/ .
作用就是提高系统的并发性, 将一些不需要及时响应客户端且占用较多资源的操作, 放入队列, 再由另外一个线程, 去异步处理这些队列, 可极大的提高系统的并发能力.
2, 安装
RabbitMQ 服务: http://www.rabbitmq.com/download.html .
(安装完 RabbitMQ 服务后, 会在 Windows 服务中看到. 如果没有 Erlang 运行环境, 在安装过程中会提醒先安装 Erlang 环境. http://www.erlang.org/downloads)
.net 客户端类库: http://www.rabbitmq.com/dotnet.html
3, 插件
RabbitMQ 提供了很多好用的插件, 最常用的就是 web 管理工具, 启动此插件.
CMD 中运行命令: rabbitmq-plugins enable rabbitmq_management
注: rabbitmq-plugins 所在路径为: D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
Web 管理工具的地址是: http://localhost:15672, 初始用户名: guest 初始密码: guest
4, 配置
配置文件地址为: C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config, 默认没有 rabbit.config 文件, 需要手工新建 (默认会有 rabbitmq.config.example 作为参考). 基于安全, 做了两个配置, 如下:
- [
- {
- rabbit,
- [
- {
- loopback_users, [<<"guest">>]
- },
- {
- tcp_listeners, [{
- "127.0.0.1", 1234
- },
- {
- "10.121.1.48", 8009
- }]
- }
- ]
- }
- ].
loopback_users: 设置只能在与 RabbitMq 服务同一台机器上访问服务的用户.
tcp_listeners: 设置 RabbitMQ 监听的 IP 地址与端口. 只监听局域网内网 iP, 修改默认端口, 防止被入侵攻击.
设置完后, 别忘记了以下操作, 否则配置不起作用.
停止 RabbitMQ 服务;
重新安装服务使配置生效: rabbitmq-service.bat install
此命令要切换到路径: D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
启动 RabbitMQ 服务;
5,Demo 练习.
消息生产者:
- class Program
- {
- static void Main(string[] args)
- {
- try
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = Constants.MqHost;
- factory.Port = Constants.MqPort;
- factory.UserName = Constants.MqUserName;
- factory.Password = Constants.MqPwd;
- using (IConnection conn = factory.CreateConnection())
- {
- using (IModel channel = conn.CreateModel())
- {
- // 在 MQ 上定义一个持久化队列, 如果名称相同不会重复创建
- channel.QueueDeclare("MyFirstQueue", true, false, false, null);
- while (true)
- {
- string customStr = Console.ReadLine();
- RequestMsg requestMsg = new RequestMsg();
- requestMsg.Name = string.Format("Name_{0}", customStr);
- requestMsg.Code = string.Format("Code_{0}", customStr);
- string jsonStr = JsonConvert.SerializeObject(requestMsg);
- byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
- // 设置消息持久化
- IBasicProperties properties = channel.CreateBasicProperties();
- properties.DeliveryMode = 2;
- channel.BasicPublish("","MyFirstQueue", properties, bytes);
- //channel.BasicPublish("","MyFirstQueue", null, bytes);
- Console.WriteLine("消息已发送:" + requestMsg.ToString());
- }
- }
- }
- }
- catch (Exception e1)
- {
- Console.WriteLine(e1.ToString());
- }
- Console.ReadLine();
- }
- }
- class Program
- {
- static void Main(string[] args)
- {
- try
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = Constants.MqHost;
- factory.Port = Constants.MqPort;
- factory.UserName = Constants.MqUserName;
- factory.Password = Constants.MqPwd;
- using (IConnection conn = factory.CreateConnection())
- {
- using (IModel channel = conn.CreateModel())
- {
- // 在 MQ 上定义一个持久化队列, 如果名称相同不会重复创建
- channel.QueueDeclare("MyFirstQueue", true, false, false, null);
- // 输入 1, 那如果接收一个消息, 但是没有应答, 则客户端不会收到下一个消息
- channel.BasicQos(0, 1, false);
- Console.WriteLine("Listening...");
- // 在队列上定义一个消费者
- QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
- // 消费队列, 并设置应答模式为程序主动应答
- channel.BasicConsume("MyFirstQueue", false, consumer);
- while (true)
- {
- // 阻塞函数, 获取队列中的消息
- BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
- byte[] bytes = ea.Body;
- string str = Encoding.UTF8.GetString(bytes);
- RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
- Console.WriteLine("HandleMsg:" + msg.ToString());
- // 回复确认
- channel.BasicAck(ea.DeliveryTag, false);
- }
- }
- }
- }
- catch (Exception e1)
- {
- Console.WriteLine(e1.ToString());
- }
- Console.ReadLine();
- }
- }
来源: http://www.bubuko.com/infodetail-3054585.html