使用CDSW和运营数据库构建ML应用1:设置和基础

2021-02-07 14:37:30 浏览数 (1)

介绍

Python在数据工程师和数据科学家中被广泛使用,以解决从ETL / ELT管道到构建机器学习模型的各种问题。Apache HBase是用于许多工作流程的有效数据存储系统,但是专门通过Python访问此数据可能会很困难。对于想要利用存储在HBase中的数据的数据专业人士而言,最新的上游项目“ hbase-connectors”可以与PySpark一起使用以进行基本操作。

在本博客系列中,我们将说明如何为基本的Spark使用以及CDSW中维护的作业一起配置PySpark和HBase 。对于不熟悉CDSW的人来说,这是一个安全的、自助式企业数据科学平台,数据科学家可以管理自己的分析管道,从而加快从勘探到生产的机器学习项目。有关CDSW的更多信息,请访问Cloudera Data Science Workbench产品页面。

在这篇文章中,将解释和演示几种操作以及示例输出。就上下文而言,此特定博客文章中的所有示例操作均与CDSW部署一起运行。

先决条件

  1. 具有带有HBase和Spark的CDP集群
  2. 如果要通过CDSW遵循示例,则需要安装它-安装Cloudera Data Science Workbench
  3. Python 3安装在每个节点的同一路径上

配置

首先,HBase和Spark需要配置到一起用于SparkSQL查询工作正常进行。为此,它包括两个部分:首先,通过Cloudera Manager配置HBase Region Server。其次,确保Spark运行时具有HBase绑定。不过要记住的一点是,Cloudera Manager已经设置了一些配置和环境变量,可以自动为您将Spark指向HBase。尽管如此,在所有CDP集群上的所有部署类型中,配置Spark SQL查询的第一步都是通用的,但第二步因部署类型而略有不同。

配置HBase Region Servers

  1. 转到Cloudera Manager,然后选择HBase服务。
  2. 搜索“regionserver environment”
  1. 使用RegionServer环境高级配置代码段(安全阀)添加新的环境变量:
    • Key:HBASE_CLASSPATH
    • Value:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar:/ opt /cloudera/parcels/CDH/jars/scala-library-2.11.12.jar确保使用适当的版本号。
  2. 重新启动Region Server。

完成上述步骤后,请按照以下步骤,根据需要是否依赖CDSW部署。

在非CDSW部署中将HBase绑定添加到Spark运行时

要部署Shell或正确使用spark-submit,请使用以下命令来确保spark具有正确的HBase绑定。

代码语言:javascript复制
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

在CDSW部署中将HBase绑定添加到Spark运行时

要使用HBase和PySpark配置CDSW,需要执行一些步骤。

1)确保在每个集群节点上都安装了Python 3,并记下了它的路径

2)在CDSW中创建一个新项目并使用PySpark模板

3)打开项目,转到设置->引擎->环境变量。

4)将PYSPARK3_DRIVER_PYTHONPYSPARK3_PYTHON设置为群集节点上安装Python的路径(步骤1中指出的路径)。

以下是其外观的示例。

5)在您的项目中,转到文件-> spark-defaults.conf并在工作台中将其打开

6)复制下面的行并将其粘贴到该文件中,并确保在开始新会话之前已将其保存。

代码语言:javascript复制
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

至此,CDSW现在已配置为在HBase上运行PySpark作业!本博客文章的其余部分涉及CDSW部署上的一些示例操作。

示例操作

put操作

有两种向HBase中插入和更新行的方法。第一个也是最推荐的方法是构建目录,该目录是一种Schema,它将在指定表名和名称空间的同时将HBase表的列映射到PySpark的dataframe。构建这种用户定义的JSON格式是最优选的方法,因为它也可以与其他操作一起使用。有关目录的更多信息,请参考此文档http://hbase.apache.org/book.html#_define_catalog。第二种方法是使用一个名为“ hbase.columns.mapping”的特定映射参数,该参数仅接收一串键值对。

  • 使用目录
代码语言:javascript复制
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession
 .builder
 .appName("SampleApplication")
 .getOrCreate()

tableCatalog = ''.join("""{
              "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
                "key":{"cf":"rowkey", "col":"key", "type":"int"},
                "empId":{"cf":"personal","col":"empId","type":"string"},
                "empName":{"cf":"personal", "col":"empName", "type":"string"},
                "empState":{"cf":"personal", "col":"empWeight", "type":"string"}
              }
            }""".split())

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") 
  .options(catalog=tableCatalog, newTable=5) 
  .option("hbase.spark.use.hbasecontext", False) 
 .save()
# newTable refers to the NumberOfRegions which has to be > 3

只需打开HBase shell并执行以下命令,即可验证是否在HBase中创建了一个名为“ tblEmployee”的新表:

代码语言:javascript复制
scan ‘tblEmployee’, {‘LIMIT’ => 2}

使用目录还可以使您轻松加载HBase表。以后的部分将对此进行讨论。

  • 使用hbase.columns.mapping

在编写PySpark数据框时,可以添加一个名为“ hbase.columns.mapping”的选项,以包含正确映射列的字符串。此选项仅允许您将行插入现有表。

在HBase shell中,我们首先创建一个表,创建'tblEmployee2','personal'

现在在PySpark中,使用“ hbase.columns.mapping”插入2行

代码语言:javascript复制
from pyspark.sql import Row
from pyspark.sql import SparkSession


spark = SparkSession
 .builder
  .appName("SampleApplication")
  .getOrCreate()

employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") 
       .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") 
       .option("hbase.table", "tblEmployee2") 
       .option("hbase.spark.use.hbasecontext", False) 
       .save()

同样,只需验证名为“ tblEmployee2”的新表具有这些新行。

代码语言:javascript复制
scan ‘tblEmployee2’, {‘LIMIT’ => 2}

这就完成了我们有关如何通过PySpark将行插入到HBase表中的示例。在下一部分中,我将讨论“获取和扫描操作”,PySpark SQL和一些故障排除。在此之前,您应该获得一个CDP集群并按照这些示例进行操作。

原文作者:Manas Chakka

原文链接:https://blog.cloudera.com/building-a-machine-learning-application-with-cloudera-data-science-workbench-and-operational-database-part-1-the-set-up-basics/

0 人点赞