谷粒商城-高级-54 -异步-异步与线程池及异步编排

一、异步

在业务开发中,有很多异步场景,为了节约时间或或者提高系统的吞吐量,要做一些异步任务,在Java中要实现异步通常都是Thread,开启一个线程Thread,开启线程有四种方式。

1、初始化线程池的4中方式

1)、继承Thread
2)、实现Runnable接口
3)、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
4)、线程池

方式1和方式2:主进程无法获取线程的运算结果。不适合我们当前的场景。
方式3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。
方式4:通过如下两种方式初始化线程池:

Excutors.newFiexedThreadPool(3);

// 或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,TimeUnit unit, workQuene,threadFactory,handler);

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果

2、开启线程测试

package com.atguigu.gulimall.search.thread;

/**
 * @author: kaiyi
 * @create: 2020-09-04 11:19
 */
public class ThreadTest {

public static void main(String[] args) {
   System.out.println("main...start...");
    /**
     * 1)、继承Thread
     *    Thread01 thread = new Thread01();
     *     thread.start(); // 启动线程
     *
     * 2)、实现Runnable接口
     *     Runable01 runable01 = new Runable01()
     *     new Thread(runable01).start();
     *
     * 3)、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
     *     FutureTask<Integer> FutureTask = new FutureTask<>(new Callable01());
     *     new Thread(futureTask).start();
     *     // 阻塞等待整个线程执行完成,获取返回结果
     *     Integer integer = futureTask.get();
     *
     * 4)、线程池[ExecutorService]
     *   给线程池直接提交任务。
     *   service.execute(new Runable01());
     *   创建:
     *      ①、Excutors
     *      ②、new ThreadPoolExecutor
     *
     *    Future:可以获取到异步结果
     */
    Thread01 thread01 = new Thread01();
    thread01.start(); // 启动线程

    System.out.println("main...end...");
  }

  public static class Thread01 extends  Thread{

    @Override
    public void run(){
      System.out.println("当前线程:" + Thread.currentThread().getId());

      int i = 10 / 2;
      System.out.println("运行结果:" + i);
    }
  }

}

执行结果打印:

main...start...
main...end...
当前线程:10
运行结果:5

Process finished with exit code 0

可以看到开启线程,进入了异步,主程序已经结束,线程才打印。

二、线程池

开发中为什么使用线程池?

降低资源的消耗

通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

提高响应速度

因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行。

提高线程的可管理性

线程池会根据当前系统特点对池内的线程进行优化处理,减少线程创建和销毁带来的系统开销。

创建线程池:

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,TimeUnit unit, workQuene,threadFactory,handler);

源码:

   /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(
                                 int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

创建线程池示例:

private static void threadPool(){
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        5,
        200,
        10L,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(10000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );

    // 定时任务的线程池
    ExecutorService service = Executors.newScheduledThreadPool(2);
  }

线程池七大参数:

  • corePoolSize:[5] 核心线程数[一直存在除非(allowCoreThreadTimeOut)];线程池创建好以后就准备就绪。5个 Thread thread = new Thread(); thread.start();
  • maximumPoolSize:[200] 最大线程数量;控制资源。
  • keepAliveTime:存活时间。如果当前的线程数量大于 core数量。释放空闲的线程(maximumPoolSize - corePoolSize)。只要线程空闲的时间大于指定的keepAliveTime。
  • unit:时间单位
  • BlockingQueue workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放到队列里面。只要有线程空闲,就会去队列里面取出新的任务继续执行。
  • threadFactory:线程的创建工厂。
  • RejectedExecutionHandler handler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务。

工作顺序:
1)、线程池创建,准备好core数量的核心线程,准备接受任务。
1.1、core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务。
2、阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量
3、max满了就用 RejectedExecutionHandler handler 拒绝任务
max都执行完成,有很多空闲,在指定的时间 keepAliveTime 以后,释放 max - core 这些线程。
new LinkedBlockingDeque<>():默认是 Integer 的最大值。会导致内存占满,需要根据自己的业务定。

面试:
一个线程池 core 7, max 20, queue:50, 100并发进来怎么分配?
答:先有 7 个能直接得到执行,接下来 50 个进入阻塞队列,再多开10个线程继续执行。现在 70 个被安排上了。剩下 30 个默认拒绝策略。

注意:当 core满了 不会立即创建新的线程,而是将进来的任务放入到阻塞队列中,当阻塞队列满了之后,才会直接新开线程执行,最大只能开到 Max指定的数量。

三、CompletableFuture异步编排

业务场景:
查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

file

假如商品详情页的每个查询,用户需要6.5s 后才能看到商品详情页的内容,这是显然不能接受的。
如果多个线程同时完成这个 6 步操作,也许只需要 1.5s 即可响应完成。

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) ;
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

说明:没有指定Executor的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

  • ①、runXxxx都是没有返回结果的,supplyXxx都是可以获取返回结果的
  • ②、可以传入自定义的线程池,否则就用默认的线程池。

示例:

/**
 * CompletableFuture 提供了四个静态方法来创建一个异步操作。
 *
 */
public class CompletableFutureDemo1 {
    public static void main(String[] args) {

        /** 第一种 runAsync 不支持返回值的 */
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println("runAsync 不支持返回值");
        }).whenComplete((t,u)->{
            System.out.println("runAsync 完成");
        });

        /** 第二种 supplyAsync 支持返回值 */
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            int i = 10;
            return i;
        }).whenComplete((t, u) -> {
            System.out.println("supplyAsync 完成,返回值是" + t);
        });

    }
}

运行结果:
runAsync 不支持返回值
runAsync 完成
supplyAsync 完成,返回值是10

2、计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行特定动作的结果(或 null如果没有)和异常(或 null如果没有)这个阶段。 

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行特定动作执行给定的操作这一阶段的默认的异步执行设施,其结果(或 null如果没有)和异常(或 null如果没有)这个阶段作为参数。

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行使用所提供的遗嘱执行人,给出的行动与结果(或 null如果没有)和异常(或 null如果没有)这个阶段作为参数。 

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
返回一个新的completablefuture已经完成与给定值。 

3、线程串行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
当一个线程依赖另一个线程时,获取上一个任务返回的结果,**并返回当前任务的返回值**。(有返回值)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
消费处理结果。接收任务的处理结果,并消费处理,**无返回结果。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
只要上面的任务执行完成,就开始执行thenRun,**只是处理完任务后,执行 thenRun的后续操作

每一个方法都对应了三种操作。带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。带有参数Executor executor的则用指定的线程池方案,不指定的话则用默认的ForkJoinPool.commonPool()。

说明:(Function<? super T,? extends U> fn)

  • T : 上一个任务返回结果的类型
    ​ U:当前任务的返回值的类型

4、两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor);
 组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值   

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor);
组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);
组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务

5、多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

说明:
allOf:等待所有任务完成
anyOf:只要有一个任务完成

四、测试代码

package com.atguigu.gulimall.search.thread;

import java.util.concurrent.*;

/**
 * @author: kaiyi
 * @create: 2020-09-04 11:19
 */
public class ThreadTest {

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // System.out.println("main......start.....");
        // Thread thread = new Thread01();
        // thread.start();
        // System.out.println("main......end.....");

        // Runable01 runable01 = new Runable01();
        // new Thread(runable01).start();

        // FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
        // new Thread(futureTask).start();
        // System.out.println(futureTask.get());

        // service.execute(new Runable01());
        // Future<Integer> submit = service.submit(new Callable01());
        // submit.get();

        System.out.println("main......start.....");
        // CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        //     System.out.println("当前线程:" + Thread.currentThread().getId());
        //     int i = 10 / 2;
        //     System.out.println("运行结果:" + i);
        // }, executor);

        /**
         * 方法完成后的处理
         */
        // CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        //     System.out.println("当前线程:" + Thread.currentThread().getId());
        //     int i = 10 / 0;
        //     System.out.println("运行结果:" + i);
        //     return i;
        // }, executor).whenComplete((res,exception) -> {
        //     //虽然能得到异常信息,但是没法修改返回数据
        //     System.out.println("异步任务成功完成了...结果是:" + res + "异常是:" + exception);
        // }).exceptionally(throwable -> {
        //     //可以感知异常,同时返回默认值
        //     return 10;
        // });

        /**
         * 方法执行完后端处理
         */
        // CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        //     System.out.println("当前线程:" + Thread.currentThread().getId());
        //     int i = 10 / 2;
        //     System.out.println("运行结果:" + i);
        //     return i;
        // }, executor).handle((result,thr) -> {
        //     if (result != null) {
        //         return result * 2;
        //     }
        //     if (thr != null) {
        //         System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr);
        //         return 0;
        //     }
        //     return 0;
        // });

        /**
         * 线程串行化
         * 1、thenRunL:不能获取上一步的执行结果
         * 2、thenAcceptAsync:能接受上一步结果,但是无返回值
         * 3、thenApplyAsync:能接受上一步结果,有返回值
         *
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务2启动了..." + res);
            return "Hello" + res;
        }, executor);
        System.out.println("main......end....." + future.get());

    }

    private static void threadPool() {

        ExecutorService threadPool = new ThreadPoolExecutor(
                200,
                10,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(10000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        //定时任务的线程池
        ExecutorService service = Executors.newScheduledThreadPool(2);
    }

    public static class Thread01 extends Thread {
        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }

    public static class Runable01 implements Runnable {
        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }

    public static class Callable01 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }
    }

}

相关文章:
异步编程CompletableFuture

为者常成,行者常至