1、競爭條件
1.甚么是競爭條件?
兩個或多個進程讀寫某些同享數據,而最后的結果取決于進程運行的精確時序,稱為競爭條件。
2.怎樣避免競爭條件?
要避免這類毛病,關鍵是找出某種途徑來禁止多個進程同時讀寫同享的數據。換言之,我們需要互斥!即以某種手段確保當1個進程在使用1個同享變量或文件時,其他進程不能做一樣的操作。
3.甚么叫臨界區(qū)?
我們把對同享內存進行訪問的程序片斷稱作臨界區(qū)域或臨界區(qū)。使得兩個進程不可能同時處于臨界區(qū)中,就可以夠避免競爭條件。
4.1個好的避免出現競爭條件的解決方案,需要滿足以下4個條件:
a).任何兩個進程不能同時處于臨界區(qū)。
b).不應當對CPU的速度和數量做任何假定。
c).臨界區(qū)外運行的進程不得阻塞其他進程。
d).不得使進程無窮期等待進入臨界區(qū)。
2、互斥方式
2.1.忙等待的互斥
1).屏蔽中斷
在單處理器系統(tǒng)中,最簡單的辦法是使每一個進程在剛剛進入臨界區(qū)后立即屏蔽所有中斷,并在將要離開之前再打開中斷。屏蔽中斷后,時鐘中斷也被屏蔽。CPU只有在產生時鐘中斷或其他中斷時才會進行進程切換。所以,在屏蔽中斷后CPU將不會被切換到其他進程。因而,1旦某個進程屏蔽中斷后,它就能夠檢查和修改內存,而沒必要擔心其他進程參與。
優(yōu)點:足夠簡單、明了。
缺點:把屏蔽中斷的權利交給用戶進程是不明智的。假想1下,如果1個用戶進程屏蔽中斷后不再打開,其結果將會如何?
只能適用于單CPU系統(tǒng)。如果系統(tǒng)是多處理器,則屏蔽中斷僅僅對履行disable指令的那個CPU有效,其他CPU仍將繼續(xù)運行并切換進程,而且可以訪問同享內存。
2).鎖變量
這是1種軟件解決方案。假想有1個同享(鎖)變量,其初始值為0,當1個進程想進入其臨界區(qū)時,它首先測試這把鎖。如果該鎖的值為0,則該進程將其設置為1并進入臨界區(qū)。若這把鎖的值已為1,則該進程將等待直到其值變成0。
疏漏:假定1個進程讀出鎖變量的值并發(fā)現它為0,而恰好在它將其值設置為1之前,另外一個進程被調度運行,將該鎖變量設置為1。當第1個進程再次運行時,它一樣也將該鎖設置為1,并進入臨界區(qū)。此時同時有兩個進程進入臨界區(qū)。所以這類方案不可行!
3).嚴格輪換法
設置1個整型變量turn,初始值為0,用于記錄輪到哪一個進程進入臨界區(qū),并檢查或更新同享內存。開始時,進程0檢查turn,發(fā)現其值為0,因而進入臨界區(qū)。進程1也發(fā)現其值為0,所以在1個等待循環(huán)中不停地測試turn,看其值什么時候變成1,等到其值變成1,則進入臨界區(qū)。
這類方式可用下面的兩段代碼描寫:
進程0:
while(TRUE){
while (turn != 0);
critical_region();
turn = 1;
noncritical_region();
}
進程1:
while(TRUE){
while (turn != 1);
critical_region();
turn = 1;
noncritical_region();
}
可以看到,進程0在離開臨界區(qū)前,它將turn的值設置為1,以便允許進程1進入臨界區(qū)。
優(yōu)點:簡單、有效;
缺點:性能較低,CPU的利用率不高,由于觸及到忙等待的問題。連續(xù)測試1個變量直到某個值出現為止,稱為忙等待。只有在有理由認為等待時間是非常短的情況下,才能使用忙等待。用于忙等待的鎖,稱為自旋鎖。
試著斟酌這樣1種情況:假定turn的值為0,此時進程0先進入臨界區(qū),現在進程0很快就履行完其全部循環(huán),它退出臨界區(qū),并將trun的值設置為1。此時,turn的值為1,兩個進程都在其臨界區(qū)外履行。突然,進程0結束了非臨界區(qū)的操作,并且嘗試進入臨界區(qū)。但是,此時它沒法進入臨界區(qū),由于turn的值為1,而此時進程1還在忙于履行非臨界區(qū)的操作,進程0只有繼續(xù)while循環(huán),直到進程1把turn的值改成0。這說明在1個進程比另外1個進程慢了很多的情況下,輪番進入臨界區(qū)其實不是1個好辦法。這類情況違背了前面敘述的條件3:進程0被1個臨界區(qū)以外的進程阻塞。
4).Peterson算法
這類方法是由兩個ANSI C編寫的進程:
#define FALSE 0
#define TRUE 1
#define N 2
int turn;
int interested[N];
void enter_region(int process)
{
int other;
other = 1 - process;
interested[process] = TRUE;
turn = process;
while(turn == process && interested[other] == TRUE);
}
void leave_region(int process)
{
interested[process] = FALSE;
}
看看它是如何工作的:1開始,沒有任何進程處于臨界區(qū)中,現在進程0調用enter_region(),并傳入自己的進程ID,它通過設置其數組元素和將turn設置為0來標識希望進入臨界區(qū)。由于進程1其實不想進入臨界區(qū),所以enter_region()很快便返回。如果進程1現在調用enter_region(),進程1將在此處掛起直到interested[0]變成FALSE,該事件只有在進程0調用leave_region()退出臨界區(qū)時才會產生。
再斟酌兩個進程幾近同時調用enter_region()的情況,他們都將自己的進程ID寫入到了turn變量,但只有后被保存進入的進程ID才有效,假定進程1是后存入ID的,則turn的值為1,。當兩個進程都運行到while循環(huán)的時候,進程0將循環(huán)0次并進入臨界區(qū),因此時turn為1,process為0,雖然interested[1]為TRUE;而進程1將不停地循環(huán)且不能進入臨界區(qū),因turn的值為1,process也為1,切interested[1]為TRUE。
缺點:while循環(huán)會致使進程處于忙等待的狀態(tài),直到條件滿足可以進入臨界區(qū)為止。
5).TSL指令
這是1種需要硬件支持的方案。1般存在這樣1條指令:TSL RX,LOCK 稱為測試并加鎖(Test and Set Lock)指令,它將1個內存字lock讀取到寄存器RX中,然后再該內存地址上存儲1個非零值。讀字和寫字操作保證是不可分割的原子操作。履行TSL指令的CPU將鎖住內存總線,以制止其他CPU在本指令結束之前訪問內存。當LOCK的值為0時,任何進程都可使用TSL指令將其值設置為1,并讀寫同享內存。當操作結束,進程用1條普通的move指令將LOCK的值重新設置為0。
enter_region:
TSL RX,LOCK ;復制鎖變量到寄存器RX,并將鎖變量設置為1
CMP RX,#0 ;鎖變量是0嗎?
JNE enter_region ;若不是零,說明已被設置,所以跳轉到enter_region開始循環(huán)履行
RET ;若是零,返回調用者,可以進入臨界區(qū)
leave_region:
MOVE LOCK,#0 ;將0存入鎖變量
RET ;返回調用者,離開臨界區(qū)
缺點:進程在進入臨界區(qū)前,先調用enter_region,這將致使忙等待,直到鎖空閑為止。與基于臨界區(qū)問題的所有解法1樣,進程必須在正確的時間調用enter_region和leave_region,解法才能見效。如果1個進程有訛詐行動,則互斥將會失敗。
6).XCHG指令
該指令是1個可替換TSL的指令。它原子性的交換了兩個位置的內容。用它實現互斥的方法以下:
enter_region:
MOVE RX,#1 ;在寄存器中放入1個1
XCHG RX,LOCK ;將LOCK和RX的值交換
CMP RX,#0 ;判斷RX的值是不是為0?
JNE enter_region ;如果不為0,說明已被設置,跳轉到enter_region循環(huán)履行
RET ;如果為0,則返回調用者,可進入臨界區(qū)
leave_region:
MOVE LOCK,#0 ;將LOCK設置為0
RET ;返回調用者,離開臨界區(qū)
缺點:和TSL指令1樣,存在忙等待的問題。
7).總結
Peterson解法、TSL和XCHG解法都是正確的,但它們都有忙等待的缺點。這些解法在本質上是這樣的:當1個進程想進入臨界區(qū)時,先檢查是不是允許進入,若不允許,則進程將原地等待,直到允許為止。
這類方法不但浪費了CPU時間,而且還可能引發(fā)料想不到的結果。斟酌1臺計算機具有兩個進程,H優(yōu)先級較高,L優(yōu)先級較低。調度規(guī)則規(guī)定,只要H處于就緒態(tài)它就能夠運行。在某1時刻,L處于臨界區(qū)中,此時H變到就緒態(tài),準備運行?,F在H開始忙等待,由于L此時處于臨界區(qū)中。但由于H就緒時L不會被調度,也就沒法離開臨界區(qū),所以H將永久忙等待下去。這類情況有時候被稱為優(yōu)先級反轉問題。
2.2.尋覓更加高效的解決方案
從上文的總結可以看出,我們得尋覓1種更加高效的解決方案:當1個進程沒法進入臨界區(qū)時,將其阻塞,而不是忙等待,也許是個不錯的方案。最簡單的實現方案就是sleep和wakeup系統(tǒng)調用。sleep是1個將引發(fā)調用進程阻塞的系統(tǒng)調用,即被掛起,直到另外一個進程將其喚醒。wakeup調用有1個參數,即要被喚醒的進程。
作為使用這個方案的1個例子,我們斟酌生產者和消費者問題:兩個進程同享1個公共的固定大小的緩沖區(qū),其中1個是生產者,將信息放入緩沖區(qū);另外一個是消費者,從緩沖區(qū)中取出信息。當緩沖區(qū)為空時,消費者應當阻塞,直到生產者生產了1份數據并喚醒它;當緩沖區(qū)滿了時,生產者應當阻塞,直到消費者消費了1份數據并重新喚醒它。
這個例子可以大致表述為以下的代碼:
#define MAXN 100 //緩沖區(qū)的容量
int count = 0; //緩沖區(qū)中的數據項數量
void producer(void)
{
int item;
while(TRUE)
{
item = produce_item(); //產生1項新數據
if(count == N) sleep(); //緩沖區(qū)已滿了,生產者阻塞
insert_item(item);
count = count + 1;
if(count == 1) wakeup(consumer); //緩沖區(qū)空嗎?如果不空,則喚醒消費者
}
}
void consumer(void)
{
int item;
while(TRUE)
{
if(count == 0) sleep(); //緩沖區(qū)空嗎?如果為空,消費者阻塞
item = remove_item();
count = count - 1;
if(count == N - 1) wakeup(producer);//緩沖區(qū)還滿嗎?如果不滿了,喚醒生產者
consume_item(item);
}
}
上述的代碼是存在競爭條件的,其緣由是對count的訪問未加任何限制。有可能出現以下情況:緩沖區(qū)為空,消費者進程剛剛讀取count的值發(fā)現它為0。此時調度程序決定暫停消費者并啟動運行生產者進程。生產者向緩沖區(qū)中加入1個數據項,count加1,?,F在count的值為1,它推斷由于剛才count的值為0,所以消費者此時1定在睡眠,因而生產者調用wakeup來喚醒消費者。但是,消費者此時在邏輯上并未睡眠,只不過是被調度程序轉移到了就緒隊列中,以等待下1次運行,所以wakeup信號丟失。當消費者下次運行時,它將測試先前讀到的count值,發(fā)現它為0,因而睡眠。生產者早晚會填滿全部緩沖區(qū),然后睡眠,這樣1來,兩個進程都將永久睡眠下去。
問題的實質在于發(fā)給1個(尚)未睡眠的進程的wakeup信號丟失了。如果它沒有丟失,則1切都很正常。解決它的方法就是使用信號量。
2.3.信號量
信號量,它使用1個整型變量來累計喚醒次數,供以后使用。它可以為0,表示沒有保存下來的喚醒操作,也能夠為正值,表示1個或多個喚醒操作。
對信號量有兩種操作:down和up,它們分別對應1般化后的sleep和wakeup操作。
對1個信號量履行down操作,則是檢測其值是不是大于0,若大于0,則將其值減1(即用掉1個保存的喚醒信號)并繼續(xù);若其值為0,則進程將睡眠,而此時down操作并未結束。1個信號量的down和up操作都是原子操作。所謂原子操作,是指1組相干聯(lián)的操作要末都不中斷的履行,要末都不履行。
up操作對信號量的值增加1.如果1個或多個進程在該信號量上睡眠,沒法完成1個先前的down操作,則由系統(tǒng)選擇其中1個(如隨機挑選)并允許該進程完成它的down操作。因而,對1個有進程在其上睡眠的信號量履行1次up操作后,該信號量的值仍舊是0,但在其上睡眠的進程卻少了1個。
如果使用多個CPU,則每一個信號量應當由1個鎖變量進行保護,以確保同1時刻只有1個CPU在對信號量進行操作。
試著用信號量來解決生產者和消費者問題:
#define N 100 //緩沖區(qū)最大容量
typedef int Semaphore;
Semaphore mutex = 1; //提供互斥,控制對臨界區(qū)的訪問(2元信號量實現互斥)
Semaphore empty = N;
Semaphore full = 0;
void producer(void)
{
int item;
while(TRUE)
{
item = produce_item();
down(&empty);
down(&mutex);
insert_item(item);
up(&mutex);
up(&full);
}
}
void consumer(void)
{
int item;
while(TRUE)
{
down(&full);
down(&mutex);
item = remove_item();
up(&mutex);
up(&empty);
consume_item(item);
}
}
在上述的示例中,我們用信號量實現了兩種操作:互斥和同步。mutex就是1個互斥信號量,它相當于1個鎖變量,每一個進程在進入臨界區(qū)前履行1個down操作,比在剛剛退出時履行1個up操作,就可以實現互斥了。信號量full和empty用來保證某種事件的順序產生和不產生,例如當緩沖區(qū)滿時生產者停止,和緩沖區(qū)空時消費者停止。
2.4.互斥量
如果不需要信號量的計數能力,有時可使用信號量的1個簡化版本,稱為互斥量(mutex)。它僅僅適用于管理同享資源或1小段代碼。它實現簡單并有效,因此在實現用戶空間線程包時非常有用。
互斥量可以處于兩種狀態(tài):解鎖和加鎖。它有兩個進程,當1個進程(線程)需要訪問臨界區(qū)時,它調用mutex_lock,如果該互斥量當前是解鎖的,此調用成功,調用線程可以自由進入該臨界區(qū)。如果該互斥量已加鎖,調用線程被阻塞,直到在臨界區(qū)中的線程完成工作并調用mutext_unlock。此時如果有多個線程阻塞在該互斥量上,將隨機選擇1個線程并允許它取得鎖。
如果有可用的TSL或XCHG指令,要在用戶空間實現互斥量就很簡單了。
mutex_lock:
TSL RX,MUTEX
CMP RX,$0 //互斥量是0嗎?
JE ok //如果是0,則跳轉到ok處,此時加鎖成功,可以進入臨界區(qū)
CALL thread_yield //如果不是0,將自己阻塞掛起,并讓出CPU以調度另外一個線程
JMP mutex_lock //稍后再試
ok:
RET //返回調用者,進入臨界區(qū)
mutex_unlock:
MOVE MUTEX,#0
RET
mutex_lock與enter_region的代碼很類似,但有1個關鍵的區(qū)分:當enter_region進入臨界區(qū)失敗時,它始終重復測試鎖(忙等待)。實際上,如果是在進程中,由于時鐘超時作用,會調度其他進程運行,這樣早晚具有鎖的進程會進入臨界區(qū)運行并釋放鎖,而忙等待的進程會因此而取得鎖。而在用戶線程中,情況有所不同,由于沒有時鐘停止運行時間太長的線程,會致使通過忙等待的方式來試圖取得鎖的線程將永久循環(huán)下去,決不會得到鎖,由于這個試圖取得鎖的線程不會讓其他線程運行從而釋放鎖。
2.5.Pthread中的互斥
Pthread提供許多可以用來同步線程的函數。其基本機制是使用1個可以被鎖定和解鎖的互斥量來保護每一個臨界區(qū)。它提供了創(chuàng)建和撤消互斥量的方法pthread_mutex_init和pthread_mutex_destroy,也能夠通過pthread_mutex_lock給互斥量加鎖,如果該互斥量已被加鎖時,則會阻塞調用者。還有1個調用可以用來嘗試鎖住1個互斥量,當互斥量已被加鎖時會返回毛病代碼而不是阻塞調用者,這個調用是pthread_mutex_trylock。最后,pthread_mutex_unlock用來給1個互斥量解鎖,并在1個或多個線程等待它的情況下正確的釋放1個線程?;コ饬恳材軌蛴袑傩?,但是這些屬性只有在某些特殊的場合下使用。
除互斥量以外,pthread還提供了另外一種同步機制:條件變量。互斥量在允許或阻塞對臨界區(qū)的訪問上是很有用的,條件變量則允許線程由于1些未到達的條件而阻塞。絕大部份情況下這兩種方法是1起使用的。這里的條件變量實際上類似于信號量。它也有專門的調用來創(chuàng)建和撤消,也能夠有屬性。最重要的兩個操作是pthread_cond_wait和pthread_cond_signal。前者阻塞調用線程直到另外一其他線程向它發(fā)送信號。被阻塞的線程常常是在等待發(fā)信號的線程去做某些工作、釋放某些資源或是進行其他的1些操作,只有完成后被阻塞的線程才可以繼續(xù)運行。當多個線程被阻塞并等待同1個信號時,可使用pthread_cond_broadcast調用去喚醒它們所有。
條件變量與互斥量常常1起使用。這類模式用于讓1個線程鎖住1個互斥量,然后當它不能取得它期待的結果時等待1個條件變量。最后另外一個線程會向它發(fā)送信號,使它可以繼續(xù)運行。pthread_cond_wait方法原子性的調用并解鎖它持有的互斥量,由于這個緣由,互斥量是它的參數之1。
值得1說的是,條件變量不同于信號量,雖然它們很類似。條件變量(不像信號量)不會存在于內存中。如果將1個信號發(fā)送給1個沒有線程在等待的條件變量,那末這個信號就會丟失。即是說條件變量不是計數器,也不能像信號量那樣累計信號以便以后使用。所以,如果向1個條件變量發(fā)送信號,但是在該條件變量上并沒有等待的進程,則該信號會永久丟失。
斟酌實現1個生產者和消費者問題:
#include <stdio.h>
#include <pthread.h>
#define MAX 10 //生產的數據數量
pthread_mutex_t the_mutex; //互斥鎖
pthread_cond_t condC, condP; //消費者和生產者的條件變量
int buff = 0;
void *producer(void *ptr)
{
for (int i = 1; i <= MAX; ++i)
{
printf("生產者線程第【%d】次運行.\n",i);
pthread_mutex_lock(&the_mutex);
while(buff != 0)
{
printf("生產者線程睡眠.\n");
pthread_cond_wait(&condP, &the_mutex); //如果緩沖區(qū)中還有數據,則睡眠并等待消費者的喚醒信號
}
buff = i;
pthread_cond_signal(&condC); //通知消費者消費產品
pthread_mutex_unlock(&the_mutex);
}
printf("生產者線程運行終了!\n");
pthread_exit(NULL);
return NULL;
}
void *consumer(void *ptr)
{
for (int i = 1; i <= MAX; ++i)
{
printf("消費者線程第【%d】次運行.\n",i);
pthread_mutex_lock(&the_mutex);
while(buff == 0)
{
printf("消費者線程睡眠.\n");
pthread_cond_wait(&condC, &the_mutex); //如果沒有可消費的產品,則睡眠并等待生產者的喚醒信號
}
printf("%d\n", buff);
buff = 0;
pthread_cond_signal(&condP); //通知生產者生產產品
pthread_mutex_unlock(&the_mutex);
}
printf("消費者線程運行終了!\n");
pthread_exit(NULL);
return NULL;
}
int main(int argc, char const *argv[])
{
pthread_t pro, con;
pthread_mutex_init(&the_mutex, NULL);
pthread_cond_init(&condC, NULL);
pthread_cond_init(&condC, NULL);
pthread_create(&con, NULL, consumer, NULL);
pthread_create(&pro, NULL, producer, NULL);
pthread_join(con, NULL);
pthread_join(pro, NULL);
pthread_cond_destroy(&condC);
pthread_cond_destroy(&condP);
pthread_mutex_destroy(&the_mutex);
return 0;
}