這個項目實錄系列是記錄MPRoxy項目的整個開發流程。項目最終的目標是開發一套代理服務器的API。這個系列中會記錄項目的需求、設計、驗證、實現、升級等等,包括設計決策的依據,開發過程中的各種坑。希望和大家共同交流,一起進步。
項目的源碼我會同步更新到GitHub,項目地址:https://github.com/mrbcy/Mproxy。
系列地址:
Mproxy項目實錄第1天
Mproxy項目實錄第2天
到目前為止,我們已經有了一個可以爬取快代理網站中代理服務器地址的爬蟲,并且這個爬蟲可以將爬取過程記錄到日志文件中,將爬取到的代理服務器提交到Kafka集群,同時還可以保證運行過程中不崩潰。但是還有一部分可以加強。
首先就是上一篇提到的將爬取過的代理服務器地址存入MongoDB,3天之內已經爬取過的ip就不再提交到Kafka集群了。另一個就是把類似Kafka集群的地址、MongoDB數據庫地址、日志文件的名稱這些內容放到配置文件中來方便今后的修改。
在完成了這些之后,今天我們還將完成驗證器的開發工作。
MongoDB是一個跨平臺的NoSQL,基于Key-Value形式保存數據。其存儲格式非常類似于Python的字典,因此用Python操作MongoDB會非常的容易。
MongoDB的安裝可以參考http://blog.csdn.net/chenpy/article/details/50324989
然后進行了一下技術驗證,可以參考http://blog.csdn.net/mrbcy/article/details/60141158
寫了一個工具類用于更新MongoDB中的代理服務器數據和查詢是否有重復代理服務器。
#-*- coding: utf-8 -*-import loggingimport datetimefrom pymongo import MongoClientclass KuaidailiProxyRecorder: def __init__(self,mongodb_host='localhost',port=27017,record_days=3): self.client = MongoClient(mongodb_host, port) self.db = self.client.mproxy self.collection = self.db.kuaidaili_proxy_records self.record_days = record_days def save_proxy(self,proxy_item): try: record = {} record['ip'] = proxy_item['ip'] record['update_time'] = datetime.datetime.now() self.collection.save(record) except Exception as e: logging.exception("An Error Happens") def find_repeat_proxy(self,ip): try: d = datetime.datetime.now() d = d - datetime.timedelta(days=self.record_days) return self.collection.find_one({'ip':ip,'update_time':{"$gt": d}}) except Exception as e: logging.exception("An Error Happens")代碼很簡單,這里就不再解釋了??匆幌聀ipelines里面是怎么用的。
if self.proxy_recorder.find_repeat_proxy(item['ip']) is None: logging.debug(item['ip'] + ' is not repeat') self.proxy_recorder.save_proxy(item) self.producer.send('unchecked-servers', item.__dict__) # Makes the item could be JSON serializableelse: logging.debug(item['ip'] + ' is repeat, not submit to Kafka cluster')如果在MongoDB中找到了重復的代理服務器,那么久不提交到Kafka集群,否則提交,并且更新代理服務器的信息到MongoDB。
為了后面維護的方便,將MongoDB數據庫地址、Kafka集群地址、日志文件名字這幾個值保存到配置文件中。
因為之前沒有使用過Python的配置文件,同樣進行了技術探索,可以參考http://blog.csdn.net/mrbcy/article/details/60143067
然后寫了一個配置讀取工具。
#-*- coding: utf-8 -*-import ConfigParserclass ConfigLoader: def __init__(self): self.cp = ConfigParser.SafeConfigParser() self.cp.read('kuaidaili_spider.cfg') def get_mongodb_host(self): return self.cp.get('mongodb','host') def get_kafka_bootstrap_servers(self): text = self.cp.get('kafka','bootstrap_servers') return text.split(',') def get_log_file_name(self): return self.cp.get('log','log_file_name')然后把硬編碼的字符串換成了函數調用。
好了,到這里快代理的爬蟲開發就基本告一段落了。接下來我們進入驗證器的開發。
驗證器的作用是驗證爬蟲爬取到的代理服務器是否可用。由于各地網絡環境不同,最好是在不同地理位置的服務器上運行,測試同樣的代理服務器。只有一個代理服務器在所有的驗證器上都可用時,才能認為這個代理服務器可用。
與爬蟲不同,驗證器只需要寫一次代碼,然后在不同的機器上運行即可。但是由于后續的收集器需要,必須配置一個驗證器名稱。這個驗證器名稱準備存入配置文件中。
使用下面的代碼就可以驗證代理服務器是否可用。
#-*- coding: utf-8 -*-import reimport requestsdef valid_proxy(): headers = { "Host": "www.baidu.com", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3", "Accept-Encoding": "gzip, deflate", "Cookie": "CXID=AFB58656EB6137C12D0E4FF12BC6DFFE; SUV=1484628390086037; m=FAB6EC92D3062F7D84CC06636E62F609; ABTEST=0|1486986265|v17; ad=oe45yZllll2Y$gmTlllllVAIWEtlllllJa0oJyllll9lllll9Zlll5@@@@@@@@@@; SUID=B96B30B65412940A00000000586E6482; ld=okllllllll2Y7@v2lllllVA8dw1lllllH0xrAlllll9lllllpZlll5@@@@@@@@@@; YYID=FAB6EC92D3062F7D84CC06636E62F609; SNUID=441BB6B17B7E35AEB86CFBF37CECC35E; usid=Ibgtjb1FmwpmVEd9; IPLOC=CN1101; browerV=8; osV=1", "Connection": "keep-alive", "Upgrade - Insecure - Requests": "1" } res = requests.get('http://www.sogou.com/',proxies = {'http':'120.77.156.50:80'}) regex = """050897""" pattern = re.compile(regex) if re.search(pattern=pattern,string=res.text) is not None: print "proxy is available" else: print "proxy is unavailable"if __name__ == '__main__': valid_proxy()原理是訪問sogou.com,然后在返回的頁面代碼中查找搜狗的備案號。
輸出結果為:
proxy is available因為我們要驗證的代理服務器有很多,驗證一個代理服務器的時間可能很長。因此必須使用多線程來并發的驗證代理服務器。
首先需要設計一個工具類用于保存待驗證的代理服務器列表。這個工具類必須保證一個線程在操作列表時不會受到其他線程的干擾。因此我們需要用到Python的鎖機制。實驗代碼如下所示:
#-*- coding: utf-8 -*-import threadingclass ProxyQueue: def __init__(self): self.lock = threading.Lock() self.proxy_list = [] def add_proxy(self,proxy_item): self.lock.acquire() self.proxy_list.append(proxy_item)test.py
#-*- coding: utf-8 -*-import threadingfrom util.proxyqueue import ProxyQueueclass T1(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): print "T1 add proxy" self.queue.add_proxy(1) print "T1 add proxy end"class T2(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): print "T2 add proxy" self.queue.add_proxy(1) print "T2 add proxy end"if __name__ == '__main__': queue = ProxyQueue() t1 = T1(queue) t2 = T2(queue) t1.start() t2.start()輸出結果為:
T1 add proxyT1 add proxy endT2 add proxy且程序處于等待狀態。這說明我們的鎖機制起到了應有的作用。完整的工具類代碼如下:
#-*- coding: utf-8 -*-import threadingclass ProxyQueue: def __init__(self): self.lock = threading.Lock() self.proxy_list = [] def add_proxy(self,proxy_item): self.lock.acquire() self.proxy_list.append(proxy_item) self.lock.release() def get_proxy(self): proxy_item = None self.lock.acquire() proxy_count = len(self.proxy_list) if proxy_count > 0: proxy_item = self.proxy_list[0] self.lock.release() return proxy_item def pop_proxy(self): proxy_item = None self.lock.acquire() proxy_count = len(self.proxy_list) if proxy_count > 0: proxy_item = self.proxy_list.pop() self.lock.release() return proxy_item def get_proxy_count(self): proxy_count = 0 self.lock.acquire() proxy_count = len(self.proxy_list) self.lock.release() return proxy_count接下來我們寫多線程的驗證程序。代碼如下:
#-*- coding: utf-8 -*-import timefrom util.proxyqueue import ProxyQueuefrom validator import ProxyValidatorif __name__ == '__main__': validator_num = 10 validators = [] queue = ProxyQueue() queue.add_proxy({'ip':'182.254.129.123','port':'80'}) queue.add_proxy({'ip':'101.53.101.172','port':'9999'}) queue.add_proxy({'ip':'106.46.136.204','port':'808'}) queue.add_proxy({'ip':'117.90.1.34','port':'9000'}) queue.add_proxy({'ip':'117.90.6.134','port':'9000'}) queue.add_proxy({'ip':'125.123.76.134','port':'8998'}) queue.add_proxy({'ip':'125.67.75.53','port':'9000'}) queue.add_proxy({'ip':'115.28.169.160','port':'8118'}) queue.add_proxy({'ip':'117.90.1.35','port':'9000'}) queue.add_proxy({'ip':'111.72.126.161','port':'808'}) queue.add_proxy({'ip':'121.232.148.94','port':'9000'}) queue.add_proxy({'ip':'117.90.7.106','port':'9000'}) available_proxies = ProxyQueue() for i in xrange(validator_num): validators.append(ProxyValidator(queue=queue, available_proxies=available_proxies)) validators[i].start() while True: is_finish = True for i in xrange(validator_num): if validators[i].is_finish == False: is_finish = False break if queue.get_proxy_count() == 0 and is_finish == True: break for i in xrange(validator_num): if validators[i].is_finish == True and queue.get_proxy_count() > 0: validators[i] = ProxyValidator(queue=queue,available_proxies = available_proxies) validators[i].start() print "分配一個新的驗證器開始工作" print "當前任務列表長度:" + str(queue.get_proxy_count()) time.sleep(1) print "代理服務器驗證完畢,可用代理服務器數量:" + str(available_proxies.get_proxy_count())即監聽Kafka的unchecked-servers,把spider提交上來的代理服務器加入到待驗證列表中,然后調度器就會逐個的進行驗證。
然后加入配置文件以及驗證器名稱。這部分用到的技術在昨天的博客中已經介紹過了,在此不再贅述。詳細的情況可以參看GitHub上的源碼。
新聞熱點
疑難解答