最近發現一個很好玩的Python庫,可以友善的使用在Python下編寫MapReduce任務,直接使用Hadoop Streaming在Hadoop上跑。對于一般的Hadoop而言,如果任務需要大量的IO相關操作(如資料庫查詢、檔案讀寫等),使用Python還是Java、C++,性能差别不大,而如果需要大量的資料運算,那可能Python會慢很多(語言級别上的慢),參考這裡。
最常見的如日志分析、Query統計等,都可以直接用Python快速完成。
Python作為一種快速開發語言,優美、簡潔的文法征服了很多人,現在很多的機器學習程式最初都是跑在Python上的(如知乎的推薦引擎),隻有當規模大到一定程度才會轉移到C或Java上。
本文會通過一個簡單的電影推薦系統來介紹如何使用MrJOB。
首先,可能很多人對性能格外在意,可以先看這篇文章:
MrJOB的精簡介紹
這裡重點在于實作電影推薦的系統,是以對于MrJob本身的介紹會比較簡略,夠用即可,詳細說明可以看官方文檔。
首先,在Python中安裝mrjob後,最基本的MapReduce任務很簡單:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
frommrjob.jobimportMRJob
importre
WORD_RE=re.compile(r"[\w']+")
classMRWordFreqCount(MRJob):
defmapper(self,_,line):
forwordinWORD_RE.findall(line):
yieldword.lower(),1
defcombiner(self,word,counts):
yieldword,sum(counts)
defreducer(self,word,counts):
yieldword,sum(counts)
if__name__=='__main__':
MRWordFreqCount.run()
上面的代碼中,有三個函數,mapper、combiner、reducer,作用和普通的Java版本相同:
mapper用來接收每一行的資料輸入,對其進行處理傳回一個key-value對;
combiner接收mapper輸出的key-value對進行整合,把相同key的value作為數組輸入處理後輸出;
reducer和combiner的作用完全相同,不同之處在于combiner是對于單個mapper進行處理,而reducer是對整個任務(可能有很多mapper在執行)的key-value進行處理。它以各個combiner的輸出作為輸入。
更為詳細的介紹,如分步任務、資料初始化等可以參考其這份官方文檔。
電影推薦系統
假設我們現在有一個影視網站,每一個使用者可以給電影評1到5分,現在我們需要計算每兩個電影之間的相似度,其過程是:
對于任一電影A和B,我們能找出所有同時為A和B評分過的人;
根據這些人的評分,建構一個基于電影A的向量和一個基于電影B的向量;
根據這兩個向量計算他們之間的相似度;
當有使用者看過一部電影之後,我們給他推薦與之相似度最高的另一部電影;
你可以從這裡下載下傳一些開源的電影評分資料,我們使用的是1000個使用者對1700部電影進行的100000萬個評分資料,下載下傳後的資料檔案夾包含一個README,裡面有對各個檔案的詳細介紹,鑒于我們隻需要(user|movie|rating)資料,是以我們用Python把這些資料進行一些處理:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/python/env python
if__name__=='__main__':
user_items=[]
items=[]
withopen('u.data')asf:
forlineinf:
user_items.append(line.split('\t'))
withopen('u.item')asf:
forlineinf:
items.append(line.split('|'))
print'user_items[0] = ',user_items[0]
print'items[0] = ',items[0]
items_hash={}
foriinitems:
items_hash[i[0]]=i[1]
print'items_hash[1] = ',items_hash['1']
foruiinuser_items:
ui[1]=items_hash[ui[1]]
print'user_items[0] = ',user_items[0]
withopen('ratings.csv','w')asf:
foruiinuser_items:
f.write(ui[0]+'|'+ui[1]+'|'+ui[2]+'\n')
處理後的資料類大約似于這樣:
YAML
1
2
3
4
5
6
7
8
9
196|Kolya(1996)|3
186|L.A.Confidential(1997)|3
22|Heavyweights(1994)|1
244|LegendsoftheFall(1994)|2
166|JackieBrown(1997)|1
298|Dr.Strangeloveor: How I Learned to Stop Worrying and Love the Bomb (1963)|4
115|HuntforRedOctober,The(1990)|2
253|JungleBook,The(1994)|5
305|Grease(1978)|3
皮爾遜相關系數
判斷兩個向量的相似度的方式有很多種,比如測量其歐氏距離、海明距離等,這裡我們用皮爾遜相關系數來電腦相關性,該系數可以了解為兩個向量之間夾角的餘弦值,介于-1到1之間,絕對值越大相關性越強,公式為:
第一步,我們首先對把每個使用者的所有評分聚合到一起,代碼如下:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python
# coding=utf-8
frommrjob.jobimportMRJob
classStep1(MRJob):
"""
第一步是聚合單個使用者的下的所有評分資料
格式為:user_id, (item_count, rating_sum, [(item_id,rating)...])
"""
defgroup_by_user_rating(self,key,line):
"""
該mapper輸出為:
17 70,3
35 21,1
49 19,2
49 21,1
"""
user_id,item_id,rating=line.split('|')
yielduser_id,(item_id,float(rating))
defcount_ratings_users_freq(self,user_id,values):
"""
該reducer輸出為:
49 (3,7,[19,2 21,1 70,4])
"""
item_count=0
item_sum=0
final=[]
foritem_id,ratinginvalues:
item_count+=1
item_sum+=rating
final.append((item_id,rating))
yielduser_id,(item_count,item_sum,final)
defsteps(self):
return[self.mr(mapper=self.group_by_user_rating,
reducer=self.count_ratings_users_freq),]
if__name__=='__main__':
Step1.run()
使用指令 $python step1.py ratings.csv > result1.csv獲得第一步的結果。
第二步,根據第一步聚合起來的使用者評分,按照皮爾遜系數算法獲得任一兩個電影之間的相關性,代碼及注釋如下:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/usr/bin/env python
#! coding=utf-8
frommrjob.jobimportMRJob
fromitertoolsimportcombinations
frommathimportsqrt
classStep2(MRJob):
defpairwise_items(self,user_id,values):
'''
本mapper使用step1的輸出作為輸入,把user_id丢棄掉不再使用
輸出結果為 (item_1,item2),(rating_1,rating_2)
這裡combinations(iterable,number)的作用是求某個集合的組合,
如combinations([1,2,3,4],2)就是在集合種找出任兩個數的組合。
這個mapper是整個任務的性能瓶頸,這是因為combinations函數生成的資料
比較多,這麼多的零散資料依次寫回磁盤,IO操作過于頻繁,可以用寫一個
Combiner來緊接着mapper做一些聚合操作(和Reducer相同),由Combiner
把資料寫回磁盤,該Combiner也可以用C庫來實作,由Python調用。
'''
# 這裡由于step1是分開的,把資料dump到檔案result1.csv中,是以讀取的時候
# 需要按照字元串處理,如果step1和step2在同一個job内完成,則直接可以去掉
# 這一行代碼,在同一個job内完成參見steps函數的使用說明。
values=eval(values.split('\t')[1])
item_count,item_sum,ratings=values
foritem1,item2incombinations(ratings,2):
yield(item1[0],item2[0]),(item1[1],item2[1])
defcalculate_similarity(self,pair_key,lines):
'''
(Movie A,Movie B)作為Key,(A rating,B rating)作為該reducer的輸入,
每一次輸入屬于同一個使用者,所有當兩個key相同時,代表他們兩個都看了A和B,是以
按照這些所有都看了A、B的人的評分作為向量,計算A、B的皮爾遜系數。
'''
sum_xx,sum_xy,sum_yy,sum_x,sum_y,n=(0.0,0.0,0.0,0.0,0.0,0)
item_pair,co_ratings=pair_key,lines
item_xname,item_yname=item_pair
foritem_x,item_yinco_ratings:
sum_xx+=item_x*item_x
sum_yy+=item_y*item_y
sum_xy+=item_x*item_y
sum_y+=item_y
sum_x+=item_x
n+=1
similarity=self.normalized_correlation(n,sum_xy,sum_x,sum_y,sum_xx,sum_yy)
yield(item_xname,item_yname),(similarity,n)
defsteps(self):
return[self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity),]
defnormalized_correlation(self,n,sum_xy,sum_x,sum_y,sum_xx,sum_yy):
numerator=(n*sum_xy-sum_x*sum_y)
denominator=sqrt(n*sum_xx-sum_x*sum_x)*sqrt(n*sum_yy-sum_y*sum_y)
similarity=numerator/denominator
returnsimilarity
if__name__=='__main__':
Step2.run()
使用指令 $python step2.py result1.csv > result2.csv獲得第二步的結果。
獲得結果集示例:
[Movie A, Movie B] [similarity, rating count]
Python
1
2
3
4
5
6
7
8
9
10
11
["Star Trek VI: The Undiscovered Country (1991)","Star Trek: Generations (1994)"][0.31762191045234545,93]
["Star Trek VI: The Undiscovered Country (1991)","Star Trek: The Motion Picture (1979)"][0.4632318663542742,96]
["Star Trek VI: The Undiscovered Country (1991)","Star Trek: The Wrath of Khan (1982)"][0.44969297939248015,148]
["Star Trek VI: The Undiscovered Country (1991)","Star Wars (1977)"][0.08625580124837125,151]
["Star Trek VI: The Undiscovered Country (1991)","Stargate (1994)"][0.30431878197511564,94]
["Star Trek VI: The Undiscovered Country (1991)","Stars Fell on Henrietta, The (1995)"][1.0,2]
["Star Trek VI: The Undiscovered Country (1991)","Starship Troopers (1997)"][0.14969005091372395,59]
["Star Trek VI: The Undiscovered Country (1991)","Steal Big, Steal Little (1995)"][0.74535599249993,5]
["Star Trek VI: The Undiscovered Country (1991)","Stealing Beauty (1996)"][-0.4879500364742666,10]
["Star Trek VI: The Undiscovered Country (1991)","Steel (1997)"][1.0,2]
["Star Trek VI: The Undiscovered Country (1991)","Stephen King's The Langoliers (1995)"][-0.11470786693528087,16]
可以看到結果還是具有一定的實際價值的,需要注意的是,Stars Fell on Henrietta, The (1995) 這部電影是1.0,也就是完全相關,但是由于隻有兩個人同時對他們進行了評價,是以結果并非全都很正确,這裡還要考慮多少人進行了評價。
結語
本文的内容來自于參考資料中的部落格,部落客僅做了整理工作,有任何問題可以和我交流。需要指出的是,類似于本文中的電影推薦僅僅是衆多推薦算法中一種,可以說是對物品進行相似度判斷,實際上也可以根據使用者進行使用者相似度判斷,相似的使用者總是喜歡相同的電影,這在實踐中效果更好一點,也更容易根據社交關系進一步挖掘。
參考資料:http://aimotion.blogspot.com.br/2012/08/introduction-to-recommendations-with.html