用PHP來實現異步任務一直是個難題,現有的解決方案中:PHP知名的異步框架有 swoole 和 Workerman,但都是無法在 web 環境中直接使用的,即便強行搭建 web 環境,異步調用也是使用多進程模式實現的。但有時真的不需要用啟動服務的方式,讓服務端一直等待客戶端消息,何況中間還不能改動服務端代碼。本文就介紹一下不使用任何框架和第三方庫的情況下,在 CLI 環境中如何實現多進程以及在web環境中的異步調用。
在 web 環境的異步調用
常用的方式有兩種
1. 使用 socket 連接
這種方式就是典型的C/S架構,需要有服務端支持。
// 1. 創建socket套接字$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);// 2. 進行socket連接socket_connect($socket, 127.0.0.1 , 3939 //socket_set_nonblock($socket); // 以非阻塞模式運行,由于在客戶端不實用,所以這里不考慮// 3. 向服務端發送請求socket_write($socket, $request, strlen($request));// 4. 接受服務端的回應消息(忽略非阻塞的情況,如果服務端不是提供異步服務,那這一步可以省略)$recv = socket_read($socket, 2048);// 5. 關閉socket連接socket_close($socket);
2. 使用 popen 打開進程管道
這種方式是使用操作系統命令,由操作系統直接執行。
本文討論的異步調用就是使用這種方式。
$sf = /path/to/cli_async_task.php //要執行的腳本文件$op = call //腳本文件接收的參數1$data = base64_encode(serialize([ TestTask , arg1 , arg2 ])); //腳本文件接收的參數2pclose(popen( php $sf --op $op --data $data , r //打開之后接著就關閉進程管道,讓該進程以守護模式運行echo PHP_EOL. 異步任務已執行。 .PHP_EOL;
這種方式的優點就是:一步解決,當前進程不需要任何開銷。
缺點也很明顯:無法跟蹤任務腳本的運行狀態。
所以重頭戲會是在執行任務的腳本文件上,下面就介紹任務處理和多進程的實現方式。
注意:多進程模式僅支持Linux,不支持Windows?。?/p>
這里會從0開始(未使用任何框架和類庫)介紹每一個步驟,最后會附帶一份完整的代碼。
1. 創建腳本任何腳本不可忽視的地方就是錯誤處理。所以寫一個任務處理腳本首先就是寫錯誤處理方式。
在PHP中就是調用 set_exception_handler set_error_handler register_shutdown_function 這三個函數,然后寫上自定義的處理方法。
接著是定義自動加載函數 spl_autoload_register 免去每使用一個新類都要 require / include 的煩惱。
定義日志操作方法。
定義任務處理方法。
讀取來自命令行的參數,開始執行任務。
2. 多進程處理PHP 創建多進程是使用 pcntl_fork 函數,該函數會 fork 一份當前進程(影分身術),于是就有了兩個進程,當前進程是主進程(本體),fork 出的進程是子進程(影分身)。需要注意的是兩個進程代碼環境是一樣的,兩個進程都是執行到了 pcntl_fork 函數位置。區別就是 getmypid 獲得的進程號不一樣,最重要的區分是當調用 pcntl_fork函數時,子進程獲得的返回值是 0,而主進程獲得的是子進程的進程號 pid。
好了,當我們知道誰是子進程后,就可以讓該子進程執行任務了。
那么主進程是如何得知子進程的狀態呢?
使用 pcntl_wait。該函數有兩個參數 $status 和 $options ,$status 是引用類型,用來存儲子進程的狀態,$options 有兩個可選常量WNOHANG| WUNTRACED,分別表示不等待子進程結束立即返回和等待子進程結束。很明顯使用WUNTRACED會阻塞主進程。(也可以使用 pcntl_waitpid 函數獲取特定 pid 子進程狀態)
在多進程中,主進程要做的就是管理每個子進程的狀態,否則子進程很可能無法退出而變成僵尸進程。
關于多進程間的消息通信
這一塊需要涉及具體的業務邏輯,所以只能簡單的提一下。不考慮使用第三方比如 redis 等服務的情況下,PHP原生可以實現就是管道通信和共享內存等方式。實現起來都比較簡單,缺點就是可使用的數據容量有限,只能用簡單文本協議交換數據。
如何手動結束所有進程任務
如果多進程處理不當,很可能導致進程任務卡死,甚至占用過多系統資源,此時只能手動結束進程。
除了一個個的根據進程號來結束,還有一個快速的方法是首先在任務腳本里自定義進程名稱,就是調用cli_set_process_title函數,然后在命令行輸入:ps aux|grep cli_async_worker |grep -v grep|awk {print $2} |xargs kill -9 (里面的 cli_async_worker 就是自定義的進程名稱),這樣就可以快速結束多進程任務了。
以下是完整的任務執行腳本代碼:
可能無法直接使用,需要修改的地方有:
腳本目錄和日志目錄常量
自動加載任務類的方法(默認是加載腳本目錄中以Task結尾的文件)
其他的如:錯誤和日志處理方式和文本格式就隨意吧...
如果命名管道文件設置有錯誤,可能導致進程假死,你可能需要手動刪除進程管道通信的代碼。
多進程的例子:execAsyncTask( multi , [ test = [ a , b , c ], grab = [[ url = http://www.baidu.com , callback = http://localhost ]] ]);。執行情況可以在日志文件中查看。execAsyncTask函數參考【__使用popen打開進程管道__】。
?phperror_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);@ini_set( display_errors , 0);@ini_set( date.timezone , PRC chdir(__DIR__);/* 任務腳本目錄 */defined( TASK_PATH ) or define( TASK_PATH , realpath(__DIR__ . /tasks /* 任務日志目錄 */defined( TASK_LOGS_PATH ) or define( TASK_LOGS_PATH , __DIR__ . /tasks/logs if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);set_exception_handler(function($e) { $time = date( H:i:s , time()); $msg = sprintf( . h3 [%s] %s (%s) /h3 . /n . pre %s /pre , $time, $e- getMessage(), $e- getCode(), $e- getTraceAsString() file_put_contents(TASK_LOGS_PATH . /exception- .date( Ymd ). .log , $msg.PHP_EOL, FILE_APPEND|LOCK_EX);set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date( Y-m-d H:i:s , time()); $msg = EOF[{$errno}]時間:{$datetime}信息:{$errmsg}文件:{$filename}行號:{$line}{$backtrace} file_put_contents(TASK_LOGS_PATH . /error- .date( Ymd ). .log , $msg.PHP_EOL, FILE_APPEND|LOCK_EX);register_shutdown_function(function() { $last_error = error_get_last(); if (in_array($last_error[ type ], array(E_ERROR, E_WARNING, E_USER_ERROR))) { debug_log( End. , true);function debug_log($log, $close=false) { html' target='_blank'>static $fp; if (!$fp) { $fp = fopen(TASK_LOGS_PATH . /debug- .date( Ym ). .log , a+ $log = [ . date( Y-m-d H:i:s ) . ] [Task@ . getmypid() . ] . trim($log) . PHP_EOL; if (flock($fp, LOCK_EX)) { fwrite($fp, $log); fflush($fp); flock($fp, LOCK_UN); } else { if ($close) fclose($fp);function call($job) { if (is_callable($job)) { $ret = call_user_func($job); } elseif (is_array($job) and is_callable(@$job[0])) { $ret = call_user_func_array($job[0], array_slice($job, 1)); } else throw new /Exception( 不是可執行的任務! return $ret;function grab(array $job) { /* 消息數據為json,格式 url : fetch_url , //拉取的鏈接地址 method : request_method , //請求方法 data : post_data , //POST請求數據 args :[], //請求附加參數 headers|user_agent|proxy|timeout callback : callback_url , //回調地址(統一POST帶回應數據) msg_id : message_id //消息ID $url = $job[ url $headers = @$job[ args ][ headers ] ?: []; $_headers = if (is_array($headers)) { foreach ($headers as $_k = $header) { if (!is_numeric($_k)) $header = sprintf( %s: %s , $_k, $header); $_headers .= $header . /r/n $headers = Connection: close/r/n . $_headers; $opts = array( http = array( method = strtoupper(@$job[ method ] ?: get ), content = @$job[ data ] ?: null, header = $headers, user_agent = @$job[ args ][ user_agent ] ?: HTTPGRAB/1.0 (compatible) , proxy = @$job[ args ][ proxy ] ?: null, timeout = intval(@$job[ args ][ timeout ] ?: 120), protocol_version = @$job[ args ][ protocol_version ] ?: 1.1 , max_redirects = 3, ignore_errors = true $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url. -- .strlen($ret)); if ($ret and isset($job[ callback ])) { $postdata = http_build_query(array( msg_id = @$job[ msg_id ] ?: 0, url = @$job[ url ], result = $ret $opts = array( http = array( method = POST , header = Content-type:application/x-www-form-urlencoded . /r/n , content = $postdata, timeout = 30 file_get_contents($job[ callback ], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job[ callback ]. -- .$ret2); return $ret;function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR. * ) as $_file) { if (fileatime($_file) (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; return $ret;function backup($file, $dest) { $zip = new /ZipArchive(); if (!$zip- open($file, /ZipArchive::CREATE)) { return false; _backup_dir($zip, $dest); $zip- close(); return $file;function _backup_dir($zip, $dest, $sub= ) { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip- addFile($dest . $file, $sub . $file); } else { if ($file != . and $file != .. and is_dir($dest . $file)) { //$zip- addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); closedir($dir); return true;
if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; $ret = call($cmd); break; case grab : //抓取網頁 if (is_string($data)) $data = [ url = $data]; if (is_array($data)) $ret = grab($data); else throw new /Exception( 無效的命令參數! break; case clean : //清理緩存文件夾:dirs 需要清理的文件夾列表,expires 過期時間(秒,默認7天) if (isset($data[ dirs ])) { $ret = clean($data[ dirs ], @$data[ expires } else { $ret = clean($data); break; case backup : //備份文件:zip 備份到哪個zip文件,dest 需要備份的文件夾 if (isset($data[ zip ]) and is_dir($data[ dest ])) $ret = backup($data[ zip ], $data[ dest else throw new /Exception( 沒有指定需要備份的文件! break; case require : //加載腳本文件 if (is_file($data)) $ret = require($data); else throw new /Exception( 不是可請求的文件! break; case test : sleep(rand(1, 5)); $ret = ucfirst(strval($data)). .PID: . getmypid(); break; case multi : //多進程處理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . pipe. . posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //開啟進程數據通信管道 throw new Exception( make pipe failed! //$shmid = shmop_open(ftok(__FILE__, h ), c , 0644, 4096); //共享內存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op = $_datas) { $_datas = (array)$_datas; //data 格式為數組表示一個 op 有多個執行數據 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子進程中執行任務 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, w //寫 //stream_set_blocking($pipe, false); $_ret = serialize([ pid = $_pid, op = $_op, args = $_data, result = $_ret]); if (strlen($_ret) 4096) //寫入管道的數據最大4K $_ret = serialize([ pid = $_pid, op = $_op, args = $_data, result = [RESPONSE_TOO_LONG] //debug_log( write pipe: .$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子進程 } elseif ($pid 0) { //主進程中記錄任務 $childs[] = $pid; $results[$pid] = 0; debug_log( fork by child: .$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception( could not fork at . getmygid()); $pipe = fopen($fifo, r+ //讀 stream_set_blocking($pipe, true); //阻塞模式,PID與讀取的管道數據可能會不一致。 $n = 0; while(count($childs) 0) { foreach($childs as $i = $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res 0) { $_ret = @unserialize(fgets($pipe)); //讀取管道數據 $results[$pid] = $_ret; unset($childs[$i]); debug_log( read child: .$pid . - . json_encode($_ret, 64|256)); if ($n 1000) posix_kill($pid, SIGTERM); //超時(10分鐘)結束子進程 usleep(200000); $n++; debug_log( child process completed. @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new /Exception( 沒有可執行的任務! break; $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf( [%s] %s -- (%s) %sms , strtoupper($op), @json_encode($data, 64|256), @strlen($ret) 65?$ret:@strlen($ret), $times); debug_log($log); return $ret;
if (false !== strpos(end($parts), _ )) { array_splice($parts, -1, 1, explode( _ , current($parts))); $filename = implode(DIRECTORY_SEPARATOR, $parts) . .php if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match( /.*Task$/ , $classname)) { //查找以Task結尾的任務腳本類 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . .php } else { return false;}
以上就是php如何使用命令行實現異步多進程模式的任務處理(代碼)的詳細內容,PHP教程
鄭重聲明:本文版權歸原作者所有,轉載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯系我們修改或刪除,多謝。
新聞熱點
疑難解答