Nutch 1.3 學習筆記 5-1 FetchThread
-----------------------------------
上一節看了Fetcher中主要幾個類的實作,這一節會來分析一下其中用到的消費者FetcherThread,來看看它是幹嘛的。
1. Fetcher的Mapp模型
Fetcher.java代碼中可以看到,Fetcher繼承自MapRunable,它是Mapper的抽象接口,實作這個接口的子類能夠更好的對Map的流程進行控制,包括多線程與異步Maper。
1.1 Fetcher的入口函數fetch(Path segment,int threads, boolean parsing)
下面是它的源代碼,來分析一下:
// 對配置進行檢測,看一些必要的配置是否已經配置了,如http.agent.name等參數
checkConfiguration();
// 記錄fetch的開始時間
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: starting at " + sdf.format(start));
LOG.info("Fetcher: segment: " + segment);
}
// 這裡對抓取的時候進行限制,在FetchItemQueue中會用到這個參數
// set the actual time for the timelimit relative
// to the beginning of the whole job and not of a specific task
// otherwise it keeps trying again if a task fails
long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
if (timelimit != -1) {
timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
LOG.info("Fetcher Timelimit set for : " + timelimit);
getConf().setLong("fetcher.timelimit", timelimit);
}
// 生成一個Nutch的Map-Reduce配置
JobConf job = new NutchJob(getConf());
job.setJobName("fetch " + segment);
// 配置抓取線程數,
job.setInt("fetcher.threads.fetch", threads);
job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
// 配置是否對抓取的内容進行解析
job.setBoolean("fetcher.parse", parsing);
// for politeness, don't permit parallel execution of a single task
job.setSpeculativeExecution(false);
// 配置輸出的路徑名
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
// 配置輸入的檔案格式,這裡類繼承自SequenceFileInputFormat
// 它主要是覆寫了其getSplits方法,其作用是不對檔案進行切分,以檔案數量作為splits的依據
// 就是有幾個檔案,就有幾個Map操作
job.setInputFormat(InputFormat.class);
// 配置Map操作的類
job.setMapRunnerClass(Fetcher.class);
// 配置輸出路徑
FileOutputFormat.setOutputPath(job, segment);
// 這裡配置輸出檔案方法,這個類在前面已經分析過
job.setOutputFormat(FetcherOutputFormat.class);
// 配置輸出<key,value>類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
JobClient.runJob(job);
1.2 Fetcher的run方法分析
這個是Map類的入口,用于啟動抓取的生産者與消費者,下面是部分源代碼:
// 生成生産者,用于讀取Generate出來的CrawlDatum,把它們放到共享隊列中
feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
//feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
// the value of the time limit is either -1 or the time where it should finish
long timelimit = getConf().getLong("fetcher.timelimit", -1);
if (timelimit != -1) feeder.setTimeLimit(timelimit);
feeder.start();
// set non-blocking & no-robots mode for HTTP protocol plugins.
getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
// 啟動消費者線程
for (int i = 0; i < threadCount; i++) { // spawn threads
new FetcherThread(getConf()).start();
}
// select a timeout that avoids a task timeout
long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
// 這裡用一個循環來等待線程結束
do { // wait for threads to exit
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
// 這個函數是得到相前線程的抓取狀态,如抓取了多少網頁,多少網頁抓取失敗,抓取速度是多少
reportStatus();
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+ ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
// 輸出抓取隊列中的資訊
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
// 檢視timelimit的值,這裡隻要傳回的hitByTimeLimit不為0,checkTimelimit方法會清空抓取隊列中的所有資料
// check timelimit
if (!feeder.isAlive()) {
int hitByTimeLimit = fetchQueues.checkTimelimit();
if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",
"hitByTimeLimit", hitByTimeLimit);
}
// 檢視抓取抓取線程是否逾時,如果逾時,就退出等待
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
LOG.warn("Aborting with "+activeThreads+" hung threads.");
}
return;
}
} while (activeThreads.get() > 0);
LOG.info("-activeThreads=" + activeThreads);
2. Fetcher.FetcherThread
2.1 這個類主要是用來從隊列中得到FetchItem,下面來看一下其run方法,其大概做了幾件事:
- 從抓取隊列中得到一個FetchItem,如果傳回為null,判斷生産者是否還活着或者隊列中是否還有資料, 如果隊列中還有資料,那就等待,如果上面條件沒有滿足,就認為所有FetchItem都已經處理完了,退出目前抓取線程
- 得到FetchItem, 抽取其url,從這個url中分析出所使用的協定,調用相應的plugin來解析這個協定
- 得到相當url的robotRules,看是否符合抓取規則,如果不符合或者其delayTime大于我們配置的maxDelayTime,那就不抓取這個網頁
- 對網頁進行抓取,得到其抓取的Content和抓取狀态,調用FetchItemQueues的finishFetchItem方法,表明目前url已經抓取完成
- 根據抓取協定的狀态來進行下一步操作
-
- 如果狀态為WOULDBLOCK,那就進行retry,把目前url放加FetchItemQueues中,進行重試
- 如果是MOVED或者TEMP_MOVED,這時這個網頁可以被重定向了,對其重定向的内容進行解析,得到重定向的網址,這時要生成一個新的FetchItem,根據其QueueID放到相應的隊列的inProgress集合中,然後再對這個重定向的網頁進行抓取
- 如果狀态是EXCEPTION,對目前url所屬的FetchItemQueue進行檢測,看其異常的網頁數有沒有超過最大異常網頁數,如果大于,那就清空這個隊列,認為這個隊列中的所有網頁都有問題。
- 如果狀态是RETRY或者是BLOCKED,那就輸出CrawlDatum,将其狀态設定成STATUS_FETCH_RETRY,在下一輪進行重新抓取
- 如果狀态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就輸出CrawlDatum,設定其狀态為STATUS_FETCH_GONE,可能在下一輪中就不進行抓取了,
- 如果狀态是NOTMODIFIED,那就認為這個網頁沒有改變過,那就輸出其CrawlDatum,将其狀态設成成STATUS_FETCH_NOTMODIFIED.
- 如果所有狀态都沒有找到,那預設輸出其CrawlDatum,将其狀态設定成STATUS_FETCH_RETRY,在下一輪抓取中再重試
- 判斷網頁重定向的次數,如果超過最大重定向次數,就輸出其CrawlDatum,将其狀态設定成STATUS_FETCH_GONE
這裡有一些細節沒有說明,如網頁被重定向以後如果操作,相應的協定是如果産生的,這個是通過插件産生的,具體插件是怎麼調用的,這裡就不說了,以後有機會會再分析一下。
2.2 下面分析FetcherThread中的另外一個比較重要的方法,就是output
具體這個output大概做了如下幾件事:
- 判斷抓取的content是否為空,如果不為空,那調用相應的解析插件來對其内容進行解析,然後就是設定目前url所對應的CrawlDatum的一些參數,如目前内容的MD5碼,分數等資訊
- 然後就是使用FetchOutputFormat輸出目前url的CrawlDatum,Content和解析的結果ParseResult
下面分析一下FetcherOutputFormat中所使用到的ParseOutputFormat.RecordWriter
在生成相應的ParseOutputFormat的RecordWriter過程中,這個RecordWriter會再生成三個RecordWriter來寫出parse_text(MapFile),parse_data(MapFile)和crawl_parse(SequenceFile),我們在segments下具體的segment中看到的三個這樣的目錄就是這個對象生成的,分别輸出了網頁的源代碼;網頁的解析資料,如網頁title、外連結、中繼資料、狀态等資訊,這裡會對外連結進行過濾、規格化,并且用插件計算每一個外連結的初始分數;另一個是網頁解析後的CrawlDatum對象,這裡會分析目前CrawlDatum中的metadata,從中生成兩種新的CrawlDatum,還有就是它會對外連結生成相應的CrawlDatum,放入crawl_parse目錄中,這裡我還沒有看明白。
3. 總結
有點暈了,這裡的代碼有點複雜,我們來整理一下思路。
3.1 從目錄生成的角度
- 從Generate後會在segments目錄下生成一些要抓取的具體的segment,這裡每一個segment下會有一個叫crawl_generate的目錄,其中放着要抓取CrawlDatum資訊
- 在Fetch的時候,會輸出另外五個目錄
-
- content: 這個目錄隻有在配置了要輸出抓取内容時才會輸出
- crawl_fetch: 這個目錄是輸出抓取成功後的CrawlDatum資訊,這裡是對原來crawl_generate目錄中的資訊進行了一些修改,下面三個目錄隻有配置了解析參數後才會輸出,如果後面調用bin/nutch parse指令
- parse_text: 這個目錄存放了抓取的網頁内容,以提後面建立索引用
- parse_data: 這裡存入了網頁解析後的一些資料,如網頁title,外連結資訊等
- crawl_parse: 這裡存儲了一些新生成的CrawlDatum資訊,如外連結等,以供下一次疊代抓取使用
3.2 從資料流的角度
- Generate生成的CrawlDatum資料首先經過QueueFeeder生産者,放入共享隊列
- 多個消費者(FetcherThread)從共享隊列中取得要抓取的FetchItem資料
- 對FetchItem所對應的url進行抓取,得到相應的抓取内容,對抓取的狀态進行判斷,回調相應的操作
- 對抓取的内容進行解析,産生網頁的外連結,生成新的CrawlDatum抓取資料,産生解析後的資料
- 調用FetcherOutputFormat.Writer對象,把CrawlDatum,Content,ParseResult分别寫入crawl_fetch,content,(parse_text,parse_data,crawl_parse)目錄中
好了,Fetcher的分析也差不多了,可能有一些細節還沒有分析到,下面有機會再補上吧。