代码版本
Flink : 1.10.0 Scala : 2.12.6
侧输出流(SideOutput)
本文介绍的内容是侧输出流(SideOutput),在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。
在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。ProcessFunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
当使用旁路输出时,首先需要定义一个OutputTag来标识一个旁路输出流
下面给出scala的表达形式:
代码语言:javascript复制val outputTag = OutputTag[String]("side-output")
注意:OutputTag是如何根据旁路输出流包含的元素类型typed的 可以通过以下几种函数发射数据到旁路输出,本文给出ProcessFunction的案例
- ProcessFunction
- CoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
案例
下面举一个例子是将含有特殊字符串的流区分开,数据由两个定义好的工具类向Kafka灌入不同内容的数据,然后通过侧输出流(SideOutput)将不同的流进行分离,得到不同的输出
数据内容如下:
代码语言:javascript复制常规输出内容:
{"id":3,"name":"Johngo3","age":13,"sex":1,"email":"Johngo3@flink.com","time":1590067813271}
侧输出流输出内容:
{"id":3,"name":"Johngo_side3","age":13,"sex":1,"email":"Johngo_side3@flink.com","time":1590067813271}
很明显看到,咱们要把带有 “side” 字样的数据进行摘取出来
下面按照步骤来进行
1.启动Kafka
该步骤按照各自的环境进行操作,我这里按照我本地的Kafka进行启动
启动ZooKeeper和Kafka
代码语言:javascript复制nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
创建 topic,名称person_t:
代码语言:javascript复制$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person_t
测试消费数据
代码语言:javascript复制$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person_t --from-beginning
2.定义bean
先定义一个POJO类(Person_t.scala)
代码语言:javascript复制package com.tech.bean
import scala.beans.BeanProperty
class Person_t() {
@BeanProperty var id:Int = 0
@BeanProperty var name:String = _
@BeanProperty var age:Int = 0
@BeanProperty var sex:Int = 2
@BeanProperty var email:String = _
@BeanProperty var time:Long = 0L
// 实现toString()方法
override def toString: String = {
"id:" this.id ","
"name:" this.name ","
"age:" this.age ","
"sex:" this.sex ","
"email:" this.email ","
"time:" this.time
}
}
3.编写工具类
工具类的作用是向Kafka中写入数据,编写两个方法,分别为ProduceToKafkaUtil1和ProduceToKafkaUtil2,不同数据源写入同一个Topic
ProduceToKafkaUtil1.scala
代码语言:javascript复制package com.tech.util
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import com.google.gson.Gson
import com.tech.bean.Person_t
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
/**
* 创建 topic:
* kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person_t
*
* 消费数据:
* kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person_t --from-beginning
*
* 添加时间字段
*
*/
object ProduceToKafkaUtil1 {
final val broker_list: String = "localhost:9092"
final val topic = "person_t"
def produceMessageToKafka(): Unit = {
// val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val writeProps = new Properties()
writeProps.setProperty("bootstrap.servers", broker_list)
writeProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
writeProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](writeProps)
for (i <- 1 to 10000) {
val curDate = System.currentTimeMillis()
val person: Person_t = new Person_t()
person.setId(i)
person.setName("Johngo" i)
person.setAge(10 i)
person.setSex(i%2)
person.setEmail("Johngo" i "@flink.com")
person.setTime(curDate.toLong)
val record = new ProducerRecord[String, String](topic, null, null, new Gson().toJson(person))
producer.send(record)
println("SendMessageToKafka: " new Gson().toJson(person))
Thread.sleep(2000)
}
producer.flush()
}
def main(args: Array[String]): Unit = {
this.produceMessageToKafka()
}
}
ProduceToKafkaUtil2.scala
代码语言:javascript复制package com.tech.util
import java.util.Properties
import com.google.gson.Gson
import com.tech.bean.Person_t
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
/**
* 创建 topic:
* kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person_t
*
* 消费数据:
* kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person_t --from-beginning
*
* 添加时间字段
*
*/
object ProduceToKafkaUtil2 {
final val broker_list: String = "localhost:9092"
final val topic = "person_t"
def produceMessageToKafka(): Unit = {
// val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val writeProps = new Properties()
writeProps.setProperty("bootstrap.servers", broker_list)
writeProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
writeProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](writeProps)
for (i <- 1 to 10000) {
val curDate = System.currentTimeMillis()
val person: Person_t = new Person_t()
person.setId(i)
person.setName("Johngo_side" i)
person.setAge(10 i)
person.setSex(i%2)
person.setEmail("Johngo_side" i "@flink.com")
person.setTime(curDate.toLong)
val record = new ProducerRecord[String, String](topic, null, null, new Gson().toJson(person))
producer.send(record)
println("SendMessageToKafka: " new Gson().toJson(person))
Thread.sleep(2000)
}
producer.flush()
}
def main(args: Array[String]): Unit = {
this.produceMessageToKafka()
}
}
定义好工具写Kafka的类之后,下面进行侧输出流的实现
4.侧输出流案例
下面案例可以使用webUI地址访问:http://localhost:8081/#/job/running
但是要记住添加如下的pom依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.10.0</version>
<scope>compile</scope>
</dependency>
定义一个OutputTag来标识一个旁路输出流
代码语言:javascript复制val outputTag = new OutputTag[String]("person_t_side-output")
实现主函数
sideOutputPerson_t.scala
代码语言:javascript复制package com.tech.sideoutput
import com.alibaba.fastjson.JSON
import com.tech.bean.Person_t
import com.tech.util.KafkaSourceUtil
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object sideOutputPerson_t {
def main(args: Array[String]): Unit = {
// UI地址访问:http://localhost:8081/#/job/running
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
val ksu = new KafkaSourceUtil("person_t", "test-consumer-group")
val dstream = env.addSource(ksu.getSouceInfo())
// 首先需要定义一个OutputTag来标识一个旁路输出流
val outputTag = new OutputTag[String]("person_t_side-output")
val mainDataStream = dstream
.map(line => {
JSON.parseObject(line, classOf[Person_t])
})
val sideOutput = mainDataStream.process(new ProcessFunction[Person_t, String] {
override def processElement(
value: Person_t,
ctx: ProcessFunction[Person_t, String]#Context,
out: Collector[String]): Unit = {
if (!value.getName.contains("_side")) {
out.collect(value.toString)
} else {
// 测输出流输出的部分
ctx.output(outputTag, "sideOutput-> 带有_side标识的数据名称" value.getName)
}
}
})
val sideOutputStream: DataStream[String] = sideOutput.getSideOutput(outputTag)
// 测输出流处理
sideOutputStream.print("测输出流")
// 常规数据处理
sideOutput.print("常规数据")
env.execute("outSideput")
}
}
5.程序启动
分别启动两个写Kafka的工具类:
ProduceToKafkaUtil1开始写入(不带side字样,大家观察)
代码语言:javascript复制log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SendMessageToKafka: {"id":1,"name":"Johngo1","age":11,"sex":1,"email":"Johngo1@flink.com","time":1590068784050}
SendMessageToKafka: {"id":2,"name":"Johngo2","age":12,"sex":0,"email":"Johngo2@flink.com","time":1590068786475}
SendMessageToKafka: {"id":3,"name":"Johngo3","age":13,"sex":1,"email":"Johngo3@flink.com","time":1590068788477}
SendMessageToKafka: {"id":4,"name":"Johngo4","age":14,"sex":0,"email":"Johngo4@flink.com","time":1590068790481}
SendMessageToKafka: {"id":5,"name":"Johngo5","age":15,"sex":1,"email":"Johngo5@flink.com","time":1590068792483}
SendMessageToKafka: {"id":6,"name":"Johngo6","age":16,"sex":0,"email":"Johngo6@flink.com","time":1590068794489}
SendMessageToKafka: {"id":7,"name":"Johngo7","age":17,"sex":1,"email":"Johngo7@flink.com","time":1590068796492}
SendMessageToKafka: {"id":8,"name":"Johngo8","age":18,"sex":0,"email":"Johngo8@flink.com","time":1590068798494}
SendMessageToKafka: {"id":9,"name":"Johngo9","age":19,"sex":1,"email":"Johngo9@flink.com","time":1590068800494}
ProduceToKafkaUtil2开始写入(带side字样,大家观察)
代码语言:javascript复制log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SendMessageToKafka: {"id":1,"name":"Johngo_side1","age":11,"sex":1,"email":"Johngo_side1@flink.com","time":1590068787210}
SendMessageToKafka: {"id":2,"name":"Johngo_side2","age":12,"sex":0,"email":"Johngo_side2@flink.com","time":1590068789521}
SendMessageToKafka: {"id":3,"name":"Johngo_side3","age":13,"sex":1,"email":"Johngo_side3@flink.com","time":1590068791526}
SendMessageToKafka: {"id":4,"name":"Johngo_side4","age":14,"sex":0,"email":"Johngo_side4@flink.com","time":1590068793528}
SendMessageToKafka: {"id":5,"name":"Johngo_side5","age":15,"sex":1,"email":"Johngo_side5@flink.com","time":1590068795531}
SendMessageToKafka: {"id":6,"name":"Johngo_side6","age":16,"sex":0,"email":"Johngo_side6@flink.com","time":1590068797535}
SendMessageToKafka: {"id":7,"name":"Johngo_side7","age":17,"sex":1,"email":"Johngo_side7@flink.com","time":1590068799538}
SendMessageToKafka: {"id":8,"name":"Johngo_side8","age":18,"sex":0,"email":"Johngo_side8@flink.com","time":1590068801542}
最后启动主程序(sideOutputPerson_t.scala),看看效果
代码语言:javascript复制log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
常规数据:7> id:1,name:Johngo1,age:11,sex:1,email:Johngo1@flink.com,time:1590069009644
常规数据:7> id:2,name:Johngo2,age:12,sex:0,email:Johngo2@flink.com,time:1590069012246
测输出流:7> sideOutput-> 带有_side标识的数据名称Johngo_side1
常规数据:7> id:3,name:Johngo3,age:13,sex:1,email:Johngo3@flink.com,time:1590069014250
测输出流:7> sideOutput-> 带有_side标识的数据名称Johngo_side2
常规数据:7> id:4,name:Johngo4,age:14,sex:0,email:Johngo4@flink.com,time:1590069016255
测输出流:7> sideOutput-> 带有_side标识的数据名称Johngo_side3
常规数据:7> id:5,name:Johngo5,age:15,sex:1,email:Johngo5@flink.com,time:1590069018257
测输出流:7> sideOutput-> 带有_side标识的数据名称Johngo_side4
常规数据:7> id:6,name:Johngo6,age:16,sex:0,email:Johngo6@flink.com,time:1590069020263
测输出流:7> sideOutput-> 带有_side标识的数据名称Johngo_side5
常规数据:7> id:7,name:Johngo7,age:17,sex:1,email:Johngo7@flink.com,time:1590069022266
显然咱们看到了带有“side”字样的侧输出流的打印
结合具体业务的小伙伴可以在业务中,进行过不同类型数据进行不同的sink操作
6.参考:
来自官网1.10.0
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/side_output.html
作者:Johngo
有问题随时联系作者,谢谢大家 ??????