Spark是一個快速、通用的計算叢集架構,它的核心使用Scala語言編寫,它提供了Scala、Java和Python程式設計語言high-level API,使用這些API能夠非常容易地開發并行處理的應用程式。
下面,我們通過搭建Spark叢集計算環境,并進行簡單地驗證,來體驗一下使用Spark計算的特點。無論從安裝運作環境還是從編寫處理程式(用Scala,Spark預設提供的Shell環境可以直接輸入Scala代碼進行資料處理),我們都會覺得比Hadoop MapReduce計算架構要簡單得多,而且,Spark可以很好地與HDFS進行互動(從HDFS讀取資料,以及寫資料到HDFS中)。
安裝配置
下載下傳安裝配置Scala
<code>wget http:</code><code>//www</code><code>.scala-lang.org</code><code>/files/archive/scala-2</code><code>.10.3.tgz</code>
<code>tar</code> <code>xvzf scala-2.10.3.tgz</code>
在~/.bashrc中增加環境變量SCALA_HOME,并使之生效:
<code>export</code> <code>SCALA_HOME=</code><code>/usr/scala/scala-2</code><code>.10.3</code>
<code>export</code> <code>PATH=$PATH:$SCALA_HOME</code><code>/bin</code>
下載下傳安裝配置Spark
我們首先在主節點m1上配置Spark程式,然後将配置好的程式檔案複制分發到叢集的各個從結點上。下載下傳解壓縮:
<code>wget http:</code><code>//d3kbcqa49mib13</code><code>.cloudfront.net</code><code>/spark-0</code><code>.9.0-incubating-bin-hadoop1.tgz</code>
<code>tar</code> <code>xvzf spark-0.9.0-incubating-bin-hadoop1.tgz</code>
在~/.bashrc中增加環境變量SPARK_HOME,并使之生效:
<code>export</code> <code>SPARK_HOME=</code><code>/home/shirdrn/cloud/programs/spark-0</code><code>.9.0-incubating-bin-hadoop1</code>
<code>export</code> <code>PATH=$PATH:$SPARK_HOME</code><code>/bin</code>
在m1上配置Spark,修改spark-env.sh配置檔案:
<code>cd</code> <code>/home/shirdrn/cloud/programs/spark-0</code><code>.9.0-incubating-bin-hadoop1</code><code>/conf</code>
<code>cp</code> <code>spark-</code><code>env</code><code>.sh.template spark-</code><code>env</code><code>.sh</code>
在該腳本檔案中,同時将SCALA_HOME配置為Unix環境下實際指向路徑,例如:
修改conf/slaves檔案,将計算節點的主機名添加到該檔案,一行一個,例如:
<code>s1</code>
<code>s2</code>
<code>s3</code>
最後,将Spark的程式檔案和配置檔案拷貝分發到從節點機器上:
<code>scp</code> <code>-r ~</code><code>/cloud/programs/spark-0</code><code>.9.0-incubating-bin-hadoop1 shirdrn@s1:~</code><code>/cloud/programs/</code>
<code>scp</code> <code>-r ~</code><code>/cloud/programs/spark-0</code><code>.9.0-incubating-bin-hadoop1 shirdrn@s2:~</code><code>/cloud/programs/</code>
<code>scp</code> <code>-r ~</code><code>/cloud/programs/spark-0</code><code>.9.0-incubating-bin-hadoop1 shirdrn@s3:~</code><code>/cloud/programs/</code>
啟動Spark叢集
我們會使用HDFS叢集上存儲的資料作為計算的輸入,是以首先要把Hadoop叢集安裝配置好,并成功啟動,我這裡使用的是Hadoop 1.2.1版本。啟動Spark計算叢集非常簡單,執行如下指令即可:
<code>cd</code> <code>/home/shirdrn/cloud/programs/spark-0</code><code>.9.0-incubating-bin-hadoop1/</code>
<code>sbin</code><code>/start-all</code><code>.sh</code>
可以看到,在m1上啟動了一個名稱為Master的程序,在s1上啟動了一個名稱為Worker的程序,如下所示,我這裡也啟動了Hadoop叢集:
主節點m1上:
<code>54968 SecondaryNameNode</code>
<code>55651 Master</code>
<code>54814 NameNode</code>
從節點s1上:
<code>33592 Worker</code>
<code>33442 TaskTracker</code>
<code>33336 DataNode</code>
各個程序是否啟動成功,也可以檢視日志來診斷,例如:
主節點上:
<code>tail</code> <code>-100f $SPARK_HOME</code><code>/logs/spark-shirdrn-org</code><code>.apache.spark.deploy.master.Master-1-m1.out</code>
從節點上:
<code>tail</code> <code>-100f $SPARK_HOME</code><code>/logs/spark-shirdrn-org</code><code>.apache.spark.deploy.worker.Worker-1-s1.out</code>
Spark叢集計算驗證
我們使用我的網站的通路日志檔案來示範,示例如下:
<code>27.159.254.192 - - [21</code><code>/Feb/2014</code><code>:11:40:46 +0800] </code><code>"GET /archives/526.html HTTP/1.1"</code> <code>200 12080 </code><code>"http://shiyanjun.cn/archives/526.html"</code> <code>"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"</code>
<code>120.43.4.206 - - [21</code><code>/Feb/2014</code><code>:10:37:37 +0800] </code><code>"GET /archives/417.html HTTP/1.1"</code> <code>200 11464 </code><code>"http://shiyanjun.cn/archives/417.html/"</code> <code>"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"</code>
統計該檔案裡面IP位址出現頻率,來驗證Spark叢集能夠正常計算。另外,我們需要從HDFS中讀取這個日志檔案,然後統計IP位址頻率,最後将結果再儲存到HDFS中的指定目錄。
首先,需要啟動用來送出計算任務的Spark Shell:
<code>bin</code><code>/spark-shell</code>
在Spark Shell上隻能使用Scala語言寫代碼來運作。
然後,執行統計IP位址頻率,在Spark Shell中執行如下代碼來實作:
<code>val </code><code>file</code> <code>= sc.textFile(</code><code>"hdfs://m1:9000/user/shirdrn/wwwlog20140222.log"</code><code>)</code>
<code>val result = </code><code>file</code><code>.flatMap(line => line.</code><code>split</code><code>(</code><code>"\\s+.*"</code><code>)).map(word => (word, 1)).reduceByKey((a, b) => a + b)</code>
上述的檔案hdfs://m1:9000/user/shirdrn/wwwlog20140222.log是輸入日志檔案。處理過程的日志資訊,示例如下所示:
<code> </code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO Executor: Running task ID 20</code>
<code>36</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO BlockManager: Found block broadcast_11 locally</code>
<code>37</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks</code>
<code>38</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets </code><code>in</code> <code>1 ms</code>
<code>39</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO Executor: Serialized size of result </code><code>for</code> <code>20 is 19423</code>
<code>40</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO Executor: Sending result </code><code>for</code> <code>20 directly to driver</code>
<code>41</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO TaskSetManager: Finished TID 20 </code><code>in</code> <code>17 ms on localhost (progress: 0</code><code>/1</code><code>)</code>
<code>42</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 20.0 from pool</code>
<code>43</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO DAGScheduler: Completed ResultTask(20, 0)</code>
<code>44</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO DAGScheduler: Stage 20 (collect at <console>:13) finished </code><code>in</code> <code>0.016 s</code>
<code>45</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO SparkContext: Job finished: collect at <console>:13, took 0.242136929 s</code>
<code>46</code>
<code>14</code><code>/03/06</code> <code>21:59:23 INFO Executor: Finished task ID 20</code>
<code>47</code>
<code>res14: Array[(String, Int)] = Array((27.159.254.192,28), (120.43.9.81,40), (120.43.4.206,16), (120.37.242.176,56), (64.31.25.60,2), (27.153.161.9,32), (202.43.145.163,24), (61.187.102.6,1), (117.26.195.116,12), (27.153.186.194,64), (123.125.71.91,1), (110.85.106.105,64), (110.86.184.182,36), (27.150.247.36,52), (110.86.166.52,60), (175.98.162.2,20), (61.136.166.16,1), (46.105.105.217,1), (27.150.223.49,52), (112.5.252.6,20), (121.205.242.4,76), (183.61.174.211,3), (27.153.230.35,36), (112.111.172.96,40), (112.5.234.157,3), (144.76.95.232,7), (31.204.154.144,28), (123.125.71.22,1), (80.82.64.118,3), (27.153.248.188,160), (112.5.252.187,40), (221.219.105.71,4), (74.82.169.79,19), (117.26.253.195,32), (120.33.244.205,152), (110.86.165.8,84), (117.26.86.172,136), (27.153.233.101,8), (123.12...</code>
可以看到,輸出了經過map和reduce計算後的部分結果。
最後,我們想要将結果儲存到HDFS中,隻要輸入如下代碼:
result.saveAsTextFile("hdfs://m1:9000/user/shirdrn/wwwlog20140222.log.result")
檢視HDFS上的結果資料:
<code>[shirdrn@m1 ~]$ hadoop fs -</code><code>cat</code> <code>/user/shirdrn/wwwlog20140222</code><code>.log.result</code><code>/part-00000</code> <code>| </code><code>head</code> <code>-5</code>
<code>(27.159.254.192,28)</code>
<code>(120.43.9.81,40)</code>
<code>(120.43.4.206,16)</code>
<code>(120.37.242.176,56)</code>
<code>(64.31.25.60,2)</code>
<code></code>
本文轉自crazy_charles 51CTO部落格,原文連結:http://blog.51cto.com/douya/1863288,如需轉載請自行聯系原作者