CountDownLatch和CyclicBarrier
这两个java并发的工具类还是很常用的,还有一个Semaphore(信号量),看看他们分别是怎样使用的。
我第一次看到这三个工具类也是在Java并发编程实战上看到的,写下笔记增加点印象。
CountDownLatch
首先来看CountDownLatch,有人是这样比喻的:
百米赛跑,十名运动员同时起跑,由于速度的快慢,肯定有先到达和后到达的,而终点有个统计成绩的仪器,当所有选手到达终点时,它会统计所有人的成绩并进行排序,然后把结果发送到汇报成绩的系统。 来源
CountDownLatch来源于jdk@since 1.5
,他的作用就像刚才描述的例子那样,把运动员看成是不同的线程,主线程(统计的人)需要等待await()
其他线程(运动员)都执行执行到终点countDown()
,才进行后续的操作,比如: 成绩排序,发送汇总结果等等。
来看看CountDownLatch的API
构造函数
public CountDownLatch(int count) { }; //参数count为计数值
方法
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public void countDown() { }; //将count值减1
下面是我参考写出来的例子(使用junit4):
public class CountDownLatchTest { private static final Integer THREAD_COUNT = 2; @Test public void main() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); for (int i = 1; i <= THREAD_COUNT; i++) { new Thread(new MyRunable(i + "", i * 1000L, countDownLatch)).start() ; } countDownLatch.await(); System.out.println("All Over"); } public class MyRunable implements Runnable { private String name; private Long time; private CountDownLatch countDownLatch; public MyRunable(String name, Long time, CountDownLatch countDownLatch) { this.name = name; this.countDownLatch = countDownLatch; this.time = time; } @Override public void run() { System.out.println(name + ": run"); try { Thread.sleep(time); System.out.println(name + ": over"); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } } }
CyclicBarrier
CyclicBarrier,也是在@since 1.5
加入的并发控制工具类,它可以让线程进行互相等待await()
,当一个线程先执行完某些任务时,他会被阻塞(Barrier),直到所有线程都执行到这个状态时,这些线程才能继续往后执行其他任务。CycliBarrier对象可以重复使用,重用之前应当调用reset()
。
它有两个构造函数
// parties表示要多少个线程需要等待 // barrierAction为当这些线程都达到barrier状态时将会执行 public CyclicBarrier(int parties, Runnable barrierAction); public CyclicBarrier(int parties);
主要API
// 等待其他线程 应该是最常用的方法了 public int await() public int await(long timeout, TimeUnit unit) // 重置CyclicBarrier public void reset() // 获得CyclicBarrier阻塞的线程数量 public int getNumberWaiting() // 用来知道阻塞的线程是否被中断 public boolean isBroken()
下面看一个实例(同样用junit4测试):
public class CyclicBarrierTest { private static final Integer THREAD_COUNT = 2; @Test public void main() throws IOException { CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() { @Override public void run() { System.out.println("over"); } }); for (int i = 0; i < THREAD_COUNT; i++) { new Thread(new WriteRunable(i + "", cyclicBarrier)).start(); } System.in.read(); } public class WriteRunable implements Runnable { private String name; private CyclicBarrier cyclicBarrier; public WriteRunable(String name, CyclicBarrier cyclicBarrier) { this.name = name; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { double v = Math.random() * 10000d; System.out.println(v); Thread.sleep((long) v); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { System.out.println("任务完成: " + name); cyclicBarrier.await(); // 等待其他线程完成后再继续执行 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } System.out.println("继续后续任务"); } } }
Semaphore
Semaphore翻译成字面意思为 信号量,Semaphore可以控制同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
构造函数
// 参数permits表示许可数目,即同时可以允许多少线程进行访问 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 public Semaphore(int permits, boolean fair) { sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
重要的方法
// 获取一个许可 public void acquire() // 获取permits个许可 public void acquire(int permits) // 释放一个许可 public void release() // 释放permits个许可 public void release(int permits) // 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire() // 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) // 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits) // 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) // Returns the current number of permits available in this semaphore. public int availablePermits()
假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现
public class Test {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
文中Semaphore用法全来源于引用1.
引用: