六、显式锁和AQS

2022-10-14,

显式锁aqs

一、显式锁

​ synchronized 关键字结合对象的监视器,jvm 为我们提供了一种『内置锁』的语义,这种锁很简便,不需要我们关心加锁和释放锁的过程,我们只需要告诉虚拟机哪些代码块需要加锁即可,其他的细节会由编译器和虚拟机自己实现。

​ 可以将我们的『内置锁』理解为是 jvm 的一种内置特性, 所以一个很显著的问题就是,它不支持某些高级功能的定制,比如说,我想要这个锁支持公平竞争,我想要根据不同的条件将线程阻塞在不同的队列上,我想要支持定时竞争锁,超时返回,我还想让被阻塞的线程能够响应中断请求等等。

​ 这些特殊的需求是『内置锁』满足不了的,所以在 jdk 层面又引入了『显式锁』的概念,不再由 jvm 来负责加锁和释放锁,这两个动作释放给我们程序来做,程序层面难免复杂了些,但锁灵活性提高了,可以支持更多定制功能,但要求你对锁具有更深层次的理解。

【1】lock 显式锁

lock 接口位于 java.util.concurrent.locks 包下,基本定义如下:

public interface lock {
    //获取锁,失败则阻塞
    void lock();
    //响应中断式获取锁
    void lockinterruptibly()
    //尝试一次获取锁,成功返回true,失败返回false,不会阻塞
    boolean trylock();
    //定时尝试
    boolean trylock(long time, timeunit unit)
    //释放锁
    void unlock();
    //创建一个条件队列
    condition newcondition();
}

​ 显式锁的实现类主要有三个,reentrantlock 是其最主要的实现类,readlock 和 writelock 是 reentrantreadwritelock 内部定义的两个内部类,他们继承自 lock 并实现了其定义的所有方法,精细化读写分离。而 reentrantreadwritelock 向外提供读锁写锁。

【2】reentrantlock(可重入锁)

​ reentrantlock 作为 lock 显式锁的最基本实现,也是使用最频繁的一个锁实现类。可重入就是可以重新获取这个锁,例如递归操作。

它提供了两个构造函数,用于支持公平竞争锁。

public reentrantlock()
//参数fair为是否支持公平竞争锁
public reentrantlock(boolean fair)

​ 公平锁和非公平锁的区别之处在于,公平锁在选择下一个占有锁的线程时,参考先到先得原则,等待时间越长的线程将具有更高的优先级。而非公平锁则无视这种原则。

​ 那么假设这么一种情况,a 获得锁正在运行,b 尝试获得锁失败被阻塞,此时 c 也尝试获得锁,失败而阻塞,虽然 c 只需要很短运行时间,它依然需要等待 b 执行结束才有机会获得锁来运行。

​ 非公平锁的前提下,a 执行结束,找到队列首部的 b 线程,开始上下文切换,假如此时的 c 过来竞争锁,非公平策略前提下,c 是可以获得锁的,并假设它迅速的执行结束了,当 b 线程被切换回来之后再去获取锁也不会有什么问题,结果是,c 线程在 b 线程的上下文切换过程中执行结束。显然,非公平策略下 cpu 的吞吐量是提高的。

但是,非公平策略的锁可能会造成某些线程饥饿,始终得不到运行,各有利弊,适时取舍。

【3】readwritelock(读写锁)

① readwritelock同lock一样也是一个接口,提供了readlock和writelock两种锁的操作机制,一个是只读的锁,一个是写锁。

​ 读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的(排他的)。 每次只能有一个写线程,但是可以有多个线程并发地读数据。

​ 所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。

​ 理论上,读写锁比互斥锁允许对于共享数据更大程度的并发。与互斥锁相比,读写锁是否能够提高性能取决于读写数据的频率、读取和写入操作的持续时间、以及读线程和写线程之间的竞争。

② 互斥原则:

  • 读-读能共存,
  • 读-写不能共存,
  • 写-写不能共存。

③reentrantreadwritelock

reentrantreadwritelock为readwritelock的实现类

​ 这个锁允许读线程和写线程以reentrantlock的语法重新获取读写锁。在写入线程保持的所有写入锁被释放之前,不允许不可重入的读线程。

​ 另外,写锁(写线程)可以获取读锁,但是不允许读锁(读线程)获取写锁。在其他应用程序中,当对在读锁下执行读取的方法或回调期间保持写锁时,可重入性可能非常有用。

实例代码如下:

public class testreadwritelock {

    public static void main(string[] args){
        final readwritelockdemo rwd = new readwritelockdemo();
        //启动100个读线程
        for (int i = 0; i < 100; i++) {
            new thread(new runnable() {
                @override
                public void run() {
                    rwd.get();
                }
            }).start();
        }
        //写线程
        new thread(new runnable() {
            @override
            public void run() {
                rwd.set((int)(math.random()*101));
            }
        },"write").start();
    }
}

class readwritelockdemo{
    //模拟共享资源--number
    private int number = 0;
    // 实际实现类--reentrantreadwritelock,默认非公平模式
    private readwritelock readwritelock = new reentrantreadwritelock();

    //读
    public void get(){
        //使用读锁
        readwritelock.readlock().lock();
        try {
            system.out.println(thread.currentthread().getname()+" : "+number);
        }finally {
            readwritelock.readlock().unlock();
        }
    }
    //写
    public void set(int number){
        readwritelock.writelock().lock();
        try {
            this.number = number;
            system.out.println(thread.currentthread().getname()+" : "+number);
        }finally {
            readwritelock.writelock().unlock();
        }
    }
}
/**
thread-50 : 0
thread-19 : 0
thread-54 : 0
thread-57 : 0
thread-31 : 0
write : 40
thread-61 : 40
thread-62 : 40
thread-35 : 40
thread-32 : 40
    
*/

首先启动读线程,此时number为0;然后某个时刻写线程修改了共享资源number数据,读线程再次读取最新值

二、aqs深入分析

【1】什么是aqs

​ aqs是abustactqueuedsynchronizer的简称,它是一个java提供的底层同步工具类,用一个int类型的变量表示同步状态(state),并提供了一系列的cas操作来管理这个同步状态。aqs的主要作用是为java中的并发同步组件提供统一的底层支持,例如reentrantlockcountdowlatch就是基于aqs实现的,用法是通过继承aqs实现其模版方法,然后将子类作为同步组件的内部类。

【2】同步队列

​ 同步队列是aqs很重要的组成部分,它是一个双端队列,遵循fifo原则,主要作用是用来存放在锁上阻塞的线程,当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个node节点加入到同步队列的尾部,队列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。

【3】state状态

​ abstractqueuedsynchronizer维护了一个volatile int类型的变量,用户表示当前同步状态。volatile虽然不能保证操作的原子性,但是保证了当前变量state的可见性。至于volatile的具体语义,可以参考我的相关文章。state的访问方式有三种:

  • getstate()
  • setstate()
  • compareandsetstate()

这三种叫做均是原子操作,其中compareandsetstate的实现依赖于unsafe的compareandswapint()方法。代码实现如下:

 /**
     * the synchronization state.
     */
    private volatile int state;
  
    /**
     * returns the current value of synchronization state.
     * this operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getstate() {
        return state;
    }

    /**
     * sets the value of synchronization state.
     * this operation has memory semantics of a {@code volatile} write.
     * @param newstate the new state value
     */
    protected final void setstate(int newstate) {
        state = newstate;
    }

    /**
     * atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * this operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. false return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareandsetstate(int expect, int update) {
        // see below for intrinsics setup to support this
        return unsafe.compareandswapint(this, stateoffset, expect, update);
    }

【4】资源的共享方式

​ aqs定义两种资源共享方式:exclusive(独占,只有一个线程能执行,如reentrantlock)和share(共享,多个线程可同时执行,如semaphore/countdownlatch)。
  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),aqs已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isheldexclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryacquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryrelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryacquireshared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryreleaseshared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

【5】获取锁和释放锁的流程

下面基于独占锁讲解在aqs获取锁和释放锁的流程:

获取:

  1. 调用自定义同步器的tryacquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addwaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquirequeued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfinterrupt(),将中断补上。

源码解析:

acquire是一种以独占方式获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {
    if (!tryacquire(arg) &&
        acquirequeued(addwaiter(node.exclusive), arg))
        selfinterrupt();
}

tryacquire尝试以独占的方式获取资源,如果获取成功,则直接返回true,否则直接返回false。该方法可以用于实现lock中的trylock()方法。该方法的默认实现是抛出unsupportedoperationexception,具体实现由自定义的扩展了aqs的同步类来实现。aqs在这里只负责定义了一个公共的方法框架。这里之所以没有定义成abstract,是因为独占模式下只用实现tryacquire-tryrelease,而共享模式下只用实现tryacquireshared-tryreleaseshared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

protected boolean tryacquire(int arg) {
    throw new unsupportedoperationexception();
}

  addwaiter该方法用于将当前线程根据不同的模式(node.exclusive互斥模式、node.shared共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。如果队列不为空,则以通过compareandsettail方法以cas的方式将当前线程节点加入到等待队列的末尾。否则,通过enq(node)方法初始化一个等待队列,并返回当前节点。源码如下:

private node addwaiter(node mode) {
    node node = new node(thread.currentthread(), mode);
    // try the fast path of enq; backup to full enq on failure
    node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareandsettail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

  acquirequeued用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。该方法的实现分成两部分:如果当前节点已经成为头结点,尝试获取锁(tryacquire)成功,然后返回;否则检查当前节点是否应该被park,然后将该线程park并且检查当前线程是否被可以被中断。

final boolean acquirequeued(final node node, int arg) {
    //标记是否成功拿到资源,默认false
    boolean failed = true;
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过
        for (;;) {
            final node p = node.predecessor();
            if (p == head && tryacquire(arg)) {
                sethead(node);
                p.next = null; // help gc
                failed = false;
                return interrupted;
            }
            if (shouldparkafterfailedacquire(p, node) &&
                parkandcheckinterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelacquire(node);
    }
}

释放:

  release方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

public final boolean release(int arg) {
    if (tryrelease(arg)) {
        node h = head;
        if (h != null && h.waitstatus != 0)
            unparksuccessor(h);
        return true;
    }
    return false;
}

    /**
     * attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>this method is always invoked by the thread performing release.
     *
     * <p>the default implementation throws
     * {@link unsupportedoperationexception}.
     *
     * @param arg the release argument. this value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  the value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws illegalmonitorstateexception if releasing would place this
     *         synchronizer in an illegal state. this exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws unsupportedoperationexception if exclusive mode is not supported
     */
protected boolean tryrelease(int arg) {
    throw new unsupportedoperationexception();
}

/**
     * wakes up node's successor, if one exists.
     *
     * @param node the node
     */
private void unparksuccessor(node node) {
    /*
         * if status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  it is ok if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitstatus;
    if (ws < 0)
        compareandsetwaitstatus(node, ws, 0);

    /*
         * thread to unpark is held in successor, which is normally
         * just the next node.  but if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    node s = node.next;
    if (s == null || s.waitstatus > 0) {
        s = null;
        for (node t = tail; t != null && t != node; t = t.prev)
            if (t.waitstatus <= 0)
                s = t;
    }
    if (s != null)
        locksupport.unpark(s.thread);
}

​ 与acquire()方法中的tryacquire()类似,tryrelease()方法也是需要独占模式的自定义同步器去实现的。正常来说,tryrelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryrelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
  unparksuccessor(node)方法用于唤醒等待队列中下一个线程。这里要注意的是,下一个线程并不一定是当前节点的next节点,而是下一个可以用来唤醒的线程,如果这个节点存在,调用unpark()方法唤醒。
  总之,release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

三、自定义独占锁

public class mutex implements lock {
    // 静态内部类,自定义同步器
    private static class sync extends abstractqueuedsynchronizer {
        // 是否处于占用状态
        protected boolean isheldexclusively() {
            return getstate() == 1;
        }

        // 当状态为0的时候获取锁
        public boolean tryacquire(int acquires) {
            if (compareandsetstate(0, 1)) {
                setexclusiveownerthread(thread.currentthread());
                return true;
            }
            return false;
        }

        // 释放锁,将状态设置为0
        protected boolean tryrelease(int releases) {
            if (getstate() == 0) throw new
                    illegalmonitorstateexception();
            setexclusiveownerthread(null);
            setstate(0);
            return true;
        }

        // 返回一个condition,每个condition都包含了一个condition队列
        condition newcondition() {
            return new conditionobject();
        }
    }

    // 仅需要将操作代理到sync上即可
    private final sync sync = new sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean trylock() {
        return sync.tryacquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public condition newcondition() {
        return sync.newcondition();
    }

    public boolean islocked() {
        return sync.isheldexclusively();
    }

    public boolean hasqueuedthreads() {
        return sync.hasqueuedthreads();
    }

    public void lockinterruptibly() throws interruptedexception {
        sync.acquireinterruptibly(1);
    }

    public boolean trylock(long timeout, timeunit unit) throws interruptedexception {
        return sync.tryacquirenanos(1, unit.tonanos(timeout));
    }
}

《六、显式锁和AQS.doc》

下载本文的Word格式文档,以方便收藏与打印。