菜鳥學習并行編程,參考《C#并行編程高級教程.PDF》,如有錯誤,歡迎指正。
C#并行編程-相關概念
C#并行編程-Parallel
C#并行編程-Task
C#并行編程-并發集合
C#并行編程-線程同步原語
C#并行編程-PLINQ:聲明式數據并行
背景
基于任務的程序設計、命令式數據并行和任務并行都要求能夠支持并發更新的數組、列表和集合。
在.NET Framework 4 以前,為了讓共享的數組、列表和集合能夠被多個線程更新,需要添加復雜的代碼來同步這些更新操作。
如您需要編寫一個并行循環,這個循環以無序的方式向一個共享集合中添加元素,那么必須加入一個同步機制來保證這是一個線程安全的集合。
System.Collenctions和System.Collenctions.Generic 名稱空間中所提供的經典列表、集合和數組的線程都不是安全的,不能接受并發請求,因此需要對相應的操作方法執行串行化。
下面看代碼,代碼中并沒有實現線程安全和串行化:
class PRogram { private static object o = new object(); private static List<Product> _Products { get; set; } /* coder:釋迦苦僧 * 代碼中 創建三個并發線程 來操作_Products 集合 * System.Collections.Generic.List 這個列表在多個線程訪問下,不能保證是安全的線程,所以不能接受并發的請求,我們必須對ADD方法的執行進行串行化 */ static void Main(string[] args) { _Products = new List<Product>(); /*創建任務 t1 t1 執行 數據集合添加操作*/ Task t1 = Task.Factory.StartNew(() => { AddProducts(); }); /*創建任務 t2 t2 執行 數據集合添加操作*/ Task t2 = Task.Factory.StartNew(() => { AddProducts(); }); /*創建任務 t3 t3 執行 數據集合添加操作*/ Task t3 = Task.Factory.StartNew(() => { AddProducts(); }); Task.WaitAll(t1, t2, t3); Console.WriteLine(_Products.Count); Console.ReadLine(); } /*執行集合數據添加操作*/ static void AddProducts() { Parallel.For(0, 1000, (i) => { Product product = new Product(); product.Name = "name" + i; product.Category = "Category" + i; product.SellPrice = i; _Products.Add(product); }); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }View Code
代碼中開啟了三個并發操作,每個操作都向集合中添加1000條數據,在沒有保障線程安全和串行化的運行下,實際得到的數據并沒有3000條,結果如下:
為此我們需要采用Lock關鍵字,來確保每次只有一個線程來訪問 _Products.Add(product); 這個方法,代碼如下:
class Program { private static object o = new object(); private static List<Product> _Products { get; set; } /* coder:釋迦苦僧 * 代碼中 創建三個并發線程 來操作_Products 集合 * System.Collections.Generic.List 這個列表在多個線程訪問下,不能保證是安全的線程,所以不能接受并發的請求,我們必須對ADD方法的執行進行串行化 */ static void Main(string[] args) { _Products = new List<Product>(); /*創建任務 t1 t1 執行 數據集合添加操作*/ Task t1 = Task.Factory.StartNew(() => { AddProducts(); }); /*創建任務 t2 t2 執行 數據集合添加操作*/ Task t2 = Task.Factory.StartNew(() => { AddProducts(); }); /*創建任務 t3 t3 執行 數據集合添加操作*/ Task t3 = Task.Factory.StartNew(() => { AddProducts(); }); Task.WaitAll(t1, t2, t3); Console.WriteLine("當前數據量為:" + _Products.Count); Console.ReadLine(); } /*執行集合數據添加操作*/ static void AddProducts() { Parallel.For(0, 1000, (i) => { Product product = new Product(); product.Name = "name" + i; product.Category = "Category" + i; product.SellPrice = i; lock (o) { _Products.Add(product); } }); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }View Code
但是鎖的引入,帶來了一定的開銷和性能的損耗,并降低了程序的擴展性,在并發編程中顯然不適用。
System.Collections.Concurrent
.NET Framework 4提供了新的線程安全和擴展的并發集合,它們能夠解決潛在的死鎖問題和競爭條件問題,因此在很多復雜的情形下它們能夠使得并行代碼更容易編寫,這些集合盡可能減少需要使用鎖的次數,從而使得在大部分情形下能夠優化為最佳性能,不會產生不必要的同步開銷。
需要注意的是:
線程安全并不是沒有代價的,比起System.Collenctions和System.Collenctions.Generic命名空間中的列表、集合和數組來說,并發集合會有更大的開銷。因此,應該只在需要從多個任務中并發訪問集合的時候才使用并發幾個,在串行代碼中使用并發集合是沒有意義的,因為它們會增加無謂的開銷。
為此,在.NET Framework中提供了System.Collections.Concurrent新的命名空間可以訪問用于解決線程安全問題,通過這個命名空間能訪問以下為并發做好了準備的集合。
1.BlockingCollection 與經典的阻塞隊列數據結構類似,能夠適用于多個任務添加和刪除數據,提供阻塞和限界能力。
2.ConcurrentBag 提供對象的線程安全的無序集合
3.ConcurrentDictionary 提供可有多個線程同時訪問的鍵值對的線程安全集合
4.ConcurrentQueue 提供線程安全的先進先出集合
5.ConcurrentStack 提供線程安全的后進先出集合
這些集合通過使用比較并交換和內存屏障等技術,避免使用典型的互斥重量級的鎖,從而保證線程安全和性能。
ConcurrentQueue
ConcurrentQueue是完全無鎖的,能夠支持并發的添加元素,先進先出。下面貼代碼,詳解見注釋:
class Program { private static object o = new object(); /*定義 Queue*/ private static Queue<Product> _Products { get; set; } private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; } /* coder:釋迦苦僧 * 代碼中 創建三個并發線程 來操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 條數據 查看 一般隊列Queue 和 多線程安全下的隊列ConcurrentQueue 執行情況 */ static void Main(string[] args) { Thread.Sleep(1000); _Products = new Queue<Product>(); Stopwatch swTask = new Stopwatch(); swTask.Start(); /*創建任務 t1 t1 執行 數據集合添加操作*/ Task t1 = Task.Factory.StartNew(() => { AddProducts(); }); /*創建任務 t2 t2 執行 數據集合添加操作*/ Task t2 = Task.Factory.StartNew(() => { AddProducts(); }); /*創建任務 t3 t3 執行 數據集合添加操作*/ Task t3 = Task.Factory.StartNew(() => { AddProducts(); }); Task.WaitAll(t1, t2, t3); swTask.Stop(); Console.WriteLine("List<Product> 當前數據量為:" + _Products.Count); Console.WriteLine("List<Product> 執行時間為:" + swTask.ElapsedMilliseconds); Thread.Sleep(1000); _ConcurrenProducts = new ConcurrentQueue<Product>(); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); /*創建任務 tk1 tk1 執行 數據集合添加操作*/ Task tk1 = Task.Factory.StartNew(() => { AddConcurrenProducts(); }); /*創建任務 tk2 tk2 執行 數據集合添加操作*/ Task tk2 = Task.Factory.StartNew(() => { AddConcurrenProducts(); }); /*創建任務 tk3 tk3 執行 數據集合添加操作*/ Task
新聞熱點
疑難解答