亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

Mproxy項目實錄第3天

2019-11-06 06:04:18
字體:
來源:轉載
供稿:網友

關于這個系列

這個項目實錄系列是記錄MPRoxy項目的整個開發流程。項目最終的目標是開發一套代理服務器的API。這個系列中會記錄項目的需求、設計、驗證、實現、升級等等,包括設計決策的依據,開發過程中的各種坑。希望和大家共同交流,一起進步。

項目的源碼我會同步更新到GitHub,項目地址:https://github.com/mrbcy/Mproxy。

系列地址:

Mproxy項目實錄第1天

Mproxy項目實錄第2天

今日計劃

到目前為止,我們已經有了一個可以爬取快代理網站中代理服務器地址的爬蟲,并且這個爬蟲可以將爬取過程記錄到日志文件中,將爬取到的代理服務器提交到Kafka集群,同時還可以保證運行過程中不崩潰。但是還有一部分可以加強。

首先就是上一篇提到的將爬取過的代理服務器地址存入MongoDB,3天之內已經爬取過的ip就不再提交到Kafka集群了。另一個就是把類似Kafka集群的地址、MongoDB數據庫地址、日志文件的名稱這些內容放到配置文件中來方便今后的修改。

在完成了這些之后,今天我們還將完成驗證器的開發工作。

將爬取記錄提交到MongoDB

MongoDB是一個跨平臺的NoSQL,基于Key-Value形式保存數據。其存儲格式非常類似于Python的字典,因此用Python操作MongoDB會非常的容易。

MongoDB的安裝可以參考http://blog.csdn.net/chenpy/article/details/50324989

然后進行了一下技術驗證,可以參考http://blog.csdn.net/mrbcy/article/details/60141158

寫了一個工具類用于更新MongoDB中的代理服務器數據和查詢是否有重復代理服務器。

#-*- coding: utf-8 -*-import loggingimport datetimefrom pymongo import MongoClientclass KuaidailiProxyRecorder: def __init__(self,mongodb_host='localhost',port=27017,record_days=3): self.client = MongoClient(mongodb_host, port) self.db = self.client.mproxy self.collection = self.db.kuaidaili_proxy_records self.record_days = record_days def save_proxy(self,proxy_item): try: record = {} record['ip'] = proxy_item['ip'] record['update_time'] = datetime.datetime.now() self.collection.save(record) except Exception as e: logging.exception("An Error Happens") def find_repeat_proxy(self,ip): try: d = datetime.datetime.now() d = d - datetime.timedelta(days=self.record_days) return self.collection.find_one({'ip':ip,'update_time':{"$gt": d}}) except Exception as e: logging.exception("An Error Happens")

代碼很簡單,這里就不再解釋了??匆幌聀ipelines里面是怎么用的。

if self.proxy_recorder.find_repeat_proxy(item['ip']) is None: logging.debug(item['ip'] + ' is not repeat') self.proxy_recorder.save_proxy(item) self.producer.send('unchecked-servers', item.__dict__) # Makes the item could be JSON serializableelse: logging.debug(item['ip'] + ' is repeat, not submit to Kafka cluster')

如果在MongoDB中找到了重復的代理服務器,那么久不提交到Kafka集群,否則提交,并且更新代理服務器的信息到MongoDB。

讀取配置文件

為了后面維護的方便,將MongoDB數據庫地址、Kafka集群地址、日志文件名字這幾個值保存到配置文件中。

因為之前沒有使用過Python的配置文件,同樣進行了技術探索,可以參考http://blog.csdn.net/mrbcy/article/details/60143067

然后寫了一個配置讀取工具。

#-*- coding: utf-8 -*-import ConfigParserclass ConfigLoader: def __init__(self): self.cp = ConfigParser.SafeConfigParser() self.cp.read('kuaidaili_spider.cfg') def get_mongodb_host(self): return self.cp.get('mongodb','host') def get_kafka_bootstrap_servers(self): text = self.cp.get('kafka','bootstrap_servers') return text.split(',') def get_log_file_name(self): return self.cp.get('log','log_file_name')

然后把硬編碼的字符串換成了函數調用。

好了,到這里快代理的爬蟲開發就基本告一段落了。接下來我們進入驗證器的開發。

驗證器開發

目標

驗證器的作用是驗證爬蟲爬取到的代理服務器是否可用。由于各地網絡環境不同,最好是在不同地理位置的服務器上運行,測試同樣的代理服務器。只有一個代理服務器在所有的驗證器上都可用時,才能認為這個代理服務器可用。

與爬蟲不同,驗證器只需要寫一次代碼,然后在不同的機器上運行即可。但是由于后續的收集器需要,必須配置一個驗證器名稱。這個驗證器名稱準備存入配置文件中。

驗證代理服務器

使用下面的代碼就可以驗證代理服務器是否可用。

#-*- coding: utf-8 -*-import reimport requestsdef valid_proxy(): headers = { "Host": "www.baidu.com", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3", "Accept-Encoding": "gzip, deflate", "Cookie": "CXID=AFB58656EB6137C12D0E4FF12BC6DFFE; SUV=1484628390086037; m=FAB6EC92D3062F7D84CC06636E62F609; ABTEST=0|1486986265|v17; ad=oe45yZllll2Y$gmTlllllVAIWEtlllllJa0oJyllll9lllll9Zlll5@@@@@@@@@@; SUID=B96B30B65412940A00000000586E6482; ld=okllllllll2Y7@v2lllllVA8dw1lllllH0xrAlllll9lllllpZlll5@@@@@@@@@@; YYID=FAB6EC92D3062F7D84CC06636E62F609; SNUID=441BB6B17B7E35AEB86CFBF37CECC35E; usid=Ibgtjb1FmwpmVEd9; IPLOC=CN1101; browerV=8; osV=1", "Connection": "keep-alive", "Upgrade - Insecure - Requests": "1" } res = requests.get('http://www.sogou.com/',proxies = {'http':'120.77.156.50:80'}) regex = """050897""" pattern = re.compile(regex) if re.search(pattern=pattern,string=res.text) is not None: print "proxy is available" else: print "proxy is unavailable"if __name__ == '__main__': valid_proxy()

原理是訪問sogou.com,然后在返回的頁面代碼中查找搜狗的備案號。

輸出結果為:

proxy is available

采用多線程方式進行驗證

因為我們要驗證的代理服務器有很多,驗證一個代理服務器的時間可能很長。因此必須使用多線程來并發的驗證代理服務器。

首先需要設計一個工具類用于保存待驗證的代理服務器列表。這個工具類必須保證一個線程在操作列表時不會受到其他線程的干擾。因此我們需要用到Python的鎖機制。實驗代碼如下所示:

#-*- coding: utf-8 -*-import threadingclass ProxyQueue: def __init__(self): self.lock = threading.Lock() self.proxy_list = [] def add_proxy(self,proxy_item): self.lock.acquire() self.proxy_list.append(proxy_item)

test.py

#-*- coding: utf-8 -*-import threadingfrom util.proxyqueue import ProxyQueueclass T1(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): print "T1 add proxy" self.queue.add_proxy(1) print "T1 add proxy end"class T2(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): print "T2 add proxy" self.queue.add_proxy(1) print "T2 add proxy end"if __name__ == '__main__': queue = ProxyQueue() t1 = T1(queue) t2 = T2(queue) t1.start() t2.start()

輸出結果為:

T1 add proxyT1 add proxy endT2 add proxy

且程序處于等待狀態。這說明我們的鎖機制起到了應有的作用。完整的工具類代碼如下:

#-*- coding: utf-8 -*-import threadingclass ProxyQueue: def __init__(self): self.lock = threading.Lock() self.proxy_list = [] def add_proxy(self,proxy_item): self.lock.acquire() self.proxy_list.append(proxy_item) self.lock.release() def get_proxy(self): proxy_item = None self.lock.acquire() proxy_count = len(self.proxy_list) if proxy_count > 0: proxy_item = self.proxy_list[0] self.lock.release() return proxy_item def pop_proxy(self): proxy_item = None self.lock.acquire() proxy_count = len(self.proxy_list) if proxy_count > 0: proxy_item = self.proxy_list.pop() self.lock.release() return proxy_item def get_proxy_count(self): proxy_count = 0 self.lock.acquire() proxy_count = len(self.proxy_list) self.lock.release() return proxy_count

接下來我們寫多線程的驗證程序。代碼如下:

#-*- coding: utf-8 -*-import timefrom util.proxyqueue import ProxyQueuefrom validator import ProxyValidatorif __name__ == '__main__': validator_num = 10 validators = [] queue = ProxyQueue() queue.add_proxy({'ip':'182.254.129.123','port':'80'}) queue.add_proxy({'ip':'101.53.101.172','port':'9999'}) queue.add_proxy({'ip':'106.46.136.204','port':'808'}) queue.add_proxy({'ip':'117.90.1.34','port':'9000'}) queue.add_proxy({'ip':'117.90.6.134','port':'9000'}) queue.add_proxy({'ip':'125.123.76.134','port':'8998'}) queue.add_proxy({'ip':'125.67.75.53','port':'9000'}) queue.add_proxy({'ip':'115.28.169.160','port':'8118'}) queue.add_proxy({'ip':'117.90.1.35','port':'9000'}) queue.add_proxy({'ip':'111.72.126.161','port':'808'}) queue.add_proxy({'ip':'121.232.148.94','port':'9000'}) queue.add_proxy({'ip':'117.90.7.106','port':'9000'}) available_proxies = ProxyQueue() for i in xrange(validator_num): validators.append(ProxyValidator(queue=queue, available_proxies=available_proxies)) validators[i].start() while True: is_finish = True for i in xrange(validator_num): if validators[i].is_finish == False: is_finish = False break if queue.get_proxy_count() == 0 and is_finish == True: break for i in xrange(validator_num): if validators[i].is_finish == True and queue.get_proxy_count() > 0: validators[i] = ProxyValidator(queue=queue,available_proxies = available_proxies) validators[i].start() print "分配一個新的驗證器開始工作" print "當前任務列表長度:" + str(queue.get_proxy_count()) time.sleep(1) print "代理服務器驗證完畢,可用代理服務器數量:" + str(available_proxies.get_proxy_count())

引入Kafka

#-*- coding: utf-8 -*-import ctypesimport inspectimport jsonimport threadingfrom kafka import KafkaConsumerdef _async_raise(tid, exctype): '''Raises an exception in the threads with id tid''' if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # "if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) raise SystemError("PyThreadState_SetAsyncExc failed")class KafkaProxyListener(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def _get_my_tid(self): """determines this (self's) thread id CAREFUL : this function is executed in the context of the caller thread, to get the identity of the thread represented by this instance. """ if not self.isAlive(): raise threading.ThreadError("the thread is not active") # do we have it cached? if hasattr(self, "_thread_id"): return self._thread_id # no, look for it in the _active dict for tid, tobj in threading._active.items(): if tobj is self: self._thread_id = tid return tid # TODO: in python 2.6, there's a simpler way to do : self.ident raise AssertionError("could not determine the thread's id") def raiseExc(self, exctype): """Raises the given exception type in the context of this thread. If the thread is busy in a system call (time.sleep(), socket.accept(), ...), the exception is simply ignored. If you are sure that your exception should terminate the thread, one way to ensure that it works is: t = ThreadWithExc( ... ) ... t.raiseExc( SomeException ) while t.isAlive(): time.sleep( 0.1 ) t.raiseExc( SomeException ) If the exception is to be caught by the thread, you need a way to check that your thread has caught it. CAREFUL : this function is executed in the context of the caller thread, to raise an excpetion in the context of the thread represented by this instance. """ _async_raise(self._get_my_tid(), exctype) def run(self): consumer = KafkaConsumer('unchecked-servers', group_id='test-grou1p', bootstrap_servers=['amaster:9092','anode1:9092','anode2:9092'], auto_offset_reset='earliest', enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('utf-8'))) for message in consumer: v = message.value['_values'] self.queue.add_proxy(v)

即監聽Kafka的unchecked-servers,把spider提交上來的代理服務器加入到待驗證列表中,然后調度器就會逐個的進行驗證。

然后加入配置文件以及驗證器名稱。這部分用到的技術在昨天的博客中已經介紹過了,在此不再贅述。詳細的情況可以參看GitHub上的源碼。


上一篇:指針的使用

下一篇:細菌增殖

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产在线拍偷自揄拍精品| 欧美日韩高清区| 国产精品视频公开费视频| 色香阁99久久精品久久久| 日韩最新在线视频| 国产精品www色诱视频| 久久精品2019中文字幕| 91精品国产综合久久久久久蜜臀| 国产免费观看久久黄| 深夜福利亚洲导航| 欧美丰满少妇xxxxx| 色妞在线综合亚洲欧美| 欧美色播在线播放| 亚洲精品欧美一区二区三区| 日本电影亚洲天堂| 国产一区视频在线| 久久久免费高清电视剧观看| 欧美激情亚洲激情| 国产美女主播一区| 欧美洲成人男女午夜视频| 日韩欧美成人网| 国产这里只有精品| 日韩av最新在线观看| 精品露脸国产偷人在视频| 91成人国产在线观看| 国产免费一区视频观看免费| 69**夜色精品国产69乱| 久久中文精品视频| 日韩精品在线看| 日韩在线视频线视频免费网站| 国产成人精品视| 国产精品日韩专区| 日韩欧美大尺度| 日本一区二区三区在线播放| 亚洲第一中文字幕在线观看| 2019日本中文字幕| 亚洲伦理中文字幕| 91av视频在线观看| 亚洲黄色成人网| 亚洲国产另类 国产精品国产免费| 亚洲在线一区二区| 欧美资源在线观看| 91av视频在线| 懂色av影视一区二区三区| 欧美性高跟鞋xxxxhd| 福利视频导航一区| 久久亚洲一区二区三区四区五区高| 亚洲大尺度美女在线| y97精品国产97久久久久久| 欧美极度另类性三渗透| 欧美激情国产精品| 亚洲成人免费在线视频| 日韩成人在线电影网| 欧美另类交人妖| 国产999在线观看| 欧美精品免费播放| 国产精品一区二区av影院萌芽| 国产精品九九久久久久久久| 久久在线免费视频| 久久久久久久国产精品视频| 成人xxxx视频| 九九久久国产精品| 成人免费xxxxx在线观看| 欧美视频裸体精品| 日韩欧美在线视频| 久久777国产线看观看精品| 国产久一一精品| 亚洲性av在线| 国产精品视频不卡| 在线视频亚洲欧美| 91中文字幕在线| 亚洲精品久久久久久久久久久| 97视频色精品| 亚洲综合成人婷婷小说| 91精品视频在线| 在线视频欧美性高潮| 欧美色另类天堂2015| 久久精品人人爽| 久久91精品国产| 久久久精品电影| 日韩视频永久免费观看| 久精品免费视频| 久久久久久久国产精品视频| 精品国产一区二区三区在线观看| 精品日本美女福利在线观看| 久久久久久国产三级电影| 国产视频在线一区二区| 欧美性猛交xxxx免费看| 欧美精品一本久久男人的天堂| 欧美wwwwww| 国产日韩欧美影视| 久久综合五月天| 伊人成人开心激情综合网| 色综合久综合久久综合久鬼88| 久久99精品久久久久久琪琪| 国产精品久久一区主播| 欧美精品一区在线播放| 97香蕉超级碰碰久久免费软件| 97在线视频一区| 亚洲香蕉成人av网站在线观看| 久久69精品久久久久久久电影好| 亚洲a在线播放| 中文精品99久久国产香蕉| 91在线直播亚洲| 456国产精品| 亚洲国产小视频在线观看| 欧美日韩一二三四五区| 亚洲天堂av电影| 国产精品日韩精品| 2019国产精品自在线拍国产不卡| 青青精品视频播放| 亚洲欧美国产制服动漫| 久久久久久九九九| 中文字幕日韩精品在线| 88国产精品欧美一区二区三区| 亚洲伊人久久大香线蕉av| www.亚洲天堂| 亚洲视频在线视频| 亚洲一区二区免费| 欧美精品在线极品| 欧美国产一区二区三区| 欧美精品videossex性护士| 91亚洲国产精品| 国产精品久久99久久| 欧美激情欧美狂野欧美精品| 成人午夜一级二级三级| 日本欧美在线视频| 国产精品第10页| 成人免费淫片视频软件| 久久亚洲春色中文字幕| 亚洲国产福利在线| 欧美激情第1页| 亚洲精品自拍第一页| 日韩欧美在线看| 欧美性猛交xxxx久久久| 色噜噜久久综合伊人一本| 成人免费大片黄在线播放| 国产成人avxxxxx在线看| 亚洲综合社区网| 成人动漫网站在线观看| 亚洲美女视频网| 亚洲欧美在线免费观看| 欧美久久精品午夜青青大伊人| 性欧美办公室18xxxxhd| 久久躁日日躁aaaaxxxx| 亚洲天堂一区二区三区| 亚洲自拍偷拍视频| 精品国产户外野外| 亚洲免费电影在线观看| 欧美激情精品久久久久久黑人| 日韩精品久久久久久福利| 92福利视频午夜1000合集在线观看| 亚洲女成人图区| 福利一区视频在线观看| 亚洲欧美激情精品一区二区| 欧美午夜视频一区二区| 成人免费网站在线看| 国产精品久久久久久影视| 欧美综合第一页| 国产成人精品a视频一区www| 国产精品日韩欧美大师| 91精品国产91久久久久久| 国产一区二区三区精品久久久| 国产视频精品免费播放|