之前的线程都是一个运行到自己结束的任务, 但有些时候, 多线程程序的各个线程是一直在工作的, 需要得到外部的消息来是否继续运行. 同时各个线程也会有共享数据, 比如一个统计多个门进入人数的程序:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class OrnamentalGarden {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new Entrance(i));
}
//主线程睡三秒之后关闭所有门
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
executorService.shutdown();
if (!executorService.awaitTermination(250, TimeUnit.MILLISECONDS)) {
System.out.println("Some tasks are not terminated");
}
System.out.println("--------------------------------");
System.out.println("Total: " + Entrance.getTotalCount());
Entrance.showAllNumber();
}
}
// 统计人数的对象
class Count {
private int count = 0;
private Random rand = new Random();
public synchronized int increment() {
int temp = count;
if (rand.nextBoolean()) {
Thread.yield();
}
return (count = ++temp);
}
public synchronized int getCount() {
return count;
}
}
//多个门, 用于统计每个门进出的人数, 用多线程操作
class Entrance implements Runnable {
private static Random random = new Random();
private static Count count = new Count();
//创建一个列表用于存放所有的线程对象, 以控制该线程
//这和CSAPP里的使用线程号本质上一样
private static List<Entrance> entrances = new ArrayList<>();
//每一个门都从0个人开始统计
private int number = 0;
//门的id
private final int id;
//门是不是关闭
private static volatile boolean canceled = false;
//设置门关闭
public static void cancel() {
canceled = true;
}
//初始化的时候, 把新对象加入到列表中
public Entrance(int id) {
this.id = id;
entrances.add(this);
}
public void run() {
//在每个线程中, 定期的进一个人, 然后打印
while (!canceled) {
synchronized (this) {
//自己对象里的统计当前门进了多少人
++number;
}
//打印出目前的总人数
System.out.println(this + "Total: " + count.increment());
try{
TimeUnit.MILLISECONDS.sleep(random.nextInt(10) * random.nextInt(20));
} catch (InterruptedException e) {
System.out.println("Sleep interrupted");
}
}
//运行到这里门关闭了, 就打印
System.out.println("Door " + id + " stopped");
}
public synchronized int getNumber() {
return this.number;
}
@Override
public synchronized String toString() {
return "Entrance{" +
"id=" + id +
", number=" + number +
'}';
}
public static int getTotalCount() {
return count.getCount();
}
public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances) {
sum += entrance.getNumber();
}
return sum;
}
public static void showAllNumber() {
for (Entrance entrance : entrances) {
System.out.println(entrance);
}
}
}
这个程序是一个各个线程保存自己的数据并且协同主线程工作之外, 关键是了解如何来控制线程工作. 所有的线程对象共享一个变量 private static volatile boolean canceled = false; 用于控制门是否关闭.
由于基本类型赋值是原子操作, 也声明了volatile, 所以这里可以不同步, 当然为了不迷惑, 也可以加上同步.
这里的关键是红字的部分, 即利用了线程管理器的监听线程结束的情况. 在设置了门关闭之后, 不应该有继续存活的线程.
executorService.awaitTermination(250, TimeUnit.MILLISECONDS)
这个方法用于监听线程, 参数是时间, 在指定的时间之前, 所有的线程都被关闭, 返回true, 否则返回false.
这里利用的小技巧就是虽然所有的线程都退出了, 看上去已经不存在了, 实际上在类里边维护了一个静态变量, 用于存放所有的线程对象, 所以executorService.awaitTermination(250, TimeUnit.MILLISECONDS)
可以正常工作.
线程状态
一个线程处于四种状态之一
- 新建, 只在新创建的时候, 会短暂的处于这个状态. 线程下一步将会变成可运行或者阻塞
- 就绪, 可以运行, 也可以不运行, 只要分配了时间片就可以运行
- 阻塞, 调度器会忽略这个线程, 直到其从阻塞进入就绪状态
- 死亡, 不可再调度, 也不会再得到CPU时间, 任务结束通常是run()方法返回, 但也可以被其他的方式结束
进入阻塞有如下几种情况:
- 自己调用sleep()
- 调用wait()挂起线程, 线程会等待notify()或者notifyAll()后进入就绪状态
- 在I/O上阻塞
- 等待获取锁
现在我们要来看, 如何让线程在run()返回之前结束其任务, 这样才能让线程协同工作
打断任务 – 这里的打断指的是让任务结束阻塞状态, 不是强行中止任务.
本身Thread类就有.interrupt()方法, 持有线程对象就可以调用这个方法强行中断. 如果不具体操作每个对象, 也可以通过Executor来执行所有操作.
如果调用 Executor.shutdownNow(), 会发送一个interrupt()给它启动的所有线程.
如果只希望终止一个, 可以使用submit(), 会返回一个Future对象, 在Future对象上调用.cancel()就可以中断特定的任务.
import java.io.IOException; import java.io.InputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Interrupting { private static ExecutorService executorService = Executors.newCachedThreadPool(); static void test(Runnable r) throws InterruptedException { Future<?> f = executorService.submit(r); TimeUnit.MILLISECONDS.sleep(1); System.out.println("Interrupting " + r.getClass().getName()); //传入true表示打断这个线程 f.cancel(true); System.out.println("Interrupting sent to " + r.getClass().getName()); } public static void main(String[] args) throws InterruptedException { test(new SleepBlocked()); test(new IOBlocked(System.in)); executorService.shutdown(); } } class SleepBlocked implements Runnable { @Override public void run() { try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { System.out.println("Interrupted"); System.out.println("Intterupted 之后执行的代码"); } System.out.println("正常退出run()"); } } class IOBlocked implements Runnable { private InputStream inputStream; public IOBlocked(InputStream inputStream) { this.inputStream = inputStream; } public void run() { try { System.out.println("Waiting for input: "); inputStream.read(); } catch (IOException e) { if (Thread.currentThread().isInterrupted()) { System.out.println("Interrupted from blockedI/O"); } else { throw new RuntimeException(e); } } System.out.println("Exiting IOBlocked.run()"); } } class SynchroizedBlocked implements Runnable { public synchronized void f() { while (true) { Thread.yield(); } } public SynchroizedBlocked() { new Thread() { public void run() { f(); } }.start(); } public void run() { System.out.println("Trying to call f()"); f(); System.out.println("Exiting SynchroizedBlocked.run()"); } }
实际上无法打断阻塞在I/O的线程, 因为即使打断了, 还是在等待I/O.
线程协作
之前访问共享变量, 是让线程可以安全的操作同一个内容, 现在线程还可以互相协作指的是等待, 通知, 唤醒等.
可以把每一个线程当成一个微型的程序, 这些程序之间可以互相进行通信, 叫醒和睡眠等.
本质上就是采取一种信号的方式, 让线程之间可以互相响应来做事情.
先要看的是wait()和notifyAll().
wait()函数在调用完毕之后, 调用这个函数的线程是无法自己再执行任务的, 必须要等待其他线程将其激活. 等待的信号就是notifyAll().
sleep()和yield()都不会释放锁, 只会很大概率让调度器执行其他线程. 而 wait() 会释放锁, 这是一个很关键的特性.
.wait()可以接受毫秒数作为参数, 表示挂起的时间, 作用和sleep()有些类似(但是会释放锁). sleep()需要打断, 而.wait()是等待notifyAll()消息
wait()如果不加参数, 就是无限等待下去, 直到接收到notify()或者notifyAll()消息.
还要注意的是, wait(), notify(), notifyAll()都是Object的方法之一, 而不是Thread的方法. 而且必须在同步方法内部才能调用这三个方法, 也即调用者三个方法的线程已经进入到了同步方法内部, 也即已经获取了锁.
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class WaxOnMatic { public static void main(String[] args) throws InterruptedException { Car car = new Car(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(new WaxOn(car)); executorService.submit(new WaxOff(car)); TimeUnit.SECONDS.sleep(10); //前边学的, 给所有线程发送interrupt信号, 我们靠这个机制结束线程 executorService.shutdownNow(); } } class Car { //有了四个方法之后, 打蜡的任务需要使用两个方法, 即打完腊通知其他线程干活, 以及检查打蜡标记, 如果已经打完腊, 就继续等待 //抛光的任务则需要使用抛完光再通知其他线程干活, 以及检查抛光标记, 已经抛光过就一直等待 //打蜡标记同时也是抛光标记, 为false的是表示抛光完成, 为true的时候表示打完腊 private boolean waxOn = false; public synchronized void waxed() { //干完打蜡就通知所有线程干活 waxOn = true; notifyAll(); } public synchronized void buffed() { //干完抛光就通知其他线程 waxOn = false; notifyAll(); } public synchronized void waitForWaxing() throws InterruptedException { //如果没打蜡就继续等待 while (!waxOn) { wait(); } } public synchronized void waitForBuffed() throws InterruptedException { //如果没抛光就继续等待 while (waxOn) { wait(); } } } class WaxOn implements Runnable { private Car car; public WaxOn(Car car) { this.car = car; } public void run() { try { while (!Thread.interrupted()) { System.out.println("开始打蜡."); TimeUnit.MILLISECONDS.sleep(500); //打蜡 car.waxed(); //等抛光 car.waitForBuffed(); } //用了try-catch, 打断任务实际上变成了终止线程的工具 } catch (InterruptedException e) { System.out.println("打蜡任务被中断 Interrupted"); } System.out.println("结束打蜡任务"); } } class WaxOff implements Runnable { private Car car; WaxOff(Car car) { this.car = car; } public void run() { try { while (!Thread.interrupted()) { //注意这里的顺序, 先等待, 再执行, 多个任务的话, 要有一个先执行并且改变标记, 其他的任务等待标记改变之后再来 car.waitForWaxing(); System.out.println("开始抛光"); TimeUnit.MILLISECONDS.sleep(300); car.buffed(); } } catch (InterruptedException e) { System.out.println("抛光任务被中断 Interrupted"); } System.out.println("结束抛光任务"); } }
这个例子有如果设置多个打蜡和抛光的线程, 也是可以工作的, 每次只有拿到锁的线程进行工作, 然后再通知其他线程来工作. 其实很像CSAPP上的生产者和消费者模型.
等待的线程一般都需要使用检查标记和while循环来不断等待, 这也是多线程的套路了, 反正阻塞了不会占用太多资源.
只是要注意并不像底层的信号量使用那么直接, 而是要在共享变量或者说要操作的对象上设置同步方法,然后在其中设置wait()和notify().
还有要注意的就是, 这里都使用了同一个Car对象, 而Car对象中的 sync 的锁就是car对象本身, notify()和notifyAll()只能唤醒等待同一个锁的对象, 而不是全部的对象. 当然这一点也是不言自明的.
生产者消费者模型
终于到了Java 中的生产者消费者模型, 在CSAPP底层中, 使用了一个数组和三个信号量来判断, 其中一个信号量是用于同步的互斥信号量, 另外两个信号量则用作有多少个数据和空槽可用的标记.
来看看Java 的生产者和消费者模型:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; //餐馆实际上是Meal的容器, 两个线程使用同一个餐馆对象, 其中持有Meal对象, 对于容器访问, 需要加锁才行 public class Restaurant { Meal meal = null; ExecutorService exec = Executors.newCachedThreadPool(); //这个很有意思, 直接以当前对象传给Consumer final Consumer consumer = new Consumer(this); final Chef chef = new Chef(this); public Restaurant() { exec.execute(chef); exec.execute(consumer); } public static void main(String[] args) { new Restaurant(); } } //这个类就是要消费的数据. class Meal { private final int number; public Meal(int number) { this.number = number; } public int getNumber() { return number; } @Override public String toString() { return "Meal{" + "number=" + number + '}'; } } //消费者, 就是把肉拿走的人 class Consumer implements Runnable { private Restaurant restaurant; public Consumer(Restaurant restaurant) { this.restaurant = restaurant; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (this) { //餐馆里没有肉, 就等待 while (restaurant.meal == null) { wait(); } } System.out.println("Consumer got a meal"); //消费完肉之后, 通知厨子做饭 synchronized (restaurant.chef) { restaurant.meal = null; restaurant.chef.notifyAll(); } } } catch (InterruptedException e) { System.out.println("中断消费者"); } } } class Chef implements Runnable { private Restaurant restaurant; private int count = 0; public Chef(Restaurant restaurant) { this.restaurant = restaurant; } public void run(){ try { while (!Thread.interrupted()) { synchronized (this) { while (restaurant.meal != null) { wait(); } } if (++count == 10) { System.out.println("生产完了10顿饭."); restaurant.exec.shutdownNow(); } System.out.println("开始做饭"); synchronized (restaurant.consumer) { restaurant.meal = new Meal(count); restaurant.consumer.notifyAll(); } TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { System.out.println("中断厨子"); } } }
这个模型是一个线程的消费者和一个线程的生产者, 餐馆是数据对象Meal的持有者.
这里由于是两个单独的线程, 所以在生产和消费数据, 也就是读写meal的时候, 用自己加锁, 而在唤醒另一个生产者/消费者的时候, 用对应的对象加锁.
如果meal是一个容器, 还可以全部都对meal进行加锁, 在其中设置一些变量用来取数据.可以很容易的就把CSAPP的模型移植到Java中来, 只要对象监视器都是同一个就可以了.
然后自己根据练习24的要求, 写了个内部是定长数组的缓冲区队列持有数据, 然后有多个消费者和生产者对象的模型:
import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 解决带有缓冲区的单个生产者和消费者的问题 * 初步想法是弄带有ID标识的数据对象 * 然后弄一个缓冲区对象Buffer持有数据对象 * Buffer中的放入和取出数据需要同步, 然后用内部的指针来保存当前的状态 */ // public class Ex24 { public static void main(String[] args) throws InterruptedException { Buffer buffer = Buffer.getBuffer(20); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 1; i++) { executorService.execute(new Producer(buffer)); } for (int j = 0; j < 3; j++) { executorService.execute(new Consumer(buffer)); } TimeUnit.SECONDS.sleep(3); executorService.shutdownNow(); System.out.println(buffer); } } //这个作为数据对象 class Data { // public static synchronized Data getData() { // return new Data(anInt++); // } // private int number; // // private static volatile int anInt = 0; public Data(int i) { this.number = i; } @Override public String toString() { return "Data{" + "number=" + number + '}'; } } //Buffer类作为数据对象的持有者 //有一个取出数据的方法, 一个放入数据的方法, 用于同步 //还需要有可以取出几个和可以放入几个的标志 class Buffer { //用工厂模式 public static Buffer getBuffer(int i) { if (i <= 0) { throw new RuntimeException("长度必须大于0"); } return new Buffer(i); } private Data[] datas; //用两个索引保存上次放入数据开始的地方, 和放完数据后的位置 private int lastIndex = 0; private int currentIndex = 0; //缓冲区长度 private int length; //一开始有几个空槽可以放入数据, 设置为和长度一样 private int availableToPut; //一开始有几个空槽可以取出数据, 设置为0 private int avaiableToGet = 0; //私有构造器, 用工厂模式创建新的Buffer private Buffer(int i) { this.length = i; this.availableToPut = length; //初始化一个Data对象的数组, 全部初始化为null datas = new Data[i]; for (int j = 0; j < length; j++) { datas[j] = null; } } //放入数据 public synchronized void putData(Data data) throws InterruptedException { //如果可以放入数据的数量为0,就阻塞 while (availableToPut == 0) { wait(); } //如果成功通过阻塞, 说明可以放入数据了, 就放入数据, 同时更新相关指标 //更新currentIndex到最新的可以放入数据的位置 datas[currentIndex] = data; currentIndex++; currentIndex = currentIndex % length; //更新可用槽位和可读槽位 availableToPut--; avaiableToGet++; //当前线程取出数据后唤醒其他放入数据,也就是生产者线程 notifyAll(); // System.out.println("成功放入了数据, 当前状态是: " + this); } public synchronized Data getData() throws InterruptedException { while (avaiableToGet == 0) { wait(); } //从上次放入数据的地方取数据, 然后更新lastindex Data result = datas[lastIndex]; lastIndex++; lastIndex = lastIndex % length; availableToPut++; avaiableToGet--; //当前线程取出数据后唤醒其他写入数据, 也就是消费者线程 notifyAll(); // System.out.println("成功取出了数据, 当前状态是: " + this); return result; } @Override public String toString() { return "Buffer{" + "datas=" + Arrays.toString(datas) + ", lastIndex=" + lastIndex + ", currentIndex=" + currentIndex + ", length=" + length + ", availableToPut=" + availableToPut + ", avaiableToGet=" + avaiableToGet + ", 尚未取出的数据等于=" + (currentIndex >= lastIndex ? currentIndex - lastIndex : currentIndex - lastIndex + length) + '}'; } } class Producer implements Runnable { private static volatile int number = 0; private static Random random = new Random(); private final Buffer buffer; Producer(Buffer buffer) { this.buffer = buffer; } public static synchronized int getNumber() { int temp = number; number++; return temp; } @Override public void run() { try { while (true) { Data newData = new Data(Producer.getNumber()); TimeUnit.MILLISECONDS.sleep(random.nextInt(30) * random.nextInt(20)); System.out.println("生产者尝试放入新数据:" + newData); buffer.putData(newData); } } catch (InterruptedException e) { System.out.println("生产者被中断了"); } } } class Consumer implements Runnable { private static Random random = new Random(); private final Buffer buffer; Consumer(Buffer buffer) { this.buffer = buffer; } @Override public void run() { try { while (true) { TimeUnit.MILLISECONDS.sleep(random.nextInt(30) * random.nextInt(20)); Data data = buffer.getData(); System.out.println("消费者取出了数据: " + data); } } catch (InterruptedException e) { System.out.println("消费者被中断了"); } } }
同步队列
在自己编写的模型中, 实际上同步出现在对缓冲区的读写中, 如果有一个缓冲区已经实现了这种同步方法, 那么所有的线程之间操作缓冲区就可以了, 无需自行编写同步代码.
java.util.BlockingQueue提供了这个接口, 还有很多具体实现, 常用的有LinkedBlockingQueue, ArrayBlockingQueue.
这些同步队列都支持泛型, 平常使用的方法主要就是 .take() 返回放入的泛型类型对象, .put() 放入一个泛型类型的对象
这个其实就是缓冲区底层使用不同的数据结构, 外部封装好同步读写功能即可. 本质上和上边自行编写的Buffer类是一样的.
管道
在之前的IO里留下了Piped类型的输入和输出没有用到, 实际上管道就是一个阻塞队列, PipedWriter允许任务向管道写, PipedReader允许不同任务从同一个管道读取. 可以看成是一个生产者-消费者模型的封装好的解决方案.
一个简单的例子如下, sender 每半秒钟发一个字符到管道里, 另外一端的receiver进行输出, 核心是其中的创建管道的语句:
import java.io.IOException; import java.io.PipedReader; import java.io.PipedWriter; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class PipedIO { public static void main(String[] args) throws IOException, InterruptedException { Sender sender = new Sender(); Receiver receiver = new Receiver(sender); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(sender); executorService.execute(receiver); TimeUnit.SECONDS.sleep(4); executorService.shutdownNow(); } } class Sender implements Runnable { private Random random = new Random(47); //要向管道输出的任务创建一个PipedWriter()对象 private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter() { return out; } public void run() { try { while (true) { for (char c = 'A'; c <= 'z'; c++) { out.write(c); TimeUnit.MILLISECONDS.sleep(500); } } } catch (IOException | InterruptedException e) { System.out.println("IO错误或者被打断"); } } } class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException { //需要从管道中读取数据的任务需要以一个PipedWriter对象作为PipedReader对象的构造器参数 //获取之后相当于写入任务的out是管道的写入端, 这边是读取端 // in = new PipedReader(sender.getPipedWriter()); } public void run() { try { while (true) { //这里直接调用read()方法即可, 如果读完了, 会阻塞. char c = (char) in.read(); System.out.println(c); } } catch (IOException e) { System.out.println("被打断"); } } }
如果要多线程读写管道, 注意一个PipedReader只能针对管道只能创建一次, 但是PipedReader对象可以复用, 如果要用多个线程去读管道, 需要修改成这样:
import java.io.IOException; import java.io.PipedReader; import java.io.PipedWriter; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class PipedIO { public static void main(String[] args) throws IOException, InterruptedException { Sender sender = new Sender(); //创建一个共用的PipedReader对象 PipedReader in = new PipedReader(sender.getPipedWriter()); //让每个线程都使用这个共用对象即可 Receiver receiver0 = new Receiver(sender,0, in); Receiver receiver1 = new Receiver(sender,1, in); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(sender); executorService.execute(receiver0); executorService.execute(receiver1); TimeUnit.SECONDS.sleep(4); executorService.shutdownNow(); } } //Sender代码不变, 省略, 这里要调整一下Receiver的构造器 class Receiver implements Runnable { private PipedReader in; private int number; public Receiver(Sender sender, int i, PipedReader in) throws IOException { this.in = in; this.number = i; //这个是关键, PipedReader以一个PipedWriter对象作为构造器参数, 获取一个Reader对象 // in = new PipedReader(sender.getPipedWriter()); } public void run() { try { while (true) { //这里直接调用read()方法即可, 如果读完了, 会阻塞. char c = (char) in.read(); System.out.println("ID: " + number + " " + c); } } catch (IOException e) { System.out.println("被打断"); } } }
管道是没有sleep()和wait()方法可供调用的, 只需要调用.read(), 如果没数据就会自动阻塞.