Executor线程池

2018-09-10 01:02:23来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

Executor线程池框架:

  使用线程池的优点:

    1、重用存在的线程

    2、减少对象创建、消亡的开销

    3、性能佳

    4、可有效控制最大并发线程数,提高系统资源的使用率

    5、避免过多资源竞争,避免堵塞 

    6、提供定时执行、定期执行、单线程、并发数控制等功能
    

  介绍:

    1、Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

    2、在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。

    3、Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

    

  介绍排队策略:

    1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。

      2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

      3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。

  

  Executors工厂类介绍:

    Executors提供四种线程池,newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool;

 1 //1.创建固定数目线程的线程池。
 2 public static ExecutorService newFixedThreadPool(int nThreads) 
 
 5 //2.创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
 6 public static ExecutorService newCachedThreadPool() 
 
 8 //3.创建一个单线程化的Executor。
 9 public static ExecutorService newSingleThreadExecutor() 
 
11 //4.创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
12 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 

 

创建线程池的三种方式及静态方法:

  1. Executors.newFixedThreadPool(int); / /创建固定容量大小的缓冲池
  2.  Executors.newCachedThreadPool(); // 创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
  3.  Executors.newSingleThreadExecutor(); // 创建容量为1的缓冲池

 

 1 public static ExecutorService newFixedThreadPool(int nThreads) {
 2     return new ThreadPoolExecutor(nThreads, nThreads,
 3                                   0L, TimeUnit.MILLISECONDS,
 4                                   new LinkedBlockingQueue<Runnable>());
 5 }
 6 public static ExecutorService newSingleThreadExecutor() {
 7     return new FinalizableDelegatedExecutorService
 8         (new ThreadPoolExecutor(1, 1,
 9                                 0L, TimeUnit.MILLISECONDS,
10                                 new LinkedBlockingQueue<Runnable>()));
11 }
12 public static ExecutorService newCachedThreadPool() {
13     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
14                                   60L, TimeUnit.SECONDS,
15                                   new SynchronousQueue<Runnable>());
16 }

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

  newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的(n,n),它使用的LinkedBlockingQueue;

  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1(1,1),也使用的LinkedBlockingQueue;

  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

三个线程池的特点:

  1、newFixedThreadPool创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数corePoolSize,则将提交的任务存入到池队列中。
  2、newCachedThreadPool创建一个可缓存的线程池。这种类型的线程池特点是:
    1).工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
    2).如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  3、newSingleThreadExecutor创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 

 

小案例:

    1、newFixedThreadPool

      --newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
      --其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
      --和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
      --从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
        fixed池线程数固定,并且是0秒IDLE(无IDLE)    
        cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE  

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 /**
 5  *    newFixedThreadPool(int nThreads):创建固定线程数量的线程池,以共享的无界队列方式来运行这些线程
 6  */
 7 public class Test {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executorService = Executors.newFixedThreadPool(5);
11         for (int i = 0; i < 20; i++) {
12             Runnable syncRunnable = new Runnable() {
13                 @Override
14                 public void run() {
15                     System.out.println(Thread.currentThread().getName());
16                     try {
17                         Thread.sleep(2000);
18                     } catch (InterruptedException e) {
19                         e.printStackTrace();
20                     }
21                 }
22             };
23             executorService.execute(syncRunnable);
24         }
25     }
26 }

运行后:可以看到同时有五个线程在执行任务,当当五个线程都处于活动状态,再次提交的任务都会加入队列等到其他线程运行结束,当线程处于空闲状态时会被复用执行下一个任务。

newFixedThreadPool的execute方法执行过程:

  1,如果当前运行线程数少于corePoolSize,则创建新线程来执行任务(优先满足核心池)

  2,当前运行线程数等于corePoolSize时,将任务加入LinkedBlockingQueue链式阻塞队列(核心池满了在进入队列)

  3,当线程池的任务完成之后,循环反复从LinkedBlockingQueue队列中获取任务来执行

 

  2、newCachedThreadPool

    --缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中
    --缓存型池子通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。
    --能reuse的线程,必须是timeout IDLE内的池中线程,缺省     timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
      注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。

 

newCachedThreadPool是一个根据需要创建线程的线程池。

 newCachedThreadPool的corePoolSize设置0,即核心池是空,maxmumPoolSize设置为Integer.MAX_VALUE,即maxmumPool是无界的。keepAliveTime设置60L,当空闲线程等待新任务最长时间是60s,超过60s就终止

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 /**
 5  *    newCachedThreadPool()创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程
 6  */
 7 public class Test {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executorService = Executors.newCachedThreadPool();
11         for (int i = 0; i < 100; i++) {
12             Runnable syncRunnable = new Runnable() {
13                 @Override
14                 public void run() {
15                     System.out.println(Thread.currentThread().getName());
16                     try {
17                         Thread.sleep(2000);//模拟当前执行任务的线程被暂时占用
18                     } catch (InterruptedException e) {
19                         e.printStackTrace();
20                     }
21                 }
22             };
23             executorService.execute(syncRunnable);
24         }
25     }
26 }

运行结果如图:(部分截图)

  

 

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 /**
 5  *    newCachedThreadPool()创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程
 6  */
 7 public class Test {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executorService = Executors.newCachedThreadPool();
11         for (int i = 0; i < 100; i++) {
12             Runnable syncRunnable = new Runnable() {
13                 @Override
14                 public void run() {
15                     System.out.println(Thread.currentThread().getName());
16                 }
17             };
18             executorService.execute(syncRunnable);
19         }
20     }
21 }

 运行结果:(部分截图)

  

通过两个案例对比,可以看出:

  --缓存线程池大小是不定值,可以根据需要创建不同数量的线程。

  --在使用缓存型池时,先查看池中有没有以前创建的线程,如果有,就复用.如果没有,就新建新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务

   

  3、newScheduledThreadPool

    --调度型线程池
    --这个池子里的线程可以按schedule依次delay执行,或周期执行

 1 import java.util.concurrent.Executors;
 2 import java.util.concurrent.ScheduledExecutorService;
 3 import java.util.concurrent.TimeUnit;
 4 
 5 /**
 6  *    newScheduledThreadPool(int corePoolSize)创建一个定长线程池,支持定时及周期性任务执行
 7  */
 8 public class Test {
 9     
10     /**
11      *    测试
12      */
13     public static void main(String[] args) {
14         TestMethod1();
15         TestMethod2();
16         TestMethod3();
17     }
18 
19     /**
20      *    方法1:schedule(Runnable command,long delay, TimeUnit unit)创建并执行在给定延迟后启用的一次性操作
21      *        运行结果和newFixedThreadPool类似,不同的是newScheduledThreadPool是延时一定时间之后才执行
22      */
23     public static void TestMethod1() {
24         ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
25         for (int i = 0; i < 20; i++) {
26             Runnable syncRunnable = new Runnable() {
27                 @Override
28                 public void run() {
29                     
30                 }
31             };
32             executorService.schedule(syncRunnable, 3000, TimeUnit.MILLISECONDS);
33         }
34     }
35 
36     /**
37      *     方法2:scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnitunit)
38      *            创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,
39      *            然后在initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推
40      */
41     public static void TestMethod2() {
42         ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
43         Runnable syncRunnable = new Runnable() {
44             @Override
45             public void run() {
46                 System.out.println(Thread.currentThread().getName());
47             }
48         };
49         executorService.scheduleAtFixedRate(syncRunnable, 2000, 10000, TimeUnit.MILLISECONDS);
50     }
51     
52     /**
53      *    方法3:scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)    
54      *            创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟
55      */
56     public static void TestMethod3() {
57         ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
58         Runnable syncRunnable = new Runnable() {
59             @Override
60             public void run() {
61                 System.out.println(Thread.currentThread().getName());
62                 try {
63                     Thread.sleep(1000);
64                 } catch (InterruptedException e) {
65                     e.printStackTrace();
66                 }
67             }
68         };
69         executorService.scheduleWithFixedDelay(syncRunnable, 5000, 3000, TimeUnit.MILLISECONDS);
70     }
71 }

  4、newSingleThreadExecutor

    --单例线程,任意时间池中只能有一个线程
    --用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 /**
 5  *    newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
 6  */
 7 public class Test {
 8     
 9     /**
10      *    测试,运行结果:只会创建一个线程,当上一个执行完之后才会执行第二个
11      */
12     public static void main(String[] args) {
13         ExecutorService executorService = Executors.newSingleThreadExecutor();
14         for (int i = 0; i < 20; i++) {
15             Runnable syncRunnable = new Runnable() {
16                 @Override
17                 public void run() {
18                     System.out.println(Thread.currentThread().getName());
19                 }
20             };
21             executorService.execute(syncRunnable);
22         }
23     }
24 }

newSingleThreadExecutor的execute方法执行过程如下:

  1,当前运行的线程数少于corePoolSize(即当前线程池中午运行的线程),则创建一个新的线程来执行任务

  2,当线程池中有一个运行的线程时,将任务加入阻塞队列

  3,当线程完成任务时,会无限反复从链式阻塞队列中获取任务来执行

 

 

  ExecutorService介绍:

    1、ExecutorService是一个接口,ExecutorService接口继承了Executor接口,定义了一些生命周期的方法;

    2、ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ;ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

    ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的任务执行完后,便到达终止状态。

    如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

  

ExecutorService接口源码:

 1 import java.util.Collection;
 2 import java.util.List;
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.Executor;
 6 import java.util.concurrent.Future;
 7 import java.util.concurrent.TimeUnit;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public interface ExecutorService extends Executor {
11 
12     void shutdown();//顺次地关闭ExecutorService,停止接收新的任务,等待所有已经提交的任务执行完毕之后,关闭ExecutorService
13 
14     List<Runnable> shutdownNow();//阻止等待任务启动并试图停止当前正在执行的任务,停止接收新的任务,返回处于等待的任务列表
15 
16     boolean isShutdown();//判断线程池是否已经关闭
17 
18     boolean isTerminated();//如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
19 
20     boolean awaitTermination(long timeout, TimeUnit unit);//等待(阻塞)直到关闭或最长等待时间或发生中断,timeout - 最长等待时间 ,unit - timeout 参数的时间单位  如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false 
21 
22     <T> Future<T> submit(Callable<T> task);//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
23 
24     <T> Future<T> submit(Runnable task, T result);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
25 
26     Future<?> submit(Runnable task);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null
27 
28     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
29         throws InterruptedException;
30 
31     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
32                                   long timeout, TimeUnit unit)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
33         throws InterruptedException;
34 
35     <T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。
36         throws InterruptedException, ExecutionException;
37 
38     <T> T invokeAny(Collection<? extends Callable<T>> tasks,
39                     long timeout, TimeUnit unit)
40         throws InterruptedException, ExecutionException, TimeoutException;
41 }

 

  案例1:执行Runnable任务

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 /**
 5  *     执行Runnable任务
 6  *    一旦Runnable任务传递到execute()方法,该方法便会自动在一个线程上执行
 7  */
 8 public class Test {
 9     public static void main(String[] args){   
10         ExecutorService executorService = Executors.newCachedThreadPool();   
11         for (int i = 0; i < 5; i++){   
12             executorService.execute(new TestRunnable());   
13             System.out.println("************* a" + i + " *************");   
14         }   
15         executorService.shutdown();//顺次地关闭ExecutorService,停止接收新的任务,等待所有已经提交的任务执行完毕之后,关闭ExecutorService   
16     }   
17 }   
18 
19 class TestRunnable implements Runnable{   
20     public void run(){   
21         System.out.println(Thread.currentThread().getName() + "线程被调用了。");   
22     }   
23 }

  

  案例2:执行Callable任务

    在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable task) 方法来执行,并且返回一个 Future,是表示任务等待完成的 Future。

 1 import java.util.ArrayList;
 2 import java.util.List;
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.Future;
 8 
 9 public class CallableDemo{   
10     /**
11      *    submit也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。
12      *    另外,需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过调用isDone()方法判断Future是否完成了返回。
13      */
14     public static void main(String[] args){   
15         ExecutorService executorService = Executors.newCachedThreadPool();   
16         List<Future<String>> resultList = new ArrayList<Future<String>>();   
17 
18         //创建10个任务并执行   
19         for (int i = 0; i < 10; i++){   
20             //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中   
21             Future<String> future = executorService.submit(new TaskWithResult(i));   
22             //将任务执行结果存储到List中   
23             resultList.add(future);   
24         }   
25 
26         //遍历任务的结果   
27         for (Future<String> fs : resultList){   
28                 try{   
29                     while(!fs.isDone());//Future返回如果没有完成,则一直循环等待,直到Future返回完成  
30                     System.out.println(fs.get());     //打印各个线程(任务)执行的结果   
31                 }catch(InterruptedException e){   
32                     e.printStackTrace();   
33                 }catch(ExecutionException e){   
34                     e.printStackTrace();   
35                 }finally{   
36                     //启动一次顺序关闭,执行以前提交的任务,但不接受新任务  
37                     executorService.shutdown();   
38                 }   
39         }   
40     }   
41 }   
42 
43 
44 class TaskWithResult implements Callable<String>{   
45     private int id;   
46 
47     public TaskWithResult(int id){   
48         this.id = id;   
49     }   
50 
51     /**  
52      *     任务的具体过程,一旦任务传给ExecutorService的submit方法, 
53      *     则该方法自动在一个线程上执行 
54      */   
55     public String call() throws Exception {  
56         System.out.println("call()方法被自动调用!!!    " + Thread.currentThread().getName());   
57         //该返回结果将被Future的get方法得到  
58         return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();   
59     }   
60 }  

 

  Future接口:

 1 // Future代表异步任务的执行结果
 2   public interface Future<V> {
 3   
 4       /**
 5        * 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。
 6        */
 7       boolean cancel(boolean mayInterruptIfRunning);
 8   
 9       /**
10       * 返回代表的任务是否在完成之前被取消了
11       */
12      boolean isCancelled();
13  
14      /**
15       * 如果任务已经完成,返回true
16       */
17     boolean isDone();
18  
19      /**
20       * 获取异步任务的执行结果(如果任务没执行完将等待)
21       */
22     V get() throws InterruptedException, ExecutionException;
23  
24      /**
25       * 获取异步任务的执行结果(有最常等待时间的限制)
26       *
27       *  timeout表示等待的时间,unit是它时间单位
28       */
29      V get(long timeout, TimeUnit unit)
30          throws InterruptedException, ExecutionException, TimeoutException;
31  }

 

尽量优先使用Executors提供的静态方法来创建线程池,如果Executors提供的方法无法满足要求,再自己通过ThreadPoolExecutor类来创建线程池   

  自定义线程池:ThreadPoolExecutor

 1 import java.util.concurrent.ArrayBlockingQueue;
 2 import java.util.concurrent.BlockingQueue;
 3 import java.util.concurrent.ThreadPoolExecutor;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 public class ThreadPoolTest{   
 7     public static void main(String[] args){   
 8         //创建等待队列   
 9         BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
10         //创建线程池,池中保存的线程数为3,允许的最大线程数为5,
11         //50 表示当线程数大于内核时,多余空闲线程的最大时间,单位为TimeUnit.MILLISECONDS
12         ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
13         //创建七个任务   
14         Runnable t1 = new MyThread();   
15         Runnable t2 = new MyThread();   
16         Runnable t3 = new MyThread();   
17         Runnable t4 = new MyThread();   
18         Runnable t5 = new MyThread();   
19         Runnable t6 = new MyThread();   
20         Runnable t7 = new MyThread();   
21         //每个任务会在一个线程上执行  
22         pool.execute(t1);   
23         pool.execute(t2);   
24         pool.execute(t3);   
25         pool.execute(t4);   
26         pool.execute(t5);   
27         pool.execute(t6);   
28         pool.execute(t7);   
29         //关闭线程池   
30         pool.shutdown();   
31     }   
32 }   
33 
34 class MyThread implements Runnable{   
35     @Override   
36     public void run(){   
37         System.out.println(Thread.currentThread().getName() + "正在执行。。。");   
38         try{   
39             Thread.sleep(100);   
40         }catch(InterruptedException e){   
41             e.printStackTrace();   
42         }   
43     }   
44 }  

  

  

ThreadPoolExecutor 构造方法,及参数讲解:

  public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long  keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

    corePoolSize:线程池中所保存的核心线程数,包括空闲线程。

    maximumPoolSize:池中允许的最大线程数。

    keepAliveTime:线程池中的空闲线程所能持续的最长时间。

    unit:持续时间的单位。

       TimeUnit.DAYS; //天
            TimeUnit.HOURS; //小时
            TimeUnit.MINUTES; //分钟
            TimeUnit.SECONDS; //秒
            TimeUnit.MILLISECONDS; //毫秒
            TimeUnit.MICROSECONDS; //微妙
            TimeUnit.NANOSECONDS; //纳秒

    workQueue:1、任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。

          2、(保存任务的阻塞队列,与线程池的大小有关:

              当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
              当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程
              当队列满时,在有新任务时就创建新线程)
          3、用于保存等待执行的任务的阻塞队列。一个阻塞队列,用来存储等待执行的任务:数组,链表,不存元素的阻塞队列

            ArrayBlockingQueue:数组结构的有界阻塞队列,先进先出FIFO

            LinkedBlockingQueue:链表结构的无界阻塞队列。先进先出FIFO排序元素,静态方法Executors.newFixedThreadPool使用这个方法

            SynchronousQueue:不存储元素的阻塞队列,就是每次插入操作必须等到另一个线程调用移除操作,静态方法Executors.newCachedThreadPool使用这个方法

    还有两个可选参数:
       ThreadFactory threadFactory:
          1、使用ThreadFactory创建新线程,默认使用defaultThreadFactory创建线程
          2、用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
      RejectedExecutionHandler handler:
          1、定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException
          2、当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

                 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

                 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

                 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

                ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

执行顺序解析:

  1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

  2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

       3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

       4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

 

总结:

  线程池优先要创建出基本线程池大小(corePoolSize)的线程数量,没有达到这个数量时,每次提交新任务都会直接创建一个新线程,当达到了基本线程数量后,又有新任务到达,优先放入等待队列,如果队列满了,才去创建新的线程(不能超过线程池的最大数maxmumPoolSize)

 

向线程池提交任务的两种方式:

1)通过execute()方法:这种方式提交没有返回值,也就不能判断任务是否被线程池执行成功。

 ExecutorService threadpool= Executors.newFixedThreadPool(10);
 threadpool.execute(new Runnable(){...});

2)通过submit()方法

 1 Future<?> future = threadpool.submit(new Runnable(){...});  
 2     try {  
 3             Object res = future.get();//获取任务执行结果  
 4         } catch (InterruptedException e) {  
 5             // 处理中断异常  
 6             e.printStackTrace();  
 7         } catch (ExecutionException e) {  
 8             // 处理无法执行任务异常  
 9             e.printStackTrace();  
10         }finally{  
11             // 关闭线程池  
12             executor.shutdown();  
13         }

使用submit 方法来提交任务,它会返回一个Future对象,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

 

线程池的关闭:

   shutdown():不会立即终止线程池,而是再也不会接受新的任务,要等所有任务缓存队列中的任务都执行完后才终止
   shutdownNow():立即终止线程池,再也不会接受新的任务,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池本身的状态:

1 volatile int runState;   
2 static final int RUNNING = 0;   //运行状态
3 static final int SHUTDOWN = 1;   //关闭状态
4 static final int STOP = 2;       //停止
5 static final int TERMINATED = 3; //终止,终结

1、当创建线程池后,初始时,线程池处于RUNNING状态;
2、如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕,最后终止;
3、如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务,返回没有执行的任务列表;
4、当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

 

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:设计模式之单例模式

下一篇:TreeMap中文排序,TreeMap倒序输出排列