最近寫了個小程序用到了C#4.0中的線程安全集合。想起很久以前用C#2.0開發的時候寫后臺windows服務,為了利用多線程實現生產者和消費者模型,經常要封裝一些線程安全的容器,比如泛型隊列和字典等等。下面就結合部分MS的源碼和自己的開發經驗淺顯地分析一下如何實現線程安全容器以及實現線程安全容器容易產生的問題。
在C#早期版本中已經實現了線程安全的ArrayList,可以通過下面的方式構造線程安全的數組列表:
var array = ArrayList.Synchronized(new ArrayList());
我們從Synchronized方法入手,分析它的源代碼看是如何實現線程安全的:
Synchronized /// <summary>Returns an <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</summary>
/// <returns>An <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</returns>
/// <param name="list">The <see cref="T:System.Collections.ArrayList" /> to synchronize. </param>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="list" /> is null. </exception>
/// <filterpriority>2</filterpriority>
[HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
public static ArrayList Synchronized(ArrayList list)
{
if (list == null)
{
throw new ArgumentNullException("list");
}
return new ArrayList.SyncArrayList(list);
}
繼續跟進去,發現SyncArrayList是一個繼承自ArrayList的私有類,內部線程安全方法的實現經過分析,很多都是像下面這樣lock(注意是lock_root對象而不是數組列表實例對象)一下完事:
lock (this._root)
有心的你可以查看SyncArrayList的源碼:
SyncArrayList [Serializable]
private class SyncArrayList : ArrayList
{
private ArrayList _list;
private object _root;
public override int Capacity
{
get
{
int capacity;
lock (this._root)
{
capacity = this._list.Capacity;
}
return capacity;
}
set
{
lock (this._root)
{
this._list.Capacity = value;
}
}
}
public override int Count
{
get
{
int count;
lock (this._root)
{
count = this._list.Count;
}
return count;
}
}
public override bool IsReadOnly
{
get
{
return this._list.IsReadOnly;
}
}
public override bool IsFixedSize
{
get
{
return this._list.IsFixedSize;
}
}
public override bool IsSynchronized
{
get
{
return true;
}
}
public override object this[int index]
{
get
{
object result;
lock (this._root)
{
result = this._list[index];
}
return result;
}
set
{
lock (this._root)
{
this._list[index] = value;
}
}
}
public override object SyncRoot
{
get
{
return this._root;
}
}
internal SyncArrayList(ArrayList list)
: base(false)
{
this._list = list;
this._root = list.SyncRoot;
}
public override int Add(object value)
{
int result;
lock (this._root)
{
result = this._list.Add(value);
}
return result;
}
public override void AddRange(ICollection c)
{
lock (this._root)
{
this._list.AddRange(c);
}
}
public override int BinarySearch(object value)
{
int result;
lock (this._root)
{
result = this._list.BinarySearch(value);
}
return result;
}
public override int BinarySearch(object value, IComparer comparer)
{
int result;
lock (this._root)
{
result = this._list.BinarySearch(value, comparer);
}
return result;
}
public override int BinarySearch(int index, int count, object value, IComparer comparer)
{
int result;
lock (this._root)
{
result = this._list.BinarySearch(index, count, value, comparer);
}
return result;
}
public override void Clear()
{
lock (this._root)
{
this._list.Clear();
}
}
public override object Clone()
{
object result;
lock (this._root)
{
result = new ArrayList.SyncArrayList((ArrayList)this._list.Clone());
}
return result;
}
public override bool Contains(object item)
{
bool result;
lock (this._root)
{
result = this._list.Contains(item);
}
return result;
}
public override void CopyTo(Array array)
{
lock (this._root)
{
this._list.CopyTo(array);
}
}
public override void CopyTo(Array array, int index)
{
lock (this._root)
{
this._list.CopyTo(array, index);
}
}
public override void CopyTo(int index, Array array, int arrayIndex, int count)
{
lock (this._root)
{
this._list.CopyTo(index, array, arrayIndex, count);
}
}
public override IEnumerator GetEnumerator()
{
IEnumerator enumerator;
lock (this._root)
{
enumerator = this._list.GetEnumerator();
}
return enumerator;
}
public override IEnumerator GetEnumerator(int index, int count)
{
IEnumerator enumerator;
lock (this._root)
{
enumerator = this._list.GetEnumerator(index, count);
}
return enumerator;
}
public override int IndexOf(object value)
{
int result;
lock (this._root)
{
result = this._list.IndexOf(value);
}
return result;
}
public override int IndexOf(object value, int startIndex)
{
int result;
lock (this._root)
{
result = this._list.IndexOf(value, startIndex);
}
return result;
}
public override int IndexOf(object value, int startIndex, int count)
{
int result;
lock (this._root)
{
result = this._list.IndexOf(value, startIndex, count);
}
return result;
}
public override void Insert(int index, object value)
{
lock (this._root)
{
this._list.Insert(index, value);
}
}
public override void InsertRange(int index, ICollection c)
{
lock (this._root)
{
this._list.InsertRange(index, c);
}
}
public override int LastIndexOf(object value)
{
int result;
lock (this._root)
{
result = this._list.LastIndexOf(value);
}
return result;
}
public override int LastIndexOf(object value, int startIndex)
{
int result;
lock (this._root)
{
result = this._list.LastIndexOf(value, startIndex);
}
return result;
}
public override int LastIndexOf(object value, int startIndex, int count)
{
int result;
lock (this._root)
{
result = this._list.LastIndexOf(value, startIndex, count);
}
return result;
}
public override void Remove(object value)
{
lock (this._root)
{
this._list.Remove(value);
}
}
public override void RemoveAt(int index)
{
lock (this._root)
{
this._list.RemoveAt(index);
}
}
public override void RemoveRange(int index, int count)
{
lock (this._root)
{
this._list.RemoveRange(index, count);
}
}
public override void Reverse(int index, int count)
{
lock (this._root)
{
this._list.Reverse(index, count);
}
}
public override void SetRange(int index, ICollection c)
{
lock (this._root)
{
this._list.SetRange(index, c);
}
}
public override ArrayList GetRange(int index, int count)
{
ArrayList range;
lock (this._root)
{
range = this._list.GetRange(index, count);
}
return range;
}
public override void Sort()
{
lock (this._root)
{
this._list.Sort();
}
}
public override void Sort(IComparer comparer)
{
lock (this._root)
{
this._list.Sort(comparer);
}
}
public override void Sort(int index, int count, IComparer comparer)
{
lock (this._root)
{
this._list.Sort(index, count, comparer);
}
}
public override object[] ToArray()
{
object[] result;
lock (this._root)
{
result = this._list.ToArray();
}
return result;
}
public override Array ToArray(Type type)
{
Array result;
lock (this._root)
{
result = this._list.ToArray(type);
}
return result;
}
public override void TrimToSize()
{
lock (this._root)
{
this._list.TrimToSize();
}
}
}
同樣,在C#早期版本中實現了線程安全的Hashtable,它也是早期開發中經常用到的緩存容器,可以通過下面的方式構造線程安全的哈希表:
var ht = Hashtable.Synchronized(new Hashtable());
同樣地,我們從Synchronized方法入手,分析它的源代碼看是如何實現線程安全的:
lock (this._table.SyncRoot)
貼一下SyncHashtable的源碼:
從上面的實現分析來說,封裝一個線程安全的容器看起來并不是什么難事,除了對線程安全容器的異常處理心有余悸,其他的似乎按步就班就可以了,不是嗎?也許還有更高明的實現吧?
在4.0中,多了一個System.Collections.Concurrent命名空間,懷著忐忑的心情查看C#4.0其中的一個線程安全集合ConcurrentQueue的源碼,發現它繼承自IProducerConsumerCollection<T>, IEnumerable<T>, ICollection, IEnumerable接口,內部實現線程安全的時候,通過SpinWait和通過互鎖構造(Interlocked)及SpinWait封裝的Segment,間接實現了線程安全。Segment的實現比較復雜,和線程安全密切相關的方法就是TryXXX那幾個方法,源碼如下:
和ArrayList以及Hashtable線程安全的“曲折”實現有點不同,ConcurrentQueue<T>一開始就是朝著線程安全方向實現去的。它沒有使用lock,因為大家知道使用lock性能略差,對于讀和寫操作,應該分開,不能一概而論。ConcurrentQueue<T>具體實現在性能和異常處理上應該已經考慮的更全面周到一點。
在我看來,ConcurrentQueue<T>線程安全的具體實現有多吸引人在其次,IProducerConsumerCollection<T>接口的抽象和提取非常值得稱道,查看源碼發現ConcurrentStack<T>和ConcurrentBag<T>也繼承自該接口。<<CLR via C#>>一書中在談到接口和抽象類的時候特別舉了集合和流(Stream)的例子,微軟為什么如此設計,想起來果然很有深意。
對于線程安全的泛型字典ConcurrentDictionary<TKey, TValue>,我們也可以查看它的源碼看它的具體實現方式??丛创a有1200多行,實現稍微復雜一些。我們僅從最簡單的TryAdd方法分析:
四、如法炮制
如果讓我來構造實現線程安全容器,最簡單直接快速高效的方式就是參考ArrayList和 Hashtable,我們完全可以模仿它們的處理方式,通過繼承一個容器,然后內部通過lock一個SyncRoot對象,中規中矩地實現framework中其他容器的線程安全。比如要實現線程安全的泛型隊列Queue<T>,貼一下大致的偽代碼:
private Queue<T> queue = null;
private object syncRoot = null;
internal object SyncRoot
{
get
{
return syncRoot;
}
}
#endregion
#region constructors
public SyncQueue()
{
syncRoot = new object();
queue = new Queue<T>();
}
public SyncQueue(IEnumerable<T> collection)
{
syncRoot = new object();
queue = new Queue<T>(collection);
}
public SyncQueue(int capacity)
{
syncRoot = new object();
queue = new Queue<T>(capacity);
}
#endregion
#region methods
public new void Enqueue(T item)
{
lock (SyncRoot)
{
this.Enqueue(item);
}
}
public new T Dequeue()
{
T result = default(T);
lock (SyncRoot)
{
result = this.queue.Dequeue();
}
return result;
}
public new void Clear()
{
lock (SyncRoot)
{
this.queue.Clear();
}
}
public new bool Contains(T item)
{
var exists = false;
lock (SyncRoot)
{
exists = this.queue.Contains(item);
}
return exists;
}
#endregion
}
你可能覺得上面這樣不動腦的方式似乎很傻很天真,但這絕對是一種正常人都能想到的思路,誰讓MS的數組列表和哈希表就是這么實現的呢?
當然,我們還能想到的一種常見實現方式就是通過組合而不是類繼承,實現的偽代碼類似下面這樣:
private Queue<T> queue = null;
private object syncRoot = null;
internal object SyncRoot
{
get
{
return syncRoot;
}
}
#endregion
#region constructors
public SyncQueue()
{
syncRoot = new object();
queue = new Queue<T>();
}
public SyncQueue(IEnumerable<T> collection)
{
syncRoot = new object();
queue = new Queue<T>(collection);
}
public SyncQueue(int capacity)
{
syncRoot = new object();
queue = new Queue<T>(capacity);
}
#endregion
#region methods
public void Enqueue(T item)
{
lock (SyncRoot)
{
this.Enqueue(item);
}
}
public T Dequeue()
{
T result = default(T);
lock (SyncRoot)
{
result = this.queue.Dequeue();
}
return result;
}
public void Clear()
{
lock (SyncRoot)
{
this.queue.Clear();
}
}
public bool Contains(T item)
{
var exists = false;
lock (SyncRoot)
{
exists = this.queue.Contains(item);
}
return exists;
}
#endregion
}
到這里,我們至少可以分析得出,實現一般的線程安全容器的思路至少有兩種:類繼承(內部實現偏向使用組合)和(或)組合,線程安全的地方只要通過framework的同步構造如lock、Interlocked等實現即可。
思考:如果讓您實現線程安全容器,您優先會怎么實現呢?
CacheUtil緩存實現的偽代碼如下:
public static bool TryAdd(object key, object value)
{
ht[key] = value; //set方法是線程安全的
return true;
}
public static bool TryGet(object key, out object result)
{
result = null;
lock (ht.SyncRoot)
{
if (ht.ContainsKey(key))
{
result = ht[key];
}
}
return true;
}
}
從代碼中可以看出來,哈希表中的Value存放的是IList類型,那么值所保存的應該是一個引用(也就是指針)。
(1)、當線程1通過索引器得到這個IList時,這個TryGet讀取操作是線程安全的。接著線程1進行的操作是列表遍歷。在foreach進行遍歷不為空的List的時候,遍歷的其實是存放在IList指針指向的引用。
(2)、在foreach遍歷集合的時候,這時候線程2如果正好對哈希表的key所對應的Value進行修改,IList的指針所指向的引用改變了,所以線程1的遍歷操作就會拋出異常。
這是一個簡單而又經典的陷阱,在哈希表的MSDN線程安全塊有一段說明:
Enumerating through a collection is intrinsically not a thread safe procedure. Even when a collection is synchronized, other threads can still modify the collection, which causes the enumerator to throw an exception. To guarantee thread safety during enumeration, you can either lock the collection during the entire enumeration or catch the exceptions resulting from changes made by other threads.
列表通過索引取值,一個簡單的示例代碼如下:
按照類似于1中的分析,GetFirstOrDefault應該可以分為下面兩步:
(1)線程1取數據,判斷list.Count的時候發現列表內有1個元素,這一步線程安全,沒有任何問題,然后準備返回索引為0的元素;
(2)線程2在線程1將要取索引為0的元素之前移除了列表中的唯一元素或者直接將list指向null,這樣線程1通過索引取元素就拋出異常了。
從上面的兩個示例,我們得知通常所看到的線程安全實際上并不一定安全。不安全的主要原因就是容器內的數據很容易被其他線程改變,或者可以簡要概括為:一段時間差引發的血案。實際上,我們平時所做的業務系統,歸根結底很多bug或者隱藏的缺陷都是由不起眼的一小段時間差引起的。
保證容器內的數據和操作都安全,一種簡單而有效的方法就是將你所要進行的操作進行“事務”處理。比如示例1中哈希表的Value的遍歷操作,通常情況下,我們分作兩步:
(1)、(安全地)讀取數據
(2)、(不安全地)遍歷;
為了達到遍歷操作不拋出異常,我們可以把兩步合并為一步,抽象出一個線程安全的新方法TryGetAndEnumerate,這樣可以保證線程安全地取數據和遍歷,具體實現無非是lock一下SyncRoot類似的這種思路。但是這種線程安全的遍歷可能代價很高,而且極其不通用。
線程安全集合容易產生的問題和解決方法,請參考JaredPar MSFT的Why are thread safe collections so hard?,這篇文章對設計一個線程安全的容器的指導原則是:
1、Don't add an decision procedures(procedures like Count as decision procedures). They lead users down the path to bad code.
2、Methods which query the object can always fail and the API should reflect this.
實際上大家都知道利用事務處理思想多用TryXXX方法一般是沒錯的。
新聞熱點
疑難解答