1.文档编写目的
Kudu从 1.0.0 版本开始通过Data Source API与Spark 集成。kudu-spark使用--packages选项包含依赖项。如果将Spark与Scala 2.10 一起使用,需要使用 kudu-spark_2.10 。从 Kudu1.6.0开始不再支持Spark 1,如果要使用Spark1与Kudu集成,最高只能到Kudu1.5.0。
如果将 Spark 1 与 Scala 2.10 一起使用,请使用 kudu-spark_2.10:1.5.0 依赖包。
代码语言:javascript复制spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0-cdh5.13.91 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
如果将 Spark 2 与 Scala 2.11 一起使用,请使用 kudu-spark2_2.11 依赖包(当前CDP版本中可用)。
代码语言:javascript复制spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
本文主要讲述在CDP7.1.4中如何通过spark-shell对kudu表的进行操作。
- 测试环境
1.CDP7.1.4 、启用Kerberos、Kudu 1.13.0、Spark 2.4.0
2.操作步骤
2.1 建kudu表
代码语言:javascript复制CREATE EXTERNAL TABLE test002 ( name STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, age INT, PRIMARY KEY (name) ) STORED AS KUDU TBLPROPERTIES ('external.table.purge'='TRUE', 'kudu.master_addresses'='cdp03:7051', 'numFiles'='0', 'numFilesErasureCoded'='0', 'totalSize'='0');
2.2 添加依赖jar包
通过本地的方式添加依赖,首先到下面地址中
代码语言:javascript复制https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-spark2_2.11/1.9.0-cdh6.2.0/
下载jar包,然后放在集群的一个节点上的/var/www/html/下面,通过本地的http方式加载
进行验证
2.3 进入spark-shell操作kudu
作为 CML 中现有引擎的替代品,ML Runtimes 比当前的单体引
代码语言:javascript复制spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
2.3.1 单行写
在spark-shell中执行如下代码
代码语言:javascript复制import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import org.apache.kudu.client.KuduClient
val kuduClient = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() {
@throws[Exception]
override def run: KuduClient = new KuduClient.KuduClientBuilder("cdp01:7051,cdp02:7051,cdp03:7051").build;
})
val tableList=kuduClient.getTablesList()
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"zhangsan")
row.addInt(1,30)
print(row)
print(kuduTable.getName())
session.apply(insert)
在impala-shell中去查询test001
可见插入单条数据插入成功
2.3.2 单行读
在spark-shell中执行如下代码
代码语言:javascript复制import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import org.apache.kudu.client.KuduClient
val kuduClient = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() {
@throws[Exception]
override def run: KuduClient = new KuduClient.KuduClientBuilder("cdp01:7051,cdp02:7051,cdp03:7051").build;
})
val kuduTable = kuduClient.openTable("impala::default.test002")
val scanner = kuduClient.newScannerBuilder(kuduTable).build()
scanner.hasMoreRows
val rows = scanner.nextRows
rows.hasNext
val row = rows.next
println(row.getString(0))
println(row.getInt(1))
可看到读出一条数据
2.3.3 批量操作
先插入一条数据
代码语言:javascript复制val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"lisi")
row.addInt(1,18)
session.apply(insert)
2.3.3.1 批量读
在spark-shell下执行下面代码
代码语言:javascript复制import org.apache.kudu.spark.kudu._
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table ").show()
可看到整张表查询成功
2.3.3.2 批量写
根据一个DataFrameschema创建一个kudu表,并查看是否存在
代码语言:javascript复制import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
val kuduContext = new KuduContext("cdp03:7051", spark.sparkContext)
kuduContext.createTable(
"like_test002", df.schema, Seq("name"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("name").asJava, 3))
kuduContext.tableExists("like_test002")
根据一个DataFrame schema创建一个kudu表,并查看是否存在可以看到创建成功
可以先查询一下这张表的数据
代码语言:javascript复制val dftmp = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "like_test002")).load
dftmp.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table").show()
可看到没有任何数据
然后将从test002表生成的df插入到表like_test002,执行下面代码
代码语言:javascript复制kuduContext.insertRows(df, "like_test002")
并且再次查询发现数据已经插入成功
2.3.3.3 批量更改数据
代码语言:javascript复制val updateDF = df.select($"name", ($"age" 100).as("age"))
kuduContext.updateRows(updateDF, "like_test002")
发现数据全部被更改
2.3.3.4 批量修改和新增数据
1.再test002插入一条数据
代码语言:javascript复制val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"wangwu")
row.addInt(1,40)
session.apply(insert)
2.查看数据
代码语言:javascript复制val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table ").show()
也可以看到新增的一条数据插入成功
3.在spark-shell中执行下面代码
代码语言:javascript复制kuduContext.upsertRows(df, "like_test002")
可以看到新增了一条,也更改了数据
2.3.3.5 追加数据
1.先增加一条数据
代码语言:javascript复制val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"xiaoming")
row.addInt(1,50)
session.apply(insert)
2.在spark-shell中执行下面代码
代码语言:javascript复制df.write.options(Map("kudu.master"-> "cdp03:7051", "kudu.table"-> "like_test002")).mode("append").format("kudu").save
查询like_test002多了一条数据
2.3.3.6 删除全表数据
代码语言:javascript复制val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
val nameDF = spark.sql("select name from tmp_table ")
kuduContext.deleteRows(nameDF, "like_test002")
可以看到数据全部被删除
注意:直接用kuduContext.deleteRows(df, "like_test002") 会报下面的错
2.3.3.7 删表
代码语言:javascript复制kuduContext.deleteTable("like_test002")
kuduContext.tableExists("like_test002")
表已经不存在了
3.常见问题和优化
使用Spark程序访问Kudu 时应考虑如下问题:
- 尽管 Kudu Spark 2.x 集成与 Java 7 兼容,但 Spark 2.2(及更高版本)在运行时需要 Java 8。Spark 2.2 是 Kudu 1.5.0 的默认依赖版本。
- 名称包含大写或非 ASCII 字符的 Kudu 表在注册为临时表时必须指定一个备用名称。
- 列名包含大写或非 ASCII 字符的 Kudu 表不得与 SparkSQL 一起使用。可以在 Kudu 中重命名列以解决此问题。
- 部分查询语法支持问题,如 <>符号和OR谓词不会推送到 Kudu,而是由Spark任务评估,只有LIKE 带有后缀通配符的谓词才会被推送到 Kudu。例如 LIKE "FOO%"可以查询,但LIKE "FOO�R" 则不能。
- Kudu 并不支持 Spark SQL 支持的所有类型。例如,不支持Date类型。
- Kudu 表只能在 SparkSQL 中注册为临时表。
- 无法使用HiveContext查询Kudu表。
- 常见的Kudu-Spark 程序错误是实例化多余的KuduClient对象,在Kudu-Spark程序中, KuduClient归KuduContext所有。Spark应用程序代码不应创建另一个KuduClient连接到同一集群。应用程序代码应用KuduContext来访问 KuduClient 来使用KuduContext#syncClient。
- 通常,Spark作业用最少的调整和配置运行。可以使用Spark 的配置选项调整执行程序和资源的数量,以提高并行度和性能。如果表非常宽并且默认内存分配相当低,可能导致作业失败。要解决此问题,需要增加Spark程序内存。通常的做法是每50列1GiB。如果Spark资源远超过 Kudu 集群,在kudu 集群进行数据恢复时需要限制并发发任务数,避免Kudu 集群压力过大。
参考文档:
代码语言:javascript复制https://docs.cloudera.com/cdp-private-cloud-base/7.1.6/kudu-development/topics/kudu-integration-with-spark.html