关于线程池创建、执行、销毁的原理及分析
目录
- 线程池的执行原理
- 线程执行
- 总结
线程池的执行原理
假设最大核心数是2,非核心线程数为1,队列长度是3
来第一个任务的时候,没有工作线程在工作,需要创建一个
来第二个任务的时候,发现当前核心线程数小于最大核心线程数,所以继续创建线程来处理任务
当来第三个任务的时候,发现当前核心线程数已经等于最大核心线程数了,所以把新来的任务放到taskQueue中
后面来第四个、第五个任务也会放在taskQueue中
当来第六个任务的时候,发现taskQueue已经满了,所以会创建一个非核心线程来处理任务
当来第七个任务的时候,因为线程数量到最大限度了,taskQueue也满了,所以就会走拒绝策略,把其中一个任务给抛弃掉,具体抛弃哪个需要根据选择的拒绝策略来定。
创建线程这里需要考虑并发的问题,即多个任务同时过来了,需要串行创建线程,否则,可能会导致超卖的情况(即创建的线程超过了最大线程数),具体是通过CAS乐观锁实现,代码解释如下:
private boolean addworker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get();//获取当前线程池的控制状态。 int rs = runStateOf(c);//获取当前线程池的运行状态。 // Check if queue empty only if necessary. //下面这个条件判断用于判断线程池是否处于关闭状态,并且任务队列不为空。如果线程池的状态大于等于SHUTDOWN,并且不满足线程池状态为SHUTDOWN、首个任务为null且任务队列为空的条件,则返回false。这个判断是为了确保在线程池关闭时,不再添加新的工作线程。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//获取当前线程池中的工作线程数量。 //这个条件判断用于判断工作线程数量是否达到上限。如果工作线程数量大于等于CAPACITY(工作线程数量的上限)或者大于等于核心线程数(如果core为true)或最大线程数(如果core为false),则返回false。这个判断是为了确保工作线程数量不超过线程池的限制。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //尝试通过CAS(比较并交换)操作增加工作线程数量。如果成功增加工作线程数量,则跳出循环,继续执行后续逻辑。 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 重新读取当前线程池的控制状态。 //再次判断线程池的运行状态是否发生了变化。如果运行状态发生了变化,则继续重试内部循环。这个判断是为了处理在CAS操作过程中,线程池的状态发生了变化的情况。 if (rphpunStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } ...创建线程的逻辑忽略... }
线程执行
花开两朵,各表一枝。
上面说了任务来了之后,是怎么创建线程的,又是怎么暂存任务的。这一节介绍一下线程是怎么执行任务的,以及在不用的时候,线程怎么被销毁或者保活。
线程池创建线程并调了thread的start方法之后,该线程会走到下面的runWorker方法。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeEx编程ecute(wt, task); Throwable thrown = null; try { javascript task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { js thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbrpythonuptly); } }
当一个线程执行完就会自动退出,但是我们知道线程池中的核心线程会一直存活着,想要一直存活,不退出程序就可以了,即循环,从上面的代码也可以看出来确实是这样的。但是还有一个疑问,核心线程是一直存活的,但是非核心线程在一定情况是会销毁的,他们用的是一套代码逻辑,该怎么实现呢?关键点就在getTask这个方法中。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //销毁线程需要满足这两个条件:1. (允许核心线程销毁 || 线程数大于核心线程数)&& 达到了销毁时间;2. 任务队列中没有任务了 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 这里返回了null,外层方法就跳出了while循环,从而结束该线程 return null; continue; } try { // 超时时间就是在这里设置的,如果允许超时销毁,那么就用poll进行拉取任务,超过了keepAliveTime就返回null。take是阻塞性等待 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
从上面的解析可以看出,如果队列中没有任务时,小于核心数的线程(核心线程数不销毁的情况下)会一直阻塞在获取任务的方法,直到返回任务。(判断阻塞时并没有核心线程和非核心线程的概念,只要保证创建出来的线程销毁到符合预期数量就ok)。而且执行完后 会继续循环执行getTask的逻辑,不断的处理任务。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论