上次我們使用AutoResetEvent實現了一個生產/消費者隊列。這一次我們要使用Wait和Pulse方法來實現一個更強大的版本,它允許多個消費者,每一個消費者都在自己的線程中運行。
我們使用數組來跟蹤線程。
Thread[] _workers;
通過跟蹤線程可以讓我們在所有的線程都結束后再結束我們的隊列任務。
每一個消費者線程都執行一個叫做Consume的方法,在一個for循環中,我們可以創建和啟動線程。例如:
Public delegate void Action();
為了表示一系列的任務,我們使用Queue<T> 集合,例如:
Queue<Action> _itemQ = new Queue<Action>();
在我們調用生產(EnqueueItem)和消費(Consume)方法前,還是完整的看一看代碼吧:
public void Shutdown(bool waitForWorkers)
{
//為每一個線程插入一個null item,可以是每一個worker 退出
foreach (Thread worker in _workers)
EnqueueItem(null);
//等待所有的線程退出。
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action item)
{
lock (_locker)
{
_itemQ.Enqueue(item);
Monitor.Pulse(_locker); //通知等待隊列中的線程
}
}
void Consume()
{
while (true)
{
Action item;
lock (_locker)
{
while (_itemQ.Count == 0)
{
Monitor.Wait(_locker); //釋放鎖,并阻止當前線程,直到其他線程發送pulse信號。 }
item = _itemQ.Dequeue();
}
if (item == null) return; //退出的信號
item();
}
}
}
下面是Main方法。使用兩個consumer線程,然后讓這兩個consumers執行10個委托。
for (int i = 0; i < 10; i++)
{
int itemNumber = i;
q.EnqueueItem(() =>
{
Thread.Sleep(1000); //模擬耗時的工作
Console.WriteLine(" Task " + itemNumber);
});
}
q.Shutdown(true); //等待關閉
Console.WriteLine();
Console.WriteLine("Workers complete!");
}
下面讓我們細致的看一看EnqueueItem方法:
因為我們插入了一個新的任務,我們必須修改阻塞條件,也就是調用pulse方法,來喚醒調用了wait方法的線程。
出于對效率的考慮,當插入一個Item的時候使用Pulse來代替PulseAll方法,因為大部分時候每一個Item只需要一個consumer來執行。如果你有一個冰淇淋,你不可能叫30個睡眠的孩子都起來吃它,同樣,對于一個item,同時喚醒30個consumers一點好處都沒有。
讓我們再看看Consumer方法。
我們希望當沒什么事情做的時候,線程阻塞就可以了,換句話說,隊列中沒有item的時候,線程就應該阻塞。因此我們的阻塞條件是_itemQ.Count ==0;
if (item == null) return; //退出的信號
item();
Wait Timeouts
在調用Wait方法的時候可以傳遞一個毫秒或Timespan的時間來設置超時。如果Wait超時了,那么Wait方法就會返回false。
帶有超時功能的Wait方法的主要步驟:
釋放鎖。
阻塞 直到 pulsed 或者超時。
重新獲取鎖。
超時就好像CLR 在超時到了的時候自動的調用了 pulse方法一樣。
下面是使用超時的Wait的主要代碼:
lock(_locker)
while(<阻塞條件>)
Monitor.Wait(_locker,<超時時間>);
Monitor.Wait 方法返回一個bool值來代表是調用了pulse還是已經超時了。
如果是true: 代表調用了pulse。
如果是false:代表超時了。
這對記錄日志很有用。
新聞熱點
疑難解答