亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > Python > 正文

Python實現大文件排序的方法

2020-01-04 18:05:50
字體:
來源:轉載
供稿:網友

這篇文章主要介紹了Python大文件排序的方法,涉及Python針對文件、緩存及日期等操作的相關技巧,具有一定參考借鑒價值,需要的朋友可以參考下

本文實例講述了Python實現大文件排序的方法。分享給大家供大家參考。具體實現方法如下:

 

 
  1. import gzip 
  2. import os 
  3. from multiprocessing import Process, Queue, Pipe, current_process, freeze_support 
  4. from datetime import datetime 
  5. def sort_worker(input,output): 
  6. while True: 
  7. lines = input.get().splitlines() 
  8. element_set = {} 
  9. for line in lines: 
  10. if line.strip() == 'STOP'
  11. return 
  12. try
  13. element = line.split(' ')[0] 
  14. if not element_set.get(element): element_set[element] = '' 
  15. except: 
  16. pass 
  17. sorted_element = sorted(element_set) 
  18. #print sorted_element 
  19. output.put('/n'.join(sorted_element)) 
  20. def write_worker(input, pre): 
  21. os.system('mkdir %s'%pre) 
  22. i = 0 
  23. while True: 
  24. content = input.get() 
  25. if content.strip() == 'STOP'
  26. return 
  27. write_sorted_bulk(content, '%s/%s'%(pre, i)) 
  28. i += 1 
  29. def write_sorted_bulk(content, filename): 
  30. f = file(filename, 'w'
  31. f.write(content) 
  32. f.close() 
  33. def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4): 
  34. t = datetime.now() 
  35. pre, ext = os.path.splitext(filename) 
  36. if ext == '.gz'
  37. file_file = gzip.open(filename, 'rb'
  38. else
  39. file_file = open(filename) 
  40. bulk_queue = Queue(10) 
  41. sorted_queue = Queue(10) 
  42. NUM_SORT = num_sort 
  43. sort_worker_pool = [] 
  44. for i in range(NUM_SORT): 
  45. sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) ) 
  46. sort_worker_pool[i].start() 
  47. NUM_WRITE = 1 
  48. write_worker_pool = [] 
  49. for i in range(NUM_WRITE): 
  50. write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) ) 
  51. write_worker_pool[i].start() 
  52. buf = file_file.read(buf_size) 
  53. sorted_count = 0 
  54. while len(buf): 
  55. end_line = buf.rfind('/n'
  56. #print buf[:end_line+1] 
  57. bulk_queue.put(buf[:end_line+1]) 
  58. sorted_count += 1 
  59. if end_line != -1: 
  60. buf = buf[end_line+1:] + file_file.read(buf_size) 
  61. else
  62. buf = file_file.read(buf_size) 
  63. for i in range(NUM_SORT): 
  64. bulk_queue.put('STOP'
  65. for i in range(NUM_SORT): 
  66. sort_worker_pool[i].join() 
  67.  
  68. for i in range(NUM_WRITE): 
  69. sorted_queue.put('STOP'
  70. for i in range(NUM_WRITE): 
  71. write_worker_pool[i].join() 
  72. print 'elasped ', datetime.now() - t 
  73. return sorted_count 
  74. from heapq import heappush, heappop 
  75. from datetime import datetime 
  76. from multiprocessing import Process, Queue, Pipe, current_process, freeze_support 
  77. import os 
  78. class file_heap: 
  79. def __init__(self, dir, idx = 0, count = 1): 
  80. files = os.listdir(dir) 
  81. self.heap = [] 
  82. self.files = {} 
  83. self.bulks = {} 
  84. self.pre_element = None 
  85. for i in range(len(files)): 
  86. file = files[i] 
  87. if hash(file) % count != idx: continue 
  88. input = open(os.path.join(dir, file)) 
  89. self.files[i] = input 
  90. self.bulks[i] = '' 
  91. heappush(self.heap, (self.get_next_element_buffered(i), i)) 
  92. def get_next_element_buffered(self, i): 
  93. if len(self.bulks[i]) < 256: 
  94. if self.files[i] is not None: 
  95. buf = self.files[i].read(65536) 
  96. if buf: 
  97. self.bulks[i] += buf 
  98. else
  99. self.files[i].close() 
  100. self.files[i] = None 
  101. end_line = self.bulks[i].find('/n'
  102. if end_line == -1: 
  103. end_line = len(self.bulks[i]) 
  104. element = self.bulks[i][:end_line] 
  105. self.bulks[i] = self.bulks[i][end_line+1:] 
  106. return element 
  107. def poppush_uniq(self): 
  108. while True: 
  109. element = self.poppush() 
  110. if element is None: 
  111. return None 
  112. if element != self.pre_element: 
  113. self.pre_element = element 
  114. return element 
  115. def poppush(self): 
  116. try
  117. element, index = heappop(self.heap) 
  118. except IndexError: 
  119. return None 
  120. new_element = self.get_next_element_buffered(index) 
  121. if new_element: 
  122. heappush(self.heap, (new_element, index)) 
  123. return element 
  124. def heappoppush(dir, queue, idx = 0, count = 1): 
  125. heap = file_heap(dir, idx, count) 
  126. while True: 
  127. d = heap.poppush_uniq() 
  128. queue.put(d) 
  129. if d is None: return 
  130. def heappoppush2(dir, queue, count = 1): 
  131. heap = [] 
  132. procs = [] 
  133. queues = [] 
  134. pre_element = None 
  135. for i in range(count): 
  136. q = Queue(1024) 
  137. q_buf = queue_buffer(q) 
  138. queues.append(q_buf) 
  139. p = Process(target=heappoppush, args=(dir, q_buf, i, count)) 
  140. procs.append(p) 
  141. p.start() 
  142. queues = tuple(queues) 
  143. for i in range(count): 
  144. heappush(heap, (queues[i].get(), i)) 
  145. while True: 
  146. try
  147. d, i= heappop(heap) 
  148. except IndexError: 
  149. queue.put(None) 
  150. for p in procs: 
  151. p.join() 
  152. return 
  153. else
  154. if d is not None: 
  155. heappush(heap,(queues[i].get(), i)) 
  156. if d != pre_element: 
  157. pre_element = d 
  158. queue.put(d) 
  159. def merge_file(dir): 
  160. heap = file_heap( dir ) 
  161. os.system('rm -f '+dir+'.merge'
  162. fmerge = open(dir+'.merge''a'
  163. element = heap.poppush_uniq() 
  164. fmerge.write(element+'/n'
  165. while element is not None: 
  166. element = heap.poppush_uniq() 
  167. fmerge.write(element+'/n'
  168. class queue_buffer: 
  169. def __init__(self, queue): 
  170. self.q = queue 
  171. self.rbuf = [] 
  172. self.wbuf = [] 
  173. def get(self): 
  174. if len(self.rbuf) == 0: 
  175. self.rbuf = self.q.get() 
  176. r = self.rbuf[0] 
  177. del self.rbuf[0] 
  178. return r 
  179. def put(self, d): 
  180. self.wbuf.append(d) 
  181. if d is None or len(self.wbuf) > 1024: 
  182. self.q.put(self.wbuf) 
  183. self.wbuf = [] 
  184. def diff_file(file_old, file_new, file_diff, buf = 268435456): 
  185. print 'buffer size', buf 
  186. from file_split import split_sort_file 
  187. os.system('rm -rf '+ os.path.splitext(file_old)[0] ) 
  188. os.system('rm -rf '+ os.path.splitext(file_new)[0] ) 
  189. t = datetime.now() 
  190. split_sort_file(file_old,5,buf) 
  191. split_sort_file(file_new,5,buf) 
  192. print 'split elasped ', datetime.now() - t 
  193. os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0]) 
  194. os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0]) 
  195. os.system('rm -f '+file_diff) 
  196. t = datetime.now() 
  197. zdiff = open(file_diff, 'a'
  198. old_q = Queue(1024) 
  199. new_q = Queue(1024) 
  200. old_queue = queue_buffer(old_q) 
  201. new_queue = queue_buffer(new_q) 
  202. h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3)) 
  203. h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3)) 
  204. h1.start(), h2.start() 
  205. old = old_queue.get() 
  206. new = new_queue.get() 
  207. old_count, new_count = 0, 0 
  208. while old is not None or new is not None: 
  209. if old > new or old is None: 
  210. zdiff.write('< '+new+'/n'
  211. new = new_queue.get() 
  212. new_count +=1 
  213. elif old < new or new is None: 
  214. zdiff.write('> '+old+'/n'
  215. old = old_queue.get() 
  216. old_count +=1 
  217. else
  218. old = old_queue.get() 
  219. new = new_queue.get() 
  220. print 'new_count:', new_count 
  221. print 'old_count:', old_count 
  222. print 'diff elasped ', datetime.now() - t 
  223. h1.join(), h2.join() 

希望本文所述對大家的Python程序設計有所幫助。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
91国内精品久久| 国产在线不卡精品| 最近2019年手机中文字幕| 国产精品久久久久久久久久久久久| 久久久中精品2020中文| 国产免费一区视频观看免费| 亚洲区免费影片| 久久亚洲精品一区| 欧美精品18videosex性欧美| 性夜试看影院91社区| 国产午夜精品久久久| 国内精品模特av私拍在线观看| 国产精品久久久久久网站| 欧美另类xxx| 欧美韩日一区二区| 国产精品成人观看视频国产奇米| 亚洲r级在线观看| 欧美成人免费播放| 国产精品久久久久久久久久免费| 亚洲欧美日韩精品久久亚洲区| 欧美大片免费观看在线观看网站推荐| 亚洲一区中文字幕在线观看| 国产深夜精品福利| 在线视频亚洲欧美| 精品丝袜一区二区三区| 亚洲香蕉成视频在线观看| 国产成人精彩在线视频九色| 亚洲男人av电影| 日韩电影在线观看永久视频免费网站| 亚洲老司机av| 国产一区红桃视频| 亚洲最大福利网站| 日韩精品在线播放| 国产成人精品午夜| 亚洲精品福利在线观看| 91在线中文字幕| 久精品免费视频| 久久天堂电影网| 亚洲视频视频在线| 亚洲欧美中文日韩在线| 一区二区福利视频| 亚洲iv一区二区三区| 中文字幕精品—区二区| 欧美激情精品久久久久久大尺度| 日韩成人免费视频| 国产精品视频免费观看www| 欧美夜福利tv在线| 国产精品丝袜白浆摸在线| 中文字幕日韩电影| 欧美激情在线狂野欧美精品| 色中色综合影院手机版在线观看| 色爱精品视频一区| 欧美午夜片在线免费观看| 日韩av日韩在线观看| 欧美激情第6页| 欧美日韩国产页| 亚洲精品久久久久国产| 91免费看片在线| 国产精品国语对白| 538国产精品视频一区二区| 国产精品va在线| 成人午夜激情免费视频| 欧美一区二区三区免费视| 国产视频亚洲视频| 国产精品亚洲综合天堂夜夜| 日本高清久久天堂| 亚洲一区二区三区在线免费观看| 精品国产成人在线| 日韩a**中文字幕| 亚洲无线码在线一区观看| 日韩禁在线播放| 国内精品国产三级国产在线专| 日韩av大片在线| 欧美日韩性生活视频| 91老司机在线| 国产一区二区丝袜高跟鞋图片| 日日狠狠久久偷偷四色综合免费| 久久久久久一区二区三区| 中文字幕国产日韩| 久热在线中文字幕色999舞| 欧美激情第一页xxx| 国产精品91一区| 亚洲一区中文字幕在线观看| 亚洲成人久久网| 性欧美亚洲xxxx乳在线观看| 亚洲一区二区久久久久久| 亚洲天天在线日亚洲洲精| 97久久久免费福利网址| 中文字幕日韩免费视频| 日韩免费高清在线观看| 亚洲一区久久久| 亚洲一区二区三区毛片| 中文.日本.精品| 国产精品视频26uuu| 97精品国产91久久久久久| 久久精品亚洲94久久精品| 日韩日本欧美亚洲| 7777精品视频| 国产91露脸中文字幕在线| 国产精品麻豆va在线播放| 日韩中文字幕不卡视频| 亚洲精品在线不卡| 久久成人18免费网站| 国产欧美精品久久久| 欧美床上激情在线观看| 国产精品久久久| 91精品国产91久久久久久最新| 欧美日韩美女视频| 午夜精品一区二区三区av| 欧美日韩第一视频| 日韩精品免费在线播放| 国产91色在线| 日韩精品亚洲元码| 亚洲精品在线视频| 欧美又大粗又爽又黄大片视频| 国产福利视频一区| 国产精品视频中文字幕91| 亚洲国产精品视频在线观看| 亚洲男人第一av网站| 国产成人涩涩涩视频在线观看| 国产亚洲激情在线| 久久久精品视频在线观看| 国产精品美女无圣光视频| 91青草视频久久| 亚洲摸下面视频| 97视频在线观看免费高清完整版在线观看| 亚洲一区二区三区久久| 国产精品入口福利| 一区二区三区回区在观看免费视频| 亚洲美女性生活视频| 欧美高清自拍一区| 狠狠躁18三区二区一区| 国产精品美腿一区在线看| 日韩av在线一区| 日韩av色综合| 国产免费久久av| 久久久噜久噜久久综合| 热re91久久精品国99热蜜臀| 日韩暖暖在线视频| 精品偷拍一区二区三区在线看| 国产成人福利网站| 亚洲三级免费看| 最近2019年手机中文字幕| 国产精品99蜜臀久久不卡二区| 国产精品白嫩美女在线观看| 97碰在线观看| 欧美成人性色生活仑片| 亚洲成人a级网| 日韩精品免费看| 久久夜色撩人精品| 精品视频中文字幕| 欧美超级免费视 在线| 在线日韩日本国产亚洲| 欧美特黄级在线| 欧美高清videos高潮hd| 欧美日韩裸体免费视频| 久久av红桃一区二区小说| 精品国产成人在线| 日韩欧美精品网址| 欧美激情视频免费观看| 国产精品欧美日韩一区二区| 日本精品va在线观看| 精品国产网站地址| 国产视频999|