TPL Part 4 -- Task的協同
來源:程序員人生 發布時間:2015-06-23 08:11:38 閱讀次數:2446次
簡單的Continuation
Task.ContinueWith(Task): 當指定的Task履行終了時。
void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
});
root Task.ContinueWith((Task previousTask)=>{
Console.WriteLine("continute task completed");
});
rootTask.Start();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
Task.ContinueWhenAll(Task[]):當指定的所有Task都履行終了時,示例代碼:
Task continuation = Task.Factory.ContinueWhenAll<int>(tasks, antecedents =>{
foreach(Task<int> t in antecedents) {
// dosomething
}
});
TaskFactory.ContinueWhenAny(Task[]):當指定的所有Task的任意1個履行終了時,代碼與ContinueWhenAll類似(以下代碼中,打印出前1個Task的履行時間):
Task continuation = Task.Factory.ContinueWhenAny<int>(tasks,
(Task<int>antecedent) => {
//write out a message using the antecedent result
Console.WriteLine("The first task slept for {0} milliseconds",
antecedent.Result);
});
Continue 選項
OnlyOnRanToCompletion僅當履行完
NotOnRanToCompletion:沒有履行完(被取消或出現異常)
OnlyOnFaulted:僅當出現異常
NotOnFaulted:沒有出現異常
OnlyOnCancelled:僅當被取消
NotOnCancelled:沒有被取消
處理異常
void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
rootTask.ContinueWith((Task previousTask)=>{
Console.WriteLine("even root throw exception , I still run");
});
rootTask.Start();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
以上代碼中,第1個task中拋出了異常,Continue的Task依然會繼續履行。可是Task被Finalized時異常就會拋出。
解決方案:
void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
var t2 = rootTask.ContinueWith((Task previousTask)=>{
//
if(previousTask.Status== TaskStatus.Faulted){
throw previousTask.Exception.InnerException;
}
Console.WriteLine("even root throw exception , I still run");
});
rootTask.Start();
try{
t2.Wait();
}
catch(AggregateException ex){
ex.Handle(inner=>{Console.WriteLine("exception handled in main thread"); return true;});
}
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在Task中冒泡拋出異常,在主線程中等待最后那個Task的履行并對AggregateException進行處理。
創建子Task
創建子Task并附加在父Task上:
void Main()
{
Task parentTask = new Task(() => {
Console.WriteLine("parent task started");
//create the first child task
Task childTask = new Task(() => {
// writeout a message and wait
Console.WriteLine("Child task running");
Thread.Sleep(1000);
Console.WriteLine("Child task throwed exception");
throw new Exception();
} ,TaskCreationOptions.AttachedToParent);
Console.WriteLine("start child task...");
childTask.Start();
Console.WriteLine("parent task ended");
});
// startthe parent task
parentTask.Start();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
1. 父Task會拋出子Task中的異常
2. 父Task的狀態會遭到所附加的子Task狀態的影響
Barrier的使用
class BankAccount {
public int Balance {
get;
set;
}
} ;
void Main()
{
//create the array of bank accounts
BankAccount[] accounts = new BankAccount[6];
for(int i = 0;i < accounts.Length; i++) {
accounts[i] = new BankAccount();
}
//create the total balance counter
int totalBalance = 0;
//create the barrier
Barrier barrier = new Barrier(3, (myBarrier) => {
// zerothe balance
totalBalance= 0;
// sumthe account totals
foreach(BankAccount account in accounts) {
totalBalance+= account.Balance;
}
// writeout the balance
Console.WriteLine("[From barrier :] Total balance: {0}",totalBalance);
});
//define the tasks array
Task[] tasks = new Task[3];
// loopto create the tasks
for(int i = 0;i < tasks.Length; i++) {
tasks[i]= new Task((stateObj) => {
//create a typed reference to the account
BankAccount account = (BankAccount)stateObj;
// startof phase
Random rnd = new Random();
for(int j = 0;j < 1000; j++) {
account.Balance+= 2;
}
Thread.Sleep(new Random().Next(3000));
Console.WriteLine("Task {0} waiting, phase {1} ",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
account.Balance-= 1000;
Console.WriteLine("barrier finished .");
// endof phase
Console.WriteLine("Task {0}, phase {1} ended",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
},
accounts[i]);
}
// startthe task
foreach(Task t in tasks) {
t.Start();
}
// waitfor all of the tasks to complete
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在以上代碼中,打開了3個barrier和3個Task,在Task中為每一個賬戶添加2000,然后給barrier發出同步信號,當barrier收到3個信號時,對賬號進行求和并保存;當barrier完成邏輯后,控制權交給了每一個Task,此時每一個Task對account減1000,再次求和,最后結果為3000。
如果希望通過Cancel來控制barrier的行動,還可以在barrier中傳入tokenSource.Token:barrier.SignalAndWait(tokenSource.Token);并在Task中履行Cancel:tokenSource.Cancel()。
可以通過調用barrier.RemoveParticipant();來減少barrier的count。
CountEventDown
作用和Barrier類似,累計信號數量,當信號量到達指定數量,set event。
void Main()
{
CountdownEvent cdevent = new CountdownEvent(5);
//create a Random that we will use to generate
// sleepintervals
Random rnd = new Random();
//create 5 tasks, each of which will wait for
// arandom period and then signal the event
Task[] tasks = new Task[6];
for(int i = 0;i < tasks.Length; i++) {
//create the new task
tasks[i]= new Task(() => {
// putthe task to sleep for a random period
// up toone second
Thread.Sleep(rnd.Next(500, 1000));
//signal the event
Console.WriteLine("Task {0} signalling event",Task.CurrentId);
cdevent.Signal();
});
};
//create the final task, which will rendezous with the other 5
// usingthe count down event
tasks[5] = new Task(()=> {
// waiton the event
Console.WriteLine("Rendezvous task waiting");
cdevent.Wait();
Console.WriteLine("CountDownEvent has been set");
});
// startthe tasks
foreach(Task t in tasks) {
t.Start();
}
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在以上代碼中,開啟了5個Task和1個count為5的CountDownEvent對象,每一個Task中完成任務后分別對CountDownEvent發信號,當湊齊5個信號后,會打印出CountDownEvent has been set。
ManualResetEvent 和 AutoResetEvent
熟習.net之前版本的應當都對它們很熟習,用于在多線程環境中完成線程同步。區分在于,前者必須調用reset才能恢覆信號;而AutoResetEvent則會自動reset。在此不再贅述。
SemaphoreSlim
void Main()
{
SemaphoreSlim semaphore = new SemaphoreSlim(3);
//create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();
//create and start the task that will wait on the event
for(int i = 0;i < 10; i++) {
Task.Factory.StartNew((obj)=> {
semaphore.Wait(tokenSource.Token);
// printout a message when we are released
Console.WriteLine("Task {0} released", obj);
},i,tokenSource.Token);
}
//create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loopwhile the task has not been cancelled
while(!tokenSource.Token.IsCancellationRequested) {
// go tosleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
//signal the semaphore
semaphore.Release(3);
Console.WriteLine("Semaphore released");
}
// if wereach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
},tokenSource.Token);
// askthe user to press return before we cancel
// thetoken and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
//cancel the token source and wait for the tasks
tokenSource.Cancel();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在以上代碼中,new了1個SemaphoreSlim對象并傳入3,開了10個Task線程,每當有信號從Semaphore傳來時,打印Task[i]被release。同時開1個信號線程,每500毫秒release3個Task。
可見,Semaphore的作用主要是可以選擇1次release多少個Task。
Producer / Consumer(生產者/消費者模式)
以下代碼中,new了1個BlockingCollection,類型為Deposit。開了3個生產者Task,每一個生產者中創建20個Deposit對象并給Amount賦值為100。在主線程中等待生產者Task履行終了,調用blockingCollection.CompleteAdding()方法。以后開1個消費者Task用于操作賬戶對象,循環判斷blockingCollection.IsCompleted屬性(生產者是不是完成工作),從集合拿出存款對象,增加賬戶余額。
示例代碼:
class BankAccount {
public int Balance {
get;
set;
}
}
class Deposit {
public int Amount {
get;
set;
}
}
void Main()
{
BlockingCollection<Deposit> blockingCollection
= new BlockingCollection<Deposit>();
var producers = new List<Task>();
for(int i = 0;i < 3; i++) {
var producer = Task.Factory.StartNew((obj) => {
//create a series of deposits
for(int j = 0;j < 20; j++) {
//create the transfer
var randAmount = new Random().Next(100);
Deposit deposit = new Deposit { Amount = randAmount};
Thread.Sleep(newRandom().Next(200));
// placethe transfer in the collection
blockingCollection.Add(deposit);
Console.WriteLine(string.Format("Amount: {0} deposit Processed, index: {1}",randAmount, int.Parse(obj.ToString()) +j));
}
}, i*20);
producers.Add(producer);
};
//create a many to one continuation that will signal
// theend of production to the consumer
Task.Factory.ContinueWhenAll(producers.ToArray(),antecedents => {
//signal that production has ended
Console.WriteLine("Signalling production end");
blockingCollection.CompleteAdding();
});
//create a bank account
BankAccount account = new BankAccount();
//create the consumer, which will update
// thebalance based on the deposits
Task consumer = Task.Factory.StartNew(() => {
while(!blockingCollection.IsCompleted) {
Deposit deposit;
// tryto take the next item
if(blockingCollection.TryTake(outdeposit)) {
//update the balance with the transfer amount
account.Balance+= deposit.Amount;
}
}
// printout the final balance
Console.WriteLine("Final Balance: {0}", account.Balance);
});
// waitfor the consumer to finish
consumer.Wait();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈