聊聊高并發(fā)(十一)實(shí)現(xiàn)幾種自旋鎖(五)
來(lái)源:程序員人生 發(fā)布時(shí)間:2014-11-17 08:07:52 閱讀次數(shù):3594次
在聊聊高并發(fā)(9)實(shí)現(xiàn)幾種自旋鎖(4)中實(shí)現(xiàn)的限時(shí)隊(duì)列鎖是1個(gè)基于鏈表的限時(shí)無(wú)界隊(duì)列鎖,它的tryLock方法支持限時(shí)操作和中斷操作,無(wú)饑餓,保證了先來(lái)先服務(wù)的公平性,在多個(gè)同享狀態(tài)上自旋,是低爭(zhēng)用的。但是它的1個(gè)缺點(diǎn)是犧牲了空間,為了讓線程可以屢次使用鎖,每次Lock的時(shí)候都要new
QNode,并設(shè)置給線程,而不能重復(fù)使用原來(lái)的節(jié)點(diǎn)。
這篇說(shuō)說(shuō)限時(shí)有界隊(duì)列鎖,它采取了有界隊(duì)列,并且和ArrayLock不同,它不限制線程的個(gè)數(shù)。它的特點(diǎn)主要有
1. 采取有界隊(duì)列,減小了空間復(fù)雜度,L把鎖的空間復(fù)雜度在最壞的情況下(有界隊(duì)列長(zhǎng)度為1)是O(L)
2. 非公平,不保證先來(lái)先服務(wù),這也是1個(gè)很常見的需求
3. 由于是有界隊(duì)列,所以在高并發(fā)下存在高爭(zhēng)用,需要結(jié)合回退鎖來(lái)下降爭(zhēng)用
它的實(shí)現(xiàn)思路是:
1. 采取了1個(gè)有界的等待隊(duì)列,等待隊(duì)列的每一個(gè)節(jié)點(diǎn)都有多種狀態(tài),每一個(gè)節(jié)點(diǎn)是可復(fù)用的
2. 采取了1個(gè)工作隊(duì)列,Tail指針指向工作隊(duì)列的隊(duì)尾節(jié)點(diǎn)。獲得和是不是鎖的操作是在工作隊(duì)列中的節(jié)點(diǎn)之間進(jìn)行
3. 由因而限時(shí)隊(duì)列,并支持中斷,所以隊(duì)列中的節(jié)點(diǎn)都是可以退出隊(duì)列的
4. 算法分為3步,第1步是線程從有界的等待隊(duì)列中取得1個(gè)節(jié)點(diǎn),并設(shè)置為WAITING,如果沒(méi)有取得,就自旋
第2步是把這個(gè)節(jié)點(diǎn)加入工作隊(duì)列,并取得前1個(gè)節(jié)點(diǎn)的指針
第3步是在前1個(gè)節(jié)點(diǎn)的狀態(tài)上自旋,直到取得鎖,并把前1個(gè)節(jié)點(diǎn)RELEASED狀態(tài)改成FREE
節(jié)點(diǎn)有4種狀態(tài):
1. FREE: 表示節(jié)點(diǎn)可以被取得。當(dāng)前1個(gè)節(jié)點(diǎn)釋放鎖,并設(shè)置狀態(tài)為RELEASED的時(shí)候,后1個(gè)節(jié)點(diǎn)需要把前1個(gè)節(jié)點(diǎn)設(shè)置為FREE。當(dāng)節(jié)點(diǎn)在沒(méi)有進(jìn)入工作隊(duì)列時(shí)超時(shí),也被設(shè)置為FREE.
2. RELEASED:節(jié)點(diǎn)釋放鎖時(shí)設(shè)置為RELEASED,需要后續(xù)節(jié)點(diǎn)把它設(shè)置為FREE。如果是工作隊(duì)列的最后1個(gè)節(jié)點(diǎn),那末RELEASED狀態(tài)的節(jié)點(diǎn)在第1步時(shí)可被取得
3. WAITING:表示取得了鎖或在工作隊(duì)列中等待鎖。是在第1步中被設(shè)置的,第1步的結(jié)果就是取得1個(gè)狀態(tài)為WAITING的節(jié)點(diǎn)
4. ABORTED:工作隊(duì)列中的節(jié)點(diǎn)超時(shí)或中斷的節(jié)點(diǎn)被設(shè)置為ABORTED。 隊(duì)尾的ABORTED節(jié)點(diǎn)可以被第1步取得,隊(duì)中的ABORTED節(jié)點(diǎn)不能被第1步獲得,只能把它的preNode指針指向它的前1個(gè)節(jié)點(diǎn),表示它自己不能被獲得了
理解節(jié)點(diǎn)這4種狀態(tài)的轉(zhuǎn)變是理解這個(gè)設(shè)計(jì)的關(guān)鍵。這個(gè)設(shè)計(jì)比較復(fù)雜,從篇幅斟酌,這篇只介紹Lock和UnLock操作,下1篇說(shuō)tryLock限時(shí)操作
1. 創(chuàng)建枚舉類型State來(lái)表示狀態(tài)
2. 創(chuàng)建QNode表示節(jié)點(diǎn),使用1個(gè)AtomicReference原子變量指向它的State,以便于支持CAS操作。節(jié)點(diǎn)保護(hù)1個(gè)PreNode援用,只有節(jié)點(diǎn)被Aborted的時(shí)候才設(shè)置這個(gè)援用的值,表示跳過(guò)這個(gè)節(jié)點(diǎn)
3. 1個(gè)有界的QNode隊(duì)列,使用數(shù)組表示
4. MIN_BACKOFF和MAX_BACKOFF支持回退操作,單位是毫秒。這兩個(gè)值依賴于硬件性能,需要通過(guò)不斷測(cè)試來(lái)獲得最優(yōu)值
5. 1個(gè)Random隨機(jī)數(shù),來(lái)產(chǎn)生隨即的數(shù)組下標(biāo),非公平性需要
6. 1個(gè)AtomicStampedReference類型的原子變量作為隊(duì)尾指針tail。AtomicStampedReference采取了版本號(hào)來(lái)避免CAS操作的ABA問(wèn)題。這很重要,由于有界等待隊(duì)列的節(jié)點(diǎn)會(huì)屢次進(jìn)出工作隊(duì)列,所以可能產(chǎn)生同1個(gè)節(jié)點(diǎn)被前1個(gè)線程準(zhǔn)備CAS操作時(shí),已被后幾個(gè)線程進(jìn)出了工作隊(duì)列,致使第1個(gè)線程拿到的QNode的狀態(tài)不正確。
7. lock實(shí)現(xiàn)分為3步,上文已說(shuō)過(guò)了
8. unlock操作就是兩步,第1修改狀態(tài)通知其他線程獲得鎖。第2是設(shè)置自己的節(jié)點(diǎn)援用,以便下次可再次取得鎖而不影響其他線程的狀態(tài)。這里是把線程指向的節(jié)點(diǎn)狀態(tài)設(shè)置為RELEASED,同時(shí)設(shè)置線程的節(jié)點(diǎn)援用為空,這樣其他線程可以繼續(xù)使用這個(gè)節(jié)點(diǎn)。
package com.zc.lock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 限時(shí)有界隊(duì)列鎖,并且直接不限數(shù)量的線程
* 由因而有界的隊(duì)列,所以爭(zhēng)用劇烈,可以復(fù)合回退鎖的概念,減少高爭(zhēng)用
* 分為3步:
* 第1步是獲得1個(gè)State為FREE的節(jié)點(diǎn),設(shè)置為WAITING
* 第2步是把這個(gè)節(jié)點(diǎn)加入隊(duì)列,獲得前1個(gè)節(jié)點(diǎn)
* 第3步是在前1個(gè)節(jié)點(diǎn)上自旋
*
* 優(yōu)點(diǎn)是L個(gè)鎖的空間復(fù)雜度是O(L),而限時(shí)無(wú)界隊(duì)列鎖的空間復(fù)雜度為O(Ln)
* **/
public class CompositeLock implements TryLock{
enum State {FREE, WAITING, RELEASED, ABORTED}
class QNode{
AtomicReference<State> state = new AtomicReference<CompositeLock.State>(State.FREE);
volatile QNode preNode;
}
private final int SIZE = 10;
private final int MIN_BACKOFF = 1;
private final int MAX_BACKOFF = 10;
private Random random = new Random();
// 有界的QNode數(shù)組,表示隊(duì)列總共可使用的節(jié)點(diǎn)數(shù)
private QNode[] waitings = new QNode[10];
// 指向隊(duì)尾節(jié)點(diǎn),使用AtomicStampedReference帶版本號(hào)的原子援用變量,可以避免ABA問(wèn)題,由于這個(gè)算法實(shí)現(xiàn)需要對(duì)同1個(gè)Node屢次進(jìn)出隊(duì)列
private AtomicStampedReference<QNode> tail = new AtomicStampedReference<CompositeLock.QNode>(null, 0);
// 每一個(gè)線程保護(hù)1個(gè)QNode援用
private ThreadLocal<QNode> myNode = new ThreadLocal<CompositeLock.QNode>(){
public QNode initialValue(){
return null;
}
};
public CompositeLock(){
for(int i = 0; i < SIZE; i ++){
waitings[i] = new QNode();
}
}
@Override
public void lock() {
Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
QNode node = waitings[random.nextInt(SIZE)];
// 第1步: 先取得數(shù)組里的1個(gè)Node,并把它的狀態(tài)設(shè)置為WAITING,否則就自旋
GETNODE:
while(true){
while(node.state.get() != State.FREE){
// 由于釋放鎖時(shí)只是設(shè)置了State為RELEASED,由后繼的線程來(lái)設(shè)置RELEASED為FREE
// 如果該節(jié)點(diǎn)已是隊(duì)尾節(jié)點(diǎn)了并且是RELEASED,那末可以直接可以被使用
// 獲得當(dāng)前原子援用變量的版本號(hào)
int[] currentStamp = new int[1];
QNode tailNode = tail.get(currentStamp);
if(tailNode == node && tailNode.state.get() == State.RELEASED){
if(tail.compareAndSet(tailNode, null, currentStamp[0], currentStamp[0] + 1)){
node.state.set(State.WAITING);
break GETNODE;
}
}
}
if(node.state.compareAndSet(State.FREE, State.WAITING)){
break;
}
try {
backoff.backoff();
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted, stop to get the lock");
}
}
// 第2步加入隊(duì)列
int[] currentStamp = new int[1];
QNode preTailNode = null;
do{
preTailNode = tail.get(currentStamp);
}
// 如果沒(méi)加入隊(duì)列,就1直自旋
while(!tail.compareAndSet(preTailNode, node, currentStamp[0], currentStamp[0] + 1));
// 第3步在前1個(gè)節(jié)點(diǎn)自旋,如果前1個(gè)節(jié)點(diǎn)為null,證明是第1個(gè)加入隊(duì)列的節(jié)點(diǎn)
if(preTailNode != null){
// 在前1個(gè)節(jié)點(diǎn)的狀態(tài)自旋
while(preTailNode.state.get() != State.RELEASED){}
// 設(shè)置前1個(gè)節(jié)點(diǎn)的狀態(tài)為FREE,可以被其他線程使用
preTailNode.state.set(State.FREE);
}
// 將線程的myNode指向取得鎖的node
myNode.set(node);
return;
}
@Override
public void unlock() {
QNode node = myNode.get();
node.state.set(State.RELEASED);
myNode.set(null);
}
@Override
public boolean trylock(long time, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
}
采取我們之前的驗(yàn)證鎖正確性的測(cè)試用例來(lái)測(cè)試lock, unlock操作。
package com.zc.lock;
public class Main {
//private static Lock lock = new TimeCost(new ArrayLock(150));
private static Lock lock = new CompositeLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
結(jié)果是順序打印的,證明鎖是正確的,每次只有1個(gè)線程取得了鎖
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)