JAVA并发编程真实战! – 作者:卢本伟全体起立

有人要问了 :什么是真实战?

是的,就是真实战!

在看了一系列所谓高并发实战的书籍之后有感而发:这就是API文档啊,介绍api+源码==实战? 或者有些压根就是翻译国外的作品,这也能出书….

实战,就是要考虑实际用的场景和需求,是能直接搬运到生产当中去使用的东西叫实战。

java-concurrent-overview-1.png

1.线程安全的计数

场景:简单的对一个数进行安全的改变

黄金段位:atomicLong atomicInteger atomic…

eg:

AtomicLong atomicLong = new AtomicLong(0);
atomicLong.getAndDecrement();
System.out.println(atomicLong.get());
atomicLong.compareAndSet(-1,2);

利用unsafe的cas 浪费cpu性能差

砖石I段位:LongAddr

eg:

LongAdder longAdder = new LongAdder();
longAdder.add(1);
longAdder.sum();

对atomicxxx进行改进,增加了一个cells(这个数的小弟们)的概念 由基准值base+cells组成,改变只会改变其中一个cell(一个小弟献宝),当算sum时就把所有小弟的宝贝都算进去->base+cells得到最终值

并且还用了@sun.misc.Contended修饰小弟cell,进行字节填充解决伪共享问题。

王者段位:LongBinaryOperator

eg:

//定义一个操作 比如相乘
LongBinaryOperator longBinaryOperator = (left, right) -> left * right;
//定义一个初始值为2
LongAccumulator longAccumulator = new LongAccumulator(longBinaryOperator,2);
longAccumulator.accumulate(5);
long l = longAccumulator.get();
System.out.println("l = " + l);//2*5=10

底层还是LongAddr的那一套,不过可以自定义数的操作,更灵活,还可以设置初始值

2.多个线程之间协作

场景:多线程进行批量执行,需要确定所有线程都执行完成的时候。

白金II:CountDownLatch

eg:

ExecutorService executorService = Executors.newFixedThreadPool(2);
//执行任务1(省略了,自己加业务代码) 然后countDown
executorService.submit(()-> countDownLatch.countDown());
//执行任务2(省略了,自己加业务代码) 然后countDown
executorService.submit(()-> countDownLatch.countDown());
//等待所有线程都执行完
countDownLatch.await();
//优雅的关闭
executorService.shutdown();

利用了AQS的state,await()会使当前线程(这里是main线程)放到阻塞队列去等待state==0才唤醒;countDown()方法使state–,等于0时唤醒阻塞的线程。缺点是这个state没有回头路,用完就不能再用。

砖石III:CycliBarrier

eg:

//接受参数int parties, Runnable barrierAction 前者几个任务,后者执行完的动作
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()-> System.out.println("到达屏障点后执行的任务"));
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(()->
System.out.println("进入1");
cyclicBarrier.await();
System.out.println("出来2");
});
executorService.submit(()->{
System.out.println("进入1");
cyclicBarrier.await();
System.out.println("出来2");
});
executorService.shutdown();
//打印顺序 进入1 -> 进入2 -> 达到屏障点 -> 出来2 -> 出来1

底层是ReentrantLock+condition实现的,可以复用state,达到屏障点后复位。await()可以理解为跑步,运动员入场的点不一样,但是出发点一样。运动员入场后调用await()到达出发点然后等其他运动员到出发点,都调用了await(),号令炝”砰“(barrierAction)一声,比赛开始,任务执行。

砖石III:Semaphore

eg:

ExecutorService executorService = Executors.newFixedThreadPool(2);
Semaphore semaphore = new Semaphore(0);
executorService.submit(()->semaphore.release());
executorService.submit(()->semaphore.release());
semaphore.acquire(2);
executorService.shutdown();

能达到砖石水平完全是因为他提供公平和非公平策略,也不用关注线程的个数。但是state不能复用

3.线程安全的容器

场景:就是list,set,map对应的线程安全版,比较简单,和普通的没啥区别,这里罗列一下。

  1. ConcurrentHashMap  线程安全的map,用了cas+synchronized实现的,看看源码就大概知道了。
  2. CopyOnWriteArrayList  线程安全的list,用ReentranLock+volatile实现线程安全的,里面有一个volatile Object[] array来复制每次更改都是把这个array 复制一份 然后加数据 再塞到array去。使用场景是:对一致性要求不够强的,大量的读少量写的时候去使用。
  3. ConcurrentSkipListMap  跳表,cas实现 key是索引,在map基础上增加left 和down的指针,遍历就比较快,但是耗费内存也很大。
  4. BlockingQueue  阻塞队列  用ReentranLock+condition实现 

4.线程安全的随机数

场景:随机数生成

青铜:Random

eg:

Random random = new Random();
System.out.println(random.nextInt());

nextInt()是根据老的种子计算新的种子,而种子计算是固定函数,如果老种子一致的话新种子就一致,那么产生的随机数就一样,引发线程安全问题,为解决这个问题用了cas,但是效率低。

砖石:ThreadLocalRandom

eg:

//多线程使用
//ThreadLocalRandom负责初始化调用线程的threadLocalRandomSeed变量,也就是初始化种子
ThreadLocalRandom localRandom = ThreadLocalRandom.current();
//获得当前threadLocalRandomSeed变量 作为老种子算新种子
//由于这个变量是线程级别的变量 所以可以保证线程安全
System.out.println(localRandom.nextInt());

ThreadLocalRandom是基于线程的随机数 所以可以保证安全

5.选择正确的执行器

黄金:阻塞式 ThreadPoolExecutor

eg:

public static String doB() {
System.out.println("---do A---");
return "A is done";
}

private final static int PROCESSORS = Runtime.getRuntime().availableProcessors();
private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(
PROCESSORS,
PROCESSORS * 2,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(5),
new NamedThreadFactory("ASYNC-POOL"),
new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) throws ExecutionException, InterruptedException {
Future<String> resultB = executor.submit(AsyncThreadPoolExample::doB);
//这里会等阻塞式的get
System.out.println(resultB.get());
}

不管是无返回的Runnable 还是有返回的Callable,阻塞就很让人头疼

值得一提的是sumbit(runnable,result),效果就等于Callable返回result ,前者需要考虑线程安全性!

白金:CompletionService

场景:先执行完先处理

eg:

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorCompletionService<AtomicLong> completionService =
new ExecutorCompletionService<>(executorService);

for (int i = 0; i < 5; i++) {
//这里是不是可以优化?参考前面写的
AtomicLong al = new AtomicLong();
completionService.submit(()->{
long random = ThreadLocalRandom.current().nextLong(30);
try {
sleep(random);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task"+random+" completed");
al.set(random);
},al);
}

for (int i = 0; i < 5; i++) {
try {
//得到了结果就处理 不会都一起阻塞
System.out.println(completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}

这个就比future好一些,不用都阻塞在哪里,先跑完先处理,但是还要注意的是传入runable的是AtomicLong,不够优雅。

砖石:RecursiveTask RecursiveAction

场景:别人都说剽窃式,我觉得应该叫能者多劳,自己干完了主动揽活,这就很正能量

eg

/**
RecursiveTask有返回值 RecursiveAction无返回值
累计和
*/
public class RecursiveTaskSum extends RecursiveTask<Long> {
private final long[] numbers;
private final int startIndex;
private final int endIndex;

private static final long THRESHOLD = 10_000L;

public RecursiveTaskSum(long[] numbers) {
this(numbers, 0, numbers.length);
}

public RecursiveTaskSum(long[] numbers, int startIndex, int endIndex) {
this.numbers = numbers;
this.startIndex = startIndex;
this.endIndex = endIndex;
}

/**
* 拆分 合并 计算都在compute里面
* @return
*/
@Override
protected Long compute() {
int length = endIndex - startIndex;
if (length <= THRESHOLD) {
long result = 0L;
for (int i = startIndex; i < endIndex; i++) {
result += numbers[i];
}
return result;
}

//拆分任务
int temEndIndex = startIndex + length / 2;
//前半段
RecursiveTaskSum firstTask = new RecursiveTaskSum(numbers, startIndex, temEndIndex);
//这是递归的 又会进入这个compute进行切分 直到length <= THRESHOLD
firstTask.fork();
//后半段
RecursiveTaskSum secondTask = new RecursiveTaskSum(numbers, temEndIndex, endIndex);
secondTask.fork();

Long secondTaskResult = secondTask.join();
Long firstTaskResult = firstTask.join();
return secondTaskResult + firstTaskResult;
}

public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 9_000_000).toArray();
RecursiveTaskSum forkJoinSum = new RecursiveTaskSum(numbers);
Long sum = ForkJoinPool.commonPool().invoke(forkJoinSum);
System.out.println("sum = " + sum);
long sum1 = LongStream.rangeClosed(1, 9_000_000).sum();
System.out.println("sum1 = " + sum1);
}
}

这个是基于ForkJoinTask实现的,不断的fork拆分成子任务 然后把结果join。但是拆分 合并 计算都在一个compute里面 没有遵循单一职责,管理起来不方便

大师:CompletableFuture

场景:适用那些任务链批量并发执行处理

eg

//参数有Supplier (类似callable) 和Runnable
//supplyAsync -> Supplier
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 353);
System.out.println(future.get());
//runAsync -> runnable
CompletableFuture.runAsync(() -> System.out.println(123));

//还可以提供异步链
//thenApply:以同步的方式继续处理上一个异步任务的结果。
//supplyAsync thenApply是同一个线程!
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Supplier的线程是" + Thread.currentThread());
return "java";
}
).thenApply(e -> {
System.out.println("thenApply的线程是" + Thread.currentThread() + "上面执行的结果是:"+e);
return e.length();
});
System.out.println(future1.get());

//thenApplyAsync:以异步的方式继续处理上一个异步任务的结果
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Supplier的线程是" + Thread.currentThread());
return "java";
}
).thenApplyAsync(e -> {
System.out.println("thenApply的线程是" + Thread.currentThread() + "上面执行的结果是:"+e);
return e.length();
});
System.out.println(future2.get());
//其他
//thenAccept:以同步的方式消费上一个异步任务的结果
//thenAcceptAsync:以异步的方式消费上一个异步任务的结果
//thenRun:以同步的方式执行Runnable任务。如果执行的任务既不想对上一个任务的输出做进一步的处理,又不想消费上一个任务的输出结果
//thenRunAsync:以异步的方式执行Runnable任务。

//合并多个future
//thenCompose 用了上一个异步任务结果
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "java")
.thenCompose(lastResult -> CompletableFuture.supplyAsync(() -> lastResult + "天下第一"));
future3.thenApply(String::toLowerCase).thenAccept(System.out::println);

//thenCombine 彼此独立的任务
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "java")
.thenCombine(CompletableFuture.supplyAsync(() -> "天下第一"), (result1, result2) -> result1 + result2);
future4.thenApply(String::toLowerCase).thenAccept(System.out::println);

//批量运行任务
//CompletableFuture.allOf() 全部 返回void
//CompletableFuture.anyOf() 运行一个 返回void

//错误处理
CompletableFuture<BaseResult> future5 = CompletableFuture.<BaseResult>supplyAsync(() -> {
throw new RuntimeException();
}).handle((result, e) -> {
BaseResult baseResult = new BaseResult();
if (e != null) {

baseResult.setErrorCode("1001");
baseResult.setErrorMsg(e.getMessage());
return baseResult;
}
baseResult.setResult(result);
return result;
});
BaseResult baseResult = future5.get();
System.out.println("baseResult = " + baseResult);

优雅的进行批量异步或同步处理,就很nice,底层不用我们过度关注,只要记得他也是基于ForkJoinPool.commonPool(),默认就是CPU核数大小的线程池。

王者:Stream+CompletableFuture

场景:大量的任务,并且耗时耗力,但是需要拿到返回值的

eg:

public static String rpcCall(String ip, String param) {
System.out.println(ip + " rpcCall:" + param);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return param;
}

public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add("192.168.0." + i);
}
//并发调用
long start = System.currentTimeMillis();
List<CompletableFuture<String>> futureList = list.stream().map(ip -> CompletableFuture.supplyAsync(() -> rpcCall(ip, ip)))
.collect(Collectors.toList());
//等待所有任务执行完毕
System.out.println("---------------------");
List<String> collect = futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
collect.forEach(System.out::println);
//同步执行10s 异步执行2s左右
System.out.println("花费:" + (System.currentTimeMillis() - start));


}

模拟批量进行RPC调用时候,可以使用Stream将list转成流,然后进行处理时map生成CompletableFuture<String>结果集的流,然后拿到这个future等待处理完成后返回,也可以升级为处理了就消费,看业务需求。这里这是抛砖引玉,组合更多的组合才是王者!

来源:freebuf.com 2021-06-18 18:25:48 by: 卢本伟全体起立

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论