Java毕竟还是偏工程和设计, 搞完一个基础理论, 接着就得上模式, 这里就跟着书外加自己写写来上点模式看看.

  1. 流水线模式
  2. 并行分工

流水线模式

流水线模式的核心是一个流水线, 与现实生活中一整条流水线, 各道工序同时加工不同, 线程的流水线不能这么长, 否则线程可能要等待很久, 而且也不好同步这么多线程.

书里的流水线模式是每个线程分配一个阻塞队列. 从第一个线程开始, 将业务数据扔进第一个线程的流水线. 第一个线程在启动后就会尝试从阻塞队列中读取数据, 如果读不到, 就会阻塞.

一旦读到, 就会进行处理, 并将处理完的数据扔到下一个线程的阻塞队列中. 下一个线程也一直在等待阻塞队列产生结果.

就这样一个一个线程下去, 直到最后处理完毕.

来尝试自己编写一下吧. 任务很简单, 送一个任意字符串进流水线, 然后在后边添加上”第一道工序”, “第二道工序”之类的, 最后返回拼接后的字符串.

要编写的话, 每个线程类先编写好, 这就统一用内部类的方式写了. 先写一个传递数据的类:

public static class ProductMessage {
    String message;

    public ProductMessage(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}

然后是各个工序的线程, 每个线程中弄一个自己的阻塞队列. 这是第一个线程, 为了方便投入原材料, 就直接继承了Thread类:

public static class First extends Thread {

    private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

    public void offerRawMaterial(String material) throws InterruptedException {
        blockingQueue.put(material);
    }

    @Override
    public void run() {
        while (true) {
            try {
                //使用阻塞方法take
                String message = blockingQueue.take();
                message = message + "第一道工序";
                //使用阻塞方法put
                Second.blockingQueue.put(message);
            } catch (InterruptedException e) {
                System.out.println("被打断, 结束线程");
                break;
            }
        }
    }
}

第二个线程就继承Runnable了, 和第一个线程几乎一样, 处理完以后向第三个线程中投放数据:

public static class Second implements Runnable {

    private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

    @Override
    public void run() {
        while (true) {

            try {
                //使用阻塞方法take
                String message = blockingQueue.take();
                message = message + "第二道工序";
                //使用阻塞方法put
                Third.blockingQueue.put(message);
            } catch (InterruptedException e) {
                System.out.println("被打断, 结束线程");
                break;                }
        }
    }
}

第三个线程处理之后直接打印结果.

public static class Third implements Runnable {

    private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

    @Override
    public void run() {
        while (true) {
            try {
                //使用阻塞方法take
                String message = blockingQueue.take();
                message = message + "第三道工序";
                //使用阻塞方法put
                System.out.println("有产品生产完成: " + message);
            } catch (InterruptedException e) {
                System.out.println("被打断, 结束线程");
                break;                }
        }
    }
}

这四个内部类外边套一个类, 用于启动:

public class AssemblyLine {

    public static void main(String[] args) throws InterruptedException {
        First first = new First();
        Thread second = new Thread(new Second());
        Thread third = new Thread(new Third());
        //创建并启动三个线程, 一开始阻塞队列都是空, 所以三个线程都阻塞
        first.start();
        second.start();
        third.start();

        Scanner scanner = new Scanner(System.in);

        //输入一行, 就向流水线中放入一个字符串
        while (true) {
            System.out.print("请输入字符串, 输入qqq退出: ");
            String msg = scanner.nextLine();
            if (msg.equals("qqq")) {
                break;
            }
            System.out.println();
            first.offerRawMaterial(msg);
        }

        //输入退出指令之后, 打断所有的线程, 根据之前编写的异常处理, 会直接退出
        first.interrupt();
        second.interrupt();
        third.interrupt();

        //等待所有的线程完成工作后退出
        first.join();
        second.join();
        third.join();

        System.out.println("完成工作");
    }
}

实际试验了一下, 确实可以像流水线一样操作, 同时也知道了, 配合while循环和阻塞, 可以让线程被动的等待工作.

流水线模式

所谓并行分工, 就是有点像之前的Fork/Join模式, 将任务分解, 不过与其不同的是, 并行分工除了类似递归的模式, 还能用普通分工的模式, 就是每个线程负责一部分工作, 然后共同看一个结果是不是被找到.

在一些查找或者类似的工作中, 只要一个线程找到, 就可以通知其他线程或者使用一个共享变量, 然后集体结束任务并返回结果. 只要工作的各个部分不互相依赖, 就可以使用并发来操作.

来写一个例子试验一下, 并行从一个数组中进行搜索, 返回找到的值的索引.

先写最外层的类, 包含一个数组域, 以及一个表示是否找到的索引域, 这个索引需要用AtomicInteger:

public class MultiSearch {

    private final List<Thread> threadList = new ArrayList<>();
    //表示找到的索引
    private final AtomicInteger result = new AtomicInteger(-1);
    //线程的数量
    private final int numberOfThreads = 6;

    private final int[] array;
    
    public MultiSearch(int[] array) {
        this.array = array;
    }
}

然后在其中编写内部线程类:

public class SearchThread extends Thread {
    private final int startIndex;
    private final int endIndex;
    private int target;

    //每个线程在指定的startIndex与endIndex-1之间查找target
    public SearchThread(int startIndex, int endIndex, int target) {
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.target = target;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "开始运行From" + startIndex + " to " + endIndex);
        boolean found = false;
        for (int i = startIndex; i < endIndex; i++) {
            //找到目标值
            if (array[i] == target) {
                //尝试使用CAS设置值, 只尝试一次, 然后直接结束即可
                //如果成功设置了, 说明是第一个找到的线程
                //如果没有成功, 说明其他线程已经先找到, 所以直接break即可
                System.out.println(Thread.currentThread().getName() + "找到了 " + target + " 索引是: " + i);
                result.compareAndSet(-1, i);
                found = true;
                break;
            }
        }
        // 如果始终没有进过if, 运行到这里就是,一次都没找到, 直接结束了
        if (!found) {
            System.out.println(Thread.currentThread().getName() + "搜索结束, 没有找到");
        }

    }
}

线程类的CAS是核心, 如果找到就尝试CAS更新result, 如果发现已经被其他线程更新, 就直接break并退出, 说明其他线程已经找到了结果.

这里是为了打印过程, 所以加上了一些判断, 实际上found变量根本不需要, 只要找到就进行CAS, 然后直接break即可, 无需关心CAS是否成功.

最后为MultiSearch编写一个方法, 用于调度线程进行查找:

//查找方法
public int search(int target) throws InterruptedException {
    //根据数组长度和线程数量, 数组长度小于1000就只创建一个线程进行查找, 没有必要创建多个线程
    if (array.length <= 1000) {
        Thread searchThread = new SearchThread(0, array.length - 1, target);
        searchThread.start();
        searchThread.join();
        if (result.get() == -1) {
            System.out.println("没有找到");
        } else {
            System.out.println("找到了, 索引是: " + result.get());
        }
    } else {
        //长度超过1000, 分成按照步长的5个加上最后一个到末尾索引, 一共6个线程.
        int step = array.length / numberOfThreads;

        //创建前五个线程
        for (int i = 0; i < numberOfThreads - 1; i++) {
            System.out.println("索引范围是 " + i * step + "-->" + (step * (i + 1)));
            Thread searchThread = new SearchThread(i * step, step * (i + 1), target);
            threadList.add(searchThread);
            searchThread.start();
        }

        //创建最后一个线程
        System.out.println("索引范围是 " + step * (numberOfThreads - 1) + "-->" + array.length);
        Thread searchThread = new SearchThread(step * (numberOfThreads - 1), array.length, target);
        threadList.add(searchThread);
        searchThread.start();

    }

    //等待所有线程完成工作
    for (Thread t : threadList) {
        t.join();
    }

    //返回result, 如果查找到, 这个就被更新为查找后的结果.
    return result.get();

    }

然后就可以实际编写一个测试来跑一下:

public class Test {

    public static void main(String[] args) throws InterruptedException {

        int[] array = new int[10000000];

        for (int i = 0; i < 10000000; i++) {
            array[i] = i;
        }


        MultiSearch searcher = new MultiSearch(array);


        int index = searcher.search(9320420);

        System.out.println("找到的索引是: "+index);

    }
}

创建了一个长度是一千万的数组, 然后可以看到执行结果如下:

索引范围是 0-->1666666
索引范围是 1666666-->3333332
索引范围是 3333332-->4999998
索引范围是 4999998-->6666664
索引范围是 6666664-->8333330
索引范围是 8333330-->10000000
Thread-3开始运行From4999998 to 6666664
Thread-4开始运行From6666664 to 8333330
Thread-0开始运行From0 to 1666666
Thread-2开始运行From3333332 to 4999998
Thread-5开始运行From8333330 to 10000000
Thread-1开始运行From1666666 to 3333332
Thread-3搜索结束, 没有找到
Thread-2搜索结束, 没有找到
Thread-1搜索结束, 没有找到
Thread-4搜索结束, 没有找到
Thread-0搜索结束, 没有找到
Thread-5找到了 9320420 索引是: 9320420
找到的索引是: 9320420

6个线程在不同的区域内进行查找, 然后有一个线程找到了. 如果各个线程的查找任务属于计算密集型的话, 这个程序的理论效率是单线程的6倍.