亲爱的女儿6岁了, 6岁的时间里, 我换了三家公司, 仔细算了算工作也就13年, 时间真是一个神奇的东西.
简单总结完了Java里边与并发相关的原语和基础知识, 现在用Java写点小玩意问题不大了, 而且由于面向对象的思路, 传递参数要比C语言底层方便一些.
继续来看看Java的并发工具包java.util.concurrent
中提供的一些工具, 就是专门用于多线程并发的类.
信号量
信号量也是老朋友了, 相比之前的可重入锁和条件变量, 信号量突出的特点就是可以放超过1个的线程进入临界区.
构造函数有两个, 一个参数是信号量的int整数, 表示可以同时放多少个线程进入临界区, 第二个参数可以指定是否公平.
方法有:
void acquire()
void acquireUninterruptibly()
boolean tryAcquire()
boolean tryAcquire(long timeout, TimeUnit unit)
void release()
从方法的名称就可以很明显的看出来作用, 信号量搭配条件变量就可以来实现一个生产者-消费者队列了. 不过这里暂时不写了. 先实验一下信号量:
import java.util.concurrent.Semaphore; public class SemaphoreTest implements Runnable { public static volatile int i = 0; public static Semaphore semaphore = new Semaphore(4); @Override public void run() { semaphore.acquireUninterruptibly(); for (int j = 0; j < 100000; j++) { i = i + 1; } System.out.println(Thread.currentThread().getName() + " 完成工作"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 20; i++) { Thread thread = new Thread(new SemaphoreTest()); thread.start(); } Thread.sleep(6000); System.out.println("i=" + i); } }
这里启动了一个同事放4个线程进临界区的信号量, 然后每次先无打断等待信号量, 之后再释放信号量即可.
这里没有用互斥锁保护i, 所以可以发现, 进入了临界区之后, 读写共享变量依然需要互斥锁的保护, 可以写成如下:
synchronized (semaphore) { i = i + 1; }
这样就用同一个互斥锁保护了共享变量i.
读写锁
读写锁也在底层的时候看过了, 读写锁主要用来大批量的读和少部分更新的情况.
由于读和读之间完全不需要加锁, 因为不会修改. 只有读写和写写操作之间需要相互等待和获取锁. 如果读的次数远远大于写的次数, 读写锁就可以获取最大的性能.
其底层实现也很有意思, 就是通过一个互斥变量来统计当前所有的读者, 只把读者放进来, 而写者要进来的时候要等待读者放弃锁, 写者获取共享变量锁写入, 之后再释放.
这里看看作者的例子:
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest { private static Lock lock = new ReentrantLock(); //创建一个读写锁对象, 然后调用其中的方法获取读锁和写锁 private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private static Lock readlock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock(); private int value; //模拟读操作, 线程睡眠一秒 然后返回value public int handleRead(Lock lock) throws InterruptedException { try { lock.lock(); Thread.sleep(1000); return value; } finally { lock.unlock(); } } //模拟写操作 用index更新value public void handleWrite(Lock lock, int index) throws InterruptedException { try { lock.lock(); Thread.sleep(1000); value = index; } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { final ReadWriteLockTest demo = new ReadWriteLockTest(); //创建模拟读的线程, 使用读锁 Runnable readRunnable = new Runnable() { @Override public void run() { try { demo.handleRead(readlock); } catch (InterruptedException e) { e.printStackTrace(); } } }; //创建模拟写的线程, 使用写锁写入一个随机整数 Runnable writeRunnable = new Runnable() { @Override public void run() { try { demo.handleWrite(writeLock,new Random().nextInt()); } catch (InterruptedException e) { e.printStackTrace(); } } }; //创建20个读者和2个写者: List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 20; i++) { Thread reader = new Thread(readRunnable); threads.add(reader); reader.start(); } for (int j = 0; j < 2; j++) { Thread writer = new Thread(writeRunnable); threads.add(writer); writer.start(); } //等待所有线程结束 long start = System.currentTimeMillis(); for (Thread thread : threads) { thread.join(); } System.out.println("EndTime is :" + (System.currentTimeMillis() - start)); } }
这段程序的所有读者和所有写者都花费1秒钟进行读或者写, 一共有22个线程, 20个读者, 2个写者.
如果是普通的互斥锁, 主线程等待所有的线程完成工作之后, 大概要等待23秒左右. 但是这里使用了读写锁, 即使一个拿到读锁的读者在休眠, 其他读者仍然可以进入.
所以整体上整个程序的运行时间会大大缩短. 如果将标红的两行中使用的读写锁替换成类中静态的互斥锁lock对象, 可以同样达到保护共享变量value的作用, 但是整个程序的运行时间将大大延长:
这个程序使用读写锁的运行时间基本上在3秒左右, 而使用互斥锁的运行时间在22秒左右.
线程计数器 CountDownLatch
这个倒计时器的意思在于像一道门一样堵住当前线程的执行, 等计数器为0, 再让当前线程开始工作. 构造函数接受一个整数作为参数, 即计数器的计数个数.
看了一下简单用法:
import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchTest implements Runnable { static final CountDownLatch counter = new CountDownLatch(10); static final CountDownLatchTest demo = new CountDownLatchTest(); //每个线程执行工作之后, 通知计数器减1 @Override public void run() { try { Thread.sleep(new Random().nextInt(10) * 1000); System.out.println("check complete"); counter.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } //主线程开启一个10个线程的固定线程池, 然后每个线程提交上边的任务 public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.submit(demo); } //主线程等待计数器到0 counter.await(); System.out.println("所有线程完成了工作"); executorService.shutdown(); } }
可见这个提供了一个比join()更加好的控制线程协作的方法.
循环栅栏 CyclicBarrier
这个和上边的计数器有点类似, 但是可以循环, 于是可以每次等待一批线程完成工作, 再等待下一批. 这个有意思的是先调用一个方法就可以自动完成计数并放行.
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest { public static class Worker implements Runnable { private String name; private final CyclicBarrier barrier; @Override public void run() { try { barrier.await(); work(); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } public Worker(CyclicBarrier barrier, String name) { this.name = name; this.barrier = barrier; } void work() { System.out.println(name + "开始工作...."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + "工作结束..."); } } public static class Commander implements Runnable { boolean flag; int N; public Commander(boolean flag, int n) { this.flag = flag; N = n; } @Override public void run() { if (flag) { System.out.println("当前一批工人全部干完活了"); } else { System.out.println("当前工人集合完毕, 出发干活"); flag = true; } } public static void main(String[] args) { final int N = 40; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Commander(flag, N)); System.out.println("集合队伍"); for (int i = 0; i < N; i++) { System.out.println("工人" + i + "报道"); allSoldier[i] = new Thread(new Worker(cyclicBarrier, "士兵" + i)); allSoldier[i].start(); } } } }
这里的关键是两行红色, 第一次调用await()就是进行等待至计数到10, 第二次调用就是开始下一次调用, 各个线程在调用之后就会进行阻塞, 之后完成工作, 然后再调用就会进行阻塞.
而同一个CyclicBarrier对象就会在内部进行控制, 每次计数到10的时候就让一批线程进行工作, 然后再进行计数.
这里的异常还需要注意一下, 除了通用的打断异常之外, 还有一个BrokenBarrierException
, 表示当前的栅栏已经破损. 一般如果一批线程已经集中在一个栅栏里, 然后有一个线程出现了异常, 剩下的线程都会得到一个BrokenBarrierException, 因为这个时候栅栏失效就很可能无法继续了.
阻塞工具 LockSupport
这个阻塞工具就是之前操作系统里提到的, 线程调用过一个停止阻塞的方法之后, 再去阻塞, 也不会阻塞而是直接继续执行. 这样就避免了线程继续执行指令和阻塞指令乱序的问题.
这个阻塞工具类LockSupport只有静态方法可用, 方法如下:
park(Thread thread)
, 阻塞当前线程unpark(Thread thread)
, 让当前线程停止阻塞
这两个方法调用的前后顺序没有关系, 如果park()在先, unpark()会让线程停止阻塞. 如果unpark()在先, 则park()调用的时候不会阻塞, 而会直接往下继续运行.
这其中的机制在操作系统那里看到过, 实际上相当于给每个线程一个信号量一样的东西, park()调用会去修改那个信号量, unpark()也会. 这样无论先后, 两个函数都会知道当前的状态, 因此选择是阻塞还是继续执行.
一个简单的例子如下:
import java.util.concurrent.locks.LockSupport; public class LockSupportTest { public static class MyThread implements Runnable { @Override public void run() { System.out.println("准备一秒钟后调用阻塞方法"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.park(); } } public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new MyThread()); thread.start(); System.out.println("主线程尝试在启动副线程后就将其unpark()"); LockSupport.unpark(thread); thread.join(); System.out.println("全部线程执行完毕"); } }
这两个的用法其实是针对已经不太使用的thread.suspend()
和thread.resume()
来说的, 这两个方法必须严格的前后调用, 否则会导致线程挂起但没人唤醒.
而LockSupport就无需这种担心, 主线程可以保证一定会让副线程继续运行. 将上边的两行红色部分分别换成:
Thread.currentThread().suspend(); thread.resume();
程序就会卡死, 这是因为主线程先调用了副线程resume, 之后副线程再进入suspend, 就没法被唤醒了. 使用IDEA的话会发现, 这两个方法已经标记为废弃.
今天是7月1日啦, 一年一年时间过的好快, 未来还不知道有什么等着自己, 总之目前就继续做吧.