亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > PHP > 正文

在PHP中使用協程(yield )實現多任務調度

2019-11-08 01:48:31
字體:
來源:轉載
供稿:網友

php5.5一個比較好的新功能是加入了對迭代生成器和協程的支持。對于生成器,PHP的文檔和各種其他的博客文章已經有了非常詳細的講解。協程相對受到的關注就少了,因為協程雖然有很強大的功能但相對比較復雜, 也比較難被理解,解釋起來也比較困難。 這篇文章將嘗試通過介紹如何使用協程來實施任務調度, 來解釋在PHP中的協程。

迭代生成器 生成器也是一個函數,不同的是這個函數的返回值是依次輸出,而不是只返回一個單獨的值?;蛘?,換句話說,生成器使你更方便的實現了迭代器接口。下面通過實現一個xrange函數來簡單說明:

<?phpfunction xrange($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; }}foreach (xrange(1, 1000000) as $num) { echo $num, "/n";}

上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能。但是不同的是range()函數返回的是一個包含屬組值從1到100萬的數組(注:請查看手冊)。而xrange()函數返回的是依次輸出這些值的一個迭代器,而且并不會真正以數組形式返回. 這種方法的優點是顯而易見的。它可以讓你在處理大數據集合的時候不用一次性的加載到內存中。甚至你可以處理無限大的數據流。當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator接口實現。但通過使用生成器實現起來會更方便,不用再去實現iterator接口中的5個方法了。生成器為可中斷的函數 要從生成器認識協程,理解它們內部是如何工作是非常重要的: 生成器是一種可中斷的函數,在它里面,yield構成了中斷點。 緊接著上面的例子,如果你調用xrange(1,1000000)的話,xrange()函數里代碼其實并沒有真正地運行。相反,PHP只是返回了一個實現了迭代器接口的生成器類實例:

<?php $range = xrange(1, 1000000); var_dump($range); // object(Generator)#1 var_dump($range instanceof Iterator); // bool(true)

你對對象調用迭代器方法一次,其中的代碼運行一次。例如,如果你調用$range->rewind(), 那么xrange()里的代碼就會運行到控制流第一次出現yield的地方。而函數內傳遞給yield語句的返回值可以通過$range->current()獲取。 為了繼續執行生成器中的代碼,你必須調用$range->next()方法。這將再次啟動生成器,直到下一次yield語句出現。因此,連續調用next()和current()方法 你將能從生成器里獲得所有的值,直到再沒有yield語句出現。對xrange()來說,這種情形出現在$i超過$end時。在這中情況下, 控制流將到達函數的終點,因此將不執行任何代碼。一旦這種情況發生,vaild()方法將返回假,這時迭代結束。 協程 協程給上面功能添加的主要功能就是回送數據給生成器的能力(調用者發送數據給被調用的生成器函數)。這就把生成器到調用者的單向通信轉變為兩者之間的雙向通信。 你可以調用生成器的send()方法傳遞數據給協程。下面的logger()協程是這種通信如何運行的例子:

<?phpfunction logger($fileName) { $fileHandle = fopen($fileName, 'a'); while (true) { fwrite($fileHandle, yield . "/n"); }}$logger = logger(__DIR__ . '/log');$logger->send('Foo');$logger->send('Bar')

正如你能看到,這兒yield沒有作為一個語句來使用,而是用作一個表達式, 即它能被演變成一個值。這個值就是調用者傳遞給send()方法的值。 在這個例子里,yield表達式將首先被”Foo”替代寫入Log, 然后被”Bar”替代寫入Log。 上面的例子里yield僅作為接收者。但它其實既可接收也可發送。接收和發送通信如何進行的例子如下:

<?phpfunction gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret);}$gen = gen();var_dump($gen->current()); // string(6) "yield1"var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen) // string(6) "yield2" (the var_dump of the ->send() return value)var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen) // NULL (the return value of ->send())

要馬上理解輸出的精確順序有點困難,因此確定你知道為什按照這種方式輸出。我要特別指出的有兩點: 第一點,yield表達式兩邊的括號在PHP7以前不是可選的, 也就是說在PHP5.5和PHP5.6中圓括號是必須的。 第二點,你可能已經注意到調用current()之前沒有調用rewind()。這是因為生成迭代對象的時候已經隱含地執行了rewind操作。 多任務協作 如果閱讀了上面的logger()例子,你也許會疑惑“為了雙向通信我為什么要使用協程呢?我完全可以使用過程的方法實現同樣的功能啊?”, 是的, 你是對的, 但上面的例子只是為了演示了基本用法,這個例子其實并沒有真正的展示出使用協程的優點。 正如上面介紹里提到的,協程是非常強大的概念,不過卻應用的很稀少而且常常十分復雜。要給出一些簡單而真實的例子很難。 在這篇文章里,我決定去做的是使用協程實現多任務協作。我們要解決的問題是你想并發地運行多任務(或者“程序”)。不過我們都知道CPU在一個時刻只能運行一個任務(不考慮多核的情況)。因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務運行 “一小會兒”。 多任務協作這個術語中的“協作”說明了如何進行這種切換的:它要求當前正在運行的任務自動把控制傳回給調度器,這樣就可以運行其他任務了。這與“搶占”多任務相反,搶占多任務是這樣的:調度器可以中斷運行了一段時間的任務,不管它喜歡還是不喜歡。協作多任務在Windows的早期版本(windows95)和Mac OS中有使用,不過它們后來都切換到使用搶先多任務了。理由相當明確:如果你依靠程序自動交出控制的話,那么一些設計有問題的軟件將很容易為自身占用整個CPU,不與其他任務共享。 現在你應當明白協程和任務調度之間的聯系:yield指令提供了任務中斷自身的一種方法,然后把控制交回給任務調度器。因此協程可以運行多個其他任務。更進一步來說,yield可以用來在任務和調度器之間進行通信。 我們的目的是 對 “任務”用更輕量級的包裝的協程函數:

<?phpclass Task { PRotected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); }}

一個任務是用任務ID標記的一個協程。使用setSendValue()方法,你可以指定哪些值將被發送到下次的恢復(在之后你會了解到我們需要這個)。 run()函數確實沒有做什么,除了調用send()方法的協同程序。要理解為什么添加beforeFirstYieldflag,需要考慮下面的代碼片段:

<?phpfunction gen() { yield 'foo'; yield 'bar';}$gen = gen();var_dump($gen->send('something'));// As the send() happens before the first yield there is an implicit rewind() call,// so what really happens is this:$gen->rewind();var_dump($gen->send('something'));// The rewind() will advance to the first yield (and ignore its value), the send() will// advance to the second yield (and dump its value). Thus we loose the first yielded value!

通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。 調度器現在不得不比多任務循環要做稍微多點了,然后才運行多任務:

<?phpclass Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } }}?>

newTask()方法(使用下一個空閑的任務id)創建一個新任務,然后把這個任務放入任務map數組里。接著它通過把任務放入任務隊列里來實現對任務的調度。接著run()方法掃描任務隊列,運行任務。如果一個任務結束了,那么它將從隊列里刪除,否則它將在隊列的末尾再次被調度。 讓我們看看下面具有兩個簡單(并且沒有什么意義)任務的調度器:

<?phpfunction task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i./n"; yield; }}function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i./n"; yield; }}$scheduler = new Scheduler;$scheduler->newTask(task1());$scheduler->newTask(task2());$scheduler->run();

兩個任務都僅僅回顯一條信息,然后使用yield把控制回傳給調度器。輸出結果如下: This is task 1 iteration 1. This is task 2 iteration 1. This is task 1 iteration 2. This is task 2 iteration 2. This is task 1 iteration 3. This is task 2 iteration 3. This is task 1 iteration 4. This is task 2 iteration 4. This is task 1 iteration 5. This is task 2 iteration 5. This is task 1 iteration 6. This is task 1 iteration 7. This is task 1 iteration 8. This is task 1 iteration 9. This is task 1 iteration 10. 輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替運行的,接著第二個任務結束后,只有第一個任務繼續運行。 與調度器之間通信 既然調度器已經運行了,那么我們來看下一項:任務和調度器之間的通信。 我們將使用進程用來和操作系統會話的同樣的方式來通信:系統調用。我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上。因此為了執行特權級別的操作(如殺死另一個進程),就不得不以某種方式把控制傳回給內核,這樣內核就可以執行所說的操作了。再說一遍,這種行為在內部是通過使用中斷指令來實現的。過去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。 我們的任務調度系統將反映這種設計:不是簡單地把調度器傳遞給任務(這樣就允許它做它想做的任何事),我們將通過給yield表達式傳遞信息來與系統調用通信。這兒yield即是中斷,也是傳遞信息給調度器(和從調度器傳遞出信息)的方法。 為了說明系統調用,我將對可調用的系統調用做一個小小的封裝:

<?phpclass SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; // Can't call it directly in PHP :/ return $callback($task, $scheduler); }}

它將像其他任何可調用那樣(使用_invoke)運行,不過它要求調度器把正在調用的任務和自身傳遞給這個函數。為了解決這個問題我們不得不微微的修改調度器的run方法:

<?phppublic function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } }}

第一個系統調用除了返回任務ID外什么都沒有做:

<?phpfunction getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); });}

這個函數設置任務id為下一次發送的值,并再次調度了這個任務。由于使用了系統調用,所以調度器不能自動調用任務,我們需要手工調度任務(稍后你將明白為什么這么做)。要使用這個新的系統調用的話,我們要重新編寫以前的例子:

<?phpfunction task($max) { $tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i./n"; yield; }}$scheduler = new Scheduler;$scheduler->newTask(task(10));$scheduler->newTask(task(5));$scheduler->run();

這段代碼將給出與前一個例子相同的輸出。注意系統調用同其他任何調用一樣正常地運行,不過預先增加了yield。要創建新的任務,然后再殺死它們的話,需要兩個以上的系統調用:

<?phpfunction newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } );}function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } );}

killTask函數需要在調度器里增加一個方法:

<?phppublic function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // This is a bit ugly and could be optimized so it does not have to walk the queue, // but assuming that killing tasks is rather rare I won't bother with it now foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true;}

用來測試新功能的微腳本:

<?phpfunction childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!/n"; yield; }}function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i./n"; yield; if ($i == 3) yield killTask($childTid); }}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();?>

這段代碼將打印以下信息: Parent task 1 iteration 1. Child task 2 still alive! Parent task 1 iteration 2. Child task 2 still alive! Parent task 1 iteration 3. Child task 2 still alive! Parent task 1 iteration 4. Parent task 1 iteration 5. Parent task 1 iteration 6. 經過三次迭代以后子任務將被殺死,因此這就是”Child is still alive”消息結束的時候??赡軕斨赋龅氖沁@不是真正的父子關系。 因為甚至在父任務結束后子任務仍然可以運行。或者子任務可以殺死父任務??梢孕薷恼{度器使它具有更層級化的任務結構,不過 在這篇文章里我沒有這么做。 你可以實現許多進程管理調用。例如 wait(它一直等待到任務結束運行時),exec(它替代當前任務)和fork(它創建一個 當前任務的克?。ork非??幔夷憧梢允褂肞HP的協程真正地實現它,因為它們都支持克隆。 然而讓我們把這些留給有興趣的讀者吧,我們來看下一個議題。 非阻塞IO 很明顯,我們的任務管理系統的真正很酷的應用是web服務器。它有一個任務是在套接字上偵聽是否有新連接,當有新連接要建立的時候 ,它創建一個新任務來處理新連接。 web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的。例如PHP將等待到客戶端完成發送為止。對一個WEB服務器來說,這 根本不行;這就意味著服務器在一個時間點上只能處理一個連接。 解決方案是確保在真正對套接字讀寫之前該套接字已經“準備就緒”。為了查找哪個套接字已經準備好讀或者寫了,可以使用 流選擇函數。 首先,讓我們添加兩個新的 syscall,它們將等待直到指定 socket 準備好:

<?phpfunction waitForRead($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForRead($socket, $task); } );}function waitForWrite($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForWrite($socket, $task); } );}

這些 syscall 只是在調度器中代理其各自的方法:

<?php// resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = [];public function waitForRead($socket, Task $task) { if (isset($this->waitingForRead[(int) $socket])) { $this->waitingForRead[(int) $socket][1][] = $task; } else { $this->waitingForRead[(int) $socket] = [$socket, [$task]]; }}public function waitForWrite($socket, Task $task) { if (isset($this->waitingForWrite[(int) $socket])) { $this->waitingForWrite[(int) $socket][1][] = $task; } else { $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; }}

waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的數組。有趣的部分在于下面的方法,它將檢查 socket 是否可用,并重新安排各自任務:

<?phpprotected function ioPoll($timeout) { $rSocks = []; foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $this->waitingForRead[(int) $socket]; unset($this->waitingForRead[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } foreach ($wSocks as $socket) { list(, $tasks) = $this->waitingForWrite[(int) $socket]; unset($this->waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } }}

stream_select 函數接受承載讀取、寫入以及待檢查的socket的數組(我們無需考慮最后一類)。數組將按引用傳遞,函數只會保留那些狀態改變了的數組元素。我們可以遍歷這些數組,并重新安排與之相關的任務。 為了正常地執行上面的輪詢動作,我們將在調度器里增加一個特殊的任務:

<?phpprotected function ioPollTask() { while (true) { if ($this->taskQueue->isEmpty()) { $this->ioPoll(null); } else { $this->ioPoll(0); } yield; }}

需要在某個地方注冊這個任務,例如,你可以在run()方法的開始增加this?>newTask(this->ioPollTask())。然后就像其他 任務一樣每執行完整任務循環一次就執行輪詢操作一次(這么做一定不是最好的方法)。ioPollTask將使用0秒的超時來調用ioPoll, 這意味著stream_select將立即返回(而不是等待)。 只有任務隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口準備就緒。如果我們沒有這么做,那么輪詢任務將一而再, 再而三的循環運行,直到有新的連接建立。這將導致100%的CPU利用率。相反,讓操作系統做這種等待會更有效。 現在編寫服務器就相對容易了:

<?phpfunction server($port) { echo "Starting server at port $port.../n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); while (true) { yield waitForRead($socket); $clientSocket = stream_socket_accept($socket, 0); yield newTask(handleClient($clientSocket)); }}function handleClient($socket) { yield waitForRead($socket); $data = fread($socket, 8192); $msg = "Received following request:/n/n$data"; $msgLength = strlen($msg); $response = <<<RESHTTP/1.1 200 OK/rContent-Type: text/plain/rContent-Length: $msgLength/rConnection: close/r/r$msgRES; yield waitForWrite($socket); fwrite($socket, $response); fclose($socket);}$scheduler = new Scheduler;$scheduler->newTask(server(8000));$scheduler->run()

;

這段代碼將接收到localhost:8000上的連接,然后僅僅返回發送來的內容作為HTTP響應。要做“實際”的事情的話就愛哪個非常復雜(處理 HTTP請求可能已經超出了這篇文章的范圍)。上面的代碼片段只是演示了一般性的概念。 你可以使用類似于ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務器。這條命令將向服務器發送10000個請求,并且其中100個請求將同時到達。使用這樣的數目,我得到了處于中間的10毫秒的響應時間。不過還有一個問題:有少數幾個請求真正處理的很慢(如5秒), 這就是為什么總吞吐量只有2000請求/秒(如果是10毫秒的響應時間的話,總的吞吐量應該更像是10000請求/秒 協程堆棧 如果你試圖用我們的調度系統建立更大的系統的話,你將很快遇到問題:我們習慣了把代碼分解為更小的函數,然后調用它們。然而, 如果使用了協程的話,就不能這么做了。例如,看下面代碼:

<?phpfunction echoTimes($msg, $max) { for ($i = 1; $i <= $max; ++$i) { echo "$msg iteration $i/n"; yield; }}function task() { echoTimes('foo', 10); // print foo ten times echo "---/n"; echoTimes('bar', 5); // print bar five times yield; // force it to be a coroutine}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();

這段代碼試圖把重復循環“輸出n次“的代碼嵌入到一個獨立的協程里,然后從主任務里調用它。然而它無法運行。正如在這篇文章的開始 所提到的,調用生成器(或者協程)將沒有真正地做任何事情,它僅僅返回一個對象。這也出現在上面的例子里。echoTimes調用除了放回一個(無用的)協程對象外不做任何事情。 為了仍然允許這么做,我們需要在這個裸協程上寫一個小小的封裝。我們將調用它:“協程堆棧”。因為它將管理嵌套的協程調用堆棧。 這將是通過生成協程來調用子協程成為可能: retval=(yieldsomeCoroutine(foo, $bar)); 使用yield,子協程也能再次返回值: yield retval(“I’m a return value!”); retval函數除了返回一個值的封裝外沒有做任何其他事情。這個封裝將表示它是一個返回值。

<?phpclass CoroutineReturnValue { protected $value; public function __construct($value) { $this->value = $value; } public function getValue() { return $this->value; }}function retval($value) { return new CoroutineReturnValue($value);}

為了把協程轉變為協程堆棧(它支持子調用),我們將不得不編寫另外一個函數(很明顯,它是另一個協程):

<?phpfunction stackedCoroutine(Generator $gen) { $stack = new SplStack; for (;;) { $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } $gen->send(yield $gen->key() => $value); }}

這個函數在調用者和當前正在運行的子協程之間扮演著簡單代理的角色。在gen?>send(yieldgen->key()=>value);這行完成了代理功能。另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始運行這個生成器,并把前一個協程壓入堆棧里。一旦它獲得了CoroutineReturnValue的話,它將再次請求堆棧彈出,然后繼續執行前一個協程。為了使協程堆棧在任務里可用,任務構造器里的this-coroutine =coroutine;這行需要替代為this->coroutine = StackedCoroutine($coroutine);。 現在我們可以稍微改進上面web服務器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函數。為了分組相關的 功能,我將使用下面類:

<?phpclass CoSocket { protected $socket; public function __construct($socket) { $this->socket = $socket; } public function accept() { yield waitForRead($this->socket); yield retval(new CoSocket(stream_socket_accept($this->socket, 0))); } public function read($size) { yield waitForRead($this->socket); yield retval(fread($this->socket, $size)); } public function write($string) { yield waitForWrite($this->socket); fwrite($this->socket, $string); } public function close() { @fclose($this->socket); }}

現在服務器可以編寫的稍微簡潔點了:

<?phpfunction server($port) { echo "Starting server at port $port.../n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { yield newTask( handleClient(yield $socket->accept()) ); }}function handleClient($socket) { $data = (yield $socket->read(8192)); $msg = "Received following request:/n/n$data"; $msgLength = strlen($msg); $response = <<<RESHTTP/1.1 200 OK/rContent-Type: text/plain/rContent-Length: $msgLength/rConnection: close/r/r$msgRES; yield $socket->write($response); yield $socket->close();}

錯誤處理 作為一個優秀的程序員,相信你已經察覺到上面的例子缺少錯誤處理。幾乎所有的 socket 都是易出錯的。我這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket?。?,另一方面也在于它很容易使代碼體積膨脹。 不過,我仍然了一講一下常見的協程錯誤處理:協程允許使用 throw() 方法在其內部拋出一個錯誤。盡管此方法還未在 PHP 中實現,但我很快就會提交它,就在今天。 throw() 方法接受一個 Exception,并將其拋出到協程的當前懸掛點,看看下面代碼:

<?phpfunction gen() { echo "Foo/n"; try { yield; } catch (Exception $e) { echo "Exception: {$e->getMessage()}/n"; } echo "Bar/n";}$gen = gen();$gen->rewind(); // echos "Foo"$gen->throw(new Exception('Test')); // echos "Exception: Test" // and "Bar"

這非常棒,因為我們可以使用系統調用以及子協程調用異常拋出。對與系統調用,Scheduler::run() 方法需要一些小調整:

<?phpif ($retval instanceof SystemCall) { try { $retval($task, $this); } catch (Exception $e) { $task->setException($e); $this->schedule($task); } continue;}

Task 類也許要添加 throw 調用處理:

<?phpclass Task { // ... protected $exception = null; public function setException($exception) { $this->exception = $exception; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } elseif ($this->exception) { $retval = $this->coroutine->throw($this->exception); $this->exception = null; return $retval; } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } // ...}

現在,我們已經可以在系統調用中使用異常拋出了!例如,要調用 killTask,讓我們在傳遞 ID 不可用時拋出一個異常:

<?phpfunction killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { if ($scheduler->killTask($tid)) { $scheduler->schedule($task); } else { throw new InvalidArgumentException('Invalid task ID!'); } } );}

試試看:

<?phpfunction task() { try { yield killTask(500); } catch (Exception $e) { echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "/n"; }}

這些代碼現在尚不能正常運作,因為 stackedCoroutine 函數無法正確處理異常。要修復需要做些調整:

<?phpfunction stackedCoroutine(Generator $gen) { $stack = new SplStack; $exception = null; for (;;) { try { if ($exception) { $gen->throw($exception); $exception = null; continue; } $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } try { $sendValue = (yield $gen->key() => $value); } catch (Exception $e) { $gen->throw($e); continue; } $gen->send($sendValue); } catch (Exception $e) { if ($stack->isEmpty()) { throw $e; } $gen = $stack->pop(); $exception = $e; } }}

結束語 在這篇文章里,我使用多任務協作構建了一個任務調度器,其中包括執行“系統調用”,做非阻塞操作和處理錯誤。所有這些里真正很酷的事情是任務的結果代碼看起來完全同步,甚至任務正在執行大量的異步操作的時候也是這樣。如果你打算從套接口讀取數據的話,你將不需要傳遞某個回調函數或者注冊一個事件偵聽器。相反,你只要書寫yield $socket->read()。這兒大部分都是你常常也要編寫的,只在它的前面增加yield。 當我第一次聽到所有這一切的時候,我發現這個概念完全令人折服,而且正是這個激勵我在PHP中實現了它。同時我發現協程真正令人心慌。在令人敬畏的代碼和很大一堆代碼之間只有單薄的一行,我認為協程正好處在這一行上。講講使用上面所述的方法書寫異步代碼是否真的有益對我來說很難。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
成人免费视频在线观看超级碰| 欧美日韩爱爱视频| 亚洲欧美在线一区| 久久精品国产免费观看| 欧美激情在线播放| 日韩欧美在线视频日韩欧美在线视频| 欧美在线视频一区| 成人黄色大片在线免费观看| 久久久女人电视剧免费播放下载| 国产一区二区动漫| 亚洲香蕉在线观看| 免费av一区二区| 中国人与牲禽动交精品| 国产精品麻豆va在线播放| 久久精品国产亚洲精品2020| 欧美午夜影院在线视频| 91日本在线视频| 欧美色欧美亚洲高清在线视频| 97热在线精品视频在线观看| 国产精品∨欧美精品v日韩精品| 亚洲久久久久久久久久| 亚洲国产成人精品久久| 丝袜美腿精品国产二区| 亚洲影院色无极综合| 国产精品久久久久久久一区探花| 国产午夜一区二区| 欧美性猛交xxxx黑人| 亚洲国产毛片完整版| 欧美日韩国产页| 欧美激情第三页| 欧美日本高清视频| 久久久久北条麻妃免费看| 国产精品日日做人人爱| 国产精品尤物福利片在线观看| 日韩视频中文字幕| 九九九热精品免费视频观看网站| 久久97久久97精品免视看| 亚洲午夜未删减在线观看| 久久久久久91香蕉国产| 日本一区二三区好的精华液| 久久免费视频这里只有精品| www.精品av.com| 另类少妇人与禽zozz0性伦| 成人午夜两性视频| 国产精品主播视频| 欧洲中文字幕国产精品| 久久亚洲综合国产精品99麻豆精品福利| 精品日韩中文字幕| 亚洲精品乱码久久久久久金桔影视| 亚洲精品av在线| 亚洲第一男人天堂| 性欧美激情精品| 欧美另类老肥妇| 欧美视频免费在线| 日本在线精品视频| 亚洲国产精品久久久久秋霞不卡| 日韩福利在线播放| 国产精品久久久久久久久久新婚| 亚洲性日韩精品一区二区| 中文字幕在线观看亚洲| 亚洲免费一级电影| 亚洲精品中文字幕av| 亚洲电影免费观看高清完整版在线| 亚洲成人教育av| 97视频在线观看亚洲| 国产丝袜高跟一区| 奇米四色中文综合久久| 欧美自拍大量在线观看| 国产成人亚洲综合青青| 欧美午夜精品久久久久久久| 美女扒开尿口让男人操亚洲视频网站| 91精品视频免费看| 美女视频黄免费的亚洲男人天堂| 97色伦亚洲国产| 久久视频免费在线播放| 少妇久久久久久| 久久韩剧网电视剧| 久久伊人91精品综合网站| 国产日产欧美a一级在线| 91精品国产高清自在线| 欧美大尺度在线观看| 久久人91精品久久久久久不卡| 91在线观看免费观看| 亚洲国产高潮在线观看| 欧美日本高清视频| 亚洲第一在线视频| 欧美富婆性猛交| 欧美最猛性xxxx| 久久久久国产一区二区三区| 成人黄色免费片| 国产精品久久久亚洲| 亚洲free性xxxx护士hd| 国产视频综合在线| 日韩美女免费线视频| 国产不卡一区二区在线播放| 亚洲最大成人在线| 亚洲丝袜在线视频| 亚洲男人天堂2019| 国产99视频在线观看| 国产欧亚日韩视频| 亚洲xxxx视频| 亚洲女人天堂视频| 国产午夜精品美女视频明星a级| 亚洲精品av在线播放| 欧美国产一区二区三区| 精品在线小视频| 91高潮在线观看| 精品欧美aⅴ在线网站| 亚洲国产精品久久91精品| 波霸ol色综合久久| 国产亚洲欧美日韩一区二区| 日韩精品中文字幕在线播放| 欧美日韩国产页| 日韩亚洲第一页| 久久视频在线播放| 欧美日韩国产精品一区二区不卡中文| 91在线|亚洲| 国产精品丝袜视频| 国产亚洲精品美女| 尤物九九久久国产精品的特点| 亚洲精品电影在线观看| 91精品免费视频| 欧美亚洲免费电影| 国产精品小说在线| 国产91露脸中文字幕在线| 揄拍成人国产精品视频| 久久久久久久久久久久久久久久久久av| 日韩av在线免费| 亚洲视频在线看| 欧洲日韩成人av| 久热精品视频在线观看一区| 欧美一级淫片videoshd| 91在线观看免费| 欧美亚洲成人精品| 永久555www成人免费| 欧美国产一区二区三区| 蜜臀久久99精品久久久久久宅男| 日本高清视频一区| 欧美xxxx做受欧美| 久久精品国亚洲| 久久久久久国产三级电影| 欧美在线一区二区三区四| 色综合视频网站| 精品亚洲aⅴ在线观看| 久久精品视频在线播放| 国产欧美日韩专区发布| 日韩中文在线不卡| 国产成人午夜视频网址| 538国产精品视频一区二区| 性色av一区二区三区红粉影视| 久久综合电影一区| 亚洲精品成人av| 日本在线观看天堂男亚洲| 国产精品免费小视频| 国产亚洲精品成人av久久ww| 色悠悠久久久久| 国产精品视频免费在线观看| 97久久久免费福利网址| 国产精品高潮粉嫩av| 欧美激情三级免费| 亚洲免费av网址| 免费成人高清视频| 国产ts人妖一区二区三区| 欧美激情一二三|