executorservice线程池建立_线程池 threadlocal

executorservice线程池建立_线程池 threadlocalExecutorService建立多线程的步骤:1。定义线程类classHandlerimplementsRunnable{}2。建立ExecutorService线程池ExecutorServiceexecutorService=Executors.newCachedThreadPool();或者intc

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
ExecutorService 建立多线程的步骤:



1。定义线程类 class Handler implements Runnable{

}
2。建立ExecutorService线程池 ExecutorService executorService = Executors.newCachedThreadPool();

或者

int cpuNums = Runtime.getRuntime().availableProcessors();
                //获取当前系统的CPU 数目
ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE);
                //ExecutorService通常根据系统资源情况灵活定义线程池大小

3。调用线程池操作 循环操作,成为daemon,把新实例放入Executor池中
      while(true){

        executorService.execute(new Handler(socket)); 
           // class Handler implements Runnable{

        或者
        executorService.execute(createTask(i));
            //private static Runnable createTask(final int taskID)
      }

execute(Runnable对象)方法
其实就是对Runnable对象调用start()方法
(当然还有一些其他后台动作,比如队列,优先级,IDLE timeout,active激活等)





几种不同的ExecutorService线程池对象

1.newCachedThreadPool()  -缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中
-缓存型池子通常用于执行一些生存期很短的异步型任务
 因此在一些面向连接的daemon型SERVER中用得不多。
-能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
  注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
2. newFixedThreadPool -newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待直到当前的线程中某个线程终止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE  
3.ScheduledThreadPool -调度型线程池
-这个池子里的线程可以按schedule依次delay执行,或周期执行
4.SingleThreadExecutor -单例线程,任意时间池中只能有一个线程
用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)






上面四种线程池,都使用Executor的缺省线程工厂建立线程,也可单独定义自己的线程工厂


下面是缺省线程工厂代码:

    static class DefaultThreadFactory implements ThreadFactory {

        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        DefaultThreadFactory() {

            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :Thread.currentThread().getThreadGroup();
          
            namePrefix = “pool-” + poolNumber.getAndIncrement() + “-thread-“;
        }

        public Thread newThread(Runnable r) {

            Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

也可自己定义ThreadFactory,加入建立池的参数中

 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {





Executor的execute()方法


execute() 方法将Runnable实例加入pool中,并进行一些pool size计算和优先级处理


execute() 方法本身在Executor接口中定义,有多个实现类都定义了不同的execute()方法


如ThreadPoolExecutor类(cache,fiexed,single三种池子都是调用它)的execute方法如下:

    public void execute(Runnable command) {

        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

            if (runState == RUNNING && workQueue.offer(command)) {

                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }



 ——————————————————————————————————————————————————————————————————


Executors.newFixedThreadPool和ArrayBlockingQueue一点使用心得

newFixedThreadPool使用范例:

 

 

 

Java代码  
收藏代码

  1. import java.io.IOException;  
  2. import java.util.concurrent.ExecutorService;  
  3. import java.util.concurrent.Executors;  
  4.   
  5. public class Test {  
  6.   
  7.     public static void main(String[] args) throws IOException, InterruptedException {  
  8.         ExecutorService service = Executors.newFixedThreadPool(2);  
  9.         for (int i = 0; i < 6; i++) {  
  10.             final int index = i;  
  11.             System.out.println(“task: “ + (i+1));  
  12.             Runnable run = new Runnable() {  
  13.                 @Override  
  14.                 public void run() {  
  15.                     System.out.println(“thread start” + index);  
  16.                     try {  
  17.                         Thread.sleep(Long.MAX_VALUE);  
  18.                     } catch (InterruptedException e) {  
  19.                         e.printStackTrace();  
  20.                     }  
  21.                     System.out.println(“thread end” + index);  
  22.                 }  
  23.             };  
  24.             service.execute(run);  
  25.         }  
  26.     }  
  27. }  

 

 输出:
task: 1

task: 2

thread start0

task: 3

task: 4

task: 5

task: 6

task: 7

thread start1

task: 8

task: 9

task: 10

task: 11

task: 12

task: 13

task: 14

task: 15

 

    从实例可以看到for循环并没有被固定的线程池阻塞住,也就是说所有的线程task都被提交到了ExecutorService中,查看 Executors.newFixedThreadPool()如下:

 

 

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

    可以看到task被提交都了LinkedBlockingQueue中。这里有个问题,如果任务列表很大,一定会把内存撑爆,如何解决?看下面:

 

 

Java代码  
收藏代码

  1. import java.io.IOException;  
  2. import java.util.concurrent.ArrayBlockingQueue;  
  3. import java.util.concurrent.BlockingQueue;  
  4. import java.util.concurrent.ThreadPoolExecutor;  
  5. import java.util.concurrent.TimeUnit;  
  6.   
  7. public class Test {  
  8.   
  9.     public static void main(String[] args) throws IOException, InterruptedException {  
  10.           
  11.         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(3);  
  12.           
  13.         ThreadPoolExecutor executor = new ThreadPoolExecutor(331, TimeUnit.HOURS, queue, new ThreadPoolExecutor.CallerRunsPolicy());  
  14.           
  15.         for (int i = 0; i < 10; i++) {  
  16.             final int index = i;  
  17.             System.out.println(“task: “ + (index+1));  
  18.             Runnable run = new Runnable() {  
  19.                 @Override  
  20.                 public void run() {  
  21.                     System.out.println(“thread start” + (index+1));  
  22.                     try {  
  23.                         Thread.sleep(Long.MAX_VALUE);  
  24.                     } catch (InterruptedException e) {  
  25.                         e.printStackTrace();  
  26.                     }  
  27.                     System.out.println(“thread end” + (index+1));  
  28.                 }  
  29.             };  
  30.             executor.execute(run);  
  31.         }  
  32.     }  
  33. }  

 

 输出:
task: 1

task: 2

thread start1

task: 3

task: 4

task: 5

task: 6

task: 7

thread start2

thread start7

thread start6

 

    线程池最大值为4(??这里我不明白为什么是设置值+1,即3+1,而不是3),准备执行的任务队列为3。可以看到for循环先处理4个task,然后把3个放到队列。这样就实现了自动阻塞队列的效果。记得要使用ArrayBlockingQueue这个队列,然后设置容量就OK了。

——————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————
ExecutorService常用方法和newFixedThreadPool创建固定大小的线程池

1、ExecutorService:

是一个接口,继承了Executor:

public interface ExecutorService extends Executor {
}

2、Executor:

而Executor亦是一个接口,该接口只包含了一个方法:

void execute(Runnable command);

3、Executors:

该类是一个辅助类,此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。

此类支持以下各种方法:

• 创建并返回设置有常用配置字符串的 ExecutorService 的方法。

• 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。

• 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。

• 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。

• 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。
4、创建ExecutorService的方法:
newFixedThreadPool()

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。

5、ExecutorService的方法:

shutdown

void shutdown()
启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。 

抛出:

SecurityException – 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 
RuntimePermission
("modifyThread")),或者安全管理器的 
checkAccess 方法拒绝访问。

启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。

awaitTermination

boolean awaitTermination(long timeout,
                         TimeUnit unit)
                         throws InterruptedException
请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。 

参数:

timeout – 最长等待时间
unit – timeout 参数的时间单位

返回:

如果此执行程序终止,则返回 
true;如果终止前超时期满,则返回 
false

抛出:

InterruptedException – 如果等待时发生中断

请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。既是等待所有子线程执行结束。

execute

void execute(Runnable command)
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 
Executor实现决定。 

参数:

command – 可运行的任务

抛出:

RejectedExecutionException – 如果不能接受执行此任务。
NullPointerException – 如果命令为 null

在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。

submit

Future<?> submit(Runnable task)
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 
get 方法在
成功 完成时将会返回
null。 

参数:

task – 要提交的任务

返回:

表示任务等待完成的 Future

抛出:

RejectedExecutionException – 如果任务无法安排执行
NullPointerException – 如果该任务为 null

提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。

6、下面是相关的使用例子:
public class ExecutorServiceTest {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService service = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            System.out.println("创建线程" + i);
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    System.out.println("启动线程");
                }
            };
            // 在未来某个时间执行给定的命令
            service.execute(run);
        }
        // 关闭启动线程
        service.shutdown();
        // 等待子线程结束,再继续执行下面的代码
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("all thread complete");
    }
}

可以发现线程被循环创建,但是启动线程却不是连续的,而是由ExecutorService决定的。

————————————————————————————————————

ExecutorService生命周期

ExecutorService接口继承了Executor接口,定义了一些生命周期的方法

Java代码  
收藏代码

  1. public interface ExecutorService extends Executor {  
  2. void shutdown();  
  3. List<Runnable> shutdownNow();  
  4. boolean isShutdown();  
  5. boolean isTerminated();  
  6. boolean awaitTermination(long timeout, TimeUnit unit)  
  7.         throws InterruptedException;  
  8. }  

本文,我们逐一分析里面的每个方法。

 

首先,我们需要创建一个任务代码,这段任务代码主要是随机生成含有10个字符的字符串

Java代码  
收藏代码

  1. /** 
  2.  * 随机生成10个字符的字符串 
  3.  * @author dream-victor 
  4.  * 
  5.  */  
  6. public class Task1 implements Callable<String> {  
  7.   
  8.     @Override  
  9.     public String call() throws Exception {  
  10.         String base = “abcdefghijklmnopqrstuvwxyz0123456789”;  
  11.         Random random = new Random();  
  12.         StringBuffer sb = new StringBuffer();  
  13.         for (int i = 0; i < 10; i++) {  
  14.             int number = random.nextInt(base.length());  
  15.             sb.append(base.charAt(number));  
  16.         }  
  17.         return sb.toString();  
  18.     }  
  19.   
  20.  

 然后,我们还需要一个长任务,这里我们默认是沉睡10秒,

Java代码  
收藏代码

  1. /** 
  2.  * 长时间任务 
  3.  *  
  4.  * @author dream-victor 
  5.  *  
  6.  */  
  7. public class LongTask implements Callable<String> {  
  8.   
  9.     @Override  
  10.     public String call() throws Exception {  
  11.         TimeUnit.SECONDS.sleep(10);  
  12.         return “success”;  
  13.     }  
  14.   
  15. }  

OK,所有前期准备完毕,下面我们就来分析一下ExecutorService接口中和生命周期有关的这些方法:

 

1、shutdown方法:这个方法会平滑地关闭ExecutorService,当我们调用这个方法时,ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。这里我们先不举例在下面举例。

 

2、awaitTermination方法:这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。例如:


  1. ExecutorService service = Executors.newFixedThreadPool(4);  
  2. service.submit(new Task1());  
  3. service.submit(new Task1());  
  4. service.submit(new LongTask());  
  5. service.submit(new Task1());  
  6.   
  7. service.shutdown();  
  8.   
  9. while (!service.awaitTermination(1, TimeUnit.SECONDS)) {  
  10.     System.out.println(“线程池没有关闭”);  
  11. }  
  12. System.out.println(“线程池已经关闭”);  

 这段代码中,我们在第三次提交了一个长任务,这个任务将执行10秒沉睡,紧跟着执行了一次shutdown()方法,假设:这时ExecutorService被立即关闭,下面调用service.awaitTermination(1, TimeUnit.SECONDS)方法时应该返回true,程序执行结果应该只会打印出:“线程池已经关闭”。但是,真实的运行结果如下:


  1. 线程池没有关闭  
  2. 线程池没有关闭  
  3. 线程池没有关闭  
  4. 线程池没有关闭  
  5. 线程池没有关闭  
  6. 线程池没有关闭  
  7. 线程池没有关闭  
  8. 线程池没有关闭  
  9. 线程池没有关闭  
  10. 线程池已经关闭  

 这说明我们假设错误,service.awaitTermination(1, TimeUnit.SECONDS)每隔一秒监测一次ExecutorService的关闭情况,而长任务正好需要执行10秒,因此会在前9秒监测时ExecutorService为未关闭状态,而在第10秒时已经关闭,因此第10秒时输出:线程池已经关闭。这也验证了shutdown方法关闭ExecutorService的条件。

 

3、shutdownNow方法:这个方法会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个List列表,列表中返回的是等待在工作队列中的任务。例如:


  1. ExecutorService service = Executors.newFixedThreadPool(3);  
  2. service.submit(new LongTask());  
  3. service.submit(new LongTask());  
  4. service.submit(new LongTask());  
  5. service.submit(new LongTask());  
  6. service.submit(new LongTask());  
  7.   
  8. List<Runnable> runnables = service.shutdownNow();  
  9. System.out.println(runnables.size());  
  10.   
  11. while (!service.awaitTermination(1, TimeUnit.MILLISECONDS)) {  
  12.     System.out.println(“线程池没有关闭”);  
  13. }  
  14. System.out.println(“线程池已经关闭”);  

 这段代码中,我们限制了线程池的长度是3,提交了5个任务,这样将有两个任务在工作队列中等待,当我们执行shutdownNow方法时,ExecutorService被立刻关闭,所以在service.awaitTermination(1, TimeUnit.MILLISECONDS)方法校验时返回的是false,因此没有输出:线程池没有关闭。而在调用shutdownNow方法时,我们接受到了一个List,这里包含的是在工作队列中等待执行的任务,由于线程池长度为3,且执行的都是长任务,所以当提交了三个任务后线程池已经满了,剩下的两次提交只能在工作队列中等待,因此我们看到runnables的大小为2,结果如下:


  1. 2  
  2. 线程池已经关闭  

 

4、isTerminated方法:这个方法会校验ExecutorService当前的状态是否为“TERMINATED”即关闭状态,当为“TERMINATED”时返回true否则返回false。例如:


  1. ExecutorService service = Executors.newFixedThreadPool(3);  
  2. service.submit(new Task1());  
  3. service.submit(new Task1());  
  4. service.submit(new LongTask());  
  5.   
  6. service.shutdown();  
  7. System.out.println(System.currentTimeMillis());  
  8. while (!service.isTerminated()) {  
  9. }  
  10. System.out.println(System.currentTimeMillis());  

这段代码我们执行了两个正常的任务和一个长任务,然后调用了shutdown方法,我们知道调用shutdown方法并不会立即关闭ExecutorService,这时我们记录一下监测循环执行前的时间,在没有关闭前我们一直进入一个空循环中,直到 ExecutorService关闭后退出循环,这里我们知道长任务执行时间大约为10秒,我们看一下上述程序运行结果:


  1. 1303298818621  
  2. 1303298828634  
  3. 相差:10013毫秒,转换一下除以1000,得到相差大约10秒  

这10秒正好是长任务执行的时间,因此在 ExecutorService正常关闭后isTerminated方法返回true。

 

5、isShutdown方法:这个方法在ExecutorService关闭后返回true,否则返回false。方法比较简单不再举例。

以上讨论是基于ThreadPoolExecutor的实现,不同的实现会有所不同需注意。

————————————————————————————————————————————————————————————————————————————————

ExecutorService与Executors例子的简单剖析

 对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。 

        看一个简单的例子: 

Java代码  
收藏代码

  1. public class CacheThreadPool {  
  2.     public static void main(String[] args) {  
  3.         ExecutorService exec=Executors.newCachedThreadPool();  
  4.         for(int i=0;i<5;i++)  
  5.             exec.execute(new LiftOff());  
  6.         exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  
  7.     }  
  8. }  

        这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。 

        一、ExecutorService 
        先看看ExecutorService,这是一个接口,简单的列一下这个接口: 

Java代码  
收藏代码

  1. public interface ExecutorService extends Executor {  
  2.   
  3.     void shutdown();  
  4.   
  5.     List<Runnable> shutdownNow();  
  6.   
  7.     boolean isShutdown();  
  8.   
  9.     boolean isTerminated();  
  10.   
  11.     boolean awaitTermination(long timeout, TimeUnit unit)  
  12.   
  13.     <T> Future<T> submit(Callable<T> task);  
  14.   
  15.     <T> Future<T> submit(Runnable task, T result);  
  16.   
  17.     Future<?> submit(Runnable task);  
  18.   
  19.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
  20.   
  21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  
  22.   
  23.     <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
  24.   
  25.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
  26.                     long timeout, TimeUnit unit)  
  27. }  

        ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法: 

Java代码  
收藏代码

  1. void execute(Runnable command)  

        二、Executors 
        Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用: 
       

    •         Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods: 

 

  •        
  • Methods that create and return an ExecutorService set up with commonly useful configuration settings.       
  • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.       
  • Methods that create and return a “wrapped” ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.       
  • Methods that create and return a ThreadFactory that sets newly created threads to a known state.       
  • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.       

        在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法: 

Java代码  
收藏代码

  1. public static ExecutorService newCachedThreadPool() {  
  2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3.                                       60L, TimeUnit.SECONDS,  
  4.                                       new SynchronousQueue<Runnable>());  
  5.     }  

        在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息: 

Java代码  
收藏代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ……….  
  3.     private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到  
  4.     ……….  
  5. }  

        ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。 

        线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码: 

Java代码  
收藏代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ……….  
  3.     public void execute(Runnable command) {  
  4.         if (command == null)  
  5.             throw new NullPointerException();  
  6.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  7.             if (runState == RUNNING && workQueue.offer(command)) {  
  8.                 if (runState != RUNNING || poolSize == 0)  
  9.                     ensureQueuedTaskHandled(command);  
  10.             }  
  11.             else if (!addIfUnderMaximumPoolSize(command))  
  12.                 reject(command); // is shutdown or saturated  
  13.         }  
  14.     }  
  15.     ……….  
  16. }  

        根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是: 
        1、workQueue.offer(command) 
        workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。 
        2、ensureQueuedTaskHandled(command) 
        这个是线程执行的关键语句。看看它的源码: 

Java代码  
收藏代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ……….  
  3.     private void ensureQueuedTaskHandled(Runnable command) {  
  4.         final ReentrantLock mainLock = this.mainLock;  
  5.         mainLock.lock();  
  6.         boolean reject = false;  
  7.         Thread t = null;  
  8.         try {  
  9.             int state = runState;  
  10.             if (state != RUNNING && workQueue.remove(command))  
  11.                 reject = true;  
  12.             else if (state < STOP &&  
  13.                      poolSize < Math.max(corePoolSize, 1) &&  
  14.                      !workQueue.isEmpty())  
  15.                 t = addThread(null);  
  16.         } finally {  
  17.             mainLock.unlock();  
  18.         }  
  19.         if (reject)  
  20.             reject(command);  
  21.         else if (t != null)  
  22.             t.start();  
  23.     }  
  24.     ……….  
  25. }  

        在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码: 

Java代码  
收藏代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ……….  
  3.     private Thread addThread(Runnable firstTask) {  
  4.         Worker w = new Worker(firstTask);  
  5.         Thread t = threadFactory.newThread(w);  
  6.         if (t != null) {  
  7.             w.thread = t;  
  8.             workers.add(w);  
  9.             int nt = ++poolSize;  
  10.             if (nt > largestPoolSize)  
  11.                 largestPoolSize = nt;  
  12.         }  
  13.         return t;  
  14.     }  
  15.     ……….  
  16. }  

        这里两个重点,很明显: 
        1、Worker w = new Worker(firstTask) 
        2、Thread t = threadFactory.newThread(w) 
        先看Worker是个什么结构: 

Java代码  
收藏代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ……….  
  3.     private final class Worker implements Runnable {  
  4.         ……….  
  5.         Worker(Runnable firstTask) {  
  6.             this.firstTask = firstTask;  
  7.         }  
  8.   
  9.         private Runnable firstTask;  
  10.         ……….  
  11.   
  12.         public void run() {  
  13.             try {  
  14.                 Runnable task = firstTask;  
  15.                 firstTask = null;  
  16.                 while (task != null || (task = getTask()) != null) {  
  17.                     runTask(task);  
  18.                     task = null;  
  19.                 }  
  20.             } finally {  
  21.                 workerDone(this);  
  22.             }  
  23.         }  
  24.     }  
  25.   
  26.     Runnable getTask() {  
  27.         for (;;) {  
  28.             try {  
  29.                 int state = runState;  
  30.                 if (state > SHUTDOWN)  
  31.                     return null;  
  32.                 Runnable r;  
  33.                 if (state == SHUTDOWN)  // Help drain queue  
  34.                     r = workQueue.poll();  
  35.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
  36.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
  37.                 else  
  38.                     r = workQueue.take();  
  39.                 if (r != null)  
  40.                     return r;  
  41.                 if (workerCanExit()) {  
  42.                     if (runState >= SHUTDOWN) // Wake up others  
  43.                         interruptIdleWorkers();  
  44.                     return null;  
  45.                 }  
  46.                 // Else retry  
  47.             } catch (InterruptedException ie) {  
  48.                 // On interruption, re-check runState  
  49.             }  
  50.         }  
  51.     }  
  52.     }  
  53.     ……….  
  54. }  

        Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
        在看看newThread是一个什么方法: 

Java代码  
收藏代码

  1. public class Executors {  
  2.     ……….  
  3.     static class DefaultThreadFactory implements ThreadFactory {  
  4.         ……….  
  5.         public Thread newThread(Runnable r) {  
  6.             Thread t = new Thread(group, r,  
  7.                                   namePrefix + threadNumber.getAndIncrement(),  
  8.                                   0);  
  9.             if (t.isDaemon())  
  10.                 t.setDaemon(false);  
  11.             if (t.getPriority() != Thread.NORM_PRIORITY)  
  12.                 t.setPriority(Thread.NORM_PRIORITY);  
  13.             return t;  
  14.         }  
  15.         ……….  
  16.     }  
  17.     ……….  
  18. }  

        通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。 

        之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。 
        从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。






版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/195112.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • TCP三次握手详解-深入浅出(有图实例演示)[通俗易懂]

    TCP三次握手详解-深入浅出(有图实例演示)[通俗易懂]TCP是属于网络分层中的传输层,因为OSI分为层,感觉太麻烦了,所以分为四层就好了,简单。分层以及每层的协议,如下两张图:TCP三次握手TCP三次握手简单如下图:TCP三次握手的过程描述:1.客户主动(activeopen)去connect服务器,并且发送SYN假设序列号为J,服务器是被动打开(passiveopen)2.服务器在收到SYN后,它…

  • ScheduledExecutorService 接口[通俗易懂]

    ScheduledExecutorService 接口[通俗易懂]newScheduledThreadPool()或者newSingleThreadScheduled-Executor()方法:延迟执行、周期性执行的执行器如果想在某一段时间之后执行线程操作,或者周期性地重复执行线程操作,则可以使用工厂类Executors的newScheduledThreadPool()方法或者newSingleThreadSche…

  • 线程池参数配置详解[通俗易懂]

    线程池参数配置详解[通俗易懂]/***Createsanew{@codeThreadPoolExecutor}withthegiveninitial*parameters.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset.

  • DOS 下如何COPY部分内容[通俗易懂]

    DOS 下如何COPY部分内容[通俗易懂]左键--〉标记(选中)—-〉右键

  • setContentView流程

    setContentView流程1、activity、window、DecorView、ViewRoot之间的预备知识activityactivity是Android的四大组件之一,负责控制activity的生命周期和处理事件,负责视图的添加与显示,以及通过一些回调方法与window和View进行交互。一个activity包含一个window,window才是真正的窗口WindowWindow是一个抽象类,它真正的实现类是PhoneWindow。Window通过WindowManager加载一个DecorView到Window中,

  • 刚开的博客测试下[通俗易懂]

    刚开的博客测试下[通俗易懂];;;;;;;;;——————-iK7VUYG0yF6lS3QNNmW4Gw==tRymiHsi9AbKpr3tTFXxup1GFhuX0czs73gSv/E7b5c=uk29oXxJxAg+D0WGWLg/LaJ5+a4y4SSHbrMB4JywbGg=eIWSkIow/vo+D0WGWLg/LaJ5+a4y4SSHbrMB4JywbGg=pcL609

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号