沿用昨天的代码, 先定义交换机名称和 routing key 名称
- public interface UserCenterMq {
- /**
- * 用户系统 exchange 名
- */
- String MQ_EXCHANGE_USER = "user.topic.exchange";
- /**
- * 发送红包 routing key
- */
- String ROUTING_KEY_POST_REDPACKET = "post.redpacket";
- }
写 RabbitMQ 的配置文件
- @Configuration public class RabbitmqConfig {
- /**
- * 红包队列名
- */
- public static final String RED_PACKET_QUEUE = "red.packet.queue";
- /**
- * 声明队列, 此队列用来接收用户注册的消息
- *
- * @return
- */
- @Bean public Queue redPacketQueue() {
- Queue queue = new Queue(RED_PACKET_QUEUE);
- return queue;
- }@Bean public TopicExchange userTopicExchange() {
- return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER);
- }
- /**
- * 将红包队列和用户的 exchange 做个绑定
- *
- * @return
- */
- @Bean public Binding bindingRedPacket() {
- Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange()).with(UserCenterMq.ROUTING_KEY_POST_REDPACKET);
- return binding;
- }
- }
修改事务侦听代码 (事务确认完成后发送消息到 MQ)
- @Component
- public class UserTransactionEventListener {
- @Autowired
- private AmqpTemplate rabbitTemplate;
- @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
- public void beforeCommit(PayloadApplicationEvent<User> event) {
- System.out.println("before commit, id:" + event.getPayload().getId());
- }
- @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
- public void afterCommit(PayloadApplicationEvent<User> event) {
- System.out.println("after commit, id:" + event.getPayload().getId());
- }
- @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
- public void afterCompletion(PayloadApplicationEvent<User> event) {
- System.out.println("after completion, id:" + event.getPayload().getId());
- // 事务完成后发送消息, 消息为 user 对象
- rabbitTemplate.convertAndSend(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload()));
- }
- @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
- public void afterRollback(PayloadApplicationEvent<User> event) {
- System.out.println("after rollback, id:" + event.getPayload().getId());
- }
- }
在 RabbitMQ 的管理界面内可以看到
已经发送了一个消息到该队列, 即这个 user 对象.
在红包模块中, 我们来监听这个消息队列完成分布式事务.
- model@Data public class RedPacket implements Serializable {
- private long redPacketId;
- private long userId;
- private Double redPacketAmount;
- }
- dao(此处有一个red_packet表, 3个字段, 1个自增)@Mapper public interface RedPacketDao {@Options(useGeneratedKeys = true, keyProperty = "red_packet_id")@Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})") void add(RedPacket redPacket);
- }
- service public interface RedPacketService {
- public void add(RedPacket redPacket);
- }@Transactional@Service public class RedPacketServiceImpl implements RedPacketService {@Autowired private RedPacketDao redPacketDao;@Override public void add(RedPacket redPacket) {
- redPacketDao.add(redPacket);
- }
- }
RabbitMQ 消费者 (此处为一注册用户就发一个十块以内的随机红包)
- @Component
- @RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE)
- public class PostRedPacketConsumer {
- @Autowired
- private RedPacketService redPacketService;
- @RabbitHandler
- public void postRedPacket(String userStr) {
- User user = JSONObject.parseObject(userStr,User.class);
- RedPacket redPacket = new RedPacket();
- redPacket.setUserId(user.getId());
- redPacket.setRedPacketAmount((double)(Math.random() * 10));
- redPacketService.add(redPacket);
- }
- }
运行后, 该队列被消费掉
红包表增加数据
来源: https://yq.aliyun.com/articles/641601