博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程(十):AQS
阅读量:4342 次
发布时间:2019-06-07

本文共 9360 字,大约阅读时间需要 31 分钟。

  AQS全称为AbstractQueuedSynchronizer,是并发容器中的同步器,AQS是J.U.C的核心,它是抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类都依赖它,如ReentrantLock、Semaphore、CyclicBarrier、ReentrantLock、Condition、FutureTask等。

 

  AQS的特点:

  a、使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架

  b、利用一个int类型表示状态

  c、使用方法是继承

  d、子类通过继承并通过实现它的方法管理其状态

  e、可以同时实现排它锁和共享锁模式(独占、共享)

 

  AQS实现原理:

  AQS维护了一个volatile int state和一个FIFO线程等待队列

  state的访问方式有三种:getState(),setState(),compareAndSetState()

  AQS定义两种资源共享方式,Exclusive(独占,只有一个线程能执行),Share(共享,多个线程可同时执行)

  自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,具体线程等待队列的维护,如获取资源失败入队,唤醒出队等,AQS在顶层实现好了。自定义同步器实现时主要实现以下几种方法;

  tryAcquire(int):独占方式,尝试获取资源,成功返回true,失败返回false

  tryRelease(int): 独占方法,尝试释放资源,成功则返回true,失败则返回false

  tryAcquireShared(int):共享方式,尝试获取资源,负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,且有剩余资源

  tryReleaseShared(int):共享方式,尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false 

 

  acquire(int)

  a、tryAcquire()尝试直接去获取资源,如果成功则直接返回;

  b、addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式

  c、acquireQueueed() 使线程在等待队列中休息,有机会时会去尝试获取资源,获取到资源后才返回,如果在正在等待过程中被中断过,则返回true,否则返回false

  d、如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才进行自我中断selfInterrupt(),将中断补上

 

  release(int)

  此方法是独占模式下线程释放共享资源的顶层入口,它会释放指定量的资源,如果彻底释放了(state=0),它会唤醒等待队列里的其他线程来获取资源

 

  acquireShared(int)

  此方法是共享模式下线程获取共享资源的顶层入口,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止, 跟独占模式比这里只有线程是head.next时,才会去尝试获取资源,有剩余的话还会唤醒之后的队友,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要两个,老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢还是不让,答案是老二会继续park(),等待其他线程释放资源,也更不会去唤醒老三和老四了,独占模式同一时刻只有一个线程去执行,但共享模式下,多个线程是可以同时执行的,因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了,它跟acqure()流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作

 

  releaseShared()

  此方式是共享模式下线程释放共享资源的顶层入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源,也就是释放掉资源后,唤醒后继

 

  下面我们介绍一下通过AQS实现的类的例子,CountDownLatch、Semaphore、CyclicBarrier、ReentrantLock等都是通过AQS实现的,其中CountDownLatch和Semaphore我们已经在前面的博客中说过了,我们着重来看剩下的两个类

 

  CyclicBarrier

  它允许一组工作线程相互等待,直到到达某个工作屏障点,只有当每个线程都准备就绪后才能继续执行后面的操作,它和CountDownLatch有相似的地方都是通过计数器实现的,但它在释放等待线程后可以重用,是循环屏障,可以一直循环来使用(计数器可重置)。CountDownLatch是一个或多个线程等待一个线程的关系,CyclicBarrier主要是实现了多个线程之间相互等待,直到所有线程都满足条件后才能继续后续的操作,如有五个线程在等待,只有这5个线程都调用了await()方法后才能继续执行

 

 

  CyclicBarrier-demo1

@Slf4jpublic class CyclicBarrierExample1 {    //定义有多少线程同步等待    private static CyclicBarrier barrier = new CyclicBarrier(5);    public static void main(String[] args)throws Exception {        ExecutorService executor = Executors.newCachedThreadPool();        for (int i = 0; i < 10; i++) {            final int threadNum = i;            Thread.sleep(1000);            executor.execute(()->{                try {                    race(threadNum);                } catch (Exception e) {                    log.error("exception",e);                }            });        }        executor.shutdown();    }    private static void race(int threadNum)throws Exception {        Thread.sleep(1000);        log.info("{} is ready",threadNum);        barrier.await();        log.info("{} continue",threadNum);    }}

输出如下:

  使用此类时需先给出需要相互等待的线程数,如demo中我给出的是5个,当调用await()方法的线程数达到5个后才能继续执行,await()还可以设置等待的时候,如果超过等待时间则继续往下执行

 

  CyclicBarrier-demo2

@Slf4jpublic class CyclicBarrierExample3 {//    达到屏障之后,优先执行callback is running    private static CyclicBarrier barrier = new CyclicBarrier(5,()->{        log.info("callback is running");    });    public static void main(String[] args)throws Exception {        ExecutorService executor = Executors.newCachedThreadPool();        for (int i = 0; i < 10; i++) {            final int threadNum = i;            Thread.sleep(1000);            executor.execute(()->{                try {                    race(threadNum);                } catch (Exception e) {                    log.error("exception",e);                }            });        }        executor.shutdown();    }    private static void race(int threadNum)throws Exception {        Thread.sleep(1000);        log.info("{} is ready",threadNum);        barrier.await();        log.info("{} continue",threadNum);    }}

输出为:

  此demo告诉我们当线程到达屏障的时候,可设置优先执行的代码

 

  ReentrantLock

  我们首先说一下ReentrantLock和synchronized的区别

  a、它俩都是可重入锁,都是同一个线程进入一次锁的计数器就自增1,所以等到锁的计数器下降为0时才会释放锁

  b、synchronized是JVM实现的,ReentrantLock是JDK实现的

  c、synchronized引入偏向锁、自旋锁后性能已经和ReentrantLock差不多了,官方推荐使用synchronized

  d、synchronized使用更加简洁,是由编译器保证锁的加锁和释放的,ReentrantLock需要手工加锁和释放锁,但在锁的细粒度和灵活度方面ReentrantLock会优于synchronized

 

  下面是ReentrantLock独有的功能

  a、可指定是公平锁(先等待的线程先获得锁)还是非公平锁

  b、提供了一个Condition类,可以分组唤醒需要唤醒的线程,而不是像synchronized要么随机唤醒一个线程,要么唤醒所有线程

  c、提供能够中断等待锁的机制

 

  ReentrantLock-demo

@Slf4jpublic class LockExample2 {    //请求总数    public static int clientTotal = 5000;    //同时并发执行的线程数    public static int threadTotal = 200;    public static int count = 0;    private final static Lock lock = new ReentrantLock();    private  static void add() {        lock.lock();        try {            count++;        }finally {           lock.unlock();        }    }    public static void main(String[] args)throws Exception {        //定义线程池        ExecutorService executorService = Executors.newCachedThreadPool();        //定义信号量        final Semaphore semaphore = new Semaphore(threadTotal);        //定义计数器闭锁        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);        for (int i = 0; i < clientTotal; i++) {            executorService.execute(()->{                try {                    semaphore.acquire();                    add();                    semaphore.release();                } catch (Exception e) {                    log.error("exception",e);                }                countDownLatch.countDown();            });        }        countDownLatch.await();        executorService.shutdown();        log.info("count:{}",count);    }}

  运行结果为5000,上面就是简单的ReentrantLock的使用

 

  ReentrantReadWriteLock

@Slf4jpublic class LockExample3 {    private final Map
map = new TreeMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data get(String key) { readLock.lock(); try { return map.get(key); }finally { readLock.unlock(); } } public Set
getAllKeys() { readLock.lock(); try { return map.keySet(); }finally { readLock.unlock(); } } public Data put(String key, Data value) { writeLock.lock(); try { return map.put(key,value); }finally { writeLock.unlock(); } } class Data{ }}

  此demo为读写锁demo,对读和写分别进行锁定操作,在获取写入锁的时候,不允许读操作的锁还在保持着,如果运用在读特别多而写特别少的时候,会导致写操作的饥饿,写会一直在等待,不知道什么时候会获取锁

 

  StampedLock

@Slf4j@ThreadSafepublic class LockExample4 {    //请求总数    public static int clientTotal = 5000;    //同时并发执行的线程数    public static int threadTotal = 200;    public static int count = 0;    private final static StampedLock lock = new StampedLock();    private  static void add() {        long stamp = lock.writeLock();        try {            count++;        }finally {           lock.unlock(stamp);        }    }    public static void main(String[] args)throws Exception {        //定义线程池        ExecutorService executorService = Executors.newCachedThreadPool();        //定义信号量        final Semaphore semaphore = new Semaphore(threadTotal);        //定义计数器闭锁        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);        for (int i = 0; i < clientTotal; i++) {            executorService.execute(()->{                try {                    semaphore.acquire();                    add();                    semaphore.release();                } catch (Exception e) {                    log.error("exception",e);                }                countDownLatch.countDown();            });        }        countDownLatch.await();        executorService.shutdown();        log.info("count:{}",count);    }}

  StampedLock控制锁有三种模式,分别是写、读和乐观读,StampedLokc由版本和模式两个部分组成,使用锁时返回一个数字作为票据,用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被首先访问,在读锁上分为悲观锁和乐观锁,乐观读就是在读的操作很多而写的操作很少的情况下,我们可以乐观的认为写入与读取同时方法的概率很小,因此不悲观的使用完全锁定,程序可以查看读取资料之后是否遭到写入之前的变更再采取后续的措施,这个改进可以大幅度提高程序的吞吐量

 

  Condition

@Slf4jpublic class LockExample5 {    public static void main(String[] args) {        ReentrantLock reentrantLock = new ReentrantLock();        Condition condition = reentrantLock.newCondition();        new Thread(()->{            try {                reentrantLock.lock();                log.info("wait signal");                condition.await();            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("get signal");            reentrantLock.unlock();        }).start();        new Thread(()->{            reentrantLock.lock();            log.info("get lock");            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            condition.signalAll();            log.info("send signal");            reentrantLock.unlock();        }).start();    }}

  输出结果如下:

  执行过程如下:首先定义ReentrantLock的实例,并从实例中取出Condition,线程一调用reentrantLock.lock()方法,线程加入到aqs的等待队列中去,当调用condition.await()的时候,将线程从aqs队列中移除,对应的操作是锁的释放,并加入到condition的等待队列中去,此线程等待,因为线程一释放锁的关系,线程2被唤醒,线程2获取锁,也加入到aqs的等待队列中,线程2在执行完后执行了condition.sigalAll(),此时线程1被从condition的等待队列中取出放入aqs的等待队列中去,但此时线程一并没有被唤醒,直到线程二执行了reentrantLock.unlock,释放锁,此时aqs的等待队列中只剩下线程一,按照从头到尾的顺序唤醒线程,线程一被唤醒,继续执行,之后线程一释放锁,这个过程完毕

 

  此外最后,向大家推荐一遍关于aqs实现原理的文章,这篇博客讲的非常详细,大家可以仔细研读一下 https://www.cnblogs.com/waterystone/p/4920797.html

转载于:https://www.cnblogs.com/sbrn/p/9021807.html

你可能感兴趣的文章
USACO 4.4.2 追查坏牛奶 oj1341 网络流最小割问题
查看>>
_016_信号
查看>>
[学习笔记] CS131 Computer Vision: Foundations and Applications:Lecture 1 课程介绍
查看>>
解决:对COM组件的调用返回了错误HRESULT E_FAIL
查看>>
DHTMLX 前端框架 建立你的一个应用程序 教程(五)--添加一个表格Grid
查看>>
XML的四种解析器原理及性能比较
查看>>
233 Matrix HDU - 5015 矩阵递推
查看>>
在局域网中发布 https证书问题
查看>>
iframe自适应高度计算,iframe自适应
查看>>
【POJ 3169 Layout】
查看>>
call()和原型继承的方法
查看>>
UVALive - 6893 The Big Painting 字符串哈希
查看>>
总结五个在办公中使用很爽的软件
查看>>
使用对象属性
查看>>
PWM输出,呼吸灯
查看>>
QTcpSocket发送结构体
查看>>
redmine install 菜鸟的艰难历程
查看>>
总结2:指出代码中的错误
查看>>
[唐胡璐]Selenium技巧- 抓图并保存到TestNG报告中
查看>>
P1352 没有上司的舞会
查看>>