这里有新鲜出炉的 Java 并发编程示例,程序狗速度看过来!
java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的 Java 程序设计语言和 Java 平台(即 JavaEE(j2ee), JavaME(j2me), JavaSE(j2se))的总称。
这篇文章主要介绍了详解 Java 阻塞队列 (BlockingQueue) 的实现原理,阻塞队列是 Java util.concurrent 包下重要的数据结构,有兴趣的可以了解一下
阻塞队列 (BlockingQueue) 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。
BlockingQueue 的操作方法BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
四组不同的行为方式解释:无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高 (译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。
BlockingQueue 的实现类BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue,Java.util.concurrent 包下具有以下 BlockingQueue 接口的实现类:
使用例子:
阻塞队列的最长使用的例子就是生产者消费者模式, 也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便,代码如下:
阻塞队列原理:
- package MyThread;
- import java.util.Random;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- public class BlockingQueueTest {
- //生产者
- public static class Producer implements Runnable {
- private final BlockingQueue < Integer > blockingQueue;
- private volatile boolean flag;
- private Random random;
- public Producer(BlockingQueue < Integer > blockingQueue) {
- this.blockingQueue = blockingQueue;
- flag = false;
- random = new Random();
- }
- public void run() {
- while (!flag) {
- int info = random.nextInt(100);
- try {
- blockingQueue.put(info);
- System.out.println(Thread.currentThread().getName() + " produce " + info);
- Thread.sleep(50);
- } catch(InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- public void shutDown() {
- flag = true;
- }
- }
- //消费者
- public static class Consumer implements Runnable {
- private final BlockingQueue < Integer > blockingQueue;
- private volatile boolean flag;
- public Consumer(BlockingQueue < Integer > blockingQueue) {
- this.blockingQueue = blockingQueue;
- }
- public void run() {
- while (!flag) {
- int info;
- try {
- info = blockingQueue.take();
- System.out.println(Thread.currentThread().getName() + " consumer " + info);
- Thread.sleep(50);
- } catch(InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- public void shutDown() {
- flag = true;
- }
- }
- public static void main(String[] args) {
- BlockingQueue < Integer > blockingQueue = new LinkedBlockingQueue < Integer > (10);
- Producer producer = new Producer(blockingQueue);
- Consumer consumer = new Consumer(blockingQueue);
- //创建5个生产者,5个消费者
- for (int i = 0; i < 10; i++) {
- if (i < 5) {
- new Thread(producer, "producer" + i).start();
- } else {
- new Thread(consumer, "consumer" + (i - 5)).start();
- }
- }
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- producer.shutDown();
- consumer.shutDown();
- }
- }
其实阻塞队列实现阻塞同步的方式很简单,使用的就是是 lock 锁的多条件(condition)阻塞控制。使用 BlockingQueue 封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的 await/signal 操作了。
下面是 Jdk 1.7 中 ArrayBlockingQueue 部分代码:
双端阻塞队列(BlockingDeque)
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- //创建数组
- this.items = new Object[capacity];
- //创建锁和阻塞条件
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
- //添加元素的方法
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length)
- notFull.await();
- //如果队列不满就入队
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
- //入队的方法
- private void enqueue(E x) {
- final Object[] items = this.items;
- items[putIndex] = x;
- if (++putIndex == items.length)
- putIndex = 0;
- count++;
- notEmpty.signal();
- }
- //移除元素的方法
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0)
- notEmpty.await();
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
- //出队的方法
- private E dequeue() {
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- E x = (E) items[takeIndex];
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;
- if (itrs != null)
- itrs.elementDequeued();
- notFull.signal();
- return x;
concurrent 包下还提供双端阻塞队列(BlockingDeque),和 BlockingQueue 是类似的,只不过 BlockingDeque 提供从任意一端插入或者抽取元素的队列。
来源: http://www.phperz.com/article/17/1231/356725.html