亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > C# > 正文

一個進程間通訊同步的C#框架引薦

2020-01-24 01:37:54
字體:
來源:轉載
供稿:網友

 0.背景簡介

微軟在 .NET 框架中提供了多種實用的線程同步手段,其中包括 monitor 類及 reader-writer鎖。但跨進程的同步方法還是非常欠缺。另外,目前也沒有方便的線程間及進程間傳遞消息的方法。例如C/S和SOA,又或者生產者/消費者模式中就常常需要傳遞消息。為此我編寫了一個獨立完整的框架,實現了跨線程和跨進程的同步和通訊。這框架內包含了信號量,信箱,內存映射文件,阻塞通道,及簡單消息流控制器等組件。這篇文章里提到的類同屬于一個開源的庫項目(BSD許可),你可以從這里下載到 www.cdrnet.net/projects/threadmsg/.

這個框架的目的是:

  •     封裝性:通過MSMQ消息隊列發送消息的線程無需關心消息是發送到另一個線程還是另一臺機器。
  •     簡單性:向其他進程發送消息只需調用一個方法。

注意:我刪除了本文中全部代碼的XML注釋以節省空間。如果你想知道這些方法和參數的詳細信息,請參考附件中的代碼。

1.先看一個簡單例子

使用了這個庫后,跨進程的消息傳遞將變得非常簡單。我將用一個小例子來作示范:一個控制臺程序,根據參數可以作為發送方也可以作為接收方運行。在發送程序里,你可以輸入一定的文本并發送到信箱內(返回key),接收程序將顯示所有從信箱內收到的消息。你可以運行無數個發送程序和接收程序,但是每個消息只會被具體的某一個接收程序所收到。
 

[Serializable]struct Message{ public string Text;} class Test{ IMailBox mail;  public Test() {  mail = new ProcessMailBox("TMProcessTest",1024); }  public void RunWriter() {  Console.WriteLine("Writer started");  Message msg;  while(true)  {   msg.Text = Console.ReadLine();   if(msg.Text.Equals("exit"))    break;   mail.Content = msg;  } }  public void RunReader() {  Console.WriteLine("Reader started");  while(true)  {   Message msg = (Message)mail.Content;   Console.WriteLine(msg.Text);  } }  [STAThread] static void Main(string[] args) {  Test test = new Test();  if(args.Length > 0)   test.RunWriter();  else   test.RunReader(); }}

信箱一旦創建之后(這上面代碼里是 ProcessMailBox ),接收消息只需要讀取 Content 屬性,發送消息只需要給這個屬性賦值。當沒有數據時,獲取消息將會阻塞當前線程;發送消息時如果信箱里已經有數據,則會阻塞當前線程。正是有了這個阻塞,整個程序是完全基于中斷的,并且不會過度占用CPU(不需要進行輪詢)。發送和接收的消息可以是任意支持序列化(Serializable)的類型。

然而,實際上暗地里發生的事情有點復雜:消息通過內存映射文件來傳遞,這是目前唯一的跨進程共享內存的方法,這個例子里我們只會在 pagefile 里面產生虛擬文件。對這個虛擬文件的訪問是通過 win32 信號量來確保同步的。消息首先序列化成二進制,然后再寫進該文件,這就是為什么需要聲明Serializable屬性。內存映射文件和 win32 信號量都需要調用 NT內核的方法。多得了 .NET 框架中的 Marshal 類,我們可以避免編寫不安全的代碼。我們將在下面討論更多的細節。

2. .NET里面的跨線程/進程同步

線程/進程間的通訊需要共享內存或者其他內建機制來發送/接收數據。即使是采用共享內存的方式,也還需要一組同步方法來允許并發訪問。

同一個進程內的所有線程都共享公共的邏輯地址空間(堆)。對于不同進程,從 win2000 開始就已經無法共享內存。然而,不同的進程可以讀寫同一個文件。WinAPI提供了多種系統調用方法來映射文件到進程的邏輯空間,及訪問系統內核對象(會話)指向的 pagefile 里面的虛擬文件。無論是共享堆,還是共享文件,并發訪問都有可能導致數據不一致。我們就這個問題簡單討論一下,該怎樣確保線程/進程調用的有序性及數據的一致性。

2.1 線程同步

.NET 框架和 C# 提供了方便直觀的線程同步方法,即 monitor 類和 lock 語句(本文將不會討論 .NET 框架的互斥量)。對于線程同步,雖然本文提供了其他方法,我們還是推薦使用 lock 語句。
 

void Work1(){ NonCriticalSection1(); Monitor.Enter(this); try {  CriticalSection(); } finally {  Monitor.Exit(this); } NonCriticalSection2();} void Work2(){ NonCriticalSection1(); lock(this) {  CriticalSection(); } NonCriticalSection2();}

Work1 和 Work2 是等價的。在C#里面,很多人喜歡第二個方法,因為它更短,且不容易出錯。

2.2 跨線程信號量

信號量是經典的同步基本概念之一(由 Edsger Dijkstra 引入)。信號量是指一個有計數器及兩個操作的對象。它的兩個操作是:獲取(也叫P或者等待),釋放(也叫V或者收到信號)。信號量在獲取操作時如果計數器為0則阻塞,否則將計數器減一;在釋放時將計數器加一,且不會阻塞。雖然信號量的原理很簡單,但是實現起來有點麻煩。好在,內建的 monitor 類有阻塞特性,可以用來實現信號量。

 

public sealed class ThreadSemaphore : ISemaphore{ private int counter; private readonly int max;  public ThreadSemaphore() : this(0, int.Max) {} public ThreadSemaphore(int initial) : this(initial, int.Max) {} public ThreadSemaphore(int initial, int max) {  this.counter = Math.Min(initial,max);  this.max = max; }  public void Acquire() {  lock(this)  {   counter--;   if(counter < 0 && !Monitor.Wait(this))    throw new SemaphoreFailedException();  } }  public void Acquire(TimeSpan timeout) {  lock(this)  {   counter--;   if(counter < 0 && !Monitor.Wait(this,timeout))    throw new SemaphoreFailedException();  } }  public void Release() {  lock(this)  {   if(counter >= max)    throw new SemaphoreFailedException();   if(counter < 0)    Monitor.Pulse(this);   counter++;  } }}

信號量在復雜的阻塞情景下更加有用,例如我們后面將要討論的通道(channel)。你也可以使用信號量來實現臨界區的排他性(如下面的 Work3),但是我還是推薦使用內建的 lock 語句,像上面的 Work2 那樣。

請注意:如果使用不當,信號量也是有潛在危險的。正確的做法是:當獲取信號量失敗時,千萬不要再調用釋放操作;當獲取成功時,無論發生了什么錯誤,都要記得釋放信號量。遵循這樣的原則,你的同步才是正確的。Work3 中的 finally 語句就是為了保證正確釋放信號量。注意:獲取信號量( s.Acquire() )的操作必須放到 try 語句的外面,只有這樣,當獲取失敗時才不會調用釋放操作。
 

ThreadSemaphore s = new ThreadSemaphore(1);void Work3(){ NonCriticalSection1(); s.Acquire(); try {  CriticalSection(); } finally {  s.Release(); } NonCriticalSection2();}

2.3 跨進程信號量

為了協調不同進程訪問同一資源,我們需要用到上面討論過的概念。很不幸,.NET 中的 monitor 類不可以跨進程使用。但是,win32 API提供的內核信號量對象可以用來實現跨進程同步。 Robin Galloway-Lunn 介紹了怎樣將 win32 的信號量映射到 .NET 中(見 Using Win32 Semaphores in C# )。我們的實現也類似:

 

[DllImport("kernel32",EntryPoint="CreateSemaphore",   SetLastError=true,CharSet=CharSet.Unicode)]internal static extern uint CreateSemaphore( SecurityAttributes auth, int initialCount,  int maximumCount, string name); [DllImport("kernel32",EntryPoint="WaitForSingleObject", SetLastError=true,CharSet=CharSet.Unicode)]internal static extern uint WaitForSingleObject( uint hHandle, uint dwMilliseconds); [DllImport("kernel32",EntryPoint="ReleaseSemaphore", SetLastError=true,CharSet=CharSet.Unicode)][return : MarshalAs( UnmanagedType.VariantBool )]internal static extern bool ReleaseSemaphore( uint hHandle, int lReleaseCount, out int lpPreviousCount);   [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true, CharSet=CharSet.Unicode)][return : MarshalAs( UnmanagedType.VariantBool )]internal static extern bool CloseHandle(uint hHandle); public class ProcessSemaphore : ISemaphore, IDisposable{ private uint handle; private readonly uint interruptReactionTime;  public ProcessSemaphore(string name) : this(  name,0,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial) : this(  name,initial,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial,  int max, int interruptReactionTime) {     this.interruptReactionTime = (uint)interruptReactionTime;  this.handle = NTKernel.CreateSemaphore(null, initial, max, name);  if(handle == 0)   throw new SemaphoreFailedException(); }  public void Acquire() {  while(true)  { //looped 0.5s timeout to make NT-blocked threads interruptable.   uint res = NTKernel.WaitForSingleObject(handle,    interruptReactionTime);   try {System.Threading.Thread.Sleep(0);}   catch(System.Threading.ThreadInterruptedException e)   {    if(res == 0)    { //Rollback     int previousCount;     NTKernel.ReleaseSemaphore(handle,1,out previousCount);    }    throw e;   }   if(res == 0)    return;   if(res != 258)    throw new SemaphoreFailedException();  } }  public void Acquire(TimeSpan timeout) {  uint milliseconds = (uint)timeout.TotalMilliseconds;  if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)   throw new SemaphoreFailedException();  }  public void Release() {  int previousCount;  if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))   throw new SemaphoreFailedException();  }  #region IDisposable Member public void Dispose() {  if(handle != 0)  {   if(NTKernel.CloseHandle(handle))    handle = 0;  } } #endregion}

有一點很重要:win32中的信號量是可以命名的。這允許其他進程通過名字來創建相應信號量的句柄。為了讓阻塞線程可以中斷,我們使用了一個(不好)的替代方法:使用超時和 Sleep(0)。我們需要中斷來安全關閉線程。更好的做法是:確定沒有線程阻塞之后才釋放信號量,這樣程序才可以完全釋放資源并正確退出。

你可能也注意到了:跨線程和跨進程的信號量都使用了相同的接口。所有相關的類都使用了這種模式,以實現上面背景介紹中提到的封閉性。需要注意:出于性能考慮,你不應該將跨進程的信號量用到跨線程的場景,也不應該將跨線程的實現用到單線程的場景。

3. 跨進程共享內存:內存映射文件

我們已經實現了跨線程和跨進程的共享資源訪問同步。但是傳遞/接收消息還需要共享資源。對于線程來說,只需要聲明一個類成員變量就可以了。但是對于跨進程來說,我們需要使用到 win32 API 提供的內存映射文件(Memory Mapped Files,簡稱MMF)。使用 MMF和使用 win32 信號量差不多。我們需要先調用 CreateFileMapping 方法來創建一個內存映射文件的句柄:
 

[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",   SetLastError=true,CharSet=CharSet.Unicode)]internal static extern IntPtr CreateFileMapping(uint hFile, SecurityAttributes lpAttributes, uint flProtect, uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);   [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)]internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject, uint dwDesiredAccess, uint dwFileOffsetHigh, uint dwFileOffsetLow, uint dwNumberOfBytesToMap);   [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)][return : MarshalAs( UnmanagedType.VariantBool )]internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress); public static MemoryMappedFile CreateFile(string name,   FileAccess access, int size){ if(size < 0)  throw new ArgumentException("Size must not be negative","size");  IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,  (uint)access,0,(uint)size,name); if(fileMapping == IntPtr.Zero)  throw new MemoryMappingFailedException();  return new MemoryMappedFile(fileMapping,size,access);}

我們希望直接使用 pagefile 中的虛擬文件,所以我們用 -1(0xFFFFFFFF) 來作為文件句柄來創建我們的內存映射文件句柄。我們也指定了必填的文件大小,以及相應的名稱。這樣其他進程就可以通過這個名稱來同時訪問該映射文件。創建了內存映射文件后,我們就可以映射這個文件不同的部分(通過偏移量和字節大小來指定)到我們的進程地址空間。我們通過 MapViewOfFile 系統方法來指定:
 

public MemoryMappedFileView CreateView(int offset, int size,   MemoryMappedFileView.ViewAccess access){ if(this.access == FileAccess.ReadOnly && access ==  MemoryMappedFileView.ViewAccess.ReadWrite)  throw new ArgumentException(   "Only read access to views allowed on files without write access",   "access"); if(offset < 0)  throw new ArgumentException("Offset must not be negative","size"); if(size < 0)  throw new ArgumentException("Size must not be negative","size"); IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,  (uint)access,0,(uint)offset,(uint)size); return new MemoryMappedFileView(mappedView,size,access);}

在不安全的代碼中,我們可以將返回的指針強制轉換成我們指定的類型。盡管如此,我們不希望有不安全的代碼存在,所以我們使用 Marshal 類來從中讀寫我們的數據。偏移量參數是用來從哪里開始讀寫數據,相對于指定的映射視圖的地址。
 

public byte ReadByte(int offset){ return Marshal.ReadByte(mappedView,offset);}public void WriteByte(byte data, int offset){ Marshal.WriteByte(mappedView,offset,data);} public int ReadInt32(int offset){ return Marshal.ReadInt32(mappedView,offset);}public void WriteInt32(int data, int offset){ Marshal.WriteInt32(mappedView,offset,data);} public void ReadBytes(byte[] data, int offset){ for(int i=0;i<data.Length;i++)  data[i] = Marshal.ReadByte(mappedView,offset+i);}public void WriteBytes(byte[] data, int offset){ for(int i=0;i<data.Length;i++)  Marshal.WriteByte(mappedView,offset+i,data[i]);}

但是,我們希望讀寫整個對象樹到文件中,所以我們需要支持自動進行序列化和反序列化的方法。
 

public object ReadDeserialize(int offset, int length){ byte[] binaryData = new byte[length]; ReadBytes(binaryData,offset); System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter  = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); System.IO.MemoryStream ms = new System.IO.MemoryStream(  binaryData,0,length,true,true); object data = formatter.Deserialize(ms); ms.Close(); return data;}public void WriteSerialize(object data, int offset, int length){ System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter  = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); byte[] binaryData = new byte[length]; System.IO.MemoryStream ms = new System.IO.MemoryStream(  binaryData,0,length,true,true); formatter.Serialize(ms,data); ms.Flush(); ms.Close(); WriteBytes(binaryData,offset);}

請注意:對象序列化之后的大小不應該超過映射視圖的大小。序列化之后的大小總是比對象本身占用的內存要大的。我沒有試過直接將對象內存流綁定到映射視圖,那樣做應該也可以,甚至可能帶來少量的性能提升。

4. 信箱:在線程/進程間傳遞消息

這里的信箱與 Email 及 NT 中的郵件槽(Mailslots)無關。它是一個只能保留一個對象的安全共享內存結構。信箱的內容通過一個屬性來讀寫。如果信箱內容為空,試圖讀取該信箱的線程將會阻塞,直到另一個線程往其中寫內容。如果信箱已經有了內容,當一個線程試圖往其中寫內容時將被阻塞,直到另一個線程將信箱內容讀取出去。信箱的內容只能被讀取一次,它的引用在讀取后自動被刪除。基于上面的代碼,我們已經可以實現信箱了。
4.1 跨線程的信箱

我們可以使用兩個信號量來實現一個信箱:一個信號量在信箱內容為空時觸發,另一個在信箱有內容時觸發。在讀取內容之前,線程先等待信箱已經填充了內容,讀取之后觸發空信號量。在寫入內容之前,線程先等待信箱內容清空,寫入之后觸發滿信號量。注意:空信號量在一開始時就被觸發了。
 

public sealed class ThreadMailBox : IMailBox{ private object content; private ThreadSemaphore empty, full;  public ThreadMailBox() {  empty = new ThreadSemaphore(1,1);  full = new ThreadSemaphore(0,1); }  public object Content {  get  {   full.Acquire();   object item = content;   empty.Release();   return item;  }  set  {   empty.Acquire();   content = value;   full.Release();  } }}

4.2  跨進程信箱

跨進程信箱與跨線程信箱的實現基本上一樣簡單。不同的是我們使用兩個跨進程的信號量,并且我們使用內存映射文件來代替類成員變量。由于序列化可能會失敗,我們使用了一小段異常處理來回滾信箱的狀態。失敗的原因有很多(無效句柄,拒絕訪問,文件大小問題,Serializable屬性缺失等等)。

 

public sealed class ProcessMailBox : IMailBox, IDisposable{ private MemoryMappedFile file; private MemoryMappedFileView view; private ProcessSemaphore empty, full;  public ProcessMailBox(string name,int size) {  empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);  full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);  file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",   MemoryMappedFile.FileAccess.ReadWrite,size);  view = file.CreateView(0,size,   MemoryMappedFileView.ViewAccess.ReadWrite); }  public object Content {  get  {   full.Acquire();   object item;   try {item = view.ReadDeserialize();}   catch(Exception e)   { //Rollback    full.Release();    throw e;   }   empty.Release();   return item;  }   set  {   empty.Acquire();   try {view.WriteSerialize(value);}   catch(Exception e)   { //Rollback    empty.Release();    throw e;   }   full.Release();  } }  #region IDisposable Member public void Dispose() {  view.Dispose();  file.Dispose();  empty.Dispose();  full.Dispose(); } #endregion}

到這里我們已經實現了跨進程消息傳遞(IPC)所需要的組件。你可能需要再回頭本文開頭的那個例子,看看 ProcessMailBox 應該如何使用。

5.通道:基于隊列的消息傳遞

信箱最大的限制是它們每次只能保存一個對象。如果一系列線程(使用同一個信箱)中的一個線程需要比較長的時間來處理特定的命令,那么整個系列都會阻塞。通常我們會使用緩沖的消息通道來處理,這樣你可以在方便的時候從中讀取消息,而不會阻塞消息發送者。這種緩沖通過通道來實現,這里的通道比信箱要復雜一些。同樣,我們將分別從線程和進程級別來討論通道的實現。
5.1 可靠性

信箱和通道的另一個重要的不同是:通道擁有可靠性。例如:自動將發送失敗(可能由于線程等待鎖的過程中被中斷)的消息轉存到一個內置的容器中。這意味著處理通道的線程可以安全地停止,同時不會丟失隊列中的消息。這通過兩個抽象類來實現, ThreadReliability 和 ProcessReliability。每個通道的實現類都繼承其中的一個類。
5.2 跨線程的通道

跨線程的通道基于信箱來實現,但是使用一個同步的隊列來作為消息緩沖而不是一個變量。得益于信號量,通道在空隊列時阻塞接收線程,在隊列滿時阻塞發送線程。這樣你就不會碰到由入隊/出隊引發的錯誤。為了實現這個效果,我們用隊列大小來初始化空信號量,用0來初始化滿信號量。如果某個發送線程在等待入隊的時候被中斷,我們將消息復制到內置容器中,并將異常往外面拋。在接收操作中,我們不需要做異常處理,因為即使線程被中斷你也不會丟失任何消息。注意:線程只有在阻塞狀態才能被中斷,就像調用信號量的獲取操作(Aquire)方法時。
 

public sealed class ThreadChannel : ThreadReliability, IChannel{ private Queue queue; private ThreadSemaphore empty, full;  public ThreadChannel(int size) {  queue = Queue.Synchronized(new Queue(size));  empty = new ThreadSemaphore(size,size);  full = new ThreadSemaphore(0,size); }  public void Send(object item) {  try {empty.Acquire();}  catch(System.Threading.ThreadInterruptedException e)  {   DumpItem(item);   throw e;  }  queue.Enqueue(item);  full.Release(); }  public void Send(object item, TimeSpan timeout) {  try {empty.Acquire(timeout);}  ... }  public object Receive() {  full.Acquire();  object item = queue.Dequeue();  empty.Release();  return item; }  public object Receive(TimeSpan timeout) {  full.Acquire(timeout);  ... }   protected override void DumpStructure() {  lock(queue.SyncRoot)  {   foreach(object item in queue)    DumpItem(item);   queue.Clear();  } }}

5.3 跨進程通道

實現跨進程通道有點麻煩,因為你需要首先提供一個跨進程的緩沖區。一個可能的解決方法是使用跨進程信箱并根據需要將接收/發送方法加入隊列。為了避免這種方案的幾個缺點,我們將直接使用內存映射文件來實現一個隊列。MemoryMappedArray 類將內存映射文件分成幾部分,可以直接使用數組索引來訪問。 MemoryMappedQueue 類,為這個數組提供了一個經典的環(更多細節請查看附件中的代碼)。為了支持直接以 byte/integer 類型訪問數據并同時支持二進制序列化,調用方需要先調用入隊(Enqueue)/出隊(Dequeue)操作,然后根據需要使用讀寫方法(隊列會自動將數據放到正確的位置)。這兩個類都不是線程和進程安全的,所以我們需要使用跨進程的信號量來模擬互斥量(也可以使用 win32 互斥量),以此實現相互間的互斥訪問。除了這兩個類,跨進程的通道基本上和跨線程信箱一樣。同樣,我們也需要在 Send() 中處理線程中斷及序列化可能失敗的問題。
 

public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable{ private MemoryMappedFile file; private MemoryMappedFileView view; private MemoryMappedQueue queue; private ProcessSemaphore empty, full, mutex;  public ProcessChannel( int size, string name, int maxBytesPerEntry) {  int fileSize = 64+size*maxBytesPerEntry;   empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);  full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);  mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);  file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",   MemoryMappedFile.FileAccess.ReadWrite,fileSize);  view = file.CreateView(0,fileSize,   MemoryMappedFileView.ViewAccess.ReadWrite);  queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);  if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)   throw new MemoryMappedArrayFailedException(); }  public void Send(object item) {  try {empty.Acquire();}  catch(System.Threading.ThreadInterruptedException e)  {   DumpItemSynchronized(item);   throw e;  }  try {mutex.Acquire();}  catch(System.Threading.ThreadInterruptedException e)  {   DumpItemSynchronized(item);   empty.Release();   throw e;  }  queue.Enqueue();  try {queue.WriteSerialize(item,0);}  catch(Exception e)  {   queue.RollbackEnqueue();   mutex.Release();   empty.Release();   throw e;  }  mutex.Release();  full.Release(); }  public void Send(object item, TimeSpan timeout) {  try {empty.Acquire(timeout);}  ... }  public object Receive() {  full.Acquire();  mutex.Acquire();  object item;  queue.Dequeue();  try {item = queue.ReadDeserialize(0);}  catch(Exception e)  {   queue.RollbackDequeue();   mutex.Release();   full.Release();   throw e;  }  mutex.Release();  empty.Release();  return item; }  public object Receive(TimeSpan timeout) {  full.Acquire(timeout);  ... }   protected override void DumpStructure() {  mutex.Acquire();  byte[][] dmp = queue.DumpClearAll();  for(int i=0;i<dmp.Length;i++)   DumpItemSynchronized(dmp[i]);  mutex.Release(); }  #region IDisposable Member public void Dispose() {  view.Dispose();  file.Dispose();  empty.Dispose();  full.Dispose();  mutex.Dispose(); } #endregion}

6. 消息路由

我們目前已經實現了線程和進程同步及消息傳遞機制(使用信箱和通道)。當你使用阻塞隊列的時候,有可能會遇到這樣的問題:你需要在一個線程中同時監聽多個隊列。為了解決這樣的問題,我們提供了一些小型的類:通道轉發器,多用復用器,多路復用解碼器和通道事件網關。你也可以通過簡單的 IRunnable 模式來實現類似的通道處理器。IRunnable模式由兩個抽象類SingleRunnable和 MultiRunnable 來提供(具體細節請參考附件中的代碼)。
6.1 通道轉發器

通道轉發器僅僅監聽一個通道,然后將收到的消息轉發到另一個通道。如果有必要,轉發器可以將每個收到的消息放到一個信封中,并加上一個數字標記,然后再轉發出去(下面的多路利用器使用了這個特性)。
 

public class ChannelForwarder : SingleRunnable{ private IChannel source, target; private readonly int envelope;  public ChannelForwarder(IChannel source,  IChannel target, bool autoStart, bool waitOnStop)  : base(true,autoStart,waitOnStop) {  this.source = source;  this.target = target;  this.envelope = -1; } public ChannelForwarder(IChannel source, IChannel target,  int envelope, bool autoStart, bool waitOnStop)  : base(true,autoStart,waitOnStop) {  this.source = source;  this.target = target;  this.envelope = envelope; }  protected override void Run() { //NOTE: IChannel.Send is interrupt save and   //automatically dumps the argument.  if(envelope == -1)   while(running)    target.Send(source.Receive());  else  {   MessageEnvelope env;   env.ID = envelope;   while(running)   {    env.Message = source.Receive();    target.Send(env);   }  } }}

6.2 通道多路復用器和通道復用解碼器

通道多路復用器監聽多個來源的通道并將接收到的消息(消息使用信封來標記來源消息)轉發到一個公共的輸出通道。這樣就可以一次性地監聽多個通道。復用解碼器則是監聽一個公共的輸出通道,然后根據信封將消息轉發到某個指定的輸出通道。
 

public class ChannelMultiplexer : MultiRunnable{ private ChannelForwarder[] forwarders;  public ChannelMultiplexer(IChannel[] channels, int[] ids,  IChannel output, bool autoStart, bool waitOnStop) {  int count = channels.Length;  if(count != ids.Length)   throw new ArgumentException("Channel and ID count mismatch.","ids");   forwarders = new ChannelForwarder[count];  for(int i=0;i<count;i++)   forwarders[i] = new ChannelForwarder(channels[i],    output,ids[i],autoStart,waitOnStop);   SetRunnables((SingleRunnable[])forwarders); }} public class ChannelDemultiplexer : SingleRunnable{ private HybridDictionary dictionary; private IChannel input;  public ChannelDemultiplexer(IChannel[] channels, int[] ids,  IChannel input, bool autoStart, bool waitOnStop)  : base(true,autoStart,waitOnStop) {  this.input = input;   int count = channels.Length;  if(count != ids.Length)   throw new ArgumentException("Channel and ID count mismatch.","ids");   dictionary = new HybridDictionary(count,true);  for(int i=0;i<count;i++)   dictionary.add(ids[i],channels[i]); }  protected override void Run() { //NOTE: IChannel.Send is interrupt save and   //automatically dumps the argument.  while(running)  {   MessageEnvelope env = (MessageEnvelope)input.Receive();   IChannel channel = (IChannel)dictionary[env.ID];   channel.send(env.Message);  } }}

6.3 通道事件網關

通道事件網關監聽指定的通道,在接收到消息時觸發一個事件。這個類對于基于事件的程序(例如GUI程序)很有用,或者在使用系統線程池(ThreadPool)來初始化輕量的線程。需要注意的是:使用 WinForms 的程序中你不能在事件處理方法中直接訪問UI控件,只能調用Invoke 方法。因為事件處理方法是由事件網關線程調用的,而不是UI線程。
 

public class ChannelEventGateway : SingleRunnable{ private IChannel source; public event MessageReceivedEventHandler MessageReceived;  public ChannelEventGateway(IChannel source, bool autoStart,  bool waitOnStop) : base(true,autoStart,waitOnStop) {  this.source = source; }   protected override void Run() {  while(running)  {   object c = source.Receive();   MessageReceivedEventHandler handler = MessageReceived;   if(handler != null)    handler(this,new MessageReceivedEventArgs(c));  } }}

7. 比薩外賣店的例子

萬事俱備,只欠東風。我們已經討論了這個同步及消息傳遞框架中的大部分重要的結構和技術(本文沒有討論框架中的其他類如Rendezvous及Barrier)。就像開頭一樣,我們用一個例子來結束這篇文章。這次我們用一個小型比薩外賣店來做演示。下圖展示了這個例子:四個并行進程相互之間進行通訊。圖中展示了消息(數據)是如何使用跨進程通道在四個進程中流動的,且在每個進程中使用了性能更佳的跨線程通道和信箱。

201571493254175.gif (595×367)

 一開始,一個顧客點了一個比薩和一些飲料。他調用了顧客(customer)接口的方法,向顧客訂單(CustomerOrders)通道發送了一個下單(Order)消息。接單員,在顧客下單后,發送了兩條配餐指令(分別對應比薩和飲料)到廚師指令(CookInstruction)通道。同時他通過收銀(CashierOrder)通道將訂單轉發給收銀臺。收銀臺從價格中心獲取總價并將票據發給顧客,希望能提高收銀的速度 。與此同時,廚師將根據配餐指令將餐配好之后交給打包員工。打包員工處理好之后,等待顧客付款,然后將外賣遞給顧客。


為了運行這個例子,打開4個終端(cmd.exe),用 "PizzaDemo.exe cook" 啟動多個廚師進程(多少個都可以),用 "PizzaDemo.exe backend" 啟動后端進程,用 "PizzaDemo.exe facade" 啟動顧客接口門面(用你的程序名稱來代替 PizzaDemo )。注意:為了模擬真實情景,某些線程(例如廚師線程)會隨機休眠幾秒。按下回車鍵就會停止和退出進程。如果你在進程正在處理數據的時候退出,你將可以在內存轉存報告的結尾看到幾個未處理的消息。在真實世界的程序里面,消息一般都會被轉存到磁盤中,以便下次可以使用。

這個例子使用了上文中討論過的幾個機制。比如說,收銀臺使用一個通道復用器(ChannelMultiplexer)來監聽顧客的訂單和支付通道,用了兩個信箱來實現價格服務。分發時使用了一個通道事件網關(ChannelEventGateway),顧客在食物打包完成之后馬上會收到通知。你也可以將這些程序注冊成 Windows NT 服務運行,也可以遠程登錄后運行。

8. 總結

本文已經討論了C#中如何基于服務的架構及實現跨進程同步和通訊。然后,這個不是唯一的解決方案。例如:在大項目中使用那么多的線程會引來嚴重的問題。這個框架中缺失的是事務支持及其他的通道/信箱實現(例如命名管道和TCP sockets)。這個框架中可能也有許多不足之處。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲一区精品电影| 国产成人精品综合久久久| 欧美另类高清videos| 欧美在线欧美在线| 国内精品久久久久久影视8| 日韩国产欧美精品一区二区三区| 国产精品一区二区久久久| 国产视频亚洲视频| 久久久久久久久久国产| 欧美成人精品xxx| 日本成人在线视频网址| 久久久久北条麻妃免费看| 国产99久久精品一区二区 夜夜躁日日躁| 日韩大胆人体377p| 国产精品18久久久久久首页狼| 国产精品久久久久久久久免费| 91精品久久久久久久久久| 2019日本中文字幕| 九九九久久久久久| 91免费人成网站在线观看18| 欧美大片大片在线播放| 亚洲最大成人免费视频| 91老司机在线| 亚洲欧洲在线视频| 欧美电影免费观看大全| 久久影视电视剧免费网站清宫辞电视| 亚洲欧美精品一区二区| 精品女同一区二区三区在线播放| 亚洲欧美日韩一区二区三区在线| 国产成人精品国内自产拍免费看| 日韩免费在线观看视频| 日韩有码视频在线| 色樱桃影院亚洲精品影院| 日韩av在线天堂网| 国产欧美日韩中文字幕在线| 国产成人精彩在线视频九色| 国产极品jizzhd欧美| 亚洲欧美一区二区三区在线| 久久久这里只有精品视频| 亚洲精品之草原avav久久| 国产91在线播放精品91| 91最新在线免费观看| 富二代精品短视频| 国产精品国模在线| 精品av在线播放| 91精品国产九九九久久久亚洲| 久久国产精品偷| 91九色精品视频| 日韩精品有码在线观看| 亚洲精品福利免费在线观看| 日韩av免费网站| 国产成人精彩在线视频九色| 亚洲精品91美女久久久久久久| 国产成人精品久久二区二区| 国产区精品视频| 91精品国产自产91精品| 国产精品久久久久久久久久久久| 欧美一级高清免费| 精品视频久久久久久| 国产精品吊钟奶在线| 久久久国产视频| 亚洲女同精品视频| 国产日本欧美一区二区三区在线| 日本国产高清不卡| 日韩精品极品毛片系列视频| 777午夜精品福利在线观看| 国产精品国产三级国产专播精品人| 欧美大奶子在线| 亚洲欧美日韩爽爽影院| 久久99国产精品久久久久久久久| 超碰97人人做人人爱少妇| 国产亚洲欧美一区| 最新中文字幕亚洲| 在线观看国产精品日韩av| 欧美裸体xxxxx| 欧美中文字幕精品| 不卡毛片在线看| 亚洲国产福利在线| 欧美日韩国产激情| 欧美xxxx做受欧美.88| 日韩av在线免费播放| 55夜色66夜色国产精品视频| 精品毛片网大全| 亚洲一区二区少妇| 久久久亚洲国产天美传媒修理工| 久色乳综合思思在线视频| 亚洲一区制服诱惑| 亚洲欧美日韩天堂一区二区| 久久久久久这里只有精品| 国产精品稀缺呦系列在线| 一区二区三区日韩在线| 欧美另类高清videos| 中文字幕亚洲欧美一区二区三区| 欧美日韩不卡合集视频| 欧美黑人一级爽快片淫片高清| 亚洲在线观看视频| 亚洲天堂视频在线观看| 国产精品一区二区三区在线播放| 国产一区二区丝袜| 久久青草福利网站| 久久这里有精品视频| 狠狠久久五月精品中文字幕| 亚洲最大激情中文字幕| 国产偷亚洲偷欧美偷精品| 欧美老女人性视频| 国产精品青青在线观看爽香蕉| 清纯唯美亚洲综合| 日韩精品在线视频观看| 欧美丝袜一区二区三区| 亚洲人成啪啪网站| 亚洲精品久久久久久久久久久久久| 欧美性猛交xxxx乱大交极品| 国产91色在线|免| 久久国产精品久久久| 国产激情999| 国产精品96久久久久久又黄又硬| 中文字幕亚洲第一| 欧美一区二区三区精品电影| 91高清视频在线免费观看| 国产精品久久91| 久久的精品视频| 精品久久久久人成| 久久久久久999| 亚洲国产精品高清久久久| 久久人人爽人人爽人人片av高清| 51精品国产黑色丝袜高跟鞋| 国产日韩欧美综合| 欧洲美女7788成人免费视频| 久久久亚洲国产天美传媒修理工| 91精品91久久久久久| 日本不卡视频在线播放| 日韩成人高清在线| 精品女同一区二区三区在线播放| 欧美性xxxxx极品| 国产欧美最新羞羞视频在线观看| 亚洲成成品网站| 欧美亚州一区二区三区| 久久精品99久久久香蕉| 亚洲国产精品yw在线观看| 久久久免费观看| 91亚洲精品在线| 欧美在线不卡区| 国产精品久久国产精品99gif| 97欧美精品一区二区三区| 亚洲黄色在线观看| 日韩网站免费观看高清| 国产精品美女午夜av| 成人有码在线视频| 欧美日韩免费观看中文| 日本久久久久久| 日韩在线视频二区| 668精品在线视频| 亚洲理论电影网| 在线中文字幕日韩| 国产精品aaa| 亚洲国产天堂网精品网站| 久久久久免费视频| 欧美—级高清免费播放| 日韩专区中文字幕| 亚洲精品91美女久久久久久久| 欧美电影免费观看大全| 国产精品日日做人人爱| 色狠狠av一区二区三区香蕉蜜桃| 成人av色在线观看|