ThreadPoolExecutor UML 图:
image
image
8.1 在任务和执行策略之间隐形耦合
避免 Thread starvation deadlock
8.2 设置线程池大小
8.3 配置 ThreadPoolExecutor
image
构造函数如下:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) { ... }
核心和最大池大小: 如果运行的线程少于 corePoolSize, 则创建新线程来处理请求 (即一个 Runnable 实例), 即使其它线程是空闲的如果运行的线程多于 corePoolSize 而少于 maximumPoolSize, 则仅当队列满时才创建新线程
保持活动时间: 如果池中当前有多于 corePoolSize 的线程, 则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止
排队: 如果运行的线程等于或多于 corePoolSize, 则 Executor 始终首选将请求加入队列 BlockingQueue, 而不添加新的线程
被拒绝的任务: 当 Executor 已经关闭, 或者队列已满且线程数量达到 maximumPoolSize 时 (即线程池饱和了), 请求将被拒绝这些拒绝的策略叫做 Saturation Policy, 即饱和策略包括 AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy.
另外注意:
如果运行的线程少于 corePoolSize,ThreadPoolExecutor 会始终首选创建新的线程来处理请求; 注意, 这时即使有空闲线程也不会重复使用 (这和数据库连接池有很大差别)
如果运行的线程等于或多于 corePoolSize, 则 ThreadPoolExecutor 会将请求加入队列 BlockingQueue, 而不添加新的线程 (这和数据库连接池也不一样)
如果无法将请求加入队列 (比如队列已满), 则创建新的线程来处理请求; 但是如果创建的线程数超出 maximumPoolSize, 在这种情况下, 请求将被拒绝
newCachedThreadPool 使用了 SynchronousQueue, 并且是无界的
线程工厂 ThreadFactory
8.4 扩展 ThreadPoolExecutor
重写 beforeExecute 和 afterExecute 方法
8.5 递归算法的并行化
实际就是类似 Number of Islands 或者 N-Queens 等 DFS 问题的一种并行处理
串行版本如下:
- public class SequentialPuzzleSolver < P,
- M > {
- private final Puzzle < P,
- M > puzzle;
- private final Set < P > seen = new HashSet < P > ();
- public SequentialPuzzleSolver(Puzzle < P, M > puzzle) {
- this.puzzle = puzzle;
- }
- public List < M > solve() {
- P pos = puzzle.initialPosition();
- return search(new PuzzleNode < P, M > (pos, null, null));
- }
- private List < M > search(PuzzleNode < P, M > node) {
- if (!seen.contains(node.pos)) {
- seen.add(node.pos);
- if (puzzle.isGoal(node.pos)) return node.asMoveList();
- for (M move: puzzle.legalMoves(node.pos)) {
- P pos = puzzle.move(node.pos, move);
- PuzzleNode < P,
- M > child = new PuzzleNode < P,
- M > (pos, move, node);
- List < M > result = search(child);
- if (result != null) return result;
- }
- }
- return null;
- }
- }
并行版本如下:
- public class ConcurrentPuzzleSolver < P,
- M > {
- private final Puzzle < P,
- M > puzzle;
- private final ExecutorService exec;
- private final ConcurrentMap < P,
- Boolean > seen;
- protected final ValueLatch < PuzzleNode < P,
- M >> solution = new ValueLatch < PuzzleNode < P,
- M >> ();
- public ConcurrentPuzzleSolver(Puzzle < P, M > puzzle) {
- this.puzzle = puzzle;
- this.exec = initThreadPool();
- this.seen = new ConcurrentHashMap < P,
- Boolean > ();
- if (exec instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
- tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
- }
- }
- private ExecutorService initThreadPool() {
- return Executors.newCachedThreadPool();
- }
- public List < M > solve() throws InterruptedException {
- try {
- P p = puzzle.initialPosition();
- exec.execute(newTask(p, null, null));
- // block until solution found
- PuzzleNode < P,
- M > solnPuzzleNode = solution.getValue();
- return (solnPuzzleNode == null) ? null: solnPuzzleNode.asMoveList();
- } finally {
- exec.shutdown();
- }
- }
- protected Runnable newTask(P p, M m, PuzzleNode < P, M > n) {
- return new SolverTask(p, m, n);
- }
- protected class SolverTask extends PuzzleNode < P,
- M > implements Runnable {
- SolverTask(P pos, M move, PuzzleNode < P, M > prev) {
- super(pos, move, prev);
- }
- public void run() {
- if (solution.isSet() || seen.putIfAbsent(pos, true) != null) return; // already solved or seen this position
- if (puzzle.isGoal(pos)) solution.setValue(this);
- else for (M m: puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this));
- }
- }
- }
来源: http://www.jianshu.com/p/89a30cfe09a2