Java毕竟还是偏工程和设计, 搞完一个基础理论, 接着就得上模式, 这里就跟着书外加自己写写来上点模式看看.
流水线模式
流水线模式的核心是一个流水线, 与现实生活中一整条流水线, 各道工序同时加工不同, 线程的流水线不能这么长, 否则线程可能要等待很久, 而且也不好同步这么多线程.
书里的流水线模式是每个线程分配一个阻塞队列. 从第一个线程开始, 将业务数据扔进第一个线程的流水线. 第一个线程在启动后就会尝试从阻塞队列中读取数据, 如果读不到, 就会阻塞.
一旦读到, 就会进行处理, 并将处理完的数据扔到下一个线程的阻塞队列中. 下一个线程也一直在等待阻塞队列产生结果.
就这样一个一个线程下去, 直到最后处理完毕.
来尝试自己编写一下吧. 任务很简单, 送一个任意字符串进流水线, 然后在后边添加上”第一道工序”, “第二道工序”之类的, 最后返回拼接后的字符串.
要编写的话, 每个线程类先编写好, 这就统一用内部类的方式写了. 先写一个传递数据的类:
public static class ProductMessage { String message; public ProductMessage(String message) { this.message = message; } public String getMessage() { return message; } }
然后是各个工序的线程, 每个线程中弄一个自己的阻塞队列. 这是第一个线程, 为了方便投入原材料, 就直接继承了Thread类:
public static class First extends Thread { private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); public void offerRawMaterial(String material) throws InterruptedException { blockingQueue.put(material); } @Override public void run() { while (true) { try { //使用阻塞方法take String message = blockingQueue.take(); message = message + "第一道工序"; //使用阻塞方法put Second.blockingQueue.put(message); } catch (InterruptedException e) { System.out.println("被打断, 结束线程"); break; } } } }
第二个线程就继承Runnable了, 和第一个线程几乎一样, 处理完以后向第三个线程中投放数据:
public static class Second implements Runnable { private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); @Override public void run() { while (true) { try { //使用阻塞方法take String message = blockingQueue.take(); message = message + "第二道工序"; //使用阻塞方法put Third.blockingQueue.put(message); } catch (InterruptedException e) { System.out.println("被打断, 结束线程"); break; } } } }
第三个线程处理之后直接打印结果.
public static class Third implements Runnable { private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); @Override public void run() { while (true) { try { //使用阻塞方法take String message = blockingQueue.take(); message = message + "第三道工序"; //使用阻塞方法put System.out.println("有产品生产完成: " + message); } catch (InterruptedException e) { System.out.println("被打断, 结束线程"); break; } } } }
这四个内部类外边套一个类, 用于启动:
public class AssemblyLine { public static void main(String[] args) throws InterruptedException { First first = new First(); Thread second = new Thread(new Second()); Thread third = new Thread(new Third()); //创建并启动三个线程, 一开始阻塞队列都是空, 所以三个线程都阻塞 first.start(); second.start(); third.start(); Scanner scanner = new Scanner(System.in); //输入一行, 就向流水线中放入一个字符串 while (true) { System.out.print("请输入字符串, 输入qqq退出: "); String msg = scanner.nextLine(); if (msg.equals("qqq")) { break; } System.out.println(); first.offerRawMaterial(msg); } //输入退出指令之后, 打断所有的线程, 根据之前编写的异常处理, 会直接退出 first.interrupt(); second.interrupt(); third.interrupt(); //等待所有的线程完成工作后退出 first.join(); second.join(); third.join(); System.out.println("完成工作"); } }
实际试验了一下, 确实可以像流水线一样操作, 同时也知道了, 配合while循环和阻塞, 可以让线程被动的等待工作.
流水线模式
所谓并行分工, 就是有点像之前的Fork/Join模式, 将任务分解, 不过与其不同的是, 并行分工除了类似递归的模式, 还能用普通分工的模式, 就是每个线程负责一部分工作, 然后共同看一个结果是不是被找到.
在一些查找或者类似的工作中, 只要一个线程找到, 就可以通知其他线程或者使用一个共享变量, 然后集体结束任务并返回结果. 只要工作的各个部分不互相依赖, 就可以使用并发来操作.
来写一个例子试验一下, 并行从一个数组中进行搜索, 返回找到的值的索引.
先写最外层的类, 包含一个数组域, 以及一个表示是否找到的索引域, 这个索引需要用AtomicInteger:
public class MultiSearch { private final List<Thread> threadList = new ArrayList<>(); //表示找到的索引 private final AtomicInteger result = new AtomicInteger(-1); //线程的数量 private final int numberOfThreads = 6; private final int[] array; public MultiSearch(int[] array) { this.array = array; } }
然后在其中编写内部线程类:
public class SearchThread extends Thread { private final int startIndex; private final int endIndex; private int target; //每个线程在指定的startIndex与endIndex-1之间查找target public SearchThread(int startIndex, int endIndex, int target) { this.startIndex = startIndex; this.endIndex = endIndex; this.target = target; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "开始运行From" + startIndex + " to " + endIndex); boolean found = false; for (int i = startIndex; i < endIndex; i++) { //找到目标值 if (array[i] == target) { //尝试使用CAS设置值, 只尝试一次, 然后直接结束即可 //如果成功设置了, 说明是第一个找到的线程 //如果没有成功, 说明其他线程已经先找到, 所以直接break即可 System.out.println(Thread.currentThread().getName() + "找到了 " + target + " 索引是: " + i); result.compareAndSet(-1, i); found = true; break; } } // 如果始终没有进过if, 运行到这里就是,一次都没找到, 直接结束了 if (!found) { System.out.println(Thread.currentThread().getName() + "搜索结束, 没有找到"); } } }
线程类的CAS是核心, 如果找到就尝试CAS更新result, 如果发现已经被其他线程更新, 就直接break并退出, 说明其他线程已经找到了结果.
这里是为了打印过程, 所以加上了一些判断, 实际上found变量根本不需要, 只要找到就进行CAS, 然后直接break即可, 无需关心CAS是否成功.
最后为MultiSearch编写一个方法, 用于调度线程进行查找:
//查找方法 public int search(int target) throws InterruptedException { //根据数组长度和线程数量, 数组长度小于1000就只创建一个线程进行查找, 没有必要创建多个线程 if (array.length <= 1000) { Thread searchThread = new SearchThread(0, array.length - 1, target); searchThread.start(); searchThread.join(); if (result.get() == -1) { System.out.println("没有找到"); } else { System.out.println("找到了, 索引是: " + result.get()); } } else { //长度超过1000, 分成按照步长的5个加上最后一个到末尾索引, 一共6个线程. int step = array.length / numberOfThreads; //创建前五个线程 for (int i = 0; i < numberOfThreads - 1; i++) { System.out.println("索引范围是 " + i * step + "-->" + (step * (i + 1))); Thread searchThread = new SearchThread(i * step, step * (i + 1), target); threadList.add(searchThread); searchThread.start(); } //创建最后一个线程 System.out.println("索引范围是 " + step * (numberOfThreads - 1) + "-->" + array.length); Thread searchThread = new SearchThread(step * (numberOfThreads - 1), array.length, target); threadList.add(searchThread); searchThread.start(); } //等待所有线程完成工作 for (Thread t : threadList) { t.join(); } //返回result, 如果查找到, 这个就被更新为查找后的结果. return result.get(); }
然后就可以实际编写一个测试来跑一下:
public class Test { public static void main(String[] args) throws InterruptedException { int[] array = new int[10000000]; for (int i = 0; i < 10000000; i++) { array[i] = i; } MultiSearch searcher = new MultiSearch(array); int index = searcher.search(9320420); System.out.println("找到的索引是: "+index); } }
创建了一个长度是一千万的数组, 然后可以看到执行结果如下:
索引范围是 0-->1666666 索引范围是 1666666-->3333332 索引范围是 3333332-->4999998 索引范围是 4999998-->6666664 索引范围是 6666664-->8333330 索引范围是 8333330-->10000000 Thread-3开始运行From4999998 to 6666664 Thread-4开始运行From6666664 to 8333330 Thread-0开始运行From0 to 1666666 Thread-2开始运行From3333332 to 4999998 Thread-5开始运行From8333330 to 10000000 Thread-1开始运行From1666666 to 3333332 Thread-3搜索结束, 没有找到 Thread-2搜索结束, 没有找到 Thread-1搜索结束, 没有找到 Thread-4搜索结束, 没有找到 Thread-0搜索结束, 没有找到 Thread-5找到了 9320420 索引是: 9320420 找到的索引是: 9320420
6个线程在不同的区域内进行查找, 然后有一个线程找到了. 如果各个线程的查找任务属于计算密集型的话, 这个程序的理论效率是单线程的6倍.