线程池

2018-08-05 07:45:51来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

一、简介
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

corePoolSize:核心线程池的大小,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到任务数大于等于corePoolSize。

或者是使用prestartAllCoreThreads()或者prestartCoreThread()方法初始化线程,线程池会提前创建并启动所有基本线程。

 maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。

 keepAliveTime:线程池维护线程所允许的空闲时间,默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

unit:参数keepAliveTime的时间单位

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

workQueue:线程池所使用的阻塞队列

ArrayBlockingQueue://是一个基于数组结构的有界阻塞队列,创建时必须指定大小,此队列按 FIFO(先进先出)原则对元素进行排序
LinkedBlockingQueue://一个基于链表结构的无界阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue,默认大小为Integer_Max_Value,静态工厂方法Executors.newFixedThreadPool()使用了这个队列
SynchronousQueue;//一个不存储元素的阻塞队列。直接新建线程来执行任务,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
PriorityBlockingQueue://一个具有优先级的无界阻塞队列。

handler: 线程池对拒绝任务的处理策略

ThreadPoolExecutor.AbortPolicy()
//丢弃任务并抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
//重试添加当前的任务,在 execute 方法的调用线程中运行被拒绝的任务
ThreadPoolExecutor.DiscardOldestPolicy()
//抛弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.DiscardPolicy()
//抛弃任务且不抛异常

二、线程池的工作方式

当一个任务通过execute(Runnable)方法欲添加到线程池时:

public void execute(Runnable command) {
    if (command ==null)
            thrownew NullPointerException();
            intc = ctl.get();

      //1 当前运行的线程数量小于核心线程数量,直接将任务加入worker启动运行。
        if (workerCountOf(c) <corePoolSize) {
            if (addWorker(command,true))
                return;
            c =ctl.get();
        } 
        
        //2 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。
          运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。
          但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务;
        if (isRunning(c) &&workQueue.offer(command)) {
            intrecheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            elseif (workerCountOf(recheck) == 0)
                addWorker(null,false);
        }

         //3 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。
           注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。
           elseif (!addWorker(command,false))
            reject(command);
        }

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

从execute源码中可以知道主要的是addWorker方法,addWorker方法主要做的工作就是新建一个Woker线程,加入到woker集合中,然后启动该线程,那么接下来的重点就是Woker类的run方法了。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
//以上是线程数量的校验与更新逻辑
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成以后会尝试从阻塞队列中获取可执行的任务,如果指定时间内仍然没有任务可以执行,则进入销毁逻辑。(只销毁非核心线程)

public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //task就是Woker构造函数入参指定的任务,即用户提交的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
              processWorkerExit(w, completedAbruptly);
        }
    }

 

三、Executors包含的常用线程池

1.newFixedThreadPool:固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS, 
                                      new LinkedBlockingQueue<Runnable>());  
    }  

FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。

  1. 可控制线程最大并发数(同时执行的线程数)
  2. 超出的线程会在队列中等待

但是,在核心线程池空闲时,即核心线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

(2) newCachedThreadPool:无界线程池

public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }  
  • 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

(3)singleThreadPoll   单线程线程池

大小为1的固定线程池,这个其实就是newFixedThreadPool(1)

它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。

单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

(4)ScheduledThreadPoll  

它的核心线程池固定,非核心线程的数量没有限制,但是闲置时会立即会被回收。

支持定时及周期性任务执行

四、线程池的关闭:

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

 

 

 

 

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:spring cloud eureka 服务端开启密码认证后,客户端无法接入问题

下一篇:69道Spring面试题和答案