模型结构
JMS 编程模型由以下几个组成:
ConnectionFactory: 连接工厂 (创建连接)
Connection: 连接 (创建会话)
Session: 会话 (创建目的地, 生产者, 消费者, 消息)
Destination: 目的地 (消息发送目标)
MessageProducer: 消息生产者 (发送消息)
MessageConsumer: 消息消费者 (消费消息)
Message: 消息 (内容主体)
下面用一张图片展示几个组成部分是如何联系在一起的
下面将逐个了解每个部分, 并且以 activeMQ 的实现作为代码片段部分示例.
ConnectionFactory
顾名思义, 一个 ConnectionFactory 是客户端用来创建 Connection 的接口. 基于工厂模式, 它简化了 Connection 的创建. 除了 ConnectionFactory 接口, 常见的还有 QueueConnectionFactory 和 TopicConnectionFactory, 它们都继承自 ConnectionFactory.
创建一个 ConnectionFactory 的代码片段如下:
- 1 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- Connection
有了 ConnectionFactory 我们就可以创建 Connection 了, Connection 表示的是一个虚拟的连接, 也就是代表着打开了一个由客户端到消息代理端的 socket 连接. Connection 可以用来创建 Session.
下面我们看看 ConnectionFactory 来创建 Connection 的示例:
1 Connection connection = connectionFactory.createConnection();
在使用 Connection 之前, 你必须先调用 start 方法开启连接
1 connection.start();
在使用完了之后, 你必须调用 close 方法关闭资源. 注意, close 方法会关闭 Connection 创建的 Session,MessageProducer,MessageConsumer. 另外, 如果 close 方法调用失败, 那么将会导致资源未被释放的问题.
但是, 如果你只是想暂时停止一下消息的传送, 那么可以调用 stop 方法, 而不是将 Connection 进行 close.
Session
session 是一个 Message 的生产和消费的上下文, 我们称作会话, 由 Connection 创建. session 可以创建 MessageProducer,MessageConsumer,Message,Destination.
我们创建一个 session
1 Session session = connection.createSession(false, Session.AUTO_ACKNOWEDGE);
第一个入参传入了 false, 表示不需要事务. 第二个入参表示消息被接收以后 session 会自动做 ack 确认操作. 如果要创建一个有事务的 session 呢?
1 Session session = connection.createSession(true, 0);
第一个参数 true 表示开启事务, 第二个参数表示不指定 ack 确认机制. 在业务代码完成以后, 需要显示提交事务
- 1 session.commit();
- Destination
一个 destination 表示的是生产者的消息发送目的地, 以及消费者消费消息的源头. 在点对点模式中, destination 又被称作 queue(队列). 在发布订阅模式中, destination 被称作 topic(话题).
Destination 由 session 创建, 创建一个 queue
1 Destination destination = session.createQueue("queue1");
创建一个 topic
- 1 Destination destination = session.createTopic("topic1");
- MessageProducer
MessageProducer 是由 session 创建的, 用于发送 Message 到 destination. 我们使用 session 创建一个 MessageProducer, 如下
1 MessageProducer producer = session.createProducer(destination);
如果你创建了一个 Message 对象, 你可以使用 MessageProducer 发送消息
- 1 producer.send(message);
- MessageConsumer
MessageConsumer 是由 session 创建的, 将会作为一个消费者消费 destination 中的 Message. 创建一个 MessageConsumer
1 MessageConsumer consumer = session.createConsumer(destination);
创建了消费者, 就可以消费消息了
1 Message message = consumer.receive();
receive 方法是同步消费消息的方法, 有时候我们不想等待那么久, 所以采用异步监听的方式, 如
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
这里假设 Listener 是实现了 MessageListener 接口的监听器, 当消息到达的时候 onMessage 方法将被触发.
Message
Message 表示具体的消息, JMS 定义了五种消息格式, 如:
TextMessage: 文本
MapMessage: 键值对
BytesMessage: 字节码
StreamMessage: 流
ObjectMessage: 对象
以 TextMessage 为例, 创建一个消息
- TextMessage message = session.createTextMessage();
- message.setText("text content");
- producer.send(message);
如果是 MessageConsumerreceive 消息
- Message message = consumer.receive();
- if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage)message;
- System.out.println("receive message" + message.getText());
- }
完整代码
生产
- // Create a ConnectionFactory
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- // Create a Connection
- Connection connection = connectionFactory.createConnection();
- connection.start();
- // Create a Session
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Create the destination (Topic or Queue)
- Destination destination = session.createQueue("TEST.FOO");
- // Create a MessageProducer from the Session to the Topic or Queue
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // Create a messages
- String text = "Hello world! From:" + Thread.currentThread().getName() + ":" + this.hashCode();
- TextMessage message = session.createTextMessage(text);
- // Tell the producer to send the message
- System.out.println("Sent message:"+ message.hashCode() + ":" + Thread.currentThread().getName());
- producer.send(message);
- // Clean up
- session.close();
- connection.close();
消费
- // Create a ConnectionFactory
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- // Create a Connection
- Connection connection = connectionFactory.createConnection();
- connection.start();
- connection.setExceptionListener(this);
- // Create a Session
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Create the destination (Topic or Queue)
- Destination destination = session.createQueue("TEST.FOO");
- // Create a MessageConsumer from the Session to the Topic or Queue
- MessageConsumer consumer = session.createConsumer(destination);
- // Wait for a message
- Message message = consumer.receive(1000);
- if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- String text = textMessage.getText();
- System.out.println("Received:" + text);
- } else {
- System.out.println("Received:" + message);
- }
- consumer.close();
- session.close();
- connection.close();
原文
JavaDoc
来源: http://www.bubuko.com/infodetail-3102996.html