消息的生产和消费
一, 队列 / 主题 对比
比较项目 | Topic 模式队列 | Queue 模式队列 |
---|---|---|
工作模式 | 订阅发布模式,如果有没有订阅者,消息将会被丢弃,如果有多个,消息将会全部都能收到 | “负载均衡模式”,如果没有消费者,消息不会被丢弃,如果有多个消费者,那么,一条消息也会发送给其中一个消费者,并且要求消费者 ack(签收)消息 |
有无状态 | 无状态 | Queue 数据默认在 mq 服务器上以文件的形式保存,比如 Active MQ 一般保存在 SAMQ_HOME\data\kr-store\data 下面,也可配置成 DB 储存 |
传递完整性 | 如果没有订阅者消息会被丢弃 | 消息不会丢其 |
处理效率 | 由于消息按照订阅者的数量进行复制,所以处理性能会随着订阅者的数量增加而明显降低,并且还要结合不同的消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显下降,当然不同的消息协议的具体性能也有差异 |
二, jms 是什么
JavaEE 是一套使用 Java 进行企业级应用开发的大家遵守的 13 个核心工业标准. JavaEE 平台提供的一组基于组件的方法来加快设计, 开发, 装配以及企业部署应用程序
JDBC(Java Database)数据库连接
JDNI(Java Naming and Directory)的名命和目录接口
EJB(Enterprise JavaBean)
RMI(Remote Method Invoke)远程方法调用
Java IDL(Interface Description Language)/CORBA(Common Object Broker Arcgitecture)接口定义语言 / 公用对象请求代理程序体系结构
- JSP(Java Server Page)
- Servlet
xml(Extensible Markup Language)可扩展标记语言
JMS(Java Transaction Service)Java 消息服务
JTA(Java Transaction API)Java 事务 API
JTS(Java Transaction Service)Java 事务服务
- JavaMail
- JSAF(JavaBean Activation Framework)
什么是 Java 消息服务
Java 消息服务是指两个应用程序之间进行异步通讯的 API, 它为标准消息协议和消息服务提供了一组通用接口, 包括创建, 发送, 读取消息, 用于 JAVA 的应用程序开发, 在 JavaEE 中, 当连个程序使用 JMS 进行通讯是, 他们之间并不是直接相连的, 二十通过消息的手法组件关联起来以达到解耦异步削峰的效果
jms 落地产品
- Active MQ
- Rabbit MQ(Erlang)
- Rocket MQ(阿里)
- Kafka(大数据)
jms 组成元素
Provider : MQ 服务器
Producer : 消息的生产者
Message : 消息
Consumer : 消费者
Message 格式
消息头
- TextMessage textMessage = session.createTextMessage("msg" + i);
- ?
- // 目的地队列或者主题
- textMessage.setJMSDestination();
- ?
- // 消息持久和非持久, 持久消息之发送一次, 宕机之后消息也不会丢失
- textMessage.setJMSDeliveryMode();
- ?
- // 过期时间 0 永不过期
- textMessage.setJMSExpiration();
- ?
- // 消息的优先级 0~9,0~4 是普通默认 4, 消息的加急
- textMessage.setJMSPriority();
- ?
- //id 唯一识别有中间件产生
- textMessage.setJMSMessageID();
- ?
- //send 也可以统一的批处理
- textMessage.send();
消息体
封装具体的消息数据
- // 普通字符串
- session.createTextMessage();
- // 二进制数组消息包含一个 byte[]
- session.createBytesMessage();
- //java 数据流消息, 标准流炒作来顺序的填充和读取
- session.createStreamMessage();
- // 一个 map 类型的下拍戏, key 为 string, 值为 java 的基本类型
- session.createMapMessage();
- // 对象消息包含一个可序列化的 java 对象
- session.createObjectMessage();
消息属性
消息头的扩展
- // 消息属性
- textMessage.setStringProperty();
- textMessage.setBooleanProperty();
可靠性之非持久化
持久
事务
事务偏生产者, 签收偏消费者
签收
整合 spring/springboot
activemq 的传输协议
nio
tcp
持久化机制
kahadb:5.4'以后的默认基于日志文件的消息存储(类似于 Redis 持久化的 aof)
消息存储使用一个事务日志, 和仅仅用一个索引文件来储存他所有的地址
- <persistenceAdapter>
- <kahaDB direcory="${activemq.data}/kahadb"></kahaDB>
- </persistenceAdapter>
- jdbc
环境搭建
添加 MySQL 驱动包 (包括连接池 druid) 到 mq 的 lib 文件夹
jdbc 持久化配置
- <persistenceAdapter>
- <!-- 默认的配置
- <kahaDB directory="${activemq.data}/kahadb"/>
- -->
- <!-- 配置 jdbc 持久化的方法 -->
- <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
- </persistenceAdapter>
创建数据源在 active.xml,broker 标签之后. import 标签之前
- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
- <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
- <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
- <property name="username" value="root"/>
- <property name="password" value="root"/>
- <property name="poolPreparedStatements" value="true"/>
- </bean>
创建数据库 activemq
启动 MQ 然后查看数据库
来源: http://www.bubuko.com/infodetail-3340301.html