- 首頁(yè)|
- 網(wǎng)校|
- 焚題庫(kù)|
- APP |
-
微信公眾號(hào)
當(dāng)包裹被送到一個(gè)地點(diǎn)后,需要分配到不同的地方才算整個(gè)過(guò)程的結(jié)束。那么把這個(gè)過(guò)程放到分布式爬蟲中,Master節(jié)點(diǎn)負(fù)責(zé)了前期搬運(yùn)和分類包裹的工作,剩下的任務(wù)小伙伴們都猜到了,spider_Worker節(jié)點(diǎn)就承擔(dān)了最后的配送任務(wù)。我們也可以換種理解的方法,Worker是一個(gè)搬運(yùn)工人的意思。那么我們今天就spider_Worker節(jié)點(diǎn)在python分布式爬蟲中的使用給大家?guī)?lái)分享。
在將多線程版本改寫成分布式的爬蟲,主要用的可跨平臺(tái)的multiprocessing.managers的BaseManager模塊,這個(gè)模塊的主要功能就是將task_queue和result_queue兩個(gè)隊(duì)列注冊(cè)成函數(shù)暴露到網(wǎng)上去,Master節(jié)點(diǎn)監(jiān)聽端口,讓W(xué)orker子節(jié)點(diǎn)去連接,不同主機(jī)之間就可以通過(guò)注冊(cè)的函數(shù)來(lái)共享同步資源,Master節(jié)點(diǎn)主要負(fù)責(zé)發(fā)送任務(wù)和獲取結(jié)果,Worker就獲取任務(wù)隊(duì)列的任務(wù)開始跑,并將獲取的結(jié)果存儲(chǔ)到數(shù)據(jù)庫(kù)獲取返回回來(lái)。
spider_Worker 節(jié)點(diǎn)主要調(diào)用spider()函數(shù)對(duì)任務(wù)進(jìn)行處理,方法都類似,子節(jié)點(diǎn)每獲取一個(gè)鏈接就傳回Master, 另外需要注意的是Master文件只能運(yùn)行一個(gè),但Worker節(jié)點(diǎn)可以同時(shí)運(yùn)行多個(gè)并行同步處理task任務(wù)隊(duì)列。
spider_Master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#coding:utf-8
from multiprocessing.managers import BaseManager
from Queue import Queue
import time
import argparse
import MySQLdb
import sys
page = 2
word = 'inurl:login.action'
output = 'test.txt'
page = (page+1) * 10
host = '127.0.0.1'
port = 500
urls = []
class Master():
def __init__(self):
self.task_queue = Queue() #server需要先創(chuàng)建兩個(gè)共享隊(duì)列,worker端不需要
self.result_queue = Queue()
def start(self):
BaseManager.register('get_task_queue',callable=lambda:self.task_queue) #在網(wǎng)絡(luò)上注冊(cè)一個(gè)get_task_queue函數(shù),即把兩個(gè)隊(duì)列暴露到網(wǎng)上,worker端不需要callable參數(shù)
BaseManager.register('get_result_queue',callable=lambda:self.result_queue)
manager = BaseManager(address=(host,port),authkey='sir')
manager.start() #master端為start,即開始監(jiān)聽端口,worker端為connect
task = manager.get_task_queue() #master和worker都是從網(wǎng)絡(luò)上獲取task隊(duì)列和result隊(duì)列,不能在創(chuàng)建的兩個(gè)隊(duì)列
result = manager.get_result_queue()
print 'put task'
for i in range(0,page,10):
target = 'https://www.baidu.com/s?wd=%s&pn=%s'%(word,i)
print 'put task %s'%target
task.put(target)
print 'try get result'
while True:
try:
url = result.get(True,5) #獲取數(shù)據(jù)時(shí)需要超時(shí)長(zhǎng)一些
print url
urls.append(url)
except:
break
manager.shutdown()
if __name__ == '__main__':
start = time.time()
server = Master()
server.start()
print '共爬取數(shù)據(jù)%s條'%len(urls)
print time.time()-start
with open(output,'a') as f:
for url in urls:
f.write(url[1]+'\n')
conn = MySQLdb.connect('localhost','root','root','Struct',charset='utf8')
cursor = conn.cursor()
for record in urls:
sql = "insert into s045 values('%s','%s','%s')"%(record[0],record[1],str(record[2]))
cursor.execute(sql)
conn.commit()
conn.close()
spider_Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#coding:utf-8
import re
import Queue
import time
import requests
from multiprocessing.managers import BaseManager
from bs4 import BeautifulSoup as bs
host = '127.0.0.1'
port = 500
class Worder():
def __init__(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0'}
def spider(self,target,result):
urls = []
pn = int(target.split('=')[-1])/10 +1
# print pn
# print target
html = requests.get(target,headers=self.headers)
soup = bs(html.text,"lxml")
res = soup.find_all(name="a", attrs={'class':'c-showurl'})
for r in res:
try:
h = requests.get(r['href'],headers=self.headers,timeout=3)
if h.status_code == 200:
url = h.url
# print url
time.sleep(1)
title = re.findall(r'',h.content)[0]
# print url,title
title = title.decode('utf-8')
print 'send spider url:',url
result.put((pn,url,title))
else:
continue
except:
continue
# return urls
def start(self):
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
print 'Connect to server %s'%host
m = BaseManager(address=(host,port),authkey='sir')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
print 'try get queue'
while True:
try:
target = task.get(True,1)
print 'run pages %s'%target
res = self.spider(target,result)
# print res
except:
break
if __name__ == '__main__':
w = Worder()
w.start()
看完本篇文章,相信大家對(duì)spider_Worker節(jié)點(diǎn)這個(gè)辛勤的搬運(yùn)工有了新的認(rèn)識(shí),畢竟它承擔(dān)了爬蟲任務(wù)最后的配送環(huán)節(jié),大家對(duì)最終的結(jié)果都是比較期待的。
上一篇:scrapy可以獨(dú)立在python分布式爬蟲內(nèi)使用嗎?
下一篇: 沒有了
初級(jí)會(huì)計(jì)職稱中級(jí)會(huì)計(jì)職稱經(jīng)濟(jì)師注冊(cè)會(huì)計(jì)師證券從業(yè)銀行從業(yè)會(huì)計(jì)實(shí)操統(tǒng)計(jì)師審計(jì)師高級(jí)會(huì)計(jì)師基金從業(yè)資格稅務(wù)師資產(chǎn)評(píng)估師國(guó)際內(nèi)審師ACCA/CAT價(jià)格鑒證師統(tǒng)計(jì)資格從業(yè)
一級(jí)建造師二級(jí)建造師消防工程師造價(jià)工程師土建職稱房地產(chǎn)經(jīng)紀(jì)人公路檢測(cè)工程師建筑八大員注冊(cè)建筑師二級(jí)造價(jià)師監(jiān)理工程師咨詢工程師房地產(chǎn)估價(jià)師 城鄉(xiāng)規(guī)劃師結(jié)構(gòu)工程師巖土工程師安全工程師設(shè)備監(jiān)理師環(huán)境影響評(píng)價(jià)土地登記代理公路造價(jià)師公路監(jiān)理師化工工程師暖通工程師給排水工程師計(jì)量工程師
人力資源考試教師資格考試出版專業(yè)資格健康管理師導(dǎo)游考試社會(huì)工作者司法考試職稱計(jì)算機(jī)營(yíng)養(yǎng)師心理咨詢師育嬰師事業(yè)單位教師招聘公務(wù)員公選考試招警考試選調(diào)生村官
執(zhí)業(yè)藥師執(zhí)業(yè)醫(yī)師衛(wèi)生資格考試衛(wèi)生高級(jí)職稱護(hù)士資格證初級(jí)護(hù)師主管護(hù)師住院醫(yī)師臨床執(zhí)業(yè)醫(yī)師臨床助理醫(yī)師中醫(yī)執(zhí)業(yè)醫(yī)師中醫(yī)助理醫(yī)師中西醫(yī)醫(yī)師中西醫(yī)助理口腔執(zhí)業(yè)醫(yī)師口腔助理醫(yī)師公共衛(wèi)生醫(yī)師公衛(wèi)助理醫(yī)師實(shí)踐技能內(nèi)科主治醫(yī)師外科主治醫(yī)師中醫(yī)內(nèi)科主治兒科主治醫(yī)師婦產(chǎn)科醫(yī)師西藥士/師中藥士/師臨床檢驗(yàn)技師臨床醫(yī)學(xué)理論中醫(yī)理論