介紹
Python在資料工程師和資料科學家中被廣泛使用,以解決從ETL / ELT管道到建構機器學習模型的各種問題。Apache HBase是用于許多工作流程的有效資料存儲系統,但是專門通過Python通路此資料可能會很困難。對于想要利用存儲在HBase中的資料的資料專業人士而言,最新的上遊項目“ hbase-connectors”可以與PySpark一起使用以進行基本操作。
在本部落格系列中,我們将說明如何為基本的Spark使用以及CDSW中維護的作業一起配置PySpark和HBase 。對于不熟悉CDSW的人來說,這是一個安全的、自助式企業資料科學平台,資料科學家可以管理自己的分析管道,進而加快從勘探到生産的機器學習項目。有關CDSW的更多資訊,請通路Cloudera Data Science Workbench産品頁面。
在這篇文章中,将解釋和示範幾種操作以及示例輸出。就上下文而言,此特定部落格文章中的所有示例操作均與CDSW部署一起運作。
先決條件
- 具有帶有HBase和Spark的CDP叢集
- 如果要通過CDSW遵循示例,則需要安裝它-安裝Cloudera Data Science Workbench
- Python 3安裝在每個節點的同一路徑上
配置
首先,HBase和Spark需要配置到一起用于SparkSQL查詢工作正常進行。為此,它包括兩個部分:首先,通過Cloudera Manager配置HBase Region Server。其次,確定Spark運作時具有HBase綁定。不過要記住的一點是,Cloudera Manager已經設定了一些配置和環境變量,可以自動為您将Spark指向HBase。盡管如此,在所有CDP叢集上的所有部署類型中,配置Spark SQL查詢的第一步都是通用的,但第二步因部署類型而略有不同。
配置HBase Region Servers
- 轉到Cloudera Manager,然後選擇HBase服務。
- 搜尋“regionserver environment”
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICMyYTMvw1dvwlMvwlM3VWaWV2Zh1Wa-cmbw5SMsdnY29GNrVjZvwlNxgjM3UTMtUGall3LcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.png)
- 使用RegionServer環境進階配置代碼段(安全閥)添加新的環境變量:
- Key:HBASE_CLASSPATH
- Value:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar:/ opt /cloudera/parcels/CDH/jars/scala-library-2.11.12.jar確定使用适當的版本号。
- 重新啟動Region Server。
完成上述步驟後,請按照以下步驟,根據需要是否依賴CDSW部署。
在非CDSW部署中将HBase綁定添加到Spark運作時
要部署Shell或正确使用spark-submit,請使用以下指令來確定spark具有正确的HBase綁定。
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
複制
在CDSW部署中将HBase綁定添加到Spark運作時
要使用HBase和PySpark配置CDSW,需要執行一些步驟。
1)確定在每個叢集節點上都安裝了Python 3,并記下了它的路徑
2)在CDSW中建立一個新項目并使用PySpark模闆
3)打開項目,轉到設定->引擎->環境變量。
4)将PYSPARK3_DRIVER_PYTHON和PYSPARK3_PYTHON設定為群集節點上安裝Python的路徑(步驟1中指出的路徑)。
以下是其外觀的示例。
5)在您的項目中,轉到檔案-> spark-defaults.conf并在工作台中将其打開
6)複制下面的行并将其粘貼到該檔案中,并確定在開始新會話之前已将其儲存。
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
複制
至此,CDSW現在已配置為在HBase上運作PySpark作業!本部落格文章的其餘部分涉及CDSW部署上的一些示例操作。
示例操作
put操作
有兩種向HBase中插入和更新行的方法。第一個也是最推薦的方法是建構目錄,該目錄是一種Schema,它将在指定表名和名稱空間的同時将HBase表的列映射到PySpark的dataframe。建構這種使用者定義的JSON格式是最優選的方法,因為它也可以與其他操作一起使用。有關目錄的更多資訊,請參考此文檔http://hbase.apache.org/book.html#_define_catalog。第二種方法是使用一個名為“ hbase.columns.mapping”的特定映射參數,該參數僅接收一串鍵值對。
- 使用目錄
from pyspark.sql import Row
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("SampleApplication")\
.getOrCreate()
tableCatalog = ''.join("""{
"table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"int"},
"empId":{"cf":"personal","col":"empId","type":"string"},
"empName":{"cf":"personal", "col":"empName", "type":"string"},
"empState":{"cf":"personal", "col":"empWeight", "type":"string"}
}
}""".split())
employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)
employeeDF.write.format("org.apache.hadoop.hbase.spark") \
.options(catalog=tableCatalog, newTable=5) \
.option("hbase.spark.use.hbasecontext", False) \
.save()
# newTable refers to the NumberOfRegions which has to be > 3
複制
隻需打開HBase shell并執行以下指令,即可驗證是否在HBase中建立了一個名為“ tblEmployee”的新表:
scan ‘tblEmployee’, {‘LIMIT’ => 2}
複制
使用目錄還可以使您輕松加載HBase表。以後的部分将對此進行讨論。
- 使用hbase.columns.mapping
在編寫PySpark資料框時,可以添加一個名為“ hbase.columns.mapping”的選項,以包含正确映射列的字元串。此選項僅允許您将行插入現有表。
在HBase shell中,我們首先建立一個表,建立'tblEmployee2','personal'
現在在PySpark中,使用“ hbase.columns.mapping”插入2行
from pyspark.sql import Row
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("SampleApplication")\
.getOrCreate()
employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)
employeeDF.write.format("org.apache.hadoop.hbase.spark") \
.option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
.option("hbase.table", "tblEmployee2") \
.option("hbase.spark.use.hbasecontext", False) \
.save()
複制
同樣,隻需驗證名為“ tblEmployee2”的新表具有這些新行。
scan ‘tblEmployee2’, {‘LIMIT’ => 2}
複制
這就完成了我們有關如何通過PySpark将行插入到HBase表中的示例。在下一部分中,我将讨論“擷取和掃描操作”,PySpark SQL和一些故障排除。在此之前,您應該獲得一個CDP叢集并按照這些示例進行操作。
原文作者:Manas Chakka
原文連結:https://blog.cloudera.com/building-a-machine-learning-application-with-cloudera-data-science-workbench-and-operational-database-part-1-the-set-up-basics/