前段時間我寫過一篇文章,說是時候使用白名單來翻牆了,不過那個白名單已經過期好久,用起來不是那麼順暢了,後來我就誇下海口說:我要自己實現一個爬蟲,來爬取中國的網站域名,好更新白名單。
好吧,總之這個爬蟲是寫好了然後上線爬取了一萬多的,不過最後我找到了前人做的更好的方案,於是這個爬蟲項目還是廢棄了。總之,白名單更強大了,只是沒有使用這個爬蟲而已。
爬蟲是用 Python 寫的,並沒有使用經典的爬蟲框架——因為我覺得我要寫的爬蟲太簡單了沒有必要去使用那麼大的框架,於是自己實現了一個小輪子,一方面也是為了學習 Python 這門語言。好了,就說這麼多,現在來記錄一下整個的開發過程,供你參考?
設計
首先對於爬蟲,總要能獲取頁面對吧,其實爬蟲這個東西,就是下載網站頁面然後獲取內容,提取鏈接,然後根據鏈接繼續下載頁面。那麼,我們的基本目標就出來了:
下載頁面,獲取頁面裡的所有網頁鏈接,丟棄所有內容,繼續獲取鏈接。同時在這個過程中保存下獲取到的鏈接。
由於我有一個國內的vps,那麼邏輯就簡單了很多——能訪問到的域名一定是沒有被牆的,訪問不到的除了跪了的,那隻能是被牆的了:)
(這個邏輯並不完善,事實上後邊會遇到好多問題;接下來我會一點一點完善,這是一個思考的過程。其實再回憶總是有偏差的,但至少有個參考價值對吧? )
對於獲取到的域名列表怎麼管理呢?考慮到將來可能要刷新它們——畢竟可能將來某個域名可能會被牆掉對吧,所以我考慮還是使用數據庫來管理這些域名,參數簡單,一個數據庫,一個表,裡邊一行一個域名即可。
對了,如果爬蟲停止了,我們還希望它啟動的時候不再重新開始,那麼我們就應該能夠保存它現在的執行狀態,再考慮到獲取鏈接容易,挨個執行去下載鏈接的頁面困難(耗時),我們需要一個緩存機制。
或者說隊列吧,這樣更加確切。我建立了一個先入後出的隊列,這是一個數組,當我獲取到頁面,就用正則表達式把裡邊的鏈接全部獲取出來,放到數組尾端。
而蜘蛛爬取的時候,就從數組的頭部獲取,用一個刪除一個,如果數組數量太大(比如說超過一萬條),那麼就先不再添加。
同時,我們爬取了一個頁面,那說明這個域名是可用的,就加入白名單列表——當然,加入之前先看一下是否已經添加,如果添加過,就在流行度上 +1
你看,數據庫對應的條目上,還可以順便做一個簡單的域名火熱程度排名。這樣考慮到將來白名單可能會有幾萬條,但其實一般使用並不會覆蓋到這麼多,我們可以選擇輸出前一萬條之類的。
那麼這樣基本的邏輯就建立起來了,然後我們來設計一下所需要的類——畢竟,我們要面向對象。
沒有什麼是面向對象解決不了的問題,如果有,就再實例化一個類。
首先我們來一個 Spider 類,這個就是我們可愛的小蜘蛛,它要負責所有爬取的功能,比如獲取下一個需要訪問的頁面,從種子開始爬取,從頁面裡讀取鏈接等等。
其次是專門用來負責與數據庫通信的 IO ,它負責封裝一切與數據庫的通信,這樣就會很方便,我們在處理爬蟲的時候就不用煩心數據庫的問題了。
對了,最後要說一下:我用的是 Python 3
實現
大概的設計如此,現在我們來實現具體的類,先來說一說 IO :
IO
這個類負責數據庫的通信,這裡我使用的包是 pymysql ,另外為了再給數據庫添加一個最後更新的時間標籤,我們再使用一個 約會時間 包來獲取插入數據庫時候的時間。
雖然我沒有學過數據庫原理,但這並不妨礙我部署並使用它。
這裡我們使用 MySQL 作為數據庫,創建一個名為 白名單 的數據庫,並創建一個名為 白名單 的表:
1 2 3 4 5 6 7 |
CREATE TABLE WhiteList ( DomainRank int NOT NULL, Domain varchar(255) NOT NULL, LastUpdate date NOT NULL, PRIMARY KEY (Domain) ) |
接下來我們就是實現具體的方法了:
1 2 3 4 5 6 7 8 9 10 11 12 |
def __updateDomain(self,domain): cur = self.conn.cursor() cur.execute('select * from WhiteList where Domain=%s',domain) data = cur.fetchall() if data: rank = data[0][self.__DomainRank] cur.execute('update WhiteList set DomainRank=%s,LastUpdate=%s where domain=%s',(rank + 1,datetime.datetime.now().strftime("%Y%m%d"),domain)) self.conn.commit() else: cur.execute('insert into WhiteList (Domain,DomainRank,LastUpdate) values (%s,%s,%s)',(domain,1,datetime.datetime.now().strftime("%Y%m%d"))) self.conn.commit() cur.close() |
這裡我只給出部分代碼實現,完整的在文章末尾我會給出 Github 倉庫,你可以自己 clone 來閱讀的。
上文是最更新域名條目的方法,首先獲取游標,然後從游標獲取數據,接著根據現有數據更新條目;如果沒有對應的條目,那就直接插入新的條目。最後記得要 承諾 ,然後關閉游標。這裡我用了幾個名字代替了數字:
1 2 3 |
__DomainRank = 0 __Domain = 1 __LastUpdate = 2 |
這樣使用起來就方便了很多:)如你所見,我在名字前面都添加了兩個下劃線,作用是在 Python 裡實現私有變量和方法。其實我覺得這一點挺坑的,這是我遇到的第一個坑。
程序員數數,怎麼可以不從零開始?第零個坑: 自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自,自 。
1 2 |
def saveDomain(self,domain): self.__updateDomain(domain) |
當然了,為了能讓外部添加域名,我還是寫了一個方法暴露在外邊的:) (意義何在?)
蜘蛛
現在,重頭戲來了,我們的小蜘蛛。由於要使用到正則表達式,我用了 Python 包 回覆 ,由於要使用 UTF-8 編碼解碼頁面,我還使用了 編解碼器 ,當然了,要獲取頁面,所以還有 urllib3 。
1 2 |
def __getSeeds(self): return ['www.hao123.com'] |
先從一個最簡單的方法開始,蜘蛛最開始總要先給它第一個種子的,不然它怎麼開始?像我這種廣度優先的需求,那自然從這種聚合網站域名的頁面開始最好了。如有必要,其實還可以填寫更多,不過這裡其實我就寫了一個。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
def __nextPage(self): if len(self.__domainList) == 0: self.__domainList = self.__getSeeds() url = self.__domainList.pop(0) pageContent = self.__getPage(url) topDomain = self.__topDomainRex.findall(url) self.__io.saveDomain(topDomain[0]) self.__gatherDomainFromPage(pageContent) |
這就是我們的主函數了,運行的話,只要循環調用這個“下一頁”方法,爬蟲就可以一直爬下去。按照我們的邏輯,他會先檢測隊列,如果隊列為空,那麼說明是剛啟動,它就會去從種子啟動了。
然後把訪問成功的頁面加入數據庫,然後收集頁面中的鏈接,其他資源就釋放掉了。所以我們的這個小爬蟲也就不需要在意什麼 機器人.TXT ,因為它並不收集頁面信息。
你看到的代碼縮進有些奇怪,是因為我刪除了部分無意義的內容,那些內容會在後邊講到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
def __gatherDomainFromPage(self,page): if len(self.__domainList) > 10000: return try: m = self.__domainRex.findall(page) except: #print('Get wrong data! skip it!') return domainList = [] if m: for domain in m: domainList.append(domain[1]) domainList = self.__deDuplicate(domainList) domainList = self.__checkDomainFromList(domainList) self.__domainList += domainList |
從頁面獲取域名,如果隊列已經大於一萬,那就不再添加了,太多了。然後如果小於一萬,我們就用正則表達式從頁面裡獲取鏈接,,這個正則表達式是這樣的: 自.__domainRex = 回覆.編( - [R“HTTP(小號)?://([\W¯¯ - _]+\.[\W¯¯ - _]+)[\/\*]*“) 這裡會有一個問題,.cn域名是不需要判斷的,它是國別域名,肯定能夠訪問的!
1 2 3 4 5 6 7 8 9 10 11 |
def __checkDomainFromList(self,list): domainList = [] for domain in list: m = self.__cnDomainRex.findall(domain) if m: continue else: domainList.append(domain) return domainList |
所以我們使用另外一個正則來額外剔除列表裡的中國域名,正則表達式是這樣的: 自.__cnDomainRex = 回覆.編( - [R' .CN(/)?$“)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
def __getPage(self,url): http = urllib3.PoolManager( cert_reqs='CERT_REQUIRED', # Force certificate check. ca_certs=certifi.where(), # Path to the Certifi bundle. ) data = '' try: data = http.request('GET', url, timeout=10, headers={ 'User-agent' : 'Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.5) Gecko/20091102 Firefox/3.5.5'} ).data codeType = chardet.detect(data) data = data.decode(codeType['encoding']) except: pass return data |
大部頭來了!它就是我們要獲取頁面的方法,這裡我們會遇到一些ssl頁面,也就是Https,而 urllib3 默認並不支持,我們就需要配合另外一個包: CERTIFI ,同時這裡我們偽裝了訪問頭,讓它看起來更像是一個 Windows 用戶在訪問。
後來我遇到了一個問題,比如某些頁面(jd.com)會出錯,排查後發現京東用的是 GBK 編碼,而我使用的一律是默認的 UTF-8 ,所以我又使用了 chardet的 來推斷頁面編碼,雖然還是會有某些編碼不能正確識別,但總體來說,那些錯誤已經小到可以忽略不計了。
1 2 3 4 5 6 7 8 |
def __deDuplicate(self,list): result = [] for item in list: try: result.index(item) except: result.append(item) return result |
前邊說過,我們的蜘蛛首要目標是獲取盡可能多的不同的域名,那麼也就是所謂的廣度優先了,所以使用了這兩個方法來保證這一點:使用去重來去除列表裡的重複鏈接,避免同一頁面多次被訪問(同一網站的多個不同頁面還是要的)
最後,我們再把保存和讀入緩存隊列的方法寫出來:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
def __cache(self): f = codecs.open('./domainlistCache','w','utf-8') for domian in self.__domainList: f.write(domian + '\n') f.close() def __getLastTimeList(self): try: f = codecs.open('./domainlistCache', 'r','utf-8') for line in f.readlines(): line = line.strip('\n') self.__domainList.append(line) f.close() except: pass |
這兩個方法前者會在類銷毀的時候也就是停止蜘蛛的時候執行,後者則會在初始化也就是啟動的時候執行,這樣可以保證爬蟲的爬行狀態。
其實現在我們的爬蟲就已經可以上線了——事實上這也是我的第一版測試版本。
收尾
這時候我們的項目還沒有完成,因為它還是單線程阻塞的,主循環我還沒有寫,一旦執行,就是卡住完全不動了。所以,我們有必要讓它後台執行。
首先我們先來完成主循環:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def start(self): self.__domainList = self.__getSeeds() # self.__cache() while True: threads = [] self.__cache() for thread in range(20): threads.append(threading.Thread(target=self.__nextPage)) for thread in threads: thread.start() for thread in threads: thread.join() |
一次並發20個線程同時進行爬行。這裡會有個問題,由於 Python 有個全局鎖(具體內容請 Google 下吧,由於過去一段時間,當時的參考頁面已經沒有了),它“保證”了 Python 不可能實現真正的並發,所以,如果你在多核 CPU 上並發 Python,就會這樣:
這就是我遇到的 Python 的第二個坑。
好吧,總之,我們還是需要讓爬蟲變成一個服務,我們現在需要在 主要 裡邊進行操作了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
def daemonize(pidfile, *, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): if os.path.exists(pidfile): raise RuntimeError('Already running') # First fork (detaches from parent) try: if os.fork() > 0: raise SystemExit(0) # Parent exit except OSError as e: raise RuntimeError('fork #1 failed.') # os.chdir('/') os.umask(0) os.setsid() # Second fork (relinquish session leadership) try: if os.fork() > 0: raise SystemExit(0) except OSError as e: raise RuntimeError('fork #2 failed.') # Flush I/O buffers sys.stdout.flush() sys.stderr.flush() # Replace file descriptors for stdin, stdout, and stderr with open(stdin, 'rb', 0) as f: os.dup2(f.fileno(), sys.stdin.fileno()) with open(stdout, 'ab', 0) as f: os.dup2(f.fileno(), sys.stdout.fileno()) with open(stderr, 'ab', 0) as f: os.dup2(f.fileno(), sys.stderr.fileno()) # Write the PID file with open(pidfile, 'w') as f: print(os.getpid(), file=f) # Arrange to have the PID file removed on exit/signal atexit.register(lambda: os.remove(pidfile)) # Signal handler for termination (required) def sigterm_handler(signo, frame): raise SystemExit(1) signal.signal(signal.SIGTERM, sigterm_handler) |
這是一個讓 Python 後台的函數,它的原理是多進程,雖然子進程會在父進程退出後退出,但我們把它的目錄切換到根,然後將它的標準輸出重定向到日誌文件裡,這樣這個進程就會保留下來:)
當然,只有這一個函數還是不夠的:
1 2 3 |
if __name__=='__main__': PIDFILE = '/tmp/GFW-White-Domain-List-daemon.pid' main() |
我們在這裡寫入 pid 的路徑,然後運行 主要 函數:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
def main(): if len(sys.argv) == 1: help() return if sys.argv[1] == 'start': startSpider() elif sys.argv[1] == 'status': status() elif sys.argv[1] == 'stop': stop() elif sys.argv[1] == 'restart': stop() startSpider() elif sys.argv[1] == 'list': outPutList() elif sys.argv[1] == 'help': help() else: print('Unknown command {!r}'.format(sys.argv[1]), file=sys.stderr) |
為了方便操作,我給它加入了若干函數,這樣我們就可以使用比如 ./主要.PY 開始 之類的命令來操作服務了!具體的函數我不再列舉,你可以自行下載完整的代碼去閱讀。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
def startSpider(): print('WhiteList spider started!', file=sys.stderr) try: daemonize(PIDFILE, stdout='/tmp/spider-log.log', stderr='/tmp/spider-err.log') except RuntimeError as e: print(e, file=sys.stderr) raise SystemExit(1) io = IO.IO() spider = Spider.Spider(io) spider.start() |
這裡我給出啟動的函數,很簡單對吧?先調用後台函數,然後正常實例化爬蟲的類即可。這裡我們先實例化IO,然後把IO實例引用傳給Spider,然後調用它的 開始() 方法。
這樣,完整的程序就OK了,當然,接下來我就要說一說後面的修補問題了。
修補
俗話說,在已經發布的生產環境調試bug,就是這樣的:
死循環
首先我遇到的問題就是重複域名被添加太多的情況,有時候會困在一個頁面裡出不來,比如從hao123開始,然後鏈接到某個頁面,結果這個頁面又有一個hao123的友鏈,那麼爬蟲就會爬回去重新再爬一遍hao123!,所以,這裡我們用一個叫做“周知域名”的函數來限制重複的數量,我們說如果一個域名已經重複遇到過超過一萬次(後來改為2000了),那這一定是個大站,就不要繼續訪問了。
1 2 3 4 5 6 |
def __isWellKnown(self,domain): topDomain = self.__topDomainRex.findall(domain) self.__lock.acquire() result = self.__io.getDomainRank(topDomain) self.__lock.release() return result > 2000 |
線程安全
上文我們說了,Python 的並發就是個殘廢,可是它的線程安全問題倒是一點也不殘廢……也就是說,線程安全該做還得做。所以,我們要在每一個 IO 類方法調用的前後都加上線程安全語句,比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
def __nextPage(self): self.__lock.acquire() if len(self.__domainList) == 0: self.__domainList = self.__getSeeds() # print(self.__domainList) url = self.__domainList.pop(0) self.__lock.release() if self.__pingDomain(url): if self.__isWellKnown(url) : return #skip this domain if wellknowen pageContent = self.__getPage(url) topDomain = self.__topDomainRex.findall(url) self.__lock.acquire() self.__io.saveDomain(topDomain[0]) self.__lock.release() self.__gatherDomainFromPage(pageContent) |
其他類似,這樣保證了數據庫訪問不會衝突。
等待問題
我們說了,如果不能訪問就是被牆的標準,可是顯然我們對“不能訪問”這個短語的定義是模糊的,對於爬蟲來說,加載時間超時才是被牆的典型反映,可是每次都要這麼等下去真的是要天荒地老了。所以,我們調用個 ping 來判斷,如果 ping 都不通,那直接拋棄好了。這裡要用到 子 包了;同時,還需要 dnspython3 包來從域名查詢 ip;以及 shlex 包:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
def __pingDomain(self,domain): a = [] isAvailable = True try: a = self.__resolver.query(domain) except: isAvailable = False if len(a) == 0:return False ip = str(a[0]) if self.__chinaIP.isChinaIP(ip): cmd = "ping -c 1 " + ip args = shlex.split(cmd) try: subprocess.check_call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.CalledProcessError: isAvailable = False else: isAvailable = False return isAvailable |
這樣就直接過濾掉了一大堆速度慢、超時的網站,大大加快了爬行速度。另外,後來我還想到,索性就對服務器 IP 的地址進行一下判斷好了,如果不是中國區的 IP,直接返回 假 完事兒,畢竟國外的網站還是掛代理來的更快不是嗎?
所以我又在網上找了一個判斷的類加了進去,具體的就不貼了,畢竟是抄來的。
國別域名
由於我是使用了正則表達式來判斷域名的,這裡就會出現一些問題,比如想要判斷國別域名實在是太困難了, .CC 這類還好說,那 .淨.CC 這類我就崩潰了,而且。嚴格來講.la這類的域名雖然是國別域名,但大家都在用也不能一網打盡……
最後我從 ICANN 官網找到了 CCTLD (即 Country Code Top Level Domain)列表,我稍微處理了一下直接用來匹配了,這樣生成的列表裡就不會有net.la這樣的泛域名存在。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def outPutList(): # count = 10000 cctlds = TLDS.getCCTLDS() tlds = TLDS.getTLDS() io = IO.IO() list = io.getList() f = codecs.open('./whitelist.txt','w','utf-8') print('Output top 10000 domains in whitelist.txt\n', ) i = 0 skip = 0 #its ugly ... but worked! #fk country code top-level domain!!!! for item in list: d = re.findall(r'.\w{2}$',item[1]) if d: if cctlds.__contains__(d[0]): t = re.findall(r'^\w+',item[1]) if tlds.__contains__(t[0]): skip += 1 continue i += 1 f.write(item[1]+'\n') if i == 10000: break print('done! got '+str(i)+' domains.\n and skip '+str(skip)+' error domain.\n') |
結尾
好了,這篇文章終於到了結尾……但我覺得寫的並不是特別的詳細,因為很多當時的具體問題我還是記不清了,很抱歉我一直拖到現在才寫。?
畢竟這是我用 Python 寫的第一個程序,所以代碼實現看起來不那麼優雅,不過還是那句話:
至少跑起來了。
行文我已經盡可能按照當時的開發過程來寫,所以也只是展示了主要的核心代碼,具體的代碼倉庫見 Github上。
另外,我說過,白名單已經不再依靠這個爬蟲更新,轉而使用了felixonmars的的dnsmasq-中國列表 ,不過這也直接導致白名單暴增到兩萬多條,已經不再適合手機端使用。
好吧,這篇文章就到這裡。?
本文由 落格博客 原創撰寫:落格博客 » 用 python 寫一個功能變數名稱白名單爬蟲
轉載請保留出處和原文鏈接:https://www.logcg.com/archives/1697.html
代碼好亂,不過學習了。。
?都是直接貼的,所以真的很亂……總之,github上有完整代碼,然後我也懶得貼更多了……………………嘿嘿。