天天看點

關于Hadoop中reducer端combiner的一些思考

什麼是Combiner Functions

“Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function. Since the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer.” -- 《Hadoop: The Definitive Guide》
      

簡單的說,combiner是一個在mapper之後運作的function,非常類似reducer的功能,是以在《Hadoop In Action》又叫作“local reduce”。它的好處是減少網絡的資料傳輸,進而提高性能。但因為是一個優化功能,是以Hadoop并不保證會運作它。

其實這個的有一個更深入的設計問題,這裡有一個假設就是大家傾向于fat mapper和slim reducer。就是一般情況下,大家會盡可能的在mapper裡實作複雜的邏輯和運算,在reducer隻是做簡單的彙聚。這就是為什麼有mapper端的combiner而沒有reducer端的combiner。

reducer端的combiner

是不是試想這樣的情景,一個項目需要多個mapper和reducer才能完成,而且必須在reducer端實作業務邏輯。舉一個例子,輸入日志包含如下字段:使用者ID,國家,timestamp。需要統計不同國家的使用者的通路時間(定義為最後一個通路的時間戳去第一次通路的時間戳)。日志為~10G/小時,但是分析以一天為機關。

在這裡,同一個使用者會出現在一天的任何小時,是以必須将同一個使用者彙聚到一起來計算通路時間。顯然無法在mapper端實作這樣的功能。相同的使用者以“使用者ID”作為partition key排序後彙聚到reducer端。采用如下的标準模闆(Perl語言為例):

while ( my $line = <STDIN> ) {
	chomp($line);
	( $user_id, $country, $timestamp ) = split( /\t/, $line );

	# set base key
	$key = $base_key;

	if ($cur_key) {
		if ( $key ne $cur_key ) {
			&onEndKey();
			&onBeginKey();
		}
		&onSameKey();
	}
	else {
		&onBeginKey();
		&onSameKey();
	}
}

if ($cur_key) {
	&onEndKey();
}
           

這裡需要另一個mapper/reducer來彙聚不同國家的時間。如果在第一個reducer端能夠有一個combiner,那麼将極大的減少網絡傳輸,甚至避免out of memory問題(會單獨寫一個文章)。因為現有的Hadoop并沒有這個功能,隻能自己來了。最簡單的實作就是提供一個全局的hash來彙聚。因為從本質上來說,combiner或者reducer其實就是一個hash。

具體的實作就略去了,這裡隻是提供一個設計思路,歡迎大家一起讨論。

繼續閱讀