Flink操作Kudu
Flink主要应用场景是流式数据处理上,有些公司针对流式数据使用Flink实时分析后将结果存入Kudu,例如快手公司。这里将实时计算的结果存入Kudu需要自定义Flink Kudu Sink。
场景:Flink实时读取Socket数据,将结果存入Kudu表t_flink_result,为了方便操作不再创建Kudu外表,这里在Impala中创建Kudu内表t_flink_result:
代码语言:javascript复制create table t_flink_result
(
id int,
name string,
age int,
primary key (id)
)
partition by hash partitions 3
stored as kudu
tblproperties(
'kudu.master_address' = 'cm1:7150,cm2:7150'
)
在Maven中导入以下Flink 包依赖:
代码语言:javascript复制<!-- Flink 开发Scala需要导入以下依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
Flink 自定义KuduSink 代码如下:
代码语言:javascript复制val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds: DataStream[String] = env.socketTextStream("cm3",9999)
//自定义KuduSink
ds.addSink(new RichSinkFunction[String] {
//初始化连接Kudu对象
var kuduClient :KuduClient = _
//Kudu 表对象
var kuduTable :KuduTable = _
//创建KuduSession 客户端会话
var session: KuduSession = _
//初始化时调用一次,这里初始化连接Kudu的对象
override def open(parameters: Configuration): Unit = {
kuduClient = new KuduClientBuilder("cm1:7051,cm2:7051").build()
kuduTable = kuduClient.openTable("impala::default.t_flink_result")
session = kuduClient.newSession()
//设置插入数据策略
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
}
//来一条数据这里调用一次invoke方法
override def invoke(one: String, context: SinkFunction.Context[_]): Unit = {
val arr: Array[String] = one.split(",")
val id: Int = arr(0).toInt
val name: String = arr(1)
val age: Int = arr(2).toInt
//准备插入的数据
val insert: Insert = kuduTable.newInsert()
val row: PartialRow = insert.getRow
row.addInt("id",id)
row.addString("name",name)
row.addInt("age",age)
//插入到Kudu表中
session.apply(insert)
}
//当Flink 关闭时调用一次,回收连接对象
override def close(): Unit ={
session.close()
kuduClient.close()
}
})
env.execute()
启动以上Flink 代码,打开Socket 服务器,输入数据,可以在impala 中查询表t_flink_result数据,数据被写入。