Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。
如果可以复用一组线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ┌─────┐ execute ┌──────────────────┐ │Task1│─────────▶│ThreadPool │ ├─────┤ │┌───────┐┌───────┐│ │Task2│ ││Thread1││Thread2││ ├─────┤ │└───────┘└───────┘│ │Task3│ │┌───────┐┌───────┐│ ├─────┤ ││Thread3││Thread4││ │Task4│ │└───────┘└───────┘│ ├─────┤ └──────────────────┘ │Task5│ ├─────┤ │Task6│ └─────┘ ...
|
那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。
简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
Java标准库提供了ExecutorService
接口表示线程池,它的典型用法如下:
1 2 3 4 5 6 7 8
| ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(task1); executor.submit(task2); executor.submit(task3); executor.submit(task4); executor.submit(task5);
|
因为ExecutorService
只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池;
- SingleThreadExecutor:仅单线程执行的线程池。
创建这些线程池的方法都被封装到Executors
这个类中。我们以FixedThreadPool
为例,看看线程池的执行逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import java.util.concurrent.*;
public class Main { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(4); for (int i = 0; i < 6; i++) { es.submit(new Task("" + i)); } es.shutdown(); } }
class Task implements Runnable { private final String name;
public Task(String name) { this.name = name; }
@Override public void run() { System.out.println("start task " + name); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("end task " + name); } }
|
我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。
线程池在程序结束的时候要关闭。使用shutdown()
方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()
会立刻停止正在执行的任务,awaitTermination()
则会等待指定的时间让线程池关闭。
如果我们把线程池改为CachedThreadPool
,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()
方法的源码:
1 2 3 4 5 6
| public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
|
因此,想创建指定动态范围的线程池,可以这么写:
1 2 3 4 5 6
| int min = 4; int max = 10; ExecutorService es = new ThreadPoolExecutor( min, max, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
|
ScheduledThreadPool
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool
。放入ScheduledThreadPool
的任务可以定期反复执行。
创建一个ScheduledThreadPool
仍然是通过Executors
类:
1
| ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
|
我们可以提交一次性任务,它会在指定延迟后只执行一次:
1 2
| ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
|
如果任务以固定的每3秒执行,我们可以这样写:
1 2
| ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
|
如果任务以固定的3秒为间隔执行,我们可以这样写:
1 2
| ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
|
注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:
1 2 3
| │░░░░ │░░░░░░ │░░░ │░░░░░ │░░░ ├───────┼───────┼───────┼───────┼────▶ │◀─────▶│◀─────▶│◀─────▶│◀─────▶│
|
而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:
1 2 3
| │░░░│ │░░░░░│ │░░│ │░ └───┼───────┼─────┼───────┼──┼───────┼──▶ │◀─────▶│ │◀─────▶│ │◀─────▶│
|
因此,使用ScheduledThreadPool
时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。
细心的童鞋还可以思考下面的问题:
- 在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?
- 如果任务抛出了异常,后续任务是否继续执行?
Java标准库还提供了一个java.util.Timer
类,这个类也可以定期执行任务,但是,一个Timer
会对应一个Thread
,所以,一个Timer
只能定期执行一个任务,多个定时任务必须启动多个Timer
,而一个ScheduledThreadPool
就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool
取代旧的Timer
。
练习
使用线程池复用线程。
下载练习
小结
JDK提供了ExecutorService
实现了线程池功能:
- 线程池内部维护一组线程,可以高效执行大量小任务;
Executors
提供了静态方法创建不同类型的ExecutorService
;
- 必须调用
shutdown()
关闭ExecutorService
;
ScheduledThreadPool
可以定期调度多个任务。
使用Future
在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable
接口,就可以让线程池去执行:
1 2 3 4 5 6 7
| class Task implements Runnable { public String result;
public void run() { this.result = longTimeCalculation(); } }
|
Runnable
接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable
接口,和Runnable
接口比,它多了一个返回值:
1 2 3 4 5
| class Task implements Callable<String> { public String call() throws Exception { return longTimeCalculation(); } }
|
并且Callable
接口是一个泛型接口,可以返回指定类型的结果。
现在的问题是,如何获得异步执行的结果?
如果仔细看ExecutorService.submit()
方法,可以看到,它返回了一个Future
类型,一个Future
类型的实例代表一个未来能获取结果的对象:
1 2 3 4 5 6 7
| ExecutorService executor = Executors.newFixedThreadPool(4);
Callable<String> task = new Task();
Future<String> future = executor.submit(task);
String result = future.get();
|
当我们提交一个Callable
任务后,我们会同时获得一个Future
对象,然后,我们在主线程某个时刻调用Future
对象的get()
方法,就可以获得异步执行的结果。在调用get()
时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
一个Future<V>
接口表示一个未来可能会返回的结果,它定义的方法有:
get()
:获取结果(可能会等待)
get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间;
cancel(boolean mayInterruptIfRunning)
:取消当前任务;
isDone()
:判断任务是否已完成。
练习
使用Future获取异步执行结果。
下载练习
小结
对线程池提交一个Callable
任务,可以获得一个Future
对象;
可以用Future
在将来某个时刻获取结果。
使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
我们以获取股票价格为例,看看如何使用CompletableFuture
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.concurrent.CompletableFuture;
public class Main { public static void main(String[] args) throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice); cf.thenAccept((result) -> { System.out.println("price: " + result); }); cf.exceptionally((e) -> { e.printStackTrace(); return null; }); Thread.sleep(200); }
static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } }
|
创建一个CompletableFuture
是通过CompletableFuture.supplyAsync()
实现的,它需要一个实现了Supplier
接口的对象:
1 2 3
| public interface Supplier<T> { T get(); }
|
这里我们用lambda语法简化了一下,直接传入Main::fetchPrice
,因为Main.fetchPrice()
静态方法的签名符合Supplier
接口的定义(除了方法名外)。
紧接着,CompletableFuture
已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture
完成时和异常时需要回调的实例。完成时,CompletableFuture
会调用Consumer
对象:
1 2 3
| public interface Consumer<T> { void accept(T t); }
|
异常时,CompletableFuture
会调用Function
对象:
1 2 3
| public interface Function<T, R> { R apply(T t); }
|
这里我们都用lambda语法简化了代码。
可见CompletableFuture
的优点是:
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行。
如果只是实现了异步回调机制,我们还看不出CompletableFuture
相比Future
的优势。CompletableFuture
更强大的功能是,多个CompletableFuture
可以串行执行,例如,定义两个CompletableFuture
,第一个CompletableFuture
根据证券名称查询证券代码,第二个CompletableFuture
根据证券代码查询证券价格,这两个CompletableFuture
实现串行操作如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import java.util.concurrent.CompletableFuture;
public class Main { public static void main(String[] args) throws Exception { CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油"); }); CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> { return fetchPrice(code); }); cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); Thread.sleep(2000); }
static String queryCode(String name) { try { Thread.sleep(100); } catch (InterruptedException e) { } return "601857"; }
static Double fetchPrice(String code) { try { Thread.sleep(100); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
|
除了串行执行外,多个CompletableFuture
还可以并行执行。例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| import java.util.concurrent.CompletableFuture;
public class Main { public static void main(String[] args) throws Exception { CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://finance.sina.com.cn/code/"); }); CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://money.163.com/code/"); });
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://finance.sina.com.cn/price/"); }); CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://money.163.com/price/"); });
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); Thread.sleep(200); }
static String queryCode(String name, String url) { System.out.println("query code from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return "601857"; }
static Double fetchPrice(String code, String url) { System.out.println("query price from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
|
上述逻辑实现的异步查询规则实际上是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ┌─────────────┐ ┌─────────────┐ │ Query Code │ │ Query Code │ │ from sina │ │ from 163 │ └─────────────┘ └─────────────┘ │ │ └───────┬───────┘ ▼ ┌─────────────┐ │ anyOf │ └─────────────┘ │ ┌───────┴────────┐ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Query Price │ │ Query Price │ │ from sina │ │ from 163 │ └─────────────┘ └─────────────┘ │ │ └────────┬───────┘ ▼ ┌─────────────┐ │ anyOf │ └─────────────┘ │ ▼ ┌─────────────┐ │Display Price│ └─────────────┘
|
除了anyOf()
可以实现“任意个CompletableFuture
只要一个成功”,allOf()
可以实现“所有CompletableFuture
都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
最后我们注意CompletableFuture
的命名规则:
xxx()
:表示该方法将继续在已有的线程中执行;
xxxAsync()
:表示将异步在线程池中执行。
练习
使用CompletableFuture。
下载练习
小结
CompletableFuture
可以指定异步处理流程:
thenAccept()
处理正常结果;
exceptional()
处理异常结果;
thenApplyAsync()
用于串行化另一个CompletableFuture
;
anyOf()
和allOf()
用于并行化多个CompletableFuture
。
Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
1 2
| ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
|
还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
1 2 3 4
| ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
|
如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
1 2 3 4 5 6 7 8
| ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘
|
这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
我们来看如何使用Fork/Join对大数据进行并行求和:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| import java.util.Random; import java.util.concurrent.*;
public class Main { public static void main(String[] args) throws Exception { long[] array = new long[2000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } System.out.println("Expected sum: " + expectedSum); ForkJoinTask<Long> task = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); }
static Random random = new Random(0);
static long random() { return random.nextInt(10000); } }
class SumTask extends RecursiveTask<Long> { static final int THRESHOLD = 500; long[] array; int start; int end;
SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += this.array[i]; try { Thread.sleep(1); } catch (InterruptedException e) { } } return sum; } int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }
|
观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。
因此,核心代码SumTask
继承自RecursiveTask
,在compute()
方法中,关键是如何“分裂”出子任务并且提交子任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class SumTask extends RecursiveTask<Long> { protected Long compute() { SumTask subtask1 = new SumTask(...); SumTask subtask2 = new SumTask(...); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); return subresult1 + subresult2; } }
|
Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)
可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
练习
使用Fork/Join。
下载练习
小结
Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool
线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask
或RecursiveAction
。
使用Fork/Join模式可以进行并行计算以提高效率。
多线程是Java实现多任务的基础,Thread
对象代表一个线程,我们可以在代码中调用Thread.currentThread()
获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Main { public static void main(String[] args) throws Exception { log("start main..."); new Thread(() -> { log("run task..."); }).start(); new Thread(() -> { log("print..."); }).start(); log("end main."); }
static void log(String s) { System.out.println(Thread.currentThread().getName() + ": " + s); } }
|
对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:
1 2 3 4 5 6
| public void process(User user) { checkPermission(); doWork(); saveStatus(); sendResponse(); }
|
然后,通过线程池去执行这些任务。
观察process()
方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?
process()
方法需要传递的状态就是User
实例。有的童鞋会想,简单地传入User
就可以了:
1 2 3 4 5 6
| public void process(User user) { checkPermission(user); doWork(user); saveStatus(user); sendResponse(user); }
|
但是往往一个方法又会调用其他很多方法,这样会导致User
传递到所有地方:
1 2 3 4 5 6
| void doWork(User user) { queryStatus(user); checkStatus(); setNewStatus(user); log(); }
|
这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。
给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User
对象就传不进去了。
Java标准库提供了一个特殊的ThreadLocal
,它可以在一个线程中传递同一个对象。
ThreadLocal
实例通常总是以静态字段初始化如下:
1
| static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
|
它的典型使用方式如下:
1 2 3 4 5 6 7 8 9 10
| void processUser(user) { try { threadLocalUser.set(user); step1(); step2(); log(); } finally { threadLocalUser.remove(); } }
|
通过设置一个User
实例关联到ThreadLocal
中,在移除之前,所有方法都可以随时获取到该User
实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void step1() { User u = threadLocalUser.get(); log(); printUser(); }
void step2() { User u = threadLocalUser.get(); checkUser(u.id); }
void log() { User u = threadLocalUser.get(); println(u.name); }
|
注意到普通的方法调用一定是同一个线程执行的,所以,step1()
、step2()
以及log()
方法内,threadLocalUser.get()
获取的User
对象是同一个实例。
实际上,可以把ThreadLocal
看成一个全局Map<Thread, Object>
:每个线程获取ThreadLocal
变量时,总是使用Thread
自身作为key:
1
| Object threadLocalValue = threadLocalMap.get(Thread.currentThread());
|
因此,ThreadLocal
相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal
关联的实例互不干扰。
最后,特别注意ThreadLocal
一定要在finally
中清除:
1 2 3 4 5 6
| try { threadLocalUser.set(user); ... } finally { threadLocalUser.remove(); }
|
这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal
没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal
关联的实例,我们可以通过AutoCloseable
接口配合try (resource) {...}
结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal
可以封装为一个UserContext
对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class UserContext implements AutoCloseable {
static final ThreadLocal<String> ctx = new ThreadLocal<>();
public UserContext(String user) { ctx.set(user); }
public static String currentUser() { return ctx.get(); }
@Override public void close() { ctx.remove(); } }
|
使用的时候,我们借助try (resource) {...}
结构,可以这么写:
1 2 3 4
| try (var ctx = new UserContext("Bob")) { String currentUser = UserContext.currentUser(); }
|
这样就在UserContext
中完全封装了ThreadLocal
,外部代码在try (resource) {...}
内部可以随时调用UserContext.currentUser()
获取当前线程绑定的用户名。
练习
练习使用ThreadLocal。
下载练习
小结
ThreadLocal
表示线程的“局部变量”,它确保每个线程的ThreadLocal
变量都是各自独立的;
ThreadLocal
适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);
使用ThreadLocal
要用try ... finally
结构,并在finally
中清除。
虚拟线程(Virtual Thread)是Java 19引入的一种轻量级线程,它在很多其他语言中被称为协程、纤程、绿色线程、用户态线程等。
在理解虚拟线程前,我们先回顾一下线程的特点:
- 线程是由操作系统创建并调度的资源;
- 线程切换会耗费大量CPU时间;
- 一个系统能同时调度的线程数量是有限的,通常在几百至几千级别。
因此,我们说线程是一种重量级资源。在服务器端,对用户请求,通常都实现为一个线程处理一个请求。由于用户的请求数往往远超操作系统能同时调度的线程数量,所以通常使用线程池来尽量减少频繁创建和销毁线程的成本。
对于需要处理大量IO请求的任务来说,使用线程是低效的,因为一旦读写IO,线程就必须进入等待状态,直到IO数据返回。常见的IO操作包括:
- 读写文件;
- 读写网络,例如HTTP请求;
- 读写数据库,本质上是通过JDBC实现网络调用。
我们举个例子,一个处理HTTP请求的线程,它在读写网络、文件的时候就会进入等待状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| Begin ──────── Blocking ──▶ Read HTTP Request Wait... Wait... Wait... ──────── Running ──────── Blocking ──▶ Read Config File Wait... ──────── Running ──────── Blocking ──▶ Read Database Wait... Wait... Wait... ──────── Running ──────── Blocking ──▶ Send HTTP Response Wait... Wait... ──────── End
|
真正由CPU执行的代码消耗的时间非常少,线程的大部分时间都在等待IO。我们把这类任务称为IO密集型任务。
为了能高效执行IO密集型任务,Java从19开始引入了虚拟线程。虚拟线程的接口和普通线程是一样的,但是执行方式不一样。虚拟线程不是由操作系统调度,而是由普通线程调度,即成百上千个虚拟线程可以由一个普通线程调度。任何时刻,只能执行一个虚拟线程,但是,一旦该虚拟线程执行一个IO操作进入等待时,它会被立刻“挂起”,然后执行下一个虚拟线程。什么时候IO数据返回了,这个挂起的虚拟线程才会被再次调度。因此,若干个虚拟线程可以在一个普通线程中交替运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| Begin ─────────── V1 Runing V1 Blocking ──▶ Read HTTP Request ─────────── V2 Runing V2 Blocking ──▶ Read HTTP Request ─────────── V3 Runing V3 Blocking ──▶ Read HTTP Request ─────────── V1 Runing V1 Blocking ──▶ Read Config File ─────────── V2 Runing V2 Blocking ──▶ Read Database ─────────── V1 Runing V1 Blocking ──▶ Read Database ─────────── V3 Runing V3 Blocking ──▶ Read Database ─────────── V2 Runing V2 Blocking ──▶ Send HTTP Response ─────────── V1 Runing V1 Blocking ──▶ Send HTTP Response ─────────── V3 Runing V3 Blocking ──▶ Send HTTP Response ─────────── End
|
如果我们单独看一个虚拟线程的代码,在一个方法中:
1 2 3 4 5 6 7 8 9 10
| void register() { config = readConfigFile("./config.json"); if (config.useFullName) { name = req.firstName + " " + req.lastName; } insertInto(db, name); if (config.cache) { redis.set(key, name); } }
|
涉及到IO读写的#1、#2、#3处,执行到这些地方的时候(进入相关的JNI方法内部时)会自动挂起,并切换到其他虚拟线程执行。等到数据返回后,当前虚拟线程会再次调度并执行,因此,代码看起来是同步执行,但实际上是异步执行的。
使用虚拟线程
虚拟线程的接口和普通线程一样,唯一区别在于创建虚拟线程只能通过特定方法。
方法一:直接创建虚拟线程并运行:
1 2 3 4 5 6
| Thread vt = Thread.startVirtualThread(() -> { System.out.println("Start virtual thread..."); Thread.sleep(10); System.out.println("End virtual thread."); });
|
方法二:创建虚拟线程但不自动运行,而是手动调用start()
开始运行:
1 2 3 4 5 6 7 8
| Thread.ofVirtual().unstarted(() -> { System.out.println("Start virtual thread..."); Thread.sleep(1000); System.out.println("End virtual thread."); });
vt.start();
|
方法三:通过虚拟线程的ThreadFactory创建虚拟线程,然后手动调用start()
开始运行:
1 2 3 4 5 6 7 8 9 10
| ThreadFactory tf = Thread.ofVirtual().factory();
Thread vt = tf.newThread(() -> { System.out.println("Start virtual thread..."); Thread.sleep(1000); System.out.println("End virtual thread."); });
vt.start();
|
直接调用start()
实际上是由ForkJoinPool
的线程来调度的。我们也可以自己创建调度线程,然后运行虚拟线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
ThreadFactory tf = Thread.ofVirtual().factory(); for (int i=0; i<100000; i++) { Thread vt = tf.newThread(() -> { ... }); executor.submit(vt); executor.submit(() -> { System.out.println("Start virtual thread..."); Thread.sleep(1000); System.out.println("End virtual thread."); return true; }); }
|
由于虚拟线程属于非常轻量级的资源,因此,用时创建,用完就扔,不要池化虚拟线程。
最后注意,虚拟线程在Java 21正式发布,在Java 19/20是预览功能,默认关闭,需要添加参数--enable-preview
启用:
1
| java --source 19 --enable-preview Main.java
|
使用限制
注意到只有以虚拟线程方式运行的代码,才会在执行IO操作时自动被挂起并切换到其他虚拟线程。普通线程的IO操作仍然会等待,例如,我们在main()
方法中读写文件,是不会有调度和自动挂起的。
可以自动引发调度切换的操作包括:
- 文件IO;
- 网络IO;
- 使用Concurrent库引发等待;
- Thread.sleep()操作。
这是因为JDK为了实现虚拟线程,已经对底层相关操作进行了修改,这样应用层的Java代码无需修改即可使用虚拟线程。无法自动切换的语言需要用户手动调用await
来实现异步操作:
1 2 3 4
| async function doWork() { await readFile(); await sendNetworkData(); }
|
在虚拟线程中,如果绕过JDK的IO接口,直接通过JNI读写文件或网络是无法实现调度的。此外,在synchronized
块内部也无法调度。
练习
使用虚拟线程调度10万个任务并观察耗时:
1 2 3 4 5 6 7 8 9 10 11 12
| public class Main { public static void main(String[] args) { ExecutorService es = Executors.newVirtualThreadPerTaskExecutor(); for (int i=0; i<100000; i++) { es.submit(() -> { Thread.sleep(1000); return 0; }); } es.close(); } }
|
再将ExecutorService
改为线程池模式并对比结果。
下载练习
小结
Java 19引入的虚拟线程是为了解决IO密集型任务的吞吐量,它可以高效通过少数线程去调度大量虚拟线程;
虚拟线程在执行到IO操作或Blocking操作时,会自动切换到其他虚拟线程执行,从而避免当前线程等待,能最大化线程的执行效率;
虚拟线程使用普通线程相同的接口,最大的好处是无需修改任何代码,就可以将现有的IO操作异步化获得更大的吞吐能力。
计算密集型任务不应使用虚拟线程,只能通过增加CPU核心解决,或者利用分布式计算资源。