亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > Python > 正文

Python 多進程

2019-11-06 07:49:46
字體:
來源:轉載
供稿:網友
multiprocessing

python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiPRocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

1. Process

創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。

屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止后自動終止,且自己不能產生新進程,必須在start()之前設置。

import multiprocessingimport timedef worker(interval):    n = 5    while n > 0:        print("The time is {0}".format(time.ctime()))        time.sleep(interval)        n -= 1if __name__ == "__main__":    p = multiprocessing.Process(target = worker, args = (3,))    p.start()    print "p.pid:", p.pid    print "p.name:", p.name    print "p.is_alive:", p.is_alive()
p.pid: 4976p.name: Process-1p.is_alive: TrueThe time is Sat Mar 04 09:40:02 2017The time is Sat Mar 04 09:40:05 2017The time is Sat Mar 04 09:40:08 2017The time is Sat Mar 04 09:40:11 2017The time is Sat Mar 04 09:40:14 2017

創建函數并將其作為多個進程

import multiprocessingimport timedef worker_1(interval):    print "worker_1"    time.sleep(interval)    print "end worker_1"def worker_2(interval):    print "worker_2"    time.sleep(interval)    print "end worker_2"def worker_3(interval):    print "worker_3"    time.sleep(interval)    print "end worker_3"if __name__ == "__main__":    p1 = multiprocessing.Process(target = worker_1, args = (2,))    p2 = multiprocessing.Process(target = worker_2, args = (3,))    p3 = multiprocessing.Process(target = worker_3, args = (4,))    p1.start()    p2.start()    p3.start()    print("The number of CPU is:" + str(multiprocessing.cpu_count()))    for p in multiprocessing.active_children():        print("child   p.name:" + p.name + "/tp.id" + str(p.pid))    print "END!!!!!!!!!!!!!!!!!"
The number of CPU is:4child   p.name:Process-3        p.id10676child   p.name:Process-1        p.id4628child   p.name:Process-2        p.id8028END!!!!!!!!!!!!!!!!!worker_1worker_2worker_3end worker_1end worker_2end worker_3

將進程定義為類

復制代碼
import multiprocessingimport timeclass ClockProcess(multiprocessing.Process):    def __init__(self, interval):        multiprocessing.Process.__init__(self)        self.interval = interval    def run(self):        n = 5        while n > 0:            print("the time is {0}".format(time.ctime()))            time.sleep(self.interval)            n -= 1if __name__ == '__main__':    p = ClockProcess(3)    p.start()      復制代碼

注:進程p調用start()時,自動調用run()

the time is Sat Mar 04 09:44:17 2017the time is Sat Mar 04 09:44:20 2017the time is Sat Mar 04 09:44:23 2017the time is Sat Mar 04 09:44:26 2017the time is Sat Mar 04 09:44:29 2017daemon程序對比結果

不加daemon屬性

import multiprocessingimport timedef worker(interval):    print("work start:{0}".format(time.ctime()));    time.sleep(interval)    print("work end:{0}".format(time.ctime()));if __name__ == "__main__":    p = multiprocessing.Process(target = worker, args = (3,))    p.start()    p注:因子進程設置了daemon屬性,主進程結束,它們就隨著結束了。rint "end!"
end!work start:Sat Mar 04 09:46:20 2017work end:Sat Mar 04 09:46:23 2017

加上daemon屬性

import multiprocessingimport timedef worker(interval):    print("work start:{0}".format(time.ctime()));    time.sleep(interval)    print("work end:{0}".format(time.ctime()));if __name__ == "__main__":    p = multiprocessing.Process(target = worker, args = (3,))    p.daemon = True    p.start()    print "end!"
end!注:因子進程設置了daemon屬性,主進程結束,它們就隨著結束了。

設置daemon執行完結束的方法

import multiprocessingimport timedef worker(interval):    print("work start:{0}".format(time.ctime()));    time.sleep(interval)    print("work end:{0}".format(time.ctime()));if __name__ == "__main__":    p = multiprocessing.Process(target = worker, args = (3,))    p.daemon = True    p.start()    p.join()    print "end!"
work start:Sat Mar 04 09:48:20 2017work end:Sat Mar 04 09:48:23 2017end!

2. Lock

當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。

import multiprocessingimport sysdef worker_with(lock, f):    with lock:        fs = open(f, 'a+')        n = 10        while n > 1:            fs.write("Lockd acquired via with/n")            n -= 1        fs.close()        def worker_no_with(lock, f):    lock.acquire()    try:        fs = open(f, 'a+')        n = 10        while n > 1:            fs.write("Lock acquired directly/n")            n -= 1        fs.close()    finally:        lock.release()    if __name__ == "__main__":    lock = multiprocessing.Lock()    f = "file.txt"    w = multiprocessing.Process(target = worker_with, args=(lock, f))    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))    w.start()    nw.start()    print "end"
Lockd acquired via withLockd acquired via withLockd acquired via withLockd acquired via withLockd acquired via withLockd acquired via withLockd acquired via withLockd acquired via withLockd acquired via withLock acquired directlyLock acquired directlyLock acquired directlyLock acquired directlyLock acquired directlyLock acquired directlyLock acquired directlyLock acquired directlyLock acquired directly

3. Semaphore

Semaphore用來控制對共享資源的訪問數量,例如池的最大連接數。

import multiprocessingimport timedef worker(s, i):    s.acquire()    print(multiprocessing.current_process().name + "acquire");    time.sleep(i)    print(multiprocessing.current_process().name + "release/n");    s.release()if __name__ == "__main__":    s = multiprocessing.Semaphore(2)    for i in range(5):        p = multiprocessing.Process(target = worker, args=(s, i*2))        p.start()

Process-1acquirePProcess-1releaserocess-3acquireProcess-4acquireProcess-3releaseProcess-5acquireProcess-4releaseProcess-2acquireProcess-2releaseProcess-5release

4. Event

Event用來實現進程間同步通信。

import multiprocessingimport timedef wait_for_event(e):    print("wait_for_event: starting")    e.wait()    print("wairt_for_event: e.is_set()->" + str(e.is_set()))def wait_for_event_timeout(e, t):    print("wait_for_event_timeout:starting")    e.wait(t)    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))if __name__ == "__main__":    e = multiprocessing.Event()    w1 = multiprocessing.Process(name = "block",            target = wait_for_event,            args = (e,))    w2 = multiprocessing.Process(name = "non-block",            target = wait_for_event_timeout,            args = (e, 2))    w1.start()    w2.start()    time.sleep(3)    e.set()    print("main: event is set")
wait_for_event: startingwait_for_event_timeout:startingwait_for_event_timeout:e.is_set->Falsewmairt_for_event: e.is_set()->Trueain: event is set

5. Queue

Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。Queue的一段示例代碼:

import multiprocessingdef writer_proc(q):          try:                 q.put(1, block = False)     except:                 pass   def reader_proc(q):          try:                 print q.get(block = False)     except:                 passif __name__ == "__main__":    q = multiprocessing.Queue()    writer = multiprocessing.Process(target=writer_proc, args=(q,))      writer.start()       reader = multiprocessing.Process(target=reader_proc, args=(q,))      reader.start()      reader.join()      writer.join()

6. Pipe

Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。 send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出EOFError。

import multiprocessingimport timedef proc1(pipe):    while True:        for i in xrange(10000):            print "send: %s" %(i)            pipe.send(i)            time.sleep(1)def proc2(pipe):    while True:        print "proc2 rev:", pipe.recv()        time.sleep(1)def proc3(pipe):    while True:        print "PROC3 rev:", pipe.recv()        time.sleep(1)if __name__ == "__main__":    pipe = multiprocessing.Pipe()    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))    p1.start()    p2.start()    #p3.start()    p1.join()    p2.join()    #p3.join()

7. Pool

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

例7.1:使用進程池(非阻塞)

#coding: utf-8import multiprocessingimport timedef func(msg):    print "msg:", msg    time.sleep(3)    print "end"if __name__ == "__main__":    pool = multiprocessing.Pool(processes = 3)    for i in xrange(4):        msg = "hello %d" %(i)        pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"    pool.close()    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束    print "Sub-process(es) done."

一次執行結果

12345678910mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0 msg: hello 1msg: hello 2endmsg: hello 3endendendSub-process(es) done.

函數解釋:

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)close()    關閉pool,使其不在接受新的任務。terminate()    結束工作進程,不在處理未完成的任務。join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。

執行說明:創建一個進程池pool,并設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。

例7.2:使用進程池(阻塞)

#coding: utf-8import multiprocessingimport timedef func(msg):    print "msg:", msg    time.sleep(3)    print "end"if __name__ == "__main__":    pool = multiprocessing.Pool(processes = 3)    for i in xrange(4):        msg = "hello %d" %(i)        pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"    pool.close()    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束    print "Sub-process(es) done."

一次執行的結果

12345678910msg: hello 0endmsg: hello 1endmsg: hello 2endmsg: hello 3endMark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~Sub-process(es) done.

  

例7.3:使用進程池,并關注結果

復制代碼
import multiprocessingimport timedef func(msg):    print "msg:", msg    time.sleep(3)    print "end"    return "done" + msgif __name__ == "__main__":    pool = multiprocessing.Pool(processes=4)    result = []    for i in xrange(3):        msg = "hello %d" %(i)        result.append(pool.apply_async(func, (msg, )))    pool.close()    pool.join()    for res in result:        print ":::", res.get()    print "Sub-process(es) done."復制代碼

一次執行結果

12345678910msg: hello 0msg: hello 1msg: hello 2endendend::: donehello 0::: donehello 1::: donehello 2Sub-process(es) done.

 

例7.4:使用多個進程池

復制代碼
#coding: utf-8import multiprocessingimport os, time, randomdef Lee():    print "/nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當前的進程的ID    start = time.time()    time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數    end = time.time()    print 'Task Lee, runs %0.2f seconds.' %(end - start)def Marlon():    print "/nRun task Marlon-%s" %(os.getpid())    start = time.time()    time.sleep(random.random() * 40)    end=time.time()    print 'Task Marlon runs %0.2f seconds.' %(end - start)def Allen():    print "/nRun task Allen-%s" %(os.getpid())    start = time.time()    time.sleep(random.random() * 30)    end = time.time()    print 'Task Allen runs %0.2f seconds.' %(end - start)def Frank():    print "/nRun task Frank-%s" %(os.getpid())    start = time.time()    time.sleep(random.random() * 20)    end = time.time()    print 'Task Frank runs %0.2f seconds.' %(end - start)        if __name__=='__main__':    function_list=  [Lee, Marlon, Allen, Frank]     print "parent process %s" %(os.getpid())    pool=multiprocessing.Pool(4)    for func in function_list:        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個進程執行完畢后,會添加一個新的進程到pool中    print 'Waiting for all subprocesses done...'    pool.close()    pool.join()    #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行后不會有新的進程加入到pool,join函數等待素有子進程結束    print 'All subprocesses done.'復制代碼

一次執行結果

123456789101112131415parent process 7704 Waiting for all subprocesses done...Run task Lee-6948 Run task Marlon-2896 Run task Allen-7304 Run task Frank-3052Task Lee, runs 1.59 seconds.Task Marlon runs 8.48 seconds.Task Frank runs 15.68 seconds.Task Allen runs 18.08 seconds.All subprocesses done.


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲欧美制服另类日韩| 91大神福利视频在线| 色一区av在线| 亚洲缚视频在线观看| 欧美猛交免费看| 秋霞av国产精品一区| 亚洲免费视频一区二区| 欧美激情精品久久久久| 爽爽爽爽爽爽爽成人免费观看| 91经典在线视频| 国产精品九九九| 欧美日韩中文在线观看| 欧美成人中文字幕在线| 久久激情视频免费观看| 国产成人精品av在线| 中文字幕日本欧美| 欧美专区中文字幕| 国产精品老牛影院在线观看| 欧美电影免费观看高清完整| 免费99精品国产自在在线| 国产精品久久久久久久久久久不卡| 亚洲成人999| 欧美亚洲第一区| 精品性高朝久久久久久久| 91在线无精精品一区二区| 亚洲久久久久久久久久久| 亚洲人永久免费| 中文字幕亚洲一区| 欧美激情一区二区三级高清视频| 深夜成人在线观看| 国产午夜一区二区| 国产视频自拍一区| 久久99视频精品| 国产亚洲精品久久| 丁香五六月婷婷久久激情| 欧美日韩在线视频一区二区| 国产精品视频一区二区三区四| 国产激情久久久久| 亚洲黄页网在线观看| 欧美国产乱视频| 国产做受69高潮| 中文字幕不卡av| 日韩成人免费视频| 欧美日韩免费看| 亚洲欧美国产一本综合首页| 亚洲精品自拍偷拍| 亚洲最新av在线网站| 欧美人交a欧美精品| 国产精品一区专区欧美日韩| 日韩欧美在线播放| 国内精品美女av在线播放| 日韩av电影中文字幕| 日韩成人网免费视频| 中文字幕日韩av电影| 国产一区二区丝袜高跟鞋图片| 亚洲成人网久久久| 日韩最新免费不卡| 亚洲精品第一国产综合精品| 国产91ⅴ在线精品免费观看| 亚洲精品中文字幕av| 这里精品视频免费| 亚洲精品久久久久久久久| 欧美一区二区三区……| 国产精品wwwwww| 国产精品欧美日韩| 欧美激情第6页| 亚洲欧美一区二区三区四区| 亚洲国产成人久久综合一区| 日韩欧亚中文在线| 欧美精品久久久久久久免费观看| 日本一区二三区好的精华液| 在线观看欧美www| 国产情人节一区| 亚洲一区二区三区成人在线视频精品| 欧美日韩免费观看中文| 日韩欧美精品在线观看| 亚洲精品在线不卡| 欧美成年人在线观看| 日韩中文在线中文网三级| 91系列在线播放| 欧美视频国产精品| 日韩精品在线看| 亚洲成年人在线播放| 亚洲综合中文字幕在线| 欧美乱大交xxxxx| 在线观看精品自拍私拍| 性色av一区二区三区红粉影视| 亚洲老头老太hd| 欧美华人在线视频| 国产精品视频男人的天堂| 国产在线日韩在线| 亚洲精品国产免费| 久久精品视频导航| 亚洲天堂男人天堂| 国产va免费精品高清在线| 欧美视频裸体精品| 国产日韩欧美中文| 国产一级揄自揄精品视频| 亚洲女人被黑人巨大进入al| 欧美性色19p| 91在线精品视频| 国产日韩在线观看av| xvideos亚洲| 国产精自产拍久久久久久| 久久久久久中文字幕| 日韩亚洲欧美中文在线| 97在线精品国自产拍中文| 国产精品久久久久久亚洲影视| 一区二区三区国产在线观看| 精品国产区一区二区三区在线观看| 色综合天天狠天天透天天伊人| 日本中文字幕不卡免费| 亚洲欧洲在线免费| 国产美女精彩久久| 欧美一级免费视频| 91精品久久久久久久久| 少妇高潮 亚洲精品| 成人疯狂猛交xxx| 久久精品国产清自在天天线| 久久久久久久久中文字幕| 国产精品稀缺呦系列在线| 91中文字幕在线观看| 久久久噜噜噜久久中文字免| 久久人人爽人人爽人人片av高清| 欧美激情伊人电影| 亚洲国产精久久久久久| 亚洲精品网址在线观看| 久久精品国产96久久久香蕉| 亚洲国产精品小视频| 国产精品69精品一区二区三区| 亚洲国产日韩一区| 久久青草福利网站| 亚洲电影免费在线观看| 欧美又大又粗又长| 免费av在线一区| 日本高清视频一区| 国产精品视频最多的网站| 亚洲的天堂在线中文字幕| 91影视免费在线观看| 色综合天天狠天天透天天伊人| 欧美专区中文字幕| 国产99久久精品一区二区| 久久97精品久久久久久久不卡| 国产日韩换脸av一区在线观看| 久久精品国产91精品亚洲| 国产精品成人一区二区三区吃奶| 欧洲日韩成人av| 国产成人97精品免费看片| 国产香蕉97碰碰久久人人| 亚洲国产精品久久久久| 日韩高清欧美高清| 国产精品一区二区女厕厕| 国产精品久久久久aaaa九色| 国产一区二区三区精品久久久| 最近2019中文字幕在线高清| 精品久久久免费| 久久国产精品视频| 久久精品久久精品亚洲人| 富二代精品短视频| 国产精品综合不卡av| 欧美激情在线有限公司| 国内外成人免费激情在线视频网站| 国产裸体写真av一区二区| 77777少妇光屁股久久一区|