天天看點

Nutch 1.3 學習筆記 5-1 FetchThread

Nutch 1.3 學習筆記 5-1 FetchThread

-----------------------------------

上一節看了Fetcher中主要幾個類的實作,這一節會來分析一下其中用到的消費者FetcherThread,來看看它是幹嘛的。

1. Fetcher的Mapp模型

Fetcher.java代碼中可以看到,Fetcher繼承自MapRunable,它是Mapper的抽象接口,實作這個接口的子類能夠更好的對Map的流程進行控制,包括多線程與異步Maper。

1.1 Fetcher的入口函數fetch(Path segment,int threads, boolean parsing)

下面是它的源代碼,來分析一下:

// 對配置進行檢測,看一些必要的配置是否已經配置了,如http.agent.name等參數
    	checkConfiguration();


		// 記錄fetch的開始時間
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    	long start = System.currentTimeMillis();
    	if (LOG.isInfoEnabled()) {
    		LOG.info("Fetcher: starting at " + sdf.format(start));
    	  LOG.info("Fetcher: segment: " + segment);
    	}


		// 這裡對抓取的時候進行限制,在FetchItemQueue中會用到這個參數
    	// set the actual time for the timelimit relative
    	// to the beginning of the whole job and not of a specific task
    	// otherwise it keeps trying again if a task fails
    	long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
    	if (timelimit != -1) {
    	  timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
    	  LOG.info("Fetcher Timelimit set for : " + timelimit);
    	  getConf().setLong("fetcher.timelimit", timelimit);
    	}
        
		// 生成一個Nutch的Map-Reduce配置
    	JobConf job = new NutchJob(getConf());
    	job.setJobName("fetch " + segment);
	
		// 配置抓取線程數,
    	job.setInt("fetcher.threads.fetch", threads);
    	job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
		// 配置是否對抓取的内容進行解析
    	job.setBoolean("fetcher.parse", parsing);
	
    	// for politeness, don't permit parallel execution of a single task
    	job.setSpeculativeExecution(false);
	
		// 配置輸出的路徑名
    	FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
		// 配置輸入的檔案格式,這裡類繼承自SequenceFileInputFormat
		// 它主要是覆寫了其getSplits方法,其作用是不對檔案進行切分,以檔案數量作為splits的依據
		// 就是有幾個檔案,就有幾個Map操作
    	job.setInputFormat(InputFormat.class);
	
		// 配置Map操作的類
    	job.setMapRunnerClass(Fetcher.class);
	
		// 配置輸出路徑
    	FileOutputFormat.setOutputPath(job, segment);
		// 這裡配置輸出檔案方法,這個類在前面已經分析過
    	job.setOutputFormat(FetcherOutputFormat.class);
		// 配置輸出<key,value>類型
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(NutchWritable.class);
	
	    JobClient.runJob(job);
           

1.2 Fetcher的run方法分析

  這個是Map類的入口,用于啟動抓取的生産者與消費者,下面是部分源代碼:

// 生成生産者,用于讀取Generate出來的CrawlDatum,把它們放到共享隊列中
    	feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
    	//feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
    
    	// the value of the time limit is either -1 or the time where it should finish
    	long timelimit = getConf().getLong("fetcher.timelimit", -1);
    	if (timelimit != -1) feeder.setTimeLimit(timelimit);
    	feeder.start();


    	// set non-blocking & no-robots mode for HTTP protocol plugins.
    	getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
    	getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
    
		// 啟動消費者線程
    	for (int i = 0; i < threadCount; i++) {       // spawn threads
    	  new FetcherThread(getConf()).start();
    	}


    	// select a timeout that avoids a task timeout
    	long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;


		// 這裡用一個循環來等待線程結束
    	do {                                          // wait for threads to exit
    	  try {
    	    Thread.sleep(1000);
    	  } catch (InterruptedException e) {}


			// 這個函數是得到相前線程的抓取狀态,如抓取了多少網頁,多少網頁抓取失敗,抓取速度是多少
    	  reportStatus();
      	LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
      	    + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());


		// 輸出抓取隊列中的資訊
     	 if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
     	   fetchQueues.dump();
     	 }
      
	  	// 檢視timelimit的值,這裡隻要傳回的hitByTimeLimit不為0,checkTimelimit方法會清空抓取隊列中的所有資料
     	 // check timelimit
     	 if (!feeder.isAlive()) {
        	int hitByTimeLimit = fetchQueues.checkTimelimit();
       	 if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",
        	    "hitByTimeLimit", hitByTimeLimit);
     	 }
      
	  	// 檢視抓取抓取線程是否逾時,如果逾時,就退出等待
      	// some requests seem to hang, despite all intentions
      	if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
        	if (LOG.isWarnEnabled()) {
        	  LOG.warn("Aborting with "+activeThreads+" hung threads.");
        	}
        	return;
      	}


    	} while (activeThreads.get() > 0);
    	LOG.info("-activeThreads=" + activeThreads);
           

2. Fetcher.FetcherThread

2.1 這個類主要是用來從隊列中得到FetchItem,下面來看一下其run方法,其大概做了幾件事:

  • 從抓取隊列中得到一個FetchItem,如果傳回為null,判斷生産者是否還活着或者隊列中是否還有資料,  如果隊列中還有資料,那就等待,如果上面條件沒有滿足,就認為所有FetchItem都已經處理完了,退出目前抓取線程
  • 得到FetchItem, 抽取其url,從這個url中分析出所使用的協定,調用相應的plugin來解析這個協定
  • 得到相當url的robotRules,看是否符合抓取規則,如果不符合或者其delayTime大于我們配置的maxDelayTime,那就不抓取這個網頁
  • 對網頁進行抓取,得到其抓取的Content和抓取狀态,調用FetchItemQueues的finishFetchItem方法,表明目前url已經抓取完成
  • 根據抓取協定的狀态來進行下一步操作
    1. 如果狀态為WOULDBLOCK,那就進行retry,把目前url放加FetchItemQueues中,進行重試
    2. 如果是MOVED或者TEMP_MOVED,這時這個網頁可以被重定向了,對其重定向的内容進行解析,得到重定向的網址,這時要生成一個新的FetchItem,根據其QueueID放到相應的隊列的inProgress集合中,然後再對這個重定向的網頁進行抓取
    3. 如果狀态是EXCEPTION,對目前url所屬的FetchItemQueue進行檢測,看其異常的網頁數有沒有超過最大異常網頁數,如果大于,那就清空這個隊列,認為這個隊列中的所有網頁都有問題。
    4. 如果狀态是RETRY或者是BLOCKED,那就輸出CrawlDatum,将其狀态設定成STATUS_FETCH_RETRY,在下一輪進行重新抓取
    5. 如果狀态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就輸出CrawlDatum,設定其狀态為STATUS_FETCH_GONE,可能在下一輪中就不進行抓取了,
    6. 如果狀态是NOTMODIFIED,那就認為這個網頁沒有改變過,那就輸出其CrawlDatum,将其狀态設成成STATUS_FETCH_NOTMODIFIED.
    7. 如果所有狀态都沒有找到,那預設輸出其CrawlDatum,将其狀态設定成STATUS_FETCH_RETRY,在下一輪抓取中再重試
  • 判斷網頁重定向的次數,如果超過最大重定向次數,就輸出其CrawlDatum,将其狀态設定成STATUS_FETCH_GONE

這裡有一些細節沒有說明,如網頁被重定向以後如果操作,相應的協定是如果産生的,這個是通過插件産生的,具體插件是怎麼調用的,這裡就不說了,以後有機會會再分析一下。

2.2 下面分析FetcherThread中的另外一個比較重要的方法,就是output

具體這個output大概做了如下幾件事:

  • 判斷抓取的content是否為空,如果不為空,那調用相應的解析插件來對其内容進行解析,然後就是設定目前url所對應的CrawlDatum的一些參數,如目前内容的MD5碼,分數等資訊
  • 然後就是使用FetchOutputFormat輸出目前url的CrawlDatum,Content和解析的結果ParseResult

下面分析一下FetcherOutputFormat中所使用到的ParseOutputFormat.RecordWriter

在生成相應的ParseOutputFormat的RecordWriter過程中,這個RecordWriter會再生成三個RecordWriter來寫出parse_text(MapFile),parse_data(MapFile)和crawl_parse(SequenceFile),我們在segments下具體的segment中看到的三個這樣的目錄就是這個對象生成的,分别輸出了網頁的源代碼;網頁的解析資料,如網頁title、外連結、中繼資料、狀态等資訊,這裡會對外連結進行過濾、規格化,并且用插件計算每一個外連結的初始分數;另一個是網頁解析後的CrawlDatum對象,這裡會分析目前CrawlDatum中的metadata,從中生成兩種新的CrawlDatum,還有就是它會對外連結生成相應的CrawlDatum,放入crawl_parse目錄中,這裡我還沒有看明白。

3. 總結

有點暈了,這裡的代碼有點複雜,我們來整理一下思路。

3.1 從目錄生成的角度 

  • 從Generate後會在segments目錄下生成一些要抓取的具體的segment,這裡每一個segment下會有一個叫crawl_generate的目錄,其中放着要抓取CrawlDatum資訊
  • 在Fetch的時候,會輸出另外五個目錄
    1. content: 這個目錄隻有在配置了要輸出抓取内容時才會輸出
    2. crawl_fetch: 這個目錄是輸出抓取成功後的CrawlDatum資訊,這裡是對原來crawl_generate目錄中的資訊進行了一些修改,下面三個目錄隻有配置了解析參數後才會輸出,如果後面調用bin/nutch parse指令
    3. parse_text: 這個目錄存放了抓取的網頁内容,以提後面建立索引用
    4. parse_data: 這裡存入了網頁解析後的一些資料,如網頁title,外連結資訊等
    5. crawl_parse: 這裡存儲了一些新生成的CrawlDatum資訊,如外連結等,以供下一次疊代抓取使用

3.2 從資料流的角度

  • Generate生成的CrawlDatum資料首先經過QueueFeeder生産者,放入共享隊列
  • 多個消費者(FetcherThread)從共享隊列中取得要抓取的FetchItem資料
  • 對FetchItem所對應的url進行抓取,得到相應的抓取内容,對抓取的狀态進行判斷,回調相應的操作
  • 對抓取的内容進行解析,産生網頁的外連結,生成新的CrawlDatum抓取資料,産生解析後的資料
  • 調用FetcherOutputFormat.Writer對象,把CrawlDatum,Content,ParseResult分别寫入crawl_fetch,content,(parse_text,parse_data,crawl_parse)目錄中

好了,Fetcher的分析也差不多了,可能有一些細節還沒有分析到,下面有機會再補上吧。