ActiveMQ 官网下载地址:
ActiveMQ 提供了 Windows 和 Linux、Unix 等几个版本,楼主这里选择了 Linux 版本下进行开发。
下载完安装包,解压之后的目录:
从它的目录来说,还是很简单的:
进入到 ActiveMQ 安装目录的 Bin 目录,linux 下输入 ./activemq start 启动 activeMQ 服务。
输入命令之后,会提示我们创建了一个进程 IP 号,这时候说明服务已经成功启动了。
ActiveMQ 默认启动时,启动了内置的 jetty 服务器,提供一个用于监控 ActiveMQ 的 admin 应用。
admin:
我们在浏览器打开链接之后输入账号密码(这里和 tomcat 服务器类似)
默认账号:admin
密码:admin
到这里为止,ActiveMQ 服务端就启动完毕了。
ActiveMQ 在 linux 下的终止命令是 ./activemq stop
项目目录结构:
上述在官网下载 ActiveMq 的时候,我们可以在目录下看到一个 jar 包:
这个 jar 包就是我们需要在项目中进行开发中使用到的相关依赖。
- public class Producter {
- //ActiveMq 的默认用户名
- private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
- //ActiveMq 的默认登录密码
- private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- //ActiveMQ 的链接地址
- private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
- AtomicInteger count = new AtomicInteger(0);
- //链接工厂
- ConnectionFactory connectionFactory;
- //链接对象
- Connection connection;
- //事务管理
- Session session;
- ThreadLocal threadLocal = new ThreadLocal<>();
- public void init(){
- try {
- //创建一个链接工厂
- connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
- //从工厂中创建一个链接
- connection = connectionFactory.createConnection();
- //开启链接
- connection.start();
- //创建一个事务(这里通过参数可以设置事务的级别)
- session = connection.createSession(true,Session.SESSION_TRANSACTED);
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- public void sendMessage(String disname){
- try {
- //创建一个消息队列
- Queue queue = session.createQueue(disname);
- //消息生产者
- MessageProducer messageProducer = null;
- if(threadLocal.get()!=null){
- messageProducer = threadLocal.get();
- }else{
- messageProducer = session.createProducer(queue);
- threadLocal.set(messageProducer);
- }
- while(true){
- Thread.sleep(1000);
- int num = count.getAndIncrement();
- //创建一条消息
- TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
- "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
- System.out.println(Thread.currentThread().getName()+
- "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
- //发送消息
- messageProducer.send(msg);
- //提交事务
- session.commit();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- public class Comsumer {
- private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
- private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
- ConnectionFactory connectionFactory;
- Connection connection;
- Session session;
- ThreadLocal threadLocal = new ThreadLocal<>();
- AtomicInteger count = new AtomicInteger();
- public void init(){
- try {
- connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- public void getMessage(String disname){
- try {
- Queue queue = session.createQueue(disname);
- MessageConsumer consumer = null;
- if(threadLocal.get()!=null){
- consumer = threadLocal.get();
- }else{
- consumer = session.createConsumer(queue);
- threadLocal.set(consumer);
- }
- while(true){
- Thread.sleep(1000);
- TextMessage msg = (TextMessage) consumer.receive();
- if(msg!=null) {
- msg.acknowledge();
- System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
- }else {
- break;
- }
- }
- } catch (JMSException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- public class TestMq {
- public static void main(String[] args){
- Producter producter = new Producter();
- producter.init();
- TestMq testMq = new TestMq();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //Thread 1
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 2
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 3
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 4
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 5
- new Thread(testMq.new ProductorMq(producter)).start();
- }
- private class ProductorMq implements Runnable{
- Producter producter;
- public ProductorMq(Producter producter){
- this.producter = producter;
- }
- @Override
- public void run() {
- while(true){
- try {
- producter.sendMessage("Jaycekon-MQ");
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
运行结果:
- INFO | Successfully connected to tcp: //localhost:61616
- Thread - 6productor: 我是大帅哥,我现在正在生产东西!,
- count: 0 Thread - 4productor: 我是大帅哥,我现在正在生产东西!,
- count: 1 Thread - 2productor: 我是大帅哥,我现在正在生产东西!,
- count: 3 Thread - 5productor: 我是大帅哥,我现在正在生产东西!,
- count: 2 Thread - 3productor: 我是大帅哥,我现在正在生产东西!,
- count: 4 Thread - 6productor: 我是大帅哥,我现在正在生产东西!,
- count: 5 Thread - 3productor: 我是大帅哥,我现在正在生产东西!,
- count: 6 Thread - 5productor: 我是大帅哥,我现在正在生产东西!,
- count: 7 Thread - 2productor: 我是大帅哥,我现在正在生产东西!,
- count: 8 Thread - 4productor: 我是大帅哥,我现在正在生产东西!,
- count: 9 Thread - 6productor: 我是大帅哥,我现在正在生产东西!,
- count: 10 Thread - 3productor: 我是大帅哥,我现在正在生产东西!,
- count: 11 Thread - 5productor: 我是大帅哥,我现在正在生产东西!,
- count: 12 Thread - 2productor: 我是大帅哥,我现在正在生产东西!,
- count: 13 Thread - 4productor: 我是大帅哥,我现在正在生产东西!,
- count: 14 Thread - 6productor: 我是大帅哥,我现在正在生产东西!,
- count: 15 Thread - 3productor: 我是大帅哥,我现在正在生产东西!,
- count: 16 Thread - 5productor: 我是大帅哥,我现在正在生产东西!,
- count: 17 Thread - 2productor: 我是大帅哥,我现在正在生产东西!,
- count: 18 Thread - 4productor: 我是大帅哥,我现在正在生产东西!,
- count: 19
- public class TestConsumer {
- public static void main(String[] args){
- Comsumer comsumer = new Comsumer();
- comsumer.init();
- TestConsumer testConsumer = new TestConsumer();
- new Thread(testConsumer.new ConsumerMq(comsumer)).start();
- new Thread(testConsumer.new ConsumerMq(comsumer)).start();
- new Thread(testConsumer.new ConsumerMq(comsumer)).start();
- new Thread(testConsumer.new ConsumerMq(comsumer)).start();
- }
- private class ConsumerMq implements Runnable{
- Comsumer comsumer;
- public ConsumerMq(Comsumer comsumer){
- this.comsumer = comsumer;
- }
- @Override
- public void run() {
- while(true){
- try {
- comsumer.getMessage("Jaycekon-MQ");
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
运行结果:
- INFO | Successfully connected to tcp://localhost:61616
- Thread-2: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我现在正在生产东西!,count:4--->0
- Thread-3: Consumer:我是消费者,我正在消费MsgThread-4productor:我是大帅哥,我现在正在生产东西!,count:36--->1
- Thread-4: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我现在正在生产东西!,count:38--->2
- Thread-5: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我现在正在生产东西!,count:37--->3
- Thread-2: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我现在正在生产东西!,count:2--->4
- Thread-3: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我现在正在生产东西!,count:40--->5
- Thread-4: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我现在正在生产东西!,count:42--->6
- Thread-5: Consumer:我是消费者,我正在消费MsgThread-4productor:我是大帅哥,我现在正在生产东西!,count:41--->7
- Thread-2: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我现在正在生产东西!,count:1--->8
- Thread-3: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我现在正在生产东西!,count:44--->9
- Thread-4: Consumer:我是消费者,我正在消费MsgThread-4productor:我是大帅哥,我现在正在生产东西!,count:46--->10
- Thread-5: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我现在正在生产东西!,count:45--->11
- Thread-2: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我现在正在生产东西!,count:3--->12
- Thread-3: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我现在正在生产东西!,count:48--->13
- Thread-4: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我现在正在生产东西!,count:50--->14
- Thread-5: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我现在正在生产东西!,count:49--->15
- Thread-4: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我现在正在生产东西!,count:54--->16
- Thread-2: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我现在正在生产东西!,count:6--->17
- Thread-3: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我现在正在生产东西!,count:52--->18
- Thread-5: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我现在正在生产东西!,count:53--->19
- Thread-4: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我现在正在生产东西!,count:58--->20
查看运行结果,我们可以做 ActiveMQ 服务端: 里面的 Queues 中查看我们生产的消息。
关于 JMS(Java 消息服务) 的一些概述可以参考我的上一篇博客:
来源: