CountDownLatch和CyclicBarrier

这两个java并发的工具类还是很常用的,还有一个Semaphore(信号量),看看他们分别是怎样使用的。

我第一次看到这三个工具类也是在Java并发编程实战上看到的,写下笔记增加点印象。

CountDownLatch

首先来看CountDownLatch,有人是这样比喻的:

百米赛跑,十名运动员同时起跑,由于速度的快慢,肯定有先到达和后到达的,而终点有个统计成绩的仪器,当所有选手到达终点时,它会统计所有人的成绩并进行排序,然后把结果发送到汇报成绩的系统。 来源

CountDownLatch来源于jdk@since 1.5,他的作用就像刚才描述的例子那样,把运动员看成是不同的线程,主线程(统计的人)需要等待await()其他线程(运动员)都执行执行到终点countDown(),才进行后续的操作,比如: 成绩排序,发送汇总结果等等。

来看看CountDownLatch的API

  • 构造函数

    public CountDownLatch(int count) {  };  //参数count为计数值
    
  • 方法

    public void await() throws InterruptedException { };   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
    public void countDown() { };  //将count值减1
    
  • 下面是我参考写出来的例子(使用junit4):

    public class CountDownLatchTest {
    
        private static final Integer THREAD_COUNT = 2;
    
        @Test
        public void main() throws InterruptedException {
    
            CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
    
            for (int i = 1; i <= THREAD_COUNT; i++) {
                new Thread(new MyRunable(i + "", i * 1000L, countDownLatch)).start()   ;
            }
    
            countDownLatch.await();
            System.out.println("All Over");
        }
    
        public class MyRunable implements Runnable {
    
            private String name;
            private Long time;
            private CountDownLatch countDownLatch;
    
            public MyRunable(String name, Long time, CountDownLatch countDownLatch)    {
                this.name = name;
                this.countDownLatch = countDownLatch;
                this.time = time;
            }
    
            @Override
            public void run() {
                System.out.println(name + ": run");
                try {
                    Thread.sleep(time);
                    System.out.println(name + ": over");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }
        }
    }
    

CyclicBarrier

CyclicBarrier,也是在@since 1.5加入的并发控制工具类,它可以让线程进行互相等待await(),当一个线程先执行完某些任务时,他会被阻塞(Barrier),直到所有线程都执行到这个状态时,这些线程才能继续往后执行其他任务。CycliBarrier对象可以重复使用,重用之前应当调用reset()

  • 它有两个构造函数

    // parties表示要多少个线程需要等待
    // barrierAction为当这些线程都达到barrier状态时将会执行
    public CyclicBarrier(int parties, Runnable barrierAction);
    public CyclicBarrier(int parties);
    
  • 主要API

    // 等待其他线程 应该是最常用的方法了
    public int await()
    public int await(long timeout, TimeUnit unit)
    // 重置CyclicBarrier
    public void reset()
    // 获得CyclicBarrier阻塞的线程数量
    public int getNumberWaiting()
    // 用来知道阻塞的线程是否被中断
    public boolean isBroken()
    
  • 下面看一个实例(同样用junit4测试):

    public class CyclicBarrierTest {
    
        private static final Integer THREAD_COUNT = 2;
    
        @Test
        public void main() throws IOException {
    
            CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new    Runnable() {
                @Override
                public void run() {
                    System.out.println("over");
                }
            });
    
            for (int i = 0; i < THREAD_COUNT; i++) {
                new Thread(new WriteRunable(i + "", cyclicBarrier)).start();
            }
    
            System.in.read();
        }
    
        public class WriteRunable implements Runnable {
    
            private String name;
            private CyclicBarrier cyclicBarrier;
    
            public WriteRunable(String name, CyclicBarrier cyclicBarrier) {
                this.name = name;
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                try {
                    double v = Math.random() * 10000d;
                    System.out.println(v);
                    Thread.sleep((long) v);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        System.out.println("任务完成: " + name);
                        cyclicBarrier.await(); // 等待其他线程完成后再继续执行
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
    
                System.out.println("继续后续任务");
            }
        }
    }
    

Semaphore

Semaphore翻译成字面意思为 信号量,Semaphore可以控制同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

  • 构造函数

    // 参数permits表示许可数目,即同时可以允许多少线程进行访问
    public Semaphore(int permits) {  
        sync = new NonfairSync(permits);
    }
    // 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
    public Semaphore(int permits, boolean fair) {
        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
    }
    
  • 重要的方法

    // 获取一个许可
    public void acquire()
    // 获取permits个许可
    public void acquire(int permits)
    // 释放一个许可  
    public void release()
    // 释放permits个许可
    public void release(int permits)
    // 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire()
    // 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(long timeout, TimeUnit unit)
    // 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire(int permits)
    // 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    // Returns the current number of permits available in this semaphore.
    public int availablePermits()
    

假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现

public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }

    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

文中Semaphore用法全来源于引用1.

引用:

  1. Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
  2. Java并发包中CyclicBarrier的工作原理、使用示例
  3. 并发工具类(二)同步屏障CyclicBarrier