大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
ExecutorService 建立多线程的步骤:
1。定义线程类 | class Handler implements Runnable{ } |
2。建立ExecutorService线程池 | ExecutorService executorService = Executors.newCachedThreadPool();
或者 int cpuNums = Runtime.getRuntime().availableProcessors(); |
3。调用线程池操作 | 循环操作,成为daemon,把新实例放入Executor池中 execute(Runnable对象)方法 |
几种不同的ExecutorService线程池对象
1.newCachedThreadPool() |
-缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中 -缓存型池子通常用于执行一些生存期很短的异步型任务 -能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。 |
2. newFixedThreadPool | -newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程 -其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子 -和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器 –从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同: fixed池线程数固定,并且是0秒IDLE(无IDLE) cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE |
3.ScheduledThreadPool | -调度型线程池 -这个池子里的线程可以按schedule依次delay执行,或周期执行 |
4.SingleThreadExecutor | -单例线程,任意时间池中只能有一个线程 –用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE) |
上面四种线程池,都使用Executor的缺省线程工厂建立线程,也可单独定义自己的线程工厂
下面是缺省线程工厂代码:
|
也可自己定义ThreadFactory,加入建立池的参数中
|
Executor的execute()方法
execute() 方法将Runnable实例加入pool中,并进行一些pool size计算和优先级处理
execute() 方法本身在Executor接口中定义,有多个实现类都定义了不同的execute()方法
如ThreadPoolExecutor类(cache,fiexed,single三种池子都是调用它)的execute方法如下:
|
——————————————————————————————————————————————————————————————————
Executors.newFixedThreadPool和ArrayBlockingQueue一点使用心得
newFixedThreadPool使用范例:
- import java.io.IOException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- public class Test {
- public static void main(String[] args) throws IOException, InterruptedException {
- ExecutorService service = Executors.newFixedThreadPool(2);
- for (int i = 0; i < 6; i++) {
- final int index = i;
- System.out.println(“task: “ + (i+1));
- Runnable run = new Runnable() {
- @Override
- public void run() {
- System.out.println(“thread start” + index);
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(“thread end” + index);
- }
- };
- service.execute(run);
- }
- }
- }
task: 2
thread start0
task: 3
task: 4
task: 5
task: 6
task: 7
thread start1
task: 8
task: 9
task: 10
task: 11
task: 12
task: 13
task: 14
task: 15
从实例可以看到for循环并没有被固定的线程池阻塞住,也就是说所有的线程task都被提交到了ExecutorService中,查看 Executors.newFixedThreadPool()如下:
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到task被提交都了LinkedBlockingQueue中。这里有个问题,如果任务列表很大,一定会把内存撑爆,如何解决?看下面:
- import java.io.IOException;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class Test {
- public static void main(String[] args) throws IOException, InterruptedException {
- BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(3);
- ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 1, TimeUnit.HOURS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 0; i < 10; i++) {
- final int index = i;
- System.out.println(“task: “ + (index+1));
- Runnable run = new Runnable() {
- @Override
- public void run() {
- System.out.println(“thread start” + (index+1));
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(“thread end” + (index+1));
- }
- };
- executor.execute(run);
- }
- }
- }
task: 2
thread start1
task: 3
task: 4
task: 5
task: 6
task: 7
thread start2
thread start7
thread start6
线程池最大值为4(??这里我不明白为什么是设置值+1,即3+1,而不是3),准备执行的任务队列为3。可以看到for循环先处理4个task,然后把3个放到队列。这样就实现了自动阻塞队列的效果。记得要使用ArrayBlockingQueue这个队列,然后设置容量就OK了。
——————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————
ExecutorService常用方法和newFixedThreadPool创建固定大小的线程池
是一个接口,继承了Executor:
public interface ExecutorService extends Executor { }
而Executor亦是一个接口,该接口只包含了一个方法:
void execute(Runnable command);
该类是一个辅助类,此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。
此类支持以下各种方法:
• 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
• 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
• 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
• 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。
newFixedThreadPool()
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
shutdown
void shutdown()
- 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。
- 抛出:
-
SecurityException
– 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持
RuntimePermission
("modifyThread")),或者安全管理器的
checkAccess 方法拒绝访问。
-
启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。
awaitTermination
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
- 参数:
-
timeout
– 最长等待时间 -
unit
– timeout 参数的时间单位 -
如果此执行程序终止,则返回
true;如果终止前超时期满,则返回
false -
InterruptedException
– 如果等待时发生中断
返回:
抛出:
-
请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。既是等待所有子线程执行结束。
execute
void execute(Runnable command)
-
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由
Executor实现决定。 - 参数:
-
command
– 可运行的任务 -
RejectedExecutionException
– 如果不能接受执行此任务。 -
NullPointerException
– 如果命令为 null
抛出:
-
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。
submit
Future<?> submit(Runnable task)
-
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的
get 方法在
成功 完成时将会返回
null。 - 参数:
-
task
– 要提交的任务 - 表示任务等待完成的 Future
-
RejectedExecutionException
– 如果任务无法安排执行 -
NullPointerException
– 如果该任务为 null
返回:
抛出:
-
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。
public class ExecutorServiceTest { public static void main(String[] args) throws IOException, InterruptedException { // 创建一个固定大小的线程池 ExecutorService service = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { System.out.println("创建线程" + i); Runnable run = new Runnable() { @Override public void run() { System.out.println("启动线程"); } }; // 在未来某个时间执行给定的命令 service.execute(run); } // 关闭启动线程 service.shutdown(); // 等待子线程结束,再继续执行下面的代码 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("all thread complete"); } }
可以发现线程被循环创建,但是启动线程却不是连续的,而是由ExecutorService决定的。
————————————————————————————————————
ExecutorService生命周期
ExecutorService接口继承了Executor接口,定义了一些生命周期的方法
- public interface ExecutorService extends Executor {
- void shutdown();
- List<Runnable> shutdownNow();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- }
本文,我们逐一分析里面的每个方法。
首先,我们需要创建一个任务代码,这段任务代码主要是随机生成含有10个字符的字符串
- /**
- * 随机生成10个字符的字符串
- * @author dream-victor
- *
- */
- public class Task1 implements Callable<String> {
- @Override
- public String call() throws Exception {
- String base = “abcdefghijklmnopqrstuvwxyz0123456789”;
- Random random = new Random();
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < 10; i++) {
- int number = random.nextInt(base.length());
- sb.append(base.charAt(number));
- }
- return sb.toString();
- }
- }
然后,我们还需要一个长任务,这里我们默认是沉睡10秒,
- /**
- * 长时间任务
- *
- * @author dream-victor
- *
- */
- public class LongTask implements Callable<String> {
- @Override
- public String call() throws Exception {
- TimeUnit.SECONDS.sleep(10);
- return “success”;
- }
- }
OK,所有前期准备完毕,下面我们就来分析一下ExecutorService接口中和生命周期有关的这些方法:
1、shutdown方法:这个方法会平滑地关闭ExecutorService,当我们调用这个方法时,ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。这里我们先不举例在下面举例。
2、awaitTermination方法:这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。例如:
- ExecutorService service = Executors.newFixedThreadPool(4);
- service.submit(new Task1());
- service.submit(new Task1());
- service.submit(new LongTask());
- service.submit(new Task1());
- service.shutdown();
- while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
- System.out.println(“线程池没有关闭”);
- }
- System.out.println(“线程池已经关闭”);
这段代码中,我们在第三次提交了一个长任务,这个任务将执行10秒沉睡,紧跟着执行了一次shutdown()方法,假设:这时ExecutorService被立即关闭,下面调用service.awaitTermination(1, TimeUnit.SECONDS)方法时应该返回true,程序执行结果应该只会打印出:“线程池已经关闭”。但是,真实的运行结果如下:
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池已经关闭
这说明我们假设错误,service.awaitTermination(1, TimeUnit.SECONDS)每隔一秒监测一次ExecutorService的关闭情况,而长任务正好需要执行10秒,因此会在前9秒监测时ExecutorService为未关闭状态,而在第10秒时已经关闭,因此第10秒时输出:线程池已经关闭。这也验证了shutdown方法关闭ExecutorService的条件。
3、shutdownNow方法:这个方法会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个List列表,列表中返回的是等待在工作队列中的任务。例如:
- ExecutorService service = Executors.newFixedThreadPool(3);
- service.submit(new LongTask());
- service.submit(new LongTask());
- service.submit(new LongTask());
- service.submit(new LongTask());
- service.submit(new LongTask());
- List<Runnable> runnables = service.shutdownNow();
- System.out.println(runnables.size());
- while (!service.awaitTermination(1, TimeUnit.MILLISECONDS)) {
- System.out.println(“线程池没有关闭”);
- }
- System.out.println(“线程池已经关闭”);
这段代码中,我们限制了线程池的长度是3,提交了5个任务,这样将有两个任务在工作队列中等待,当我们执行shutdownNow方法时,ExecutorService被立刻关闭,所以在service.awaitTermination(1, TimeUnit.MILLISECONDS)方法校验时返回的是false,因此没有输出:线程池没有关闭。而在调用shutdownNow方法时,我们接受到了一个List,这里包含的是在工作队列中等待执行的任务,由于线程池长度为3,且执行的都是长任务,所以当提交了三个任务后线程池已经满了,剩下的两次提交只能在工作队列中等待,因此我们看到runnables的大小为2,结果如下:
- 2
- 线程池已经关闭
4、isTerminated方法:这个方法会校验ExecutorService当前的状态是否为“TERMINATED”即关闭状态,当为“TERMINATED”时返回true否则返回false。例如:
- ExecutorService service = Executors.newFixedThreadPool(3);
- service.submit(new Task1());
- service.submit(new Task1());
- service.submit(new LongTask());
- service.shutdown();
- System.out.println(System.currentTimeMillis());
- while (!service.isTerminated()) {
- }
- System.out.println(System.currentTimeMillis());
这段代码我们执行了两个正常的任务和一个长任务,然后调用了shutdown方法,我们知道调用shutdown方法并不会立即关闭ExecutorService,这时我们记录一下监测循环执行前的时间,在没有关闭前我们一直进入一个空循环中,直到 ExecutorService关闭后退出循环,这里我们知道长任务执行时间大约为10秒,我们看一下上述程序运行结果:
- 1303298818621
- 1303298828634
- 相差:10013毫秒,转换一下除以1000,得到相差大约10秒
这10秒正好是长任务执行的时间,因此在 ExecutorService正常关闭后isTerminated方法返回true。
5、isShutdown方法:这个方法在ExecutorService关闭后返回true,否则返回false。方法比较简单不再举例。
以上讨论是基于ThreadPoolExecutor的实现,不同的实现会有所不同需注意。
————————————————————————————————————————————————————————————————————————————————
ExecutorService与Executors例子的简单剖析
对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。
看一个简单的例子:
- public class CacheThreadPool {
- public static void main(String[] args) {
- ExecutorService exec=Executors.newCachedThreadPool();
- for(int i=0;i<5;i++)
- exec.execute(new LiftOff());
- exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务
- }
- }
这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。
一、ExecutorService
先看看ExecutorService,这是一个接口,简单的列一下这个接口:
- public interface ExecutorService extends Executor {
- void shutdown();
- List<Runnable> shutdownNow();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout, TimeUnit unit)
- <T> Future<T> submit(Callable<T> task);
- <T> Future<T> submit(Runnable task, T result);
- Future<?> submit(Runnable task);
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- }
ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法:
- void execute(Runnable command)
二、Executors
Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用:
-
- Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
- Methods that create and return an ExecutorService set up with commonly useful configuration settings.
- Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
- Methods that create and return a “wrapped” ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
- Methods that create and return a ThreadFactory that sets newly created threads to a known state.
- Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法:
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- ……….
- private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到
- ……….
- }
ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。
线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- ……….
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
- }
- }
- ……….
- }
根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是:
1、workQueue.offer(command)
workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。
2、ensureQueuedTaskHandled(command)
这个是线程执行的关键语句。看看它的源码:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- ……….
- private void ensureQueuedTaskHandled(Runnable command) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- boolean reject = false;
- Thread t = null;
- try {
- int state = runState;
- if (state != RUNNING && workQueue.remove(command))
- reject = true;
- else if (state < STOP &&
- poolSize < Math.max(corePoolSize, 1) &&
- !workQueue.isEmpty())
- t = addThread(null);
- } finally {
- mainLock.unlock();
- }
- if (reject)
- reject(command);
- else if (t != null)
- t.start();
- }
- ……….
- }
在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- ……….
- private Thread addThread(Runnable firstTask) {
- Worker w = new Worker(firstTask);
- Thread t = threadFactory.newThread(w);
- if (t != null) {
- w.thread = t;
- workers.add(w);
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- }
- return t;
- }
- ……….
- }
这里两个重点,很明显:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是个什么结构:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- ……….
- private final class Worker implements Runnable {
- ……….
- Worker(Runnable firstTask) {
- this.firstTask = firstTask;
- }
- private Runnable firstTask;
- ……….
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this);
- }
- }
- }
- Runnable getTask() {
- for (;;) {
- try {
- int state = runState;
- if (state > SHUTDOWN)
- return null;
- Runnable r;
- if (state == SHUTDOWN) // Help drain queue
- r = workQueue.poll();
- else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
- r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
- else
- r = workQueue.take();
- if (r != null)
- return r;
- if (workerCanExit()) {
- if (runState >= SHUTDOWN) // Wake up others
- interruptIdleWorkers();
- return null;
- }
- // Else retry
- } catch (InterruptedException ie) {
- // On interruption, re-check runState
- }
- }
- }
- }
- ……….
- }
Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
在看看newThread是一个什么方法:
- public class Executors {
- ……….
- static class DefaultThreadFactory implements ThreadFactory {
- ……….
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- ……….
- }
- ……….
- }
通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。
之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。
从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/195112.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...