- importorg.apache.activemq.ActiveMQConnectionFactory;
- importjavax.jms.*;
- importjava.io.IOException;
- public classJmsConsumer {
- public static finalString ACTIVEMQ_URL= "tcp://192.168.225.132:61616";
- public static voidmain(String[] args) throwsJMSException, IOException {
- //1 按照给定的 url 地址, 采用默认的用户名密码创建连接工厂 (有三个参数的构造方法, 其中两个参数是用户名密码)ActiveMQConnectionFactory activeMQConnectionFactory = newActiveMQConnectionFactory(ACTIVEMQ_URL);
- //2 通过连接工厂获得连接 Connection connection = activeMQConnectionFactory.createConnection();
- //3 启动访问, 真正建立连接, 需要确保防火墙, 服务端配置等都正确, 能正常访问 connection.start();
- //4 创建会话 session,false 代表不开启事物, Session.AUTO_ACKNOWLEDGE 代表自动签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //5 创建目的地 (具体是队列 queue 还是主题 topic), 参数是队列的名称 // 返回类型可以用 Queue 接收, 也可以用 Destination 接收, 因为 Queue 继承 DestinationQueue queue = session.createQueue("queue1");
- MessageConsumer consumer = session.createConsumer(queue);
- //6 消费者消费消息 /* while(true){
- TextMessage message = (TextMessage)consumer.receive();
- //TextMessage message = (TextMessage)consumer.receive(5000L);
- if(null == message){
- break;
- }
- String s = message.getText();
- System.out.println(s);
- }*///6 以监听器的形式监听并消费消息 consumer.setMessageListener(newMessageListener() {
- @Override
- public voidonMessage(Message message) {
- if(message != null&& message instanceofTextMessage){
- TextMessage textMessage = (TextMessage)message;
- try{
- System.out.println(textMessage.getText());
- } catch(JMSException e) {
- e.printStackTrace();
- }
- }
- }
- });
- //7 如果以监听器的形式监听消息, 则需要此行代码, 代表不关闭消费者, 否则有可能上边还没有进行消费, 下边就把资源都关闭了 // 会抛出 IOExceptionSystem.in.read();
- //8 关闭资源 consumer.close();
- session.close();
- connection.close();
- System.out.println("消费者从 activemq 消费消息完成!");
- }
- }
来源: http://www.bubuko.com/infodetail-3345015.html