介绍
Python在数据工程师和数据科学家中被广泛使用,以解决从ETL / ELT管道到构建机器学习模型的各种问题。Apache HBase是用于许多工作流程的有效数据存储系统,但是专门通过Python访问此数据可能会很困难。对于想要利用存储在HBase中的数据的数据专业人士而言,最新的上游项目“ hbase-connectors”可以与PySpark一起使用以进行基本操作。
在本博客系列中,我们将说明如何为基本的Spark使用以及CDSW中维护的作业一起配置PySpark和HBase 。对于不熟悉CDSW的人来说,这是一个安全的、自助式企业数据科学平台,数据科学家可以管理自己的分析管道,从而加快从勘探到生产的机器学习项目。有关CDSW的更多信息,请访问Cloudera Data Science Workbench产品页面。
在这篇文章中,将解释和演示几种操作以及示例输出。就上下文而言,此特定博客文章中的所有示例操作均与CDSW部署一起运行。
先决条件
- 具有带有HBase和Spark的CDP集群
- 如果要通过CDSW遵循示例,则需要安装它-安装Cloudera Data Science Workbench
- Python 3安装在每个节点的同一路径上
配置
首先,HBase和Spark需要配置到一起用于SparkSQL查询工作正常进行。为此,它包括两个部分:首先,通过Cloudera Manager配置HBase Region Server。其次,确保Spark运行时具有HBase绑定。不过要记住的一点是,Cloudera Manager已经设置了一些配置和环境变量,可以自动为您将Spark指向HBase。尽管如此,在所有CDP集群上的所有部署类型中,配置Spark SQL查询的第一步都是通用的,但第二步因部署类型而略有不同。
配置HBase Region Servers
- 转到Cloudera Manager,然后选择HBase服务。
- 搜索“regionserver environment”
- 使用RegionServer环境高级配置代码段(安全阀)添加新的环境变量:
- Key:HBASE_CLASSPATH
- Value:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar:/ opt /cloudera/parcels/CDH/jars/scala-library-2.11.12.jar确保使用适当的版本号。
- 重新启动Region Server。
完成上述步骤后,请按照以下步骤,根据需要是否依赖CDSW部署。
在非CDSW部署中将HBase绑定添加到Spark运行时
要部署Shell或正确使用spark-submit,请使用以下命令来确保spark具有正确的HBase绑定。
代码语言:javascript复制pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
在CDSW部署中将HBase绑定添加到Spark运行时
要使用HBase和PySpark配置CDSW,需要执行一些步骤。
1)确保在每个集群节点上都安装了Python 3,并记下了它的路径
2)在CDSW中创建一个新项目并使用PySpark模板
3)打开项目,转到设置->引擎->环境变量。
4)将PYSPARK3_DRIVER_PYTHON和PYSPARK3_PYTHON设置为群集节点上安装Python的路径(步骤1中指出的路径)。
以下是其外观的示例。
5)在您的项目中,转到文件-> spark-defaults.conf并在工作台中将其打开
6)复制下面的行并将其粘贴到该文件中,并确保在开始新会话之前已将其保存。
代码语言:javascript复制spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
至此,CDSW现在已配置为在HBase上运行PySpark作业!本博客文章的其余部分涉及CDSW部署上的一些示例操作。
示例操作
put操作
有两种向HBase中插入和更新行的方法。第一个也是最推荐的方法是构建目录,该目录是一种Schema,它将在指定表名和名称空间的同时将HBase表的列映射到PySpark的dataframe。构建这种用户定义的JSON格式是最优选的方法,因为它也可以与其他操作一起使用。有关目录的更多信息,请参考此文档http://hbase.apache.org/book.html#_define_catalog。第二种方法是使用一个名为“ hbase.columns.mapping”的特定映射参数,该参数仅接收一串键值对。
- 使用目录
from pyspark.sql import Row
from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("SampleApplication")
.getOrCreate()
tableCatalog = ''.join("""{
"table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"int"},
"empId":{"cf":"personal","col":"empId","type":"string"},
"empName":{"cf":"personal", "col":"empName", "type":"string"},
"empState":{"cf":"personal", "col":"empWeight", "type":"string"}
}
}""".split())
employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)
employeeDF.write.format("org.apache.hadoop.hbase.spark")
.options(catalog=tableCatalog, newTable=5)
.option("hbase.spark.use.hbasecontext", False)
.save()
# newTable refers to the NumberOfRegions which has to be > 3
只需打开HBase shell并执行以下命令,即可验证是否在HBase中创建了一个名为“ tblEmployee”的新表:
代码语言:javascript复制scan ‘tblEmployee’, {‘LIMIT’ => 2}
使用目录还可以使您轻松加载HBase表。以后的部分将对此进行讨论。
- 使用hbase.columns.mapping
在编写PySpark数据框时,可以添加一个名为“ hbase.columns.mapping”的选项,以包含正确映射列的字符串。此选项仅允许您将行插入现有表。
在HBase shell中,我们首先创建一个表,创建'tblEmployee2','personal'
现在在PySpark中,使用“ hbase.columns.mapping”插入2行
代码语言:javascript复制from pyspark.sql import Row
from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("SampleApplication")
.getOrCreate()
employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)
employeeDF.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight")
.option("hbase.table", "tblEmployee2")
.option("hbase.spark.use.hbasecontext", False)
.save()
同样,只需验证名为“ tblEmployee2”的新表具有这些新行。
代码语言:javascript复制scan ‘tblEmployee2’, {‘LIMIT’ => 2}
这就完成了我们有关如何通过PySpark将行插入到HBase表中的示例。在下一部分中,我将讨论“获取和扫描操作”,PySpark SQL和一些故障排除。在此之前,您应该获得一个CDP集群并按照这些示例进行操作。
原文作者:Manas Chakka
原文链接:https://blog.cloudera.com/building-a-machine-learning-application-with-cloudera-data-science-workbench-and-operational-database-part-1-the-set-up-basics/