跳至主要內容

原子操作类

Jin大约 18 分钟

原子操作类

1、API

都是java.util.concurrent.atomic包下的

序号描述
1AtomicBooleanopen in new window可以原子方式更新的值 boolean
2AtomicIntegeropen in new window可以原子方式更新的 int值。
3AtomicIntegerArrayopen in new window一个 int数组,其中元素可以原子方式更新。
4AtomicIntegerFieldUpdateropen in new window基于反射的实用程序,可对指定类的指定 volatile int字段进行原子更新。
5AtomicLongopen in new window可以原子方式更新的 long值。
6AtomicLongArrayopen in new window一个 long数组,其中元素可以原子方式更新。
7AtomicLongFieldUpdateropen in new window基于反射的实用程序,可以对指定类的指定 volatile long字段进行原子更新。
8AtomicMarkableReferenceopen in new windowAtomicMarkableReference维护一个对象引用以及一个标记位,可以原子方式更新。
9AtomicReferenceopen in new window可以原子方式更新的对象引用。
10AtomicReferenceArrayopen in new window一组对象引用,其中元素可以原子方式更新。
11AtomicReferenceFieldUpdateropen in new window基于反射的实用程序,可以对指定类的指定 volatile引用字段进行原子更新。
12AtomicStampedReferenceopen in new windowAtomicStampedReference维护一个对象引用以及一个整数“标记”,可以原子方式更新。
13DoubleAccumulatoropen in new window一个或多个变量共同维护使用提供的函数更新的运行 double值。
14DoubleAdderopen in new window一个或多个变量共同维持最初的零和 double总和。
15LongAccumulatoropen in new window一个或多个变量共同维护使用提供的函数更新的运行 long值。
16LongAdderopen in new window一个或多个变量共同维持最初为零的总和为 long

2、分类

2.1、基本类型原子类

  1. AtomicInteger
  2. AtomicBoolean
  3. AtomicLong
  • 常用API简介
public final int get()//获取当前的值
public final int getAndSet(int new Value)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement()//获取当前的值,并自减
public final int getAndAdd(int delta)//获取当前的值,并加上预期的值
public comapreAndSet(int expect,int update)//如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final int incrementAndGet() // 以原子方式递增当前值,具有 VarHandle.getAndAdd(java.lang.Object...)指定的记忆效应。
  • 案例
class MyNumber
{
    @Getter
    private AtomicInteger atomicInteger = new AtomicInteger();
    public void addPlusPlus()
    {
        atomicInteger.incrementAndGet();
    }
}

    @Test
    public void demo03() throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger);
        //main	result: 50000
    }

CountDownLatch使用

class MyNumber {
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus() {
        atomicInteger.getAndIncrement();
    }
}

public class AtomicIntegerDemo {
    public static final int SIZE = 50;

    @Test
    public void demo01() {
        MyNumber myNumber = new MyNumber();
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                for (int j = 1; j <= 1000; j++) {
                    myNumber.addPlusPlus();
                }
            }, String.valueOf(i)).start();
        }
        //本来应该是50000
        //1试-main  result: 39000
        //2试-main  result: 40178
        //?是不是我们的程序有问题?
        //因为上面的50*  1000个计算还没结束,他就去get数值了
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger);
    }

    /**
     * 不推荐使用TimeUnit.SECONDS.sleep()
     */
    @Test
    public void demo02() {
        MyNumber myNumber = new MyNumber();
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                for (int j = 1; j <= 1000; j++) {
                    myNumber.addPlusPlus();
                }
            }, String.valueOf(i)).start();
        }
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger);
    }

    /**
     * 减法计数器CountDownLatch
     * @throws InterruptedException
     */
    @Test
    public void demo03() throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger);
    }
}

2.2、数组类型原子类

  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicReferenceArray
  • 案例
public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});

        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println("====================================");
        int tmpInt = 0;

        tmpInt = atomicIntegerArray.getAndSet(0, 1122);
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
        atomicIntegerArray.getAndIncrement(1);
        atomicIntegerArray.getAndIncrement(1);
        tmpInt = atomicIntegerArray.getAndIncrement(1);
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));
    }
}
//打印结果:
//0
//0
//0
//0
//0
//====================================
//0	1122
//2	3

2.3、引用类型原子类

  1. AtomicReference
  2. AtomicStampedReference
  3. AtomicMarkableReference

AtomicReference

@Getter
@ToString
@AllArgsConstructor
class User {
    String userName;
    int age;
}

public class AtomicReferenceDemo {
    public static void main(String[] args) {
        User z3 = new User("z3", 24);
        User li4 = new User("li4", 26);
        //将类型丢入泛型即可
        AtomicReference<User> atomicReferenceUser = new AtomicReference<>();

        atomicReferenceUser.set(z3);//将这个原子类设置为张三
        //张三换位李四
        System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
        //true   User(userName=li4,age=28)
        System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
        //false   User(userName=li4,age=28)
    }
}
  • 使用AtomicReference自定义自旋锁
// 自旋锁好处:循环比较获取没有类似wait的阻塞。
// 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
// 当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到。
public class SpinLockDemo {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void Lock() {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
        while (!atomicReference.compareAndSet(null, thread))//用这个循环实现自旋
        {

        }
        //如果是空的,那我们把thread放进去

    }

    public void UnLock() {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread, null);//把当前线程踢出去,置为null
        System.out.println(Thread.currentThread().getName() + "\t" + "-------task over,unLock.....");
    }

    public static void main(String[] args) {
        SpinLockDemo spinLockDemo = new SpinLockDemo();
        new Thread(() -> {
            spinLockDemo.Lock();
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            spinLockDemo.UnLock();
        }, "A").start();

        //暂停一会儿线程,保证A线程先于B线程启动并完成
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            spinLockDemo.Lock();//B  -----come in  B只是尝试去抢锁,但是一直在自旋。

            spinLockDemo.UnLock();//A结束后 B立马抢到锁,然后马上结束了
        }, "B").start();

    }
}

AtomicStampedReference

  • 携带版本号的引用类型原子类,可以解决ABA问题
  • 解决修改过几次
  • 状态戳原子引用
public class ABADemo {
    static AtomicInteger atomicInteger = new AtomicInteger(100);
    static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100, 1);

    /**
     * ABA问题复现
     */
    @Test
    public void demo1() {
        new Thread(() -> {
            atomicInteger.compareAndSet(100, 101);
            atomicInteger.compareAndSet(101, 100);//这里 中间就有人动过了,虽然值是不变的,假如不检查版本号,CAS就直接能成功了
        }, "t1").start();

        new Thread(() -> {
            //暂停一会儿线程
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicInteger.compareAndSet(100, 2022) + "\t" + atomicInteger.get());//true	2022
        }, "t2").start();
        //暂停一会儿线程,main彻底等待上面的ABA出现演示完成。
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * ABA问题的解决
     */
    @Test
    public void demo2() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp);//t3	 首次版本号:1
            //暂停500毫秒,保证t4线程初始化拿到的版本号和我一样,
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "\t 2次版本号:" + atomicStampedReference.getStamp());//t3	 2次版本号:2
            atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "\t 3次版本号:" + atomicStampedReference.getStamp());//t3	 3次版本号:3
        }, "t3").start();

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();//记录一开始的版本号,并且写死
            System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp);//t4	 首次版本号:1
            //暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);//这个还是初始的版本号,但是实际上版本号被T3修改了,所以肯定会失败
            System.out.println(Thread.currentThread().getName() + "\t" + result + "\t" + atomicStampedReference.getReference());//t4	false	100
            countDownLatch.countDown();
        }, "t4").start();
        countDownLatch.await();
    }
}

AtomicMarkableReference

  • 携带版本号的引用类型原子类,可以解决ABA问题
  • 解决修改过几次
    • 它的定义就是将状态戳简化为true|false
    • 类似一次性筷子
  • 状态戳(true/false)原子引用
    /**
     * 使用AtomicMarkableReference 解决ABA问题
     * AtomicMarkableReference不关心引用变量更改过几次,只关心是否更改过
     *
     * @throws InterruptedException
     */
    @Test
    public void demo3() throws InterruptedException {
        AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(100, false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);//t5	 1次版本号false
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            markableReference.compareAndSet(100, 101, marked, !marked);
            System.out.println(Thread.currentThread().getName() + "\t 2次版本号" + markableReference.isMarked());//t5	 2次版本号true
//            markableReference.compareAndSet(101, 100, markableReference.isMarked(), !markableReference.isMarked());
            markableReference.compareAndSet(101, 100, markableReference.isMarked(), !marked);
            System.out.println(Thread.currentThread().getName() + "\t 3次版本号" + markableReference.isMarked());//t5	 3次版本号false
        }, "t5").start();

        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);//t6	 1次版本号false
            //暂停几秒钟线程
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            markableReference.compareAndSet(100, 2020, marked, !marked);
            System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference() + "\t" + markableReference.isMarked());//t6	100	true
            countDownLatch.countDown();
        }, "t6").start();
        countDownLatch.await();
    }

2.4、对象的属性修改原子类

  • AtomicIntegerFieldUpdater
    • 原子更新对象中int类型字段的值
  • AtomicLongFieldUpdater
    • 原子更新对象中Long类型字段的值
  • AtomicReferenceFieldUpdater
    • 原子更新引用类型字段的值

使用目的

以一种线程安全的方式操作非线程安全对象内的某些字段

使用要求

更新的对象属性必须使用 public volatile 修饰符。

因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

AtomicIntegerFieldUpdater

/**
 * 以一种线程安全的方式操作非线程安全对象的某些字段。
 * 需求:
 * 1000个人同时向一个账号转账一元钱,那么累计应该增加1000元,
 * 除了synchronized和CAS,还可以使用AtomicIntegerFieldUpdater来实现。
 */
public class AtomicIntegerFieldUpdaterDemo {
    public static void main(String[] args) {
        BankAccount bankAccount = new BankAccount();

        for (int i = 1; i <= 1000; i++) {
            int finalI = i;
            new Thread(() -> {
                bankAccount.transferMoney(bankAccount);
            }, String.valueOf(i)).start();
        }

        //暂停毫秒
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(bankAccount.money);//1000
    }
}

class BankAccount {
    private String bankName = "CCB";//银行
    public volatile int money = 0;//钱数
    AtomicIntegerFieldUpdater<BankAccount> accountAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");

    //不加锁+性能高,局部微创
    public void transferMoney(BankAccount bankAccount) {
        accountAtomicIntegerFieldUpdater.incrementAndGet(bankAccount);
    }
}

AtomicReferenceFieldUpdater

public class AtomicReferenceFieldUpdaterDemo {
    public static void main(String[] args) throws InterruptedException {
        MyVar myVar = new MyVar();

        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            }, String.valueOf(i)).start();
        }
    }
}

class MyVar {
    public volatile Boolean isInit = Boolean.FALSE;
    AtomicReferenceFieldUpdater<MyVar, Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");


    public void init(MyVar myVar) {
        if (atomicReferenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
            System.out.println(Thread.currentThread().getName() + "\t" + "---init.....");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "\t" + "---init.....over");
        } else {
            System.out.println(Thread.currentThread().getName() + "\t" + "------其它线程正在初始化");
        }
    }
}
//2	------其它线程正在初始化
//4	------其它线程正在初始化
//3	------其它线程正在初始化
//1	---init.....
//5	------其它线程正在初始化
//1	---init.....over

面试题:你在哪里用了volatile

答案:AtomicReferenceFieldUpdater

3、原子操作增强类原理深度解析

  1. DoubleAccumulator
  2. DoubleAdder
  3. LongAccumulator
  4. LongAdder

这几个都是java8开始有的,前面的都是java5就有了

2.1、阿里巴巴开发手册

【参考】volatile解决多线程内存不可见问题对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。

说明:如果是count++操作,使用如下类实现:

AtomicInteger count = new AtomicInteger();

count.addAndGet(1);

如果是JDK8,推荐使用LongAdder对象,比AtomicLong性能更好(减少乐观锁的重试次数)。

2.2、LongAdder

  • java.lang.Object

    • java.lang.Number
      • java.util.concurrent.atomic.LongAdder
    • 实现的所有接口

      Serializable


    public class LongAdder extends Striped64 implements Serializable
    abstract class Striped64 extends Number
    

​ 一个或多个变量,它们共同维持最初的零long总和。 当跨线程争用更新(方法add(long)open in new window )时,变量集可以动态增长以减少争用。 方法sum()open in new window (或等效地, longValue()open in new window )返回保持总和的变量的当前总和。

当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公共和时,此类通常优于AtomicLongopen in new window 。 在低更新争用下,这两个类具有相似的特征。 但在高争用的情况下,这一类的预期吞吐量明显更高,但代价是空间消耗更高。

LongAdders可与ConcurrentHashMapopen in new window一起使用,以维护可扩展的频率图(直方图或多重集的形式)。 例如,要将计数添加到ConcurrentHashMap<String,LongAdder> freqs ,如果尚未初始化,则可以使用freqs.computeIfAbsent(key, k -> new LongAdder()).increment();

该类扩展Numberopen in new window ,但定义诸如方法equalshashCodecompareTo ,因为实例预计将发生突变,所以不如收集钥匙有用。

  • 从以下版本开始:

    1.8

2.2.1、方法摘要

变量和类型方法描述
voidadd(long x)添加给定值。
voiddecrement()相当于 add(-1)
doubledoubleValue()在扩展原始转换后,将 sum()open in new window作为 double返回。
floatfloatValue()在扩展基元转换后,将 sum()open in new window作为 float返回。
voidincrement()相当于 add(1)
intintValue()在缩小基元转换后,将 sum()open in new window作为 int返回。
longlongValue()相当于 sum()open in new window
voidreset()重置保持总和为零的变量。
longsum()返回当前总和。
longsumThenReset()相当于 sum()open in new window后跟 reset()open in new window
StringtoString()返回 sum()open in new window的String表示 形式open in new window

2.2.2、基础方法

LongAdder只能用来计算加法 。且从零开始计算

LongAccumulator提供了自定义的函数操作 (利用lambda表达式)

    @Test
    public void demo1() {
        LongAdder longAdder = new LongAdder();

        longAdder.increment();
        longAdder.increment();
        longAdder.increment();

        System.out.println(longAdder.longValue());//3

        LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);//lambda表达式,必须要有初始值
        longAccumulator.accumulate(1);//1
        longAccumulator.accumulate(3);//4
        System.out.println(longAccumulator.get());//4
    }

2.2.3、LongAdder高性能对比

  • 案例:热点商品点赞计算器,点赞数加加统计,不要求实时精确
  • synchronized、AtomicLong、LongAdder、LongAccumulator对比
class ClickNumber {
    int number = 0;

    public synchronized void add1() {
        number++;
    }

    AtomicLong atomicLong = new AtomicLong(0);

    public void add2() {
        atomicLong.incrementAndGet();
    }

    LongAdder longAdder = new LongAdder();

    public void add3() {
        longAdder.increment();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

    public void add4() {
        longAccumulator.accumulate(1);
    }
}
	/**
     * 耗时:---1958毫秒	synchronized---50000000
     * 耗时:---1091毫秒	atomicLong---50000000
     * 耗时:---146毫秒	LongAdder---50000000
     * 耗时:---115毫秒	LongAccumulator---50000000
     * 印证了阿里卡法手册中说的 【如果是JDK8,推荐使用LongAdder对象,比AtomicLong性能更好(减少乐观锁的重试次数)】
     * @throws InterruptedException
     */ 

	public void demo2() throws InterruptedException {
        ClickNumber clickNumber = new ClickNumber();
        Long startTime;
        Long endTime;
        CountDownLatch countDownLatch1 = new CountDownLatch(50);
        CountDownLatch countDownLatch2 = new CountDownLatch(50);
        CountDownLatch countDownLatch3 = new CountDownLatch(50);
        CountDownLatch countDownLatch4 = new CountDownLatch(50);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.add1();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("耗时:---" + (endTime - startTime) + "毫秒" + "\t" + "synchronized---" + clickNumber.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.add2();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("耗时:---" + (endTime - startTime) + "毫秒" + "\t" + "atomicLong---" + clickNumber.atomicLong);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.add3();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("耗时:---" + (endTime - startTime) + "毫秒" + "\t" + "LongAdder---" + clickNumber.longAdder.sum());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.add4();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("耗时:---" + (endTime - startTime) + "毫秒" + "\t" + "LongAccumulator---" + clickNumber.longAccumulator.longValue());
    }

2.3、源码、原理分析

2.3.1、架构

image-20220726220601402
image-20220726220601402

2.3.2、原理(LongAdder为什么这么快)

  1. 官网说明和阿里要求
image-20220726221110406
image-20220726221110406
image-20220726221136463
image-20220726221136463
  1. LongAdder是Striped64的子类
public class LongAdder extends Striped64 implements Serializable 
  1. Striped64
  • 重要的成员函数
    /** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     */
    transient volatile int cellsBusy;
  • 最重要的两个
    transient volatile Cell[] cells;
    transient volatile long base;
  • Striped64中一些变量或者方法的定义
image-20220726221018630
image-20220726221018630
  1. Cell

    是 java.util.concurrent.atomic 下 Striped64 的一个内部类

image-20220726221325736
image-20220726221325736
  1. LongAdder为什么这么快
  • LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
  • sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去, 从而降级更新热点。
image-20220726221427015
image-20220726221427015
  • 公式
image-20220726221442810
image-20220726221442810

4、LongAdder源码解读深度分析

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

image-20220726221757933
image-20220726221757933
image-20220726221802786
image-20220726221802786

4.1、longAdder.increment()

4.1.1、add(1L)

image-20220726221956253
image-20220726221956253

条件递增,逐步解析

image-20220726222014487
image-20220726222014487
  1. 最初无竞争时只更新base;
  2. 如果更新base失败后,首次新建一个Cell[]数组
  3. 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容

4.1.2、longAccumulate

  • longAccumulate入参说明
image-20220726222139259
image-20220726222139259
  • Striped64中一些变量或者方法的定义
image-20220726222636513
image-20220726222636513
步骤
线程hash值:probe
image-20220726222213656
image-20220726222213656
image-20220726222218920
image-20220726222218920
image-20220726222230287
image-20220726222230287
image-20220726222235805
image-20220726222235805
总纲
image-20220726222255443
image-20220726222255443

上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:

  1. CASE1:Cell[]数组已经初始化
  2. CASE2:Cell[]数组未初始化(首次新建)
  3. CASE3:Cell[]数组正在初始化中
计算
  1. 刚刚要初始化Cell[]数组(首次新建)

未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组

image-20220726223058969
image-20220726223058969

​ 如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思。

  1. 兜底

多个线程尝试CAS修改失败的线程会走到这个分支

image-20220726223152932
image-20220726223152932

该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

  1. Cell数组不再为空且可能存在Cell数组扩容

    多个线程同时命中一个cell的竞争

image-20220726223227825
image-20220726223227825

image-20220726223233831
image-20220726223233831
  • 上面代码判断当前线程hash后指向的数据位置元素是否为空,
  • 如果为空则将Cell数据放入数组中,跳出循环。
  • 如果不空则继续循环。

image-20220726223240703
image-20220726223240703

image-20220726223245915
image-20220726223245915
  • 说明当前线程对应的数组中有了数据,也重置过hash值,
  • 这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。

image-20220726223302839
image-20220726223302839

image-20220726223310649
image-20220726223310649

image-20220726223335543
image-20220726223335543

image-20220726223549510
image-20220726223549510

4.1.3、sum

  • sum()会将所有Cell数组中的value和base累加作为返回值。
  • 核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

为啥在并发情况下sum的值不精确?

  1. sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。

  2. 首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。

  3. 其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

image-20220726223832567
image-20220726223832567

5、总结

AtomicLong

  1. 线程安全,可允许一些性能损耗,要求高精度时可使用
  2. 保证精度,性能代价
  3. AtomicLong是多个线程针对单个热点值value进行原子操作

原理

  • CAS+自旋
  • incrementAndGet

场景

  • 低并发下的全局计算
  • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。

缺陷

  • 高并发后性能急剧下降
  • AtomicLong的自旋会成为瓶颈
    • N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

AtomicLong

  1. 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
  2. 保证性能,精度代价
  3. LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作

原理

  • CAS+Base+Cell数组分散
  • 空间换时间并分散了热点数据

场景

  • 高并发下的全局计算

缺陷

  • sum求和后还有计算线程修改结果的话,最后结果不够准确

LongAdder vs AtomicLong Performance

  • 参考地址:http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/
贡献者: Jin