上一篇文章中我们将 ThreadPoolExecutor 进行了深入的学习和介绍,实际上我们在项目中应用的时候非常少有直接应用 ThreadPoolExecutor 来创建线程池的。在 jdk 的 api 中有这么一句话 " 可是,强烈建议程序猿使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,能够进行自己主动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景提前定义了设置。
" 所以这篇文章我们继续学习其他几种线程池。
创建一个可缓存的线程池。即这个线程池是无界线程池。无界指工作线程的创建数量差点儿没有限制 (事实上也有限制的, 数目为 Interger.MAX_VALUE), 这样能够灵活的往线程池中加入数据;能够进行自己主动线程回收指的是假设长时间没有往线程池中提交任务。即假设工作线程空暇了指定的时间,则该工作线程将自己主动终止。终止后。假设你又提交了新的任务。则线程池又一次创建一个工作线程。
我们一般使用例如以下代码进行创建:
- ExecutorServiceservice = Executors.newCachedThreadPool();
我们点击代码进入源代码:
- /**
- * Creates a thread pool that creates newthreads as needed, but
- * will reuse previously constructedthreads when they are
- * available. These pools will typically improve theperformance
- * of programs that execute manyshort-lived asynchronous tasks.
- * Calls to[email protected]execute} will reusepreviously constructed
- * threads if available. If no existingthread is available, a new
- * thread will be created and added to thepool. Threads that have
- * not been used for sixty seconds areterminated and removed from
- * the cache. Thus, a pool that remainsidle for long enough will
- * not consume any resources. Note thatpools with similar
- * properties but different details (forexample, timeout parameters)
- * may be created using[email protected]} constructors.
- *
- * @return the newly created thread pool
- */
- public static ExecutorServicenewCachedThreadPool() {
- return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
- 60L,TimeUnit.SECONDS,
- newSynchronousQueue());
- }
看到代码有没有非常熟悉。调用的上我们上一篇文章中的 ThreadPoolExecutor 类的构造方法,仅仅只是核心线程数为 0。同一时候指定一个最大线程数。
固定大小线程池这个非常好理解。就是创建一个指定工作线程数量的线程池,假设线程达到设置的最大数。就将提交的任务放到线程池的队列中。一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的长处。
但在线程池空暇时,即线程池中没有可执行任务时,它不会释放工作线程,还会占用一定的系统资源。
一般创建:
- ExecutorServicenewFixedThreadPool=Executors.newFixedThreadPool(5);
点击进入源代码:
- /**
- * Creates a thread pool that reuses afixed number of threads
- * operating off a shared unboundedqueue. At any point, at most
- *[email protected]nThreads} threads will be activeprocessing tasks.
- * If additional tasks are submitted whenall threads are active,
- * they will wait in the queue until athread is available.
- * If any thread terminates due to afailure during execution
- * prior to shutdown, a new one will takeits place if needed to
- * execute subsequent tasks. The threads in the pool will exist
- * until it is explicitly[email protected]#shutdown shutdown}.
- *
- * @param nThreads the number of threads inthe pool
- * @return the newly created thread pool
- * @throws IllegalArgumentException[email protected]nThreads <= 0}
- */
- public static ExecutorServicenewFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads,nThreads,
- 0L,TimeUnit.MILLISECONDS,
- newLinkedBlockingQueue());
- }
调用的依旧是我们上一篇文章中的 ThreadPoolExecutor 类的构造方法,仅仅只是核心线程数为和最大线程数一样都是我们人为指定的。
单线程线程池,仅仅创建唯一的线程来运行任务。假设这个线程异常结束。会有还有一个代替它,保证顺序运行。
一般创建方法:
点击进入源代码:
- ExecutorServicenewSingleThreadExecutor = Executors.newSingleThreadExecutor();
调用的依旧是我们上一篇文章中的 ThreadPoolExecutor 类的构造方法,仅仅只是核心线程数为和最大线程数一样都是 1。
- /**
- * Creates an Executor that uses a singleworker thread operating
- * off an unbounded queue. (Note howeverthat if this single
- * thread terminates due to a failureduring execution prior to
- * shutdown, a new one will take its placeif needed to execute
- * subsequent tasks.) Tasks are guaranteed to execute
- * sequentially, and no more than one taskwill be active at any
- * given time. Unlike the otherwiseequivalent
- *[email protected]newFixedThreadPool(1)} thereturned executor is
- * guaranteed not to be reconfigurable touse additional threads.
- *
- * @return the newly createdsingle-threaded Executor
- */
- public static ExecutorServicenewSingleThreadExecutor() {
- return newFinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L,TimeUnit.MILLISECONDS,
- newLinkedBlockingQueue()));
- }
介绍到这里我们发现这三个线程池调用的都是 ThreadPoolExecutor 的构造函数。这三个线程的差别除了核心线程数和最大线程数參数不一样外,最重要的是传入的最后一个參数即 workQueue 是不一样的。
newCachedThreadPool 的參数为 SynchronousQueue,newFixedThreadPool 和 newSingleThreadExecutor 的參数都为 LinkedBlockingQueue。事实上这一种排队策略也叫堵塞队列。那接下来我们就来介绍一下常见的堵塞队列。
堵塞队列顾名思义首先它是一个队列,常见的队列有 "后进先出" 的栈和 "先进先出" 的队列。多线程环境中,通过队列能够非常 easy 实现数据共享,最经典的就是 "生产者" 和 "消费者" 模型。这就是一个典型的堵塞队列,比方生产者生产到一定程度必须停一下,让生产者线程挂起,这就是堵塞。
在多线程领域:所谓堵塞,在某些情况下会挂起线程(即堵塞)。一旦条件满足。被挂起的线程又会自己主动被唤醒)
java.util.concurrent 包中的 BlockingQueue 就是堵塞队列的接口,作为 BlockingQueue 的使用者,我们再也不须要关心什么时候须要堵塞线程。什么时候须要唤醒线程,由于这一切 BlockingQueue 都给你一手包办了,而且它还是线程安全的。那我们如今来看下 BlockingQueue 接口的源代码:
- public interfaceBlockingQueue extends Queue {
- boolean add(E e);
- boolean offer(E e);
- void put(E e) throws InterruptedException;
- boolean offer(E e, long timeout, TimeUnitunit)
- throws InterruptedException;
- E take() throws InterruptedException;
- E poll(long timeout, TimeUnit unit)
- throws InterruptedException;
- int remainingCapacity();
- boolean remove(Object o);
- public boolean contains(Object o);
- int drainTo(Collection
- super E> c);
- int drainTo(Collection<
- super E> c,int maxElements);
- }
上面就是接口的全部方法,如今我们就介绍下这个接口中的核心方法:
- boolean add(E e);
这种方法将将泛型对象加到 BlockingQueue 里, 即假设 BlockingQueue 能够容纳, 则返回 true, 否则返回 false.(本方法不堵塞当前运行方法的线程)
- boolean offer(E e);
这种方法将将泛型对象加到 BlockingQueue 里, 即假设 BlockingQueue 能够容纳, 则返回 true, 否则返回 false.(本方法不堵塞当前运行方法的线程)
- boolean offer(E e, long timeout, TimeUnitunit)throws InterruptedException;
这种方法能够设定等待的时间,假设在指定的时间内,还不能往队列中增加 BlockingQueue,则返回失败。(本方法不堵塞当前运行方法的线程)
- void put(E e) throws InterruptedException;
这种方法把泛型对象放到 BlockingQueue 里, 假设 BlockQueue 没有空间, 则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.(本方法有堵塞的功能)
- boolean remove(Object o);
这种方法从 BlockingQueue 取出一个队首的对象,假设在指定时间内,队列一旦有数据可取,则马上返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。(本方法不堵塞当前运行方法的线程)
- E poll(long timeout, TimeUnit unit) throws InterruptedException;
这种方法从 BlockingQueue 取出一个队首的对象,假设在指定时间内。队列一旦有数据可取,则马上返回队列中的数据。否则知道时间超时还没有数据可取。返回失败。(本方法不堵塞当前运行方法的线程)
- E take() throws InterruptedException;
这种方法是取走 BlockingQueue 里排在首位的对象, 若 BlockingQueue 为空, 阻断进入等待状态直到 BlockingQueue 有新的数据被增加;(本方法有堵塞的功能)
- int drainTo(Collection<? super E> c);
这种方法是取走 BlockingQueue 里排在首位的对象, 取不到时返回 null;(本方法不堵塞当前运行方法的线程)
- int drainTo(Collection
- super E> c,int maxElements);
这种方法是取走 BlockingQueue 里排在首位的对象, 若不能马上取出, 则能够等 time 參数规定的时间, 取不到时返回 null;(本方法不堵塞当前运行方法的线程)
总结一下 BlockingQueue 接口中的方法,这些方法以四种形式出现,对于不能马上满足但可能在将来某一时刻能够满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,另外一种是返回一个特殊值(null 或 false,详细取决于操作),第三种是在操作能够成功前,无限期地堵塞当前线程,第四种是在放弃前仅仅在给定的最大时间限制内堵塞。
抛出异常 |
特殊值 |
堵塞 |
超时 |
|
插入 |
add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
移除 |
remove() |
poll() |
take() |
poll(time,unit) |
检查 |
element() |
peek() |
不可用 |
不可用 |
1)ArrayBlockingQueue: 基于数组实现的一个堵塞队列。在创建 ArrayBlockingQueue 对象时必须制定容量大小。以便缓存队列中数据对象。而且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先可以訪问队列。
其所含的对象是以 FIFO(先入先出) 顺序排序的.
2)LinkedBlockingQueue: 基于链表实现的一个堵塞队列。在创建 LinkedBlockingQueue 对象时假设不指定容量大小,则默认大小为 Integer.MAX_VALUE。其所含的对象是以 FIFO(先入先出) 顺序排序的
3)PriorityBlockingQueue: 类似于 LinkedBlockQueue, 但其所含对象的排序不是 FIFO, 它会依照元素的优先级对元素进行排序,依照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此堵塞队列为无界堵塞队列,即容量没有上限(通过源代码就能够知道,它没有容器满的信号标志),前面 2 种都是有界队列。
4)DelayQueue:基于 PriorityQueue,一种延时堵塞队列,DelayQueue 中的元素仅仅有当其指定的延迟时间到了。才可以从队列中获取到该元素。DelayQueue 也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被堵塞,而仅仅有获取数据的操作(消费者)才会被堵塞。
5)SynchronousQueue: 一种无缓冲的等待队列,类似于无中介的直接交易。
当中 LinkedBlockingQueue 和 ArrayBlockingQueue 比較起来, 它们背后所用的数据结构不一样, 导致 LinkedBlockingQueue 的数据吞吐量要大于 ArrayBlockingQueue, 但在线程数量非常大时其性能的可预见性低于 ArrayBlockingQueue.
我们这篇文章延续了上一篇文章中关于 ThreadPoolExecutor 线程池的一些内容,各自是 newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor,同一时候依据这些线程池与 ThreadPoolExecutor 的关系。进而引出了堵塞队列 BlockingQueue。于是我们具体介绍了接口 BlockingQueue 和接口中的方法,最后又介绍了接口 BlockingQueue 的实现类。
来源: http://www.bubuko.com/infodetail-2096284.html