ThreadPoolExecutor.execute()源碼提供了大量注釋來解釋該方法的設計考慮。下面的源碼來自jdk1.6.0_37
使用這么多if-else就是為了性能考慮,減小鎖的使用范圍,避免execute方法整個執行過程中都持有mainLock鎖。可以看到只有調用addIfUnderCorePoolSize、ensureQueuedTaskHandled、addIfUnderMaximumPoolSize這3個方法才需要持有鎖。如果新提交的任務,不會進入這3個方法,那么就不需要持有鎖。我們來看下,execute方法的設計是否能夠有效地減少進入這3個方法的次數,實現快進快出。
第4行代碼poolSize >= corePoolSize,為什么要加這個判斷呢?
如果通過prestartAllCoreThreads事先啟動了所有核心線程,或者是提交的任務已經讓當前大小poolSize達到了核心大小 corePoolSize,并且核心線程不會死亡(allowCoreThreadTimeOut=false不允許核心線程超時退出,并且任務執行過程沒有拋出異常導致線程退出),那么線程池中的線程數一定不會比corePoolSize小,此后如果再提交新任務,那么就不會進入addIfUnderCorePoolSize方法。poolSize >= corePoolSize就是為了減少進入addIfUnderCorePoolSize函數的次數,減少鎖的獲取和釋放次數。如果poolSize<corePoolSize,那么就會進入addIfUnderCorePoolSize,該方法會在加鎖情況下,判斷線程池的狀態和當前大小,然后決定是否需要新加線程來處理任務。由于addIfUnderCorePoolSize會持有mainLock鎖,所以可以防止其他線程對線程池的并發修改。也就說可以保證:在不需要添加新線程的時候,就不會添加。The call to addIfUnderCorePoolSize rechecks runState and pool size under lock (they change only under lock) so prevents false alarms that would add threads when it shouldn't。源碼中這段注釋,說的就是這個效果。
第5行代碼if (runState == RUNNING && workQueue.offer(command)),如果程序能走到這行代碼,從任務提交者的角度來看,此時線程池大小已經達到了核心大小(雖然事實情況不一定如此,因為沒有加鎖,存在并發修改的可能;而且線程池中的線程也可能會死亡。如果滿足條件:
runState == RUNNING && workQueue.offer(command) 這意味著:線程池仍然處于運行狀態,并且任務排隊已經成功。為什么程序執行到這里依然不能結束,還是需要走之后的代碼呢?因為沒有加鎖,判斷條件不一定可靠。考慮下面2個場景:如果任務剛開始調用offer(還沒有成功地插入到阻塞隊列中),線程池中的線程全部死亡,那么就沒有線程來處理當前提交的這個任務了;如果任務剛剛排隊成功,別的線程調用了shutdownNow()關閉了線程池,那么按照shutdownNow函數的語義,這個任務不應該被處理。由于存在這2種特殊情況,所以必須進行后續的處理。but
may also fail to add them when they should. This is compensated within the following steps.這就是說:addIfUnderCorePoolSize能夠保證不需要新線程的時候就不添加,但是不能保證需要添加新線程的時候就添加。所以讓任務排隊成功的時候,需要在鎖的保護下,判斷是否需要刪除這個任務,或者是否需要新增線程來處理任務。
第6行代碼if (runState != RUNNING || poolSize == 0),如果任務成功排隊(workQueue.offer()返回true)后,線程池被關閉或者沒有存活的線程,那么就需要執行ensureQueuedTaskHandled(command),也就是說這種情況下,任務可能沒有得到合適的處置。如果任務成功排隊后,線程池仍然處在運行狀態,而且有存活的線程,那么就能夠確保該新提交的任務一定會被處理。為什么會這樣呢?我們知道如果想關閉ThreadPoolExecutor,只有3種途徑:調用shutdown方法,調用shutdownNow方法,線程池最后一個工作者退出(對應workerDone方法)。查看源碼我們知道,這3個可能導致線程池關閉的方法,最終都會調用tryTerminate()方法。也就是說如果線程池想要終止,就必須通過該方法。
當線程池的線程池數是0的時候,如果線程池是running或者shutdown狀態,并且任務隊列不為空,那么就會新增1個線程來處理任務;如果線程池是狀態,或者處于shutdown狀態并且任務隊列為空,那么線程池就會退出。也就是說,如果任務排隊成功后,線程池還沒有終止,那么該任務一定會得到合理的處置。
如果在任務插入之前或者插入的過程中,線程池不在運行狀態runState != RUNNING 或者沒有存活的線程poolSize == 0,那么就需要自己考慮下:如何處理該提交的任務,這通過ensureQueuedTaskHandled來完成。如果if (runState == RUNNING && workQueue.offer(command))條件是真,那么隨后if (runState != RUNNING || poolSize == 0)基本上很少出現,大部分場景下都不需要執行ensureQueuedTaskHandled方法,就不需要獲取和釋放鎖。下面我們通過源碼看下,ensureQueuedTaskHandled方法是如何處理異常場景的。
該方法持有mainLock鎖,所以可以防止線程池的并發修改。如果線程池不在running狀態(state != RUNNING ),并且新提交的任務還駐留在任務隊列中(workQueue.remove(command)返回true),那么當前提交的任務會被拒絕執行(調用reject(comma)方法)。也就是說,只要線程池不是running狀態,那么就一定會拒絕執行當前提交的任務,除非該任務已經被線程池中的線程處理了(workQueue.remove返回false)。這里不區分到底是shutdown()關閉的線程池,還是通過shutdownNow關閉的線程池。如果新提交一個任務,并且執行流程進入了ensureQueuedTaskHandled()函數,那么該任務可能會被拒絕執行,也可能會被正常執行。如果線程池是running或者shutdown狀態(state < STOP),并且線程池中已經沒有存活的線程,并且任務隊列非空,那么就需要新加1個線程,來處理等待執行的任務。
可以看到:通過多次無鎖的條件判斷,能夠有效地減少提交任務時對mainLock鎖的競爭;也能夠確在并發執行的情況下,當前新提交的任務和線程池中等待執行的任務,都能得到合適的處理。不知道大師Doug Lea是如何想到這么巧妙的設計和實現execute()方法的!雖然execute方法能夠提高性能,但是犧牲了代碼的可讀性和簡易性。對于并發程序,還是那句經典的老話make it right before you make it faster。