前言
本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识, 顺便谈谈其在高并发和秒杀系统中的具体应用
1. 预备示例
想了下, 还是先抛出一个简单示例, 随后再根据其具体应用场景进行扩展, 我觉得这样表述条理更清晰些
- RabbitConfig:
- @Configuration
- public class RabbitConfig {
- @Bean
- public Queue callQueue() {
- return new Queue(MQConstant.CALL);
- }
- }
- Client:
- @Component
- public class Client {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendCall(String content) {
- for (int i = 0; i < 10000; i++) {
- String message = i + "-" + content;
- System.out.println(String.format("Sender: %s", message));
- rabbitTemplate.convertAndSend(MQConstant.CALL, message);
- }
- }
- }
- Server:
- @Component
- public class Server {
- @RabbitHandler
- @RabbitListener(queues = MQConstant.CALL)
- public void callProcess(String message) throws InterruptedException {
- Thread.sleep(100);
- System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
- }
- }
- Result:
- Sender: Hello, are you there!
- Receiver: reply("Hello, are you there!") Yes, I just saw your message!
以上示例会在 rabbitmq 中创建一条队列 CALL, 消息在其中等待消费:
在此基础上的简单扩展我就不再写案例了, 比如领域模块完成了其核心业务规则之后可能需要更新缓存写个邮件记个复杂日志做个统计报表等等, 这些不需要及时反馈或者耗时的附属业务都可以通过异步队列分发, 以此来提升核心业务的响应速度, 同时如此处理能让领域边界更加清晰, 代码的可维护性和持续拓展的能力也会有所提升
2. 削峰
上个示例中我提到的应用场景是解耦和通知, 再接着扩展, 因其具备良好的缓冲性质, 所以还有一个非常适合的应用场景那就是削峰对于突如其来的极高并发请求, 我们可以先瞬速地将其加入队列并回复用户一个友好提示, 然后服务器可在其能承受的范围内慢慢处理, 以此来防止突发的 CPU 和内存 爆表
改造之后对于发送方来说当然是比较爽的, 他只是将请求加入消息队列而已, 处理压力都归到了消费端接着思考, 这样处理有没有副作用? 如果这个请求刚好是线程阻塞的, 那还要加入队列慢慢排队处理, 那不是完蛋了, 用户要猴年马月才能得到反馈? 所以针对此, 我觉得应该将消费端的方法改为异步调用 (即多线程) 以提升吞吐量, 在 Spring Boot 中的写法也非常简单:
- @Component
- public class Server {
- @Async
- @RabbitHandler
- @RabbitListener(queues = MQConstant.CALL)
- public void callProcess(String message) throws InterruptedException {
- Thread.sleep(100);
- System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
- }
- }
参照示例一的方法, 我发布了 10000 条消息加入队列, 且消费端的调用每次阻塞一秒, 那可有意思了, 什么时候能处理完? 但如果开几百个线程同时处理的话, 那几十秒就够了, 当然具体多少合适还应根据具体的业务场景和服务器配置酌情考虑另外, 别忘了配线程池:
- @Configuration
- public class AsyncConfig {
- @Bean
- public Executor asyncExecutor(){
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(10);
- executor.setMaxPoolSize(500);
- executor.setQueueCapacity(10);
- executor.setThreadNamePrefix("MyExecutor-");
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- executor.initialize();
- return executor;
- }
- }
- 3. Exchange
RabbitMQ 可能为 N 个应用同时提供服务, 要是你和你的蓝颜知己突然心有灵犀, 在不同的业务上使用了同一个 routingKey, 想想就刺激因此, 队列多了自然要进行分组管理, 限定好 Exchange 的规则, 接下来就可以独自玩耍了
- MQConstant:
- public class MQConstant {
- public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";
- public static final String CALL = MQConstant.EXCHANGE + ".CALL";
- public static final String ALL = MQConstant.EXCHANGE + ".#";
- }
- RabbitConfig:
- @Configuration
- public class RabbitConfig {
- @Bean
- public Queue callQueue() {
- return new Queue(MQConstant.CALL);
- }
- @Bean
- TopicExchange exchange() {
- return new TopicExchange(MQConstant.EXCHANGE);
- }
- @Bean
- Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
- return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
- }
- }
此时我们再去查队列 CALL, 可以看到已经绑定了 Exchange:
当然 Exchange 的作用远不止如此, 以上示例为 Topic 模式, 除此之外还有 DirectHeaders 和 Fanout 模式, 写法都差不多, 感兴趣的童鞋可以去查看 官方文档 进行更深入了解
4. 延时队列
延时任务的场景相信小伙伴们都接触过, 特别是抢购的时候, 在规定时间内未付款订单就被回收了微信支付的 API 里面也有一个支付完成后的延时再确认消息推送, 实现原理应该都差不多
利用 RabbitMQ 实现该功能首先要了解他的两个特性, 分别是 Time-To-Live Extensions 和 Dead Letter Exchanges, 字面意思上就能理解个大概, 一个是生存时间, 一个是死信整个过程也很容易理解, TTL 相当于一个缓冲队列, 等待其过期之后消息会由 DLX 转发到实际消费队列, 如此便实现了他的延时过程
- MQConstant:
- public class MQConstant {
- public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";
- public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
- public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";
- public static final String CALL = "CALL";
- }
- ExpirationMessagePostProcessor:
- public class ExpirationMessagePostProcessor implements MessagePostProcessor {
- private final Long ttl;
- public ExpirationMessagePostProcessor(Long ttl) {
- this.ttl = ttl;
- }
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties()
- .setExpiration(ttl.toString());
- return message;
- }
- }
- Client:
- @Component
- public class Client {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendCall(String content) {
- for (int i = 1; i <= 3; i++) {
- long expiration = i * 5000;
- String message = i + "-" + content;
- System.out.println(String.format("Sender: %s", message));
- rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));
- }
- }
- }
- Server:
- @Component
- public class Server {
- @Async
- @RabbitHandler
- @RabbitListener(queues = MQConstant.CALL)
- public void callProcess(String message) throws InterruptedException {
- String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
- System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date));
- }
- }
- Result:
- Sender: 1-Hello, are you there!
- Sender: 2-Hello, are you there!
- Sender: 3-Hello, are you there!
- Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
- Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
- Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22
结果一目了然, 分别在队列中延迟了 5 秒, 10 秒, 15 秒, 当然, 以上只是我的简单示例, 童鞋们可翻阅官方文档 ( ttl && dlx ) 进一步深入学习
来源: https://www.cnblogs.com/youclk/p/8650100.html