第一, 生产者消费者的实际的应用
使用的分布式消息队列, 比如 ActiveMQ,RabbitMQ 等等, 消息队列的是有可以使得程序之间解耦, 提升程序响应效率.
如果我们把多线程环境比作是分布式的话, 那么线程与线程之间也可以用这种消息队列的方式进行数据通信和解耦.
第二, 阻塞队列的使用案例
注册成功之后新增积分
假如我们模拟一个场景, 就是用户注册的时候, 在注册的时候成功以后发放积分. 这个场景一般来说, 会这么去实现
但是实际上, 我们需要考虑两个问题
1. 性能, 在注册这个环节里面, 假如添加用户需要花费 1 秒钟, 增加积分需要花费 1 秒钟, 那么整个注册结果的返回就可能需要大于 2 秒, 虽然影响不是很大,
但是在量比较大的时候, 也需要做一些优化
2. 耦合, 添加用户和增加积分, 可以认为是两个领域, 也就是说, 增加积分并不是注册必须要具备的功能, 但是一旦增加积分这个逻辑出现异常, 就会导致注册失败.
这种耦合在程序设计的时候是一定要规避的因此可以通过异步的方式来实现
3. 代码实现
- public class User {
- private String userName;
- public String getUserName() {
- return userName;
- }
- public void setUserName(String userName) {
- this.userName = userName;
- }
- @Override
- public String toString() {
- return "User{" + "userName='" + userName + '\'' + '}';
- }
- }
- public class UserService<T> {
- private final ExecutorService single =
- Executors.newSingleThreadExecutor();
- private volatile boolean isRunning = true;
- BlockingQueue<T> queue = new LinkedBlockingQueue(3);
- {
- init();
- }
- public void init(){
- single.execute(()->{
- while (isRunning){
- try {
- // 阻塞获取
- User user = (User) queue.poll(2, TimeUnit.SECONDS);
- if(user==null){
- isRunning = false;
- break;
- }
- sendPoints(user);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- public void register(){
- User user = new User();
- user.setUserName("name");
- addUser(user);
- }
- /**
- * 添加用户
- * @param user
- */
- private void addUser(User user) {
- System.out.println("添加用户成功"+user);
- }
- /**
- * 添加积分成功
- * @param user
- */
- private void sendPoints(User user){
- System.out.println("添加积分成功"+user);
- }
- }
- public class Test001 {
- public static void main(String[] args) {
- UserService userService = new UserService();
- userService.register();
- }
- }
第三. 阻塞队列的应用场景
阻塞队列这块的应用场景, 比较多的仍然是对于生产者消费者场景的应用, 但是由于分布式架构的普及, 更多的关注在分布式消息队列上. 所以其实如果把阻塞队
列比作成分布式消息队列的话, 那么所谓的生产者和消费者其实就是基于阻塞队列的解耦. 另外, 阻塞队列是一个 fifo 的队列, 所以对于希望在线程级别需要实现对
目标服务的顺序访问的场景中, 也可以使用
第四. JUC 中的阻塞队列
- public boolean offer(E e) {
- //1, 如果元素为 null, 抛出空指针异常
- if (e == null) throw new NullPointerException();
- //2, 如果当前的队列的满了, 则丢弃要放入的元素, 返回 false
- final AtomicInteger count = this.count;
- if (count.get() == capacity)
- return false;
- int c = -1;
- //3, 构造新节点, 获取 putLock 锁
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- // 如果队列没有满, 则进队列, 并递增元素计较
- if (count.get() <capacity) {
- enqueue(node);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- return c>= 0;
- }
- /**
- * 生产者
- */
- public class ProducerThread implements Runnable {
- private BlockingQueue<String> queue;
- private String data;
- public ProducerThread(BlockingQueue<String> queue,String data) {
- this.queue = queue;
- this.data = data;
- }
- @Override
- public void run() {
- System.out.println("生产者启动 ======");
- boolean offer = queue.offer(data);
- if (offer) {
- System.out.println(Thread.currentThread().getName()
- +"生产队列成功"+data+"成功");
- }else{
- System.out.println(Thread.currentThread().getName()
- +"生产队列成功"+data+"失败");
- }
- System.out.println("生产者关闭");
- }
- }
- /**
- * 消费者
- */
- public class ConsumerThread implements Runnable {
- private BlockingQueue<String> queue;
- private boolean flag = true;
- public ConsumerThread(BlockingQueue<String> queue) {
- this.queue = queue;
- }
- @Override
- public void run() {
- while (flag){
- System.out.println("消费者启动 ======");
- String data = null;
- try {
- data = queue.poll();
- if(data!=null){
- System.out.println("消费者队列成功"+data);
- }else{
- System.out.println("消费队列失败");
- flag = false;
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- }
- }
- }
- }
- public class Test001 {
- public static void main(String[] args) {
- BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
- new Thread(new ProducerThread(blockingQueue,"A")).start();
- new Thread(new ConsumerThread(blockingQueue)).start();
- }
- }
来源: http://www.bubuko.com/infodetail-3164013.html