亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > PHP > 正文

php如何使用命令行實現異步多進程模式的任務處理(代碼)

2020-03-22 18:18:18
字體:
來源:轉載
供稿:網友
本篇文章給大家帶來的內容是關于php如何使用命令行實現異步多進程模式的任務處理(代碼),有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。

用PHP來實現異步任務一直是個難題,現有的解決方案中:PHP知名的異步框架有 swoole 和 Workerman,但都是無法在 web 環境中直接使用的,即便強行搭建 web 環境,異步調用也是使用多進程模式實現的。但有時真的不需要用啟動服務的方式,讓服務端一直等待客戶端消息,何況中間還不能改動服務端代碼。本文就介紹一下不使用任何框架和第三方庫的情況下,在 CLI 環境中如何實現多進程以及在web環境中的異步調用。

在 web 環境的異步調用

常用的方式有兩種

1. 使用 socket 連接

這種方式就是典型的C/S架構,需要有服務端支持。

// 1. 創建socket套接字$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);// 2. 進行socket連接socket_connect($socket, 127.0.0.1 , 3939 //socket_set_nonblock($socket); // 以非阻塞模式運行,由于在客戶端不實用,所以這里不考慮// 3. 向服務端發送請求socket_write($socket, $request, strlen($request));// 4. 接受服務端的回應消息(忽略非阻塞的情況,如果服務端不是提供異步服務,那這一步可以省略)$recv = socket_read($socket, 2048);// 5. 關閉socket連接socket_close($socket);

2. 使用 popen 打開進程管道

這種方式是使用操作系統命令,由操作系統直接執行。

本文討論的異步調用就是使用這種方式。

$sf = /path/to/cli_async_task.php //要執行的腳本文件$op = call //腳本文件接收的參數1$data = base64_encode(serialize([ TestTask , arg1 , arg2 ])); //腳本文件接收的參數2pclose(popen( php $sf --op $op --data $data , r //打開之后接著就關閉進程管道,讓該進程以守護模式運行echo PHP_EOL. 異步任務已執行。 .PHP_EOL;

這種方式的優點就是:一步解決,當前進程不需要任何開銷。
缺點也很明顯:無法跟蹤任務腳本的運行狀態。
所以重頭戲會是在執行任務的腳本文件上,下面就介紹任務處理和多進程的實現方式。

在 CLI 環境的多進程任務處理

注意:多進程模式僅支持Linux,不支持Windows?。?/p>

這里會從0開始(未使用任何框架和類庫)介紹每一個步驟,最后會附帶一份完整的代碼。

1. 創建腳本

任何腳本不可忽視的地方就是錯誤處理。所以寫一個任務處理腳本首先就是寫錯誤處理方式。

在PHP中就是調用 set_exception_handler set_error_handler register_shutdown_function 這三個函數,然后寫上自定義的處理方法。

接著是定義自動加載函數 spl_autoload_register 免去每使用一個新類都要 require / include 的煩惱。

定義日志操作方法。

定義任務處理方法。

讀取來自命令行的參數,開始執行任務。

2. 多進程處理

PHP 創建多進程是使用 pcntl_fork 函數,該函數會 fork 一份當前進程(影分身術),于是就有了兩個進程,當前進程是主進程(本體),fork 出的進程是子進程(影分身)。需要注意的是兩個進程代碼環境是一樣的,兩個進程都是執行到了 pcntl_fork 函數位置。區別就是 getmypid 獲得的進程號不一樣,最重要的區分是當調用 pcntl_fork函數時,子進程獲得的返回值是 0,而主進程獲得的是子進程的進程號 pid。

好了,當我們知道誰是子進程后,就可以讓該子進程執行任務了。

那么主進程是如何得知子進程的狀態呢?
使用 pcntl_wait。該函數有兩個參數 $status 和 $options ,$status 是引用類型,用來存儲子進程的狀態,$options 有兩個可選常量WNOHANG| WUNTRACED,分別表示不等待子進程結束立即返回和等待子進程結束。很明顯使用WUNTRACED會阻塞主進程。(也可以使用 pcntl_waitpid 函數獲取特定 pid 子進程狀態)

在多進程中,主進程要做的就是管理每個子進程的狀態,否則子進程很可能無法退出而變成僵尸進程。

關于多進程間的消息通信
這一塊需要涉及具體的業務邏輯,所以只能簡單的提一下。不考慮使用第三方比如 redis 等服務的情況下,PHP原生可以實現就是管道通信和共享內存等方式。實現起來都比較簡單,缺點就是可使用的數據容量有限,只能用簡單文本協議交換數據。

如何手動結束所有進程任務

如果多進程處理不當,很可能導致進程任務卡死,甚至占用過多系統資源,此時只能手動結束進程。
除了一個個的根據進程號來結束,還有一個快速的方法是首先在任務腳本里自定義進程名稱,就是調用cli_set_process_title函數,然后在命令行輸入:ps aux|grep cli_async_worker |grep -v grep|awk {print $2} |xargs kill -9 (里面的 cli_async_worker 就是自定義的進程名稱),這樣就可以快速結束多進程任務了。

以下是完整的任務執行腳本代碼:

可能無法直接使用,需要修改的地方有:

腳本目錄和日志目錄常量

自動加載任務類的方法(默認是加載腳本目錄中以Task結尾的文件)

其他的如:錯誤和日志處理方式和文本格式就隨意吧...

如果命名管道文件設置有錯誤,可能導致進程假死,你可能需要手動刪除進程管道通信的代碼。

多進程的例子:execAsyncTask( multi , [ test = [ a , b , c ], grab = [[ url = http://www.baidu.com , callback = http://localhost ]] ]);。執行情況可以在日志文件中查看。execAsyncTask函數參考【__使用popen打開進程管道__】。

 ?phperror_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);@ini_set( display_errors , 0);@ini_set( date.timezone , PRC chdir(__DIR__);/* 任務腳本目錄 */defined( TASK_PATH ) or define( TASK_PATH , realpath(__DIR__ . /tasks /* 任務日志目錄 */defined( TASK_LOGS_PATH ) or define( TASK_LOGS_PATH , __DIR__ . /tasks/logs if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);set_exception_handler(function($e) { $time = date( H:i:s , time()); $msg = sprintf( . h3 [%s] %s (%s) /h3 . /n . pre %s /pre , $time, $e- getMessage(), $e- getCode(), $e- getTraceAsString() file_put_contents(TASK_LOGS_PATH . /exception- .date( Ymd ). .log , $msg.PHP_EOL, FILE_APPEND|LOCK_EX);set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date( Y-m-d H:i:s , time()); $msg = EOF[{$errno}]時間:{$datetime}信息:{$errmsg}文件:{$filename}行號:{$line}{$backtrace} file_put_contents(TASK_LOGS_PATH . /error- .date( Ymd ). .log , $msg.PHP_EOL, FILE_APPEND|LOCK_EX);register_shutdown_function(function() { $last_error = error_get_last(); if (in_array($last_error[ type ], array(E_ERROR, E_WARNING, E_USER_ERROR))) { debug_log( End. , true);function debug_log($log, $close=false) { html' target='_blank'>static $fp; if (!$fp) { $fp = fopen(TASK_LOGS_PATH . /debug- .date( Ym ). .log , a+  $log = [ . date( Y-m-d H:i:s ) . ] [Task@ . getmypid() . ] . trim($log) . PHP_EOL; if (flock($fp, LOCK_EX)) { fwrite($fp, $log); fflush($fp); flock($fp, LOCK_UN); } else { if ($close) fclose($fp);function call($job) { if (is_callable($job)) { $ret = call_user_func($job); } elseif (is_array($job) and is_callable(@$job[0])) { $ret = call_user_func_array($job[0], array_slice($job, 1)); } else throw new /Exception( 不是可執行的任務!  return $ret;function grab(array $job) { /* 消息數據為json,格式 url : fetch_url , //拉取的鏈接地址 method : request_method , //請求方法 data : post_data , //POST請求數據 args :[], //請求附加參數 headers|user_agent|proxy|timeout callback : callback_url , //回調地址(統一POST帶回應數據) msg_id : message_id //消息ID $url = $job[ url  $headers = @$job[ args ][ headers ] ?: []; $_headers =  if (is_array($headers)) { foreach ($headers as $_k = $header) { if (!is_numeric($_k))  $header = sprintf( %s: %s , $_k, $header); $_headers .= $header . /r/n  $headers = Connection: close/r/n . $_headers; $opts = array( http = array( method = strtoupper(@$job[ method ] ?: get ), content = @$job[ data ] ?: null, header = $headers, user_agent = @$job[ args ][ user_agent ] ?: HTTPGRAB/1.0 (compatible) , proxy = @$job[ args ][ proxy ] ?: null, timeout = intval(@$job[ args ][ timeout ] ?: 120), protocol_version = @$job[ args ][ protocol_version ] ?: 1.1 , max_redirects = 3, ignore_errors = true $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url. -- .strlen($ret)); if ($ret and isset($job[ callback ])) { $postdata = http_build_query(array( msg_id = @$job[ msg_id ] ?: 0, url = @$job[ url ], result = $ret $opts = array( http = array( method = POST , header = Content-type:application/x-www-form-urlencoded . /r/n , content = $postdata, timeout = 30 file_get_contents($job[ callback ], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job[ callback ]. -- .$ret2); return $ret;function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR. * ) as $_file) { if (fileatime($_file) (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; return $ret;function backup($file, $dest) { $zip = new /ZipArchive(); if (!$zip- open($file, /ZipArchive::CREATE)) { return false; _backup_dir($zip, $dest); $zip- close(); return $file;function _backup_dir($zip, $dest, $sub= ) { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip- addFile($dest . $file, $sub . $file); } else { if ($file != . and $file != .. and is_dir($dest . $file)) { //$zip- addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); closedir($dir); return true;
if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; $ret = call($cmd); break; case grab : //抓取網頁 if (is_string($data)) $data = [ url = $data]; if (is_array($data)) $ret = grab($data); else throw new /Exception( 無效的命令參數! break; case clean : //清理緩存文件夾:dirs 需要清理的文件夾列表,expires 過期時間(秒,默認7天) if (isset($data[ dirs ])) { $ret = clean($data[ dirs ], @$data[ expires } else { $ret = clean($data); break; case backup : //備份文件:zip 備份到哪個zip文件,dest 需要備份的文件夾 if (isset($data[ zip ]) and is_dir($data[ dest ])) $ret = backup($data[ zip ], $data[ dest else throw new /Exception( 沒有指定需要備份的文件! break; case require : //加載腳本文件 if (is_file($data)) $ret = require($data); else throw new /Exception( 不是可請求的文件! break; case test : sleep(rand(1, 5)); $ret = ucfirst(strval($data)). .PID: . getmypid(); break; case multi : //多進程處理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . pipe. . posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //開啟進程數據通信管道 throw new Exception( make pipe failed! //$shmid = shmop_open(ftok(__FILE__, h ), c , 0644, 4096); //共享內存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op = $_datas) { $_datas = (array)$_datas; //data 格式為數組表示一個 op 有多個執行數據 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子進程中執行任務 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, w //寫 //stream_set_blocking($pipe, false); $_ret = serialize([ pid = $_pid, op = $_op, args = $_data, result = $_ret]); if (strlen($_ret) 4096) //寫入管道的數據最大4K $_ret = serialize([ pid = $_pid, op = $_op, args = $_data, result = [RESPONSE_TOO_LONG] //debug_log( write pipe: .$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子進程 } elseif ($pid 0) { //主進程中記錄任務 $childs[] = $pid; $results[$pid] = 0; debug_log( fork by child: .$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception( could not fork at . getmygid()); $pipe = fopen($fifo, r+ //讀 stream_set_blocking($pipe, true); //阻塞模式,PID與讀取的管道數據可能會不一致。 $n = 0; while(count($childs) 0) { foreach($childs as $i = $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res 0) { $_ret = @unserialize(fgets($pipe)); //讀取管道數據 $results[$pid] = $_ret; unset($childs[$i]); debug_log( read child: .$pid . - . json_encode($_ret, 64|256)); if ($n 1000) posix_kill($pid, SIGTERM); //超時(10分鐘)結束子進程 usleep(200000); $n++; debug_log( child process completed. @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new /Exception( 沒有可執行的任務! break; $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf( [%s] %s -- (%s) %sms , strtoupper($op), @json_encode($data, 64|256), @strlen($ret) 65?$ret:@strlen($ret), $times); debug_log($log); return $ret;
if (false !== strpos(end($parts), _ )) { array_splice($parts, -1, 1, explode( _ , current($parts))); $filename = implode(DIRECTORY_SEPARATOR, $parts) . .php if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match( /.*Task$/ , $classname)) { //查找以Task結尾的任務腳本類 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . .php } else { return false;}


以上就是php如何使用命令行實現異步多進程模式的任務處理(代碼)的詳細內容,PHP教程

鄭重聲明:本文版權歸原作者所有,轉載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯系我們修改或刪除,多謝。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
久久亚洲一区二区三区四区五区高| 欧美高清视频免费观看| 亚洲精品美女久久久久| 国产成人97精品免费看片| 国产精品99久久99久久久二8| 中国china体内裑精亚洲片| 在线观看国产精品淫| 欧美激情在线狂野欧美精品| 久久69精品久久久久久久电影好| 久久伊人精品天天| 久久精品久久久久久国产 免费| 在线一区二区日韩| 欧美亚洲另类视频| 亚洲国产免费av| 国产精品久久久久久久9999| 国产精品扒开腿爽爽爽视频| 精品香蕉在线观看视频一| 欧洲成人午夜免费大片| 亚洲美女又黄又爽在线观看| 午夜精品久久久久久99热软件| 国产精品流白浆视频| 九色91av视频| 91国产美女在线观看| 欧美午夜丰满在线18影院| 日韩大片在线观看视频| 岛国av一区二区| 亚洲精品v天堂中文字幕| 亚洲iv一区二区三区| 国产一区红桃视频| 国产精品久久色| 国产精品久久久久国产a级| 日韩亚洲精品视频| 欧美日韩免费网站| 亚洲精品视频在线观看视频| 国产ts一区二区| 91视频免费网站| 岛国精品视频在线播放| 日日噜噜噜夜夜爽亚洲精品| 精品久久久久久亚洲国产300| 亚洲成人激情视频| 在线观看国产精品淫| 亚洲三级黄色在线观看| 亚洲色图日韩av| 国产成人在线精品| 欧美亚洲第一区| 欧美精品一区在线播放| 亚洲在线视频观看| 成人免费视频xnxx.com| 亚洲精品国精品久久99热| 亚洲va男人天堂| 精品久久久久久| 欧美综合激情网| 亚洲一区二区日本| 91情侣偷在线精品国产| 91亚洲精华国产精华| 国产成人小视频在线观看| 高清欧美一区二区三区| 精品视频久久久| 国产精品美女午夜av| 羞羞色国产精品| 成人黄色av网站| 精品国偷自产在线视频| 自拍偷拍免费精品| 国产成人免费av电影| 成人精品福利视频| 欧美日韩在线免费| 亚洲视频自拍偷拍| 96sao精品视频在线观看| 亚洲免费视频一区二区| 欧美精品成人91久久久久久久| 91精品视频一区| 久久中文字幕在线| 最近的2019中文字幕免费一页| 国产精品美女主播| 国产一区玩具在线观看| 亚洲天堂免费视频| 欧美一级电影在线| 91久久国产精品| 国产精品jizz在线观看麻豆| 久久91精品国产| 国产成人精品久久二区二区| 中文在线不卡视频| 日韩在线精品一区| 国产啪精品视频网站| 欧美丰满少妇xxxxx| 日韩亚洲欧美中文在线| 777国产偷窥盗摄精品视频| 欧洲精品毛片网站| 欧美孕妇毛茸茸xxxx| 中文字幕日韩有码| 国产精品久久久久77777| 国产精品一二区| 国产精品亚发布| 久久精品一区中文字幕| 欧美激情精品久久久| 亚洲国产精品热久久| 国产精品色视频| 日韩专区中文字幕| 久久久久久久久久久人体| 欧美丰满老妇厨房牲生活| 精品福利一区二区| 亚洲欧美日韩中文在线制服| 国产成人精品综合| 欧美日韩国产色视频| 欧美美女操人视频| 国产精品第一第二| 国产亚洲视频中文字幕视频| 成人中文字幕+乱码+中文字幕| 日本精品久久中文字幕佐佐木| 蜜月aⅴ免费一区二区三区| 中文字幕久热精品在线视频| 91在线无精精品一区二区| 日韩美女av在线| 亚洲免费视频一区二区| 成人有码在线播放| 日韩欧美一区二区在线| 亚洲成人久久一区| 欧美精品做受xxx性少妇| 亚洲一区二区三区在线视频| 久久97久久97精品免视看| 亚洲免费一级电影| 日本一区二三区好的精华液| 久久久久久成人精品| 91久久久国产精品| 国产99久久精品一区二区 夜夜躁日日躁| 久久久久久com| 国产精品久久久久久一区二区| 国产精品69久久| 久久久久久国产免费| 亚洲成av人片在线观看香蕉| 亚洲精品成人久久| 91在线视频一区| 亚洲a级在线播放观看| 久久久久免费视频| 黄色一区二区三区| 久久久欧美一区二区| 亚洲国产精品免费| 欧美在线欧美在线| 色系列之999| 国产精品久久久久久久久免费| 亚洲va欧美va国产综合剧情| 黄网站色欧美视频| 精品亚洲va在线va天堂资源站| 免费不卡欧美自拍视频| 欧美成人一区二区三区电影| 色老头一区二区三区| 欧美成人中文字幕| 国自产精品手机在线观看视频| 国产精品久久久久久网站| 91国产精品电影| 亚洲欧美在线一区| 国产主播欧美精品| 国产精品丝袜久久久久久不卡| 亚洲精品99久久久久| 欧美丝袜一区二区| 亚洲精品久久久久中文字幕二区| 亚洲日本欧美中文幕| 亚洲码在线观看| 91免费看片网站| 国产欧美在线看| 亚洲精品在线视频| 欧美—级高清免费播放| 亚洲影视中文字幕| 亚洲国产成人精品久久久国产成人一区|