線程池(一)
來源:程序員人生 發(fā)布時(shí)間:2016-07-14 14:42:23 閱讀次數(shù):5910次
甚么是線程池?為何要使用線程池?
其實(shí)線程池也是類似的概念,線程池中總有那末幾個(gè)活躍的線程,當(dāng)你需要的時(shí)候可以從線程池里隨意拿來1個(gè)空閑線程,當(dāng)完成工作時(shí)其實(shí)不著急關(guān)閉線程,而是返回給線程池,方便其他人使用。
這里我們肯定還有疑問為何我們傳統(tǒng)的創(chuàng)建自定義線程有甚么問題?問題就是雖然線程是1種輕量級(jí)的工具,但它的創(chuàng)建和關(guān)閉都需要花費(fèi)時(shí)間,如我們?cè)?span style="color:#ff0000">程序中隨便的創(chuàng)建線程而不加控制其數(shù)量,反而會(huì)耗盡cpu和內(nèi)存資源。即使沒有outofmemory異常,大量的回收線程也會(huì)致使GC停頓的時(shí)間延長(zhǎng)。所以我們實(shí)際中可以優(yōu)先斟酌使用線程池對(duì)線程進(jìn)行控制和管理,更加有效的公道的使用線程進(jìn)行提高程序的性能。
jdk對(duì)線程池的支持
Jdk提供了1套Executor框架,核心成員以下圖:
其中ThreadPoolExecutor是1個(gè)線程池,Executors扮演著1個(gè)線程工廠的角色,通過Executors類可以獲得1個(gè)具有特定功能的線程池。通過UML圖我們可以看到ThreadPoolExecutor實(shí)現(xiàn)Executor接口通過這個(gè)接口,任何Runable對(duì)象都可以被ThreadPoolExecutor線程池調(diào)度。
Executors主要提供以下工廠方法:
static ExecutorService newCachedThreadPool()
static ExecutorService newFixedThreadPool(int nThreads)
static ExecutorService newSingleThreadExecutor()
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
static ScheduledExecutorService newSingleThreadScheduledExecutor()
newCachedThreadPool()方法:該方法返回1個(gè)會(huì)根據(jù)實(shí)際情況進(jìn)行調(diào)劑的線程池,線程數(shù)量不肯定。但如果有空閑的線程可復(fù)用,則優(yōu)先選擇可復(fù)用的線程。若當(dāng)先線程都在工作,同時(shí)又有新的任務(wù)提交,則會(huì)創(chuàng)建新的線程來處理。所有線程在當(dāng)前任務(wù)完成的時(shí)候,將返回線程池進(jìn)行復(fù)用。
newFixedThreadPool(int nThreads)方法:該方法返回1個(gè)具有固定數(shù)量的線程的線程池。該線程池中的線程數(shù)量始終不變。當(dāng)1個(gè)任務(wù)提交時(shí),若有空閑線程則履行,若沒有,會(huì)被交給1個(gè)任務(wù)隊(duì)列,當(dāng)有空閑線程的時(shí)候,就會(huì)處理任務(wù)隊(duì)里的任務(wù)。
newScheduleThreadPool(int corePoolSize)方法:該方法會(huì)返回1個(gè)ScheduledExecutorService對(duì)象。ScheduledExecutorService接口在ExecutorService接口的基礎(chǔ)上擴(kuò)大了在給定時(shí)間履行某任務(wù)的功能,如某個(gè)固定時(shí)間后開始履行,或周期性的履行某個(gè)任務(wù)。
newSingleThreadScheduleExecutor();也返回1個(gè)ScheduledExecutorService對(duì)象,不過線程池只有1個(gè)線程。
下面簡(jiǎn)單演示下newFixedThreadPool(int nThreads)的簡(jiǎn)單使用:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 簡(jiǎn)單展現(xiàn)newFixedThreadPool
* @author zdm
*
*/
public class ThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":ThreadId:"
+ Thread.currentThread().getId());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
for(int i = 0; i < 10; i++) {
es.execute(new MyTask());
}
}
}
運(yùn)行結(jié)果:
有運(yùn)行結(jié)果可以知道具有5個(gè)線程的線程池的把10個(gè)任務(wù)分兩批完成,前5個(gè)和后5個(gè)任務(wù)恰好相差了1秒,因而可知上面程序符合newFixedThreadPool產(chǎn)的線程池的行動(dòng)。
有時(shí)間計(jì)劃的任務(wù):
newScheduleThreadPool(int corePoolSize)返回1個(gè)ScheduleExecutorService對(duì)象,可以根據(jù)時(shí)間需要對(duì)線程進(jìn)行調(diào)度,它的主要方法以下:
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
//創(chuàng)建并履行在給定延遲后啟用的1次性操作。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
//創(chuàng)建并履行1個(gè)在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期;也就是將在 initialDelay 后開始履行,然后在 initialDelay+period 后履行,接著在 initialDelay + 2 * period 后履行,依此類推。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
//創(chuàng)建并履行1個(gè)在給定初始延遲后首次啟用的定期操作,隨后,在每次履行終止和下1次履行開始之間都存在給定的延遲。
與其他線程池不同,ScheduledExecutorService其實(shí)不1定會(huì)立即安排履行任務(wù),它會(huì)在指定的時(shí)間,對(duì)任務(wù)進(jìn)行調(diào)度,起到的計(jì)劃任務(wù)的作用。
這里需要注意的就是scheduleAtFixedRate和scheduleWithFixedDelay這兩個(gè)方法都是對(duì)任務(wù)進(jìn)行周期性的調(diào)度,但是又有1點(diǎn)不同。
對(duì)FixedRate的方式來講,任務(wù)調(diào)度的頻率是1定的,它是以上1個(gè)任務(wù)開始履行的時(shí)間為出發(fā)點(diǎn),以后的period時(shí)間,調(diào)度下1次任務(wù)。而FixedDealy則是上1個(gè)任務(wù)結(jié)束后,在經(jīng)過delay對(duì)下1次任務(wù)進(jìn)行調(diào)度。
如果還有疑問,我們可以官方的文檔對(duì)它兩的解釋:
scheduleAtFixRate(Runnable command, long initialDealy, long period, TimeUnit unit):
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and
so on.
翻譯:創(chuàng)建1個(gè)周期性的任務(wù),任務(wù)開始于指定的初始延遲,后續(xù)的任務(wù)依照給定的周期履行:后續(xù)第1個(gè)任務(wù)將會(huì)在initialDelay+period,下1個(gè)任務(wù)將在initialDelay+2period履行。
scheduleWithFixedDelay(Runnable command, long initialDealy, long delay, TimeUnit unit):
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next. If any execution of the task encounters an
exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
翻譯:創(chuàng)建1個(gè)周期性的任務(wù),任務(wù)開始于指定的初始延遲,后續(xù)的任務(wù)依照給定的延遲履行,即上1個(gè)任務(wù)結(jié)束的時(shí)間到下1個(gè)任務(wù)開始時(shí)間的時(shí)間差。
下面可以簡(jiǎn)單演示下ScheduledExecutorService的scheduleAtFixedRate的方法:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* ScheduledExecutorService的簡(jiǎn)單示例
* @author zdm
*
*/
public class ScheduleExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}
運(yùn)行結(jié)果:
這個(gè)任務(wù)履行使用1秒,周期為2秒,也就是2秒履行1次任務(wù),恰好運(yùn)行結(jié)果符合我們的預(yù)期目標(biāo)。
但是這里有1個(gè)問題就是如果任務(wù)的履行時(shí)間大于調(diào)度周期的時(shí)間會(huì)產(chǎn)生怎樣辦?這里我們將TimeUnit.SECONDS.sleep(5)嘗試1下
會(huì)發(fā)現(xiàn)任務(wù)的周期調(diào)度變成了5秒~~~
如果采取scheduledWithFixedDelay()調(diào)用會(huì)依照修改任務(wù)履行需要5秒,延遲為2秒,那末任務(wù)的實(shí)際間隔為7秒。
這里還需要注意1個(gè)問題:那就是調(diào)度程序其實(shí)不會(huì)無窮期的延續(xù)等待。如果任務(wù)本身產(chǎn)生了異常,那末后續(xù)的子任務(wù)都會(huì)停止調(diào)用。所以需要對(duì)異常進(jìn)行及時(shí)的處理,以保證周期性任務(wù)的穩(wěn)定性。
線程池的內(nèi)部實(shí)現(xiàn)
對(duì)核心的那幾個(gè)線程池,雖然看上去功能各不相同其實(shí)內(nèi)部都是使用了ThreadPoolExecutor實(shí)現(xiàn):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從上面我們可以發(fā)現(xiàn)它們都是ThreadPoolExecutor的封裝,為何功能如此強(qiáng)大?我們可以看1下ThreadPoolExecutor的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
方法的參數(shù)含義以下:
corePoolSize:表示線程中的線程數(shù)量。
maximuumPoolSize:表示線程池中最大的線程數(shù)量。
keepAliveTime:當(dāng)線程數(shù)量超過corePoolSize時(shí),過剩的空閑線程的存活時(shí)間
unit:表示存活時(shí)間單位
workQueue:任務(wù)隊(duì)列,被提交但卻沒有沒履行的任務(wù)。
threadFactory:線程工廠,用于創(chuàng)建線程,1般用默許的便可。
handler:謝絕策略,當(dāng)任務(wù)太多來不及處理時(shí),如何謝絕。
其中的參數(shù)workQueue是1個(gè)BlockingQueue<Runnable>接口,它有幾種不同功能的子類隊(duì)列:

先認(rèn)識(shí)1下Blocking:
阻塞隊(duì)列,顧名思義,首先它是1個(gè)隊(duì)列,而1個(gè)隊(duì)列在數(shù)據(jù)結(jié)構(gòu)中所起的作用大致以下圖所示:
從上圖我們可以很清楚看到,通過1個(gè)同享的隊(duì)列,可使得數(shù)據(jù)由隊(duì)列的1端輸入,從另外1端輸出;
經(jīng)常使用的隊(duì)列主要有以下兩種:(固然通過不同的實(shí)現(xiàn)方式,還可以延伸出很多不同類型的隊(duì)列,DelayQueue就是其中的1種)
先進(jìn)先出(FIFO):先插入的隊(duì)列的元素也最早出隊(duì)列,類似于排隊(duì)的功能。從某種程度上來講這類隊(duì)列也體現(xiàn)了1種公平性。
落后先出(LIFO):后插入隊(duì)列的元素最早出隊(duì)列,這類隊(duì)列優(yōu)先處理最近產(chǎn)生的事件。
多線程環(huán)境中,通過隊(duì)列可以很容易實(shí)現(xiàn)數(shù)據(jù)同享,比如經(jīng)典的“生產(chǎn)者”和“消費(fèi)者”模型中,通過隊(duì)列可以很便利地實(shí)現(xiàn)二者之間的數(shù)據(jù)同享。假定我們有若干生產(chǎn)者線程,另外又有若干個(gè)消費(fèi)者線程。如果生產(chǎn)者線程需要把準(zhǔn)備好的數(shù)據(jù)同享給消費(fèi)者線程,利用隊(duì)列的方式來傳遞數(shù)據(jù),就能夠很方便地解決他們之間的數(shù)據(jù)同享問題。但如果生產(chǎn)者和消費(fèi)者在某個(gè)時(shí)間段內(nèi),萬1產(chǎn)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費(fèi)者消費(fèi)的速度,并且當(dāng)生產(chǎn)出來的數(shù)據(jù)積累到1定程度的時(shí)候,那末生產(chǎn)者必須暫停等待1下(阻塞生產(chǎn)者線程),以便等待消費(fèi)者線程把積累的數(shù)據(jù)處理終了,反之亦然。但是,在concurrent包發(fā)布之前,在多線程環(huán)境下,我們每一個(gè)程序員都必須去自己控制這些細(xì)節(jié),特別還要統(tǒng)籌效力和線程安全,而這會(huì)給我們的程序帶來不小的復(fù)雜度。好在此時(shí),強(qiáng)大的concurrent包橫空出世了,而他也給我們帶來了強(qiáng)大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞,在某些情況下會(huì)掛起線程(即阻塞),1旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒)
下面兩幅圖演示了BlockingQueue的兩個(gè)常見阻塞場(chǎng)景:

如上圖所示:當(dāng)隊(duì)列中沒有數(shù)據(jù)的情況下,消費(fèi)者真?zhèn)€所有線程都會(huì)被自動(dòng)阻塞(掛起),直到有數(shù)據(jù)放入隊(duì)列。

如上圖所示:當(dāng)隊(duì)列中填滿數(shù)據(jù)的情況下,生產(chǎn)者真?zhèn)€所有線程都會(huì)被自動(dòng)阻塞(掛起),直到隊(duì)列中有空的位置,線程被自動(dòng)喚醒。
這也是我們?cè)诙嗑€程環(huán)境下,為何需要BlockingQueue的緣由。作為BlockingQueue的使用者,我們不再需要關(guān)心甚么時(shí)候需要阻塞線程,甚么時(shí)候需要喚醒線程,由于這1切BlockingQueue都給你1手包辦了。既然BlockingQueue如此神通廣大,讓我們1起來見識(shí)下它的經(jīng)常使用方法:
BlockingQueue的核心方法:
放入數(shù)據(jù):
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當(dāng)前履行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設(shè)定等待的時(shí)間,如果在指定的時(shí)間內(nèi),還不能往隊(duì)列中加入BlockingQueue,則返回失敗。
put(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續(xù).
獲得數(shù)據(jù):
poll(time):取走BlockingQueue里排在首位的對(duì)象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null;
poll(long timeout, TimeUnit unit):從BlockingQueue取出1個(gè)隊(duì)首的對(duì)象,如果在指定時(shí)間內(nèi),隊(duì)列1旦有數(shù)據(jù)可取,則立即返回隊(duì)列中的數(shù)據(jù)。否則知道時(shí)間超時(shí)還沒有數(shù)據(jù)可取,返回失敗。
take():取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
drainTo():1次性從BlockingQueue獲得所有可用的數(shù)據(jù)對(duì)象(還可以指定獲得數(shù)據(jù)的個(gè)數(shù)),通過該方法,可以提升獲得數(shù)據(jù)效力;不需要屢次分批加鎖或釋放鎖。
BlockingQueue成員詳細(xì)介紹
1. ArrayBlockingQueue
基于數(shù)組的阻塞隊(duì)列實(shí)現(xiàn),在ArrayBlockingQueue內(nèi)部,保護(hù)了1個(gè)定長(zhǎng)數(shù)組,以便緩存隊(duì)列中的數(shù)據(jù)對(duì)象,這是1個(gè)經(jīng)常使用的阻塞隊(duì)列,除1個(gè)定長(zhǎng)數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個(gè)整形變量,分別標(biāo)識(shí)著隊(duì)列的頭部和尾部在數(shù)組中的位置。
ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲得數(shù)據(jù),都是共用同1個(gè)鎖對(duì)象,由此也意味著二者沒法真正并行運(yùn)行,這點(diǎn)特別不同于LinkedBlockingQueue;依照實(shí)現(xiàn)原理來分析,ArrayBlockingQueue完全可以采取分離鎖,從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行。Doug
Lea之所以沒這樣去做,或許是由于ArrayBlockingQueue的數(shù)據(jù)寫入和獲得操作已足夠輕巧,以致于引入獨(dú)立的鎖機(jī)制,除給代碼帶來額外的復(fù)雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有1個(gè)明顯的不同的地方在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或燒毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成1個(gè)額外的Node對(duì)象。這在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對(duì)GC的影響還是存在1定的區(qū)分。而在創(chuàng)建ArrayBlockingQueue時(shí),我們還可以控制對(duì)象的內(nèi)部鎖是不是采取公平鎖,默許采取非公平鎖。
2.
LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列,同ArrayListBlockingQueue類似,其內(nèi)部也保持著1個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由1個(gè)鏈表構(gòu)成),當(dāng)生產(chǎn)者往隊(duì)列中放入1個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲得數(shù)據(jù),并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)到達(dá)最大值緩存容量時(shí)(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值),才會(huì)阻塞生產(chǎn)者隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉1份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之對(duì)消費(fèi)者這真?zhèn)€處理也基于一樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還由于其對(duì)生產(chǎn)者端和消費(fèi)者端分別采取了獨(dú)立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高全部隊(duì)列的并發(fā)性能。
作為開發(fā)者,我們需要注意的是,如果構(gòu)造1個(gè)LinkedBlockingQueue對(duì)象,而沒有指定其容量大小,LinkedBlockingQueue會(huì)默許1個(gè)類似無窮大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度1旦大于消費(fèi)者的速度,或許還沒有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個(gè)最普通也是最經(jīng)常使用的阻塞隊(duì)列,1般情況下,在處理多線程間的生產(chǎn)者消費(fèi)者問題,使用這兩個(gè)類足以。
3.
DelayQueue
DelayQueue中的元素只有當(dāng)其指定的延遲時(shí)間到了,才能夠從隊(duì)列中獲得到該元素。DelayQueue是1個(gè)沒有大小限制的隊(duì)列,因此往隊(duì)列中插入數(shù)據(jù)的操作(生產(chǎn)者)永久不會(huì)被阻塞,而只有獲得數(shù)據(jù)的操作(消費(fèi)者)才會(huì)被阻塞。
使用處景:
DelayQueue使用處景較少,但都相當(dāng)奇妙,常見的例子比如使用1個(gè)DelayQueue來管理1個(gè)超時(shí)未響應(yīng)的連接隊(duì)列。
4.
PriorityBlockingQueue
基于優(yōu)先級(jí)的阻塞隊(duì)列(優(yōu)先級(jí)的判斷通過構(gòu)造函數(shù)傳入的Compator對(duì)象來決定),但需要注意的是PriorityBlockingQueue其實(shí)不會(huì)阻塞數(shù)據(jù)生產(chǎn)者,而只會(huì)在沒有可消費(fèi)的數(shù)據(jù)時(shí),阻塞數(shù)據(jù)的消費(fèi)者。因此使用的時(shí)候要特別注意,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對(duì)不能快于消費(fèi)者消費(fèi)數(shù)據(jù)的速度,否則時(shí)間1長(zhǎng),會(huì)終究耗盡所有的可用堆內(nèi)存空間。在實(shí)現(xiàn)PriorityBlockingQueue時(shí),內(nèi)部控制線程同步的鎖采取的是公平鎖。
5. SynchronousQueue
1種無緩沖的等待隊(duì)列,類似于無中介的直接交易,有點(diǎn)像原始社會(huì)中的生產(chǎn)者和消費(fèi)者,生產(chǎn)者拿著產(chǎn)品去集市銷售給產(chǎn)品的終究消費(fèi)者,而消費(fèi)者必須親身去集市找到所要商品的直接生產(chǎn)者,如果1方?jīng)]有找到適合的目標(biāo),那末對(duì)不起,大家都在集市等待。相對(duì)有緩沖的BlockingQueue來講,少了1個(gè)中間經(jīng)銷商的環(huán)節(jié)(緩沖區(qū)),如果有經(jīng)銷商,生產(chǎn)者直接把產(chǎn)品批發(fā)給經(jīng)銷商,而無需在乎經(jīng)銷商終究會(huì)將這些產(chǎn)品賣給那些消費(fèi)者,由于經(jīng)銷商可以庫(kù)存1部份商品,因此相對(duì)直接交易模式,整體來講采取中間經(jīng)銷商的模式會(huì)吞吐量高1些(可以批量買賣);但另外一方面,又由于經(jīng)銷商的引入,使得產(chǎn)品從生產(chǎn)者到消費(fèi)者中間增加了額外的交易環(huán)節(jié),單個(gè)產(chǎn)品的及時(shí)響應(yīng)性能可能會(huì)下降。
聲明1個(gè)SynchronousQueue有兩種不同的方式,它們之間有著不太1樣的行動(dòng)。公平模式和非公平模式的區(qū)分:
如果采取公平模式:SynchronousQueue會(huì)采取公平鎖,并配合1個(gè)FIFO隊(duì)列來阻塞過剩的生產(chǎn)者和消費(fèi)者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue默許):SynchronousQueue采取非公平鎖,同時(shí)配合1個(gè)LIFO隊(duì)列來管理過剩的生產(chǎn)者和消費(fèi)者,而后1種模式,如果生產(chǎn)者和消費(fèi)者的處理速度有差距,則很容易出現(xiàn)饑渴的情況,便可能有某些生產(chǎn)者或是消費(fèi)者的數(shù)據(jù)永久都得不到處理。所以在使用SynchronousQueue時(shí)會(huì)設(shè)置很大的maximumPoolSize,而否則會(huì)很容易履行謝絕策略。
- 小結(jié)
BlockingQueue不光實(shí)現(xiàn)了1個(gè)完全隊(duì)列所具有的基本功能,同時(shí)在多線程環(huán)境下,他還自動(dòng)管理了多線間的自動(dòng)等待于喚醒功能,從而使得程序員可以疏忽這些細(xì)節(jié),關(guān)注更高級(jí)的功能。
由此可以知道,在使用newCachedThreadPool時(shí),當(dāng)提交的任務(wù)過量時(shí),沒有空閑的線程,使用SynchronousQueue,它是直接提交任務(wù)的隊(duì)列,從而迫使線程池創(chuàng)建新的線程來處理任務(wù)。當(dāng)任務(wù)履行完成,它會(huì)被指定的時(shí)間內(nèi)被回收。由于maximumPoolSize=0。所以當(dāng)有大量任務(wù)提交,而任務(wù)的處理又不是很快的情況下,會(huì)致使系統(tǒng)資源的耗盡。
使用newFixedThreadPool和newSingleThreadExecutor時(shí)應(yīng)當(dāng)注意無界的LinkedBlockingQueue的增長(zhǎng)。
下面給出線程池的核心調(diào)度任務(wù)的代碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
代碼第5行的workerCountOf()方法獲得當(dāng)前線程池中的線程總數(shù),當(dāng)線程總數(shù)小于corePoolSize時(shí),會(huì)將任務(wù)通過方法addWorker()直接調(diào)度。否則workQueue.offer()進(jìn)入任務(wù)隊(duì)列,如果進(jìn)入任務(wù)隊(duì)列失敗(有界隊(duì)列到達(dá)上限或使用SynchronousQueue),則會(huì)履行17行,將任務(wù)提交到線程池,當(dāng)前線程數(shù)到達(dá)maximumPoolSize,則提交失敗,使用謝絕策略,未到達(dá),則分配線程履行。
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)