我们知道, 单个线程计算是串行的, 只有等上一个任务结束之后, 才能执行下一个任务, 所以执行效率是比较低的.
那么, 如果用多线程执行任务, 就可以在单位时间内执行更多的任务, 而 Master-Worker 就是多线程并行计算的一种实现方式.
它的思想是, 启动两个进程协同工作: Master 和 Worker 进程.
Master 负责任务的接收和分配, Worker 负责具体的子任务执行. 每个 Worker 执行完任务之后把结果返回给 Master, 最后由 Master 汇总结果.(其实也是一种分而治之的思想, 和 forkjoin 计算框架有相似之处, 参看: 并行任务计算框架 forkjoin)
Master-Worker 工作示意图如下:
下面用 Master-Worker 实现计算 1-100 的平方和, 思路如下:
定义一个 Task 类用于存储每个任务的数据.
Master 生产固定个数的 Worker, 把所有 worker 存放在 workers 变量 (map) 中, Master 需要存储所有任务的队列 workqueue(ConcurrentLinkedQueue)和所有子任务返回的结果集 resultMap(ConcurrentHashMap).
每个 Worker 执行自己的子任务, 然后把结果存放在 resultMap 中.
Master 汇总 resultMap 中的数据, 然后返回给 Client 客户端.
为了扩展 Worker 的功能, 用一个 MyWorker 继承 Worker 重写任务处理的具体方法.
Task 类:
- package com.thread.masterworker;
- public class Task {
- private int id;
- private String name;
- private int num;
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getNum() {
- return num;
- }
- public void setNum(int num) {
- this.num = num;
- }
- }
Master 实现:
- package com.thread.masterworker;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentLinkedQueue;
- public class Master {
- // 所有任务的队列
- private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>();
- // 所有 worker
- private HashMap<String,Thread> workers = new HashMap<String,Thread>();
- // 共享变量, worker 返回的结果
- private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>();
- // 构造方法, 初始化所有 worker
- public Master(Worker worker,int workerCount){
- worker.setWorkerQueue(this.workerQueue);
- worker.setResultMap(this.resultMap);
- for (int i = 0; i <workerCount; i++) {
- Thread t = new Thread(worker);
- this.workers.put("worker-"+i,t);
- }
- }
- // 任务的提交
- public void submit(Task task){
- this.workerQueue.add(task);
- }
- // 执行任务
- public int execute(){
- for (Map.Entry<String, Thread> entry : workers.entrySet()) {
- entry.getValue().start();
- }
- // 一直循环, 直到结果返回
- while (true){
- if(isComplete()){
- return getResult();
- }
- }
- }
- // 判断是否所有线程都已经执行完毕
- public boolean isComplete(){
- for (Map.Entry<String, Thread> entry : workers.entrySet()) {
- // 只要有任意一个线程没有结束, 就返回 false
- if(entry.getValue().getState() != Thread.State.TERMINATED){
- return false;
- }
- }
- return true;
- }
- // 处理结果集返回最终结果
- public int getResult(){
- int res = 0;
- for (Map.Entry<String,Object> entry : resultMap.entrySet()) {
- res += (Integer) entry.getValue();
- }
- return res;
- }
- }
父类 Worker:
- package com.thread.masterworker;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentLinkedQueue;
- public class Worker implements Runnable {
- private ConcurrentLinkedQueue<Task> workerQueue;
- private ConcurrentHashMap<String,Object> resultMap;
- public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) {
- this.workerQueue = workerQueue;
- }
- public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
- this.resultMap = resultMap;
- }
- @Override
- public void run() {
- while(true){
- // 从任务队列中取出一个任务
- Task task = workerQueue.poll();
- if(task == null) break;
- // 处理具体的任务
- Object res = doTask(task);
- // 把每次处理的结果放到结果集里面, 此处直接把 num 值作为结果
- resultMap.put(String.valueOf(task.getId()),res);
- }
- }
- public Object doTask(Task task) {
- return null;
- }
- }
子类 MyWorker 继承父类 Worker, 重写 doTask 方法实现具体的逻辑:
- package com.thread.masterworker;
- public class MyWorker extends Worker {
- @Override
- public Object doTask(Task task) {
- // 暂停 0.5 秒, 模拟任务处理
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 计算数字的平方
- int num = task.getNum();
- return num * num;
- }
- }
客户端 Client:
- package com.thread.masterworker;
- import java.util.Random;
- public class Client {
- public static void main(String[] args) {
- Master master = new Master(new MyWorker(), 10);
- // 提交 n 个任务到任务队列里
- for (int i = 0; i < 100; i++) {
- Task task = new Task();
- task.setId(i);
- task.setName("任务"+i);
- task.setNum(i+1);
- master.submit(task);
- }
- // 执行任务
- long start = System.currentTimeMillis();
- int res = master.execute();
- long time = System.currentTimeMillis() - start;
- System.out.println("结果:"+res+", 耗时:"+time);
- }
- }
以上, 我们用 10 个线程去执行子任务, 最终由 Master 做计算求和(1-100 的平方和). 每个线程暂停 500ms, 计算数字的平方值.
总共 100 个任务, 分 10 个线程并行计算, 相当于每个线程均分 10 个任务, 一个任务的时间大概为 500ms, 故 10 个任务为 5000ms, 再加上计算平方值的时间, 故稍大于 5000ms. 结果如下,
结果: 338350, 耗时: 5084
来源: https://www.cnblogs.com/starry-skys/p/12341170.html