天天看點

使用CDSW和營運資料庫建構ML應用1:設定和基礎

介紹

Python在資料工程師和資料科學家中被廣泛使用,以解決從ETL / ELT管道到建構機器學習模型的各種問題。Apache HBase是用于許多工作流程的有效資料存儲系統,但是專門通過Python通路此資料可能會很困難。對于想要利用存儲在HBase中的資料的資料專業人士而言,最新的上遊項目“ hbase-connectors”可以與PySpark一起使用以進行基本操作。

在本部落格系列中,我們将說明如何為基本的Spark使用以及CDSW中維護的作業一起配置PySpark和HBase 。對于不熟悉CDSW的人來說,這是一個安全的、自助式企業資料科學平台,資料科學家可以管理自己的分析管道,進而加快從勘探到生産的機器學習項目。有關CDSW的更多資訊,請通路Cloudera Data Science Workbench産品頁面。

在這篇文章中,将解釋和示範幾種操作以及示例輸出。就上下文而言,此特定部落格文章中的所有示例操作均與CDSW部署一起運作。

先決條件

  1. 具有帶有HBase和Spark的CDP叢集
  2. 如果要通過CDSW遵循示例,則需要安裝它-安裝Cloudera Data Science Workbench
  3. Python 3安裝在每個節點的同一路徑上

配置

首先,HBase和Spark需要配置到一起用于SparkSQL查詢工作正常進行。為此,它包括兩個部分:首先,通過Cloudera Manager配置HBase Region Server。其次,確定Spark運作時具有HBase綁定。不過要記住的一點是,Cloudera Manager已經設定了一些配置和環境變量,可以自動為您将Spark指向HBase。盡管如此,在所有CDP叢集上的所有部署類型中,配置Spark SQL查詢的第一步都是通用的,但第二步因部署類型而略有不同。

配置HBase Region Servers

  1. 轉到Cloudera Manager,然後選擇HBase服務。
  2. 搜尋“regionserver environment”
使用CDSW和營運資料庫建構ML應用1:設定和基礎
  1. 使用RegionServer環境進階配置代碼段(安全閥)添加新的環境變量:
    • Key:HBASE_CLASSPATH
    • Value:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar:/ opt /cloudera/parcels/CDH/jars/scala-library-2.11.12.jar確定使用适當的版本号。
  2. 重新啟動Region Server。

完成上述步驟後,請按照以下步驟,根據需要是否依賴CDSW部署。

在非CDSW部署中将HBase綁定添加到Spark運作時

要部署Shell或正确使用spark-submit,請使用以下指令來確定spark具有正确的HBase綁定。

pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar           

複制

在CDSW部署中将HBase綁定添加到Spark運作時

要使用HBase和PySpark配置CDSW,需要執行一些步驟。

1)確定在每個叢集節點上都安裝了Python 3,并記下了它的路徑

2)在CDSW中建立一個新項目并使用PySpark模闆

3)打開項目,轉到設定->引擎->環境變量。

4)将PYSPARK3_DRIVER_PYTHON和PYSPARK3_PYTHON設定為群集節點上安裝Python的路徑(步驟1中指出的路徑)。

以下是其外觀的示例。

使用CDSW和營運資料庫建構ML應用1:設定和基礎

5)在您的項目中,轉到檔案-> spark-defaults.conf并在工作台中将其打開

6)複制下面的行并将其粘貼到該檔案中,并確定在開始新會話之前已将其儲存。

spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar           

複制

使用CDSW和營運資料庫建構ML應用1:設定和基礎

至此,CDSW現在已配置為在HBase上運作PySpark作業!本部落格文章的其餘部分涉及CDSW部署上的一些示例操作。

示例操作

put操作

有兩種向HBase中插入和更新行的方法。第一個也是最推薦的方法是建構目錄,該目錄是一種Schema,它将在指定表名和名稱空間的同時将HBase表的列映射到PySpark的dataframe。建構這種使用者定義的JSON格式是最優選的方法,因為它也可以與其他操作一起使用。有關目錄的更多資訊,請參考此文檔http://hbase.apache.org/book.html#_define_catalog。第二種方法是使用一個名為“ hbase.columns.mapping”的特定映射參數,該參數僅接收一串鍵值對。

  • 使用目錄
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
 .builder\
 .appName("SampleApplication")\
 .getOrCreate()

tableCatalog = ''.join("""{
              "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
                "key":{"cf":"rowkey", "col":"key", "type":"int"},
                "empId":{"cf":"personal","col":"empId","type":"string"},
                "empName":{"cf":"personal", "col":"empName", "type":"string"},
                "empState":{"cf":"personal", "col":"empWeight", "type":"string"}
              }
            }""".split())

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
  .options(catalog=tableCatalog, newTable=5) \
  .option("hbase.spark.use.hbasecontext", False) \
 .save()
# newTable refers to the NumberOfRegions which has to be > 3           

複制

隻需打開HBase shell并執行以下指令,即可驗證是否在HBase中建立了一個名為“ tblEmployee”的新表:

scan ‘tblEmployee’, {‘LIMIT’ => 2}           

複制

使用CDSW和營運資料庫建構ML應用1:設定和基礎

使用目錄還可以使您輕松加載HBase表。以後的部分将對此進行讨論。

  • 使用hbase.columns.mapping

在編寫PySpark資料框時,可以添加一個名為“ hbase.columns.mapping”的選項,以包含正确映射列的字元串。此選項僅允許您将行插入現有表。

在HBase shell中,我們首先建立一個表,建立'tblEmployee2','personal'

使用CDSW和營運資料庫建構ML應用1:設定和基礎

現在在PySpark中,使用“ hbase.columns.mapping”插入2行

from pyspark.sql import Row
from pyspark.sql import SparkSession


spark = SparkSession\
 .builder\
  .appName("SampleApplication")\
  .getOrCreate()

employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
       .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
       .option("hbase.table", "tblEmployee2") \
       .option("hbase.spark.use.hbasecontext", False) \
       .save()           

複制

同樣,隻需驗證名為“ tblEmployee2”的新表具有這些新行。

scan ‘tblEmployee2’, {‘LIMIT’ => 2}           

複制

使用CDSW和營運資料庫建構ML應用1:設定和基礎

這就完成了我們有關如何通過PySpark将行插入到HBase表中的示例。在下一部分中,我将讨論“擷取和掃描操作”,PySpark SQL和一些故障排除。在此之前,您應該獲得一個CDP叢集并按照這些示例進行操作。

原文作者:Manas Chakka

原文連結:https://blog.cloudera.com/building-a-machine-learning-application-with-cloudera-data-science-workbench-and-operational-database-part-1-the-set-up-basics/