文章内容
自定义Flink Source,案例分别实现了继承于SourceFunction的四个案例,三个完全自定义的Source, 另外一个Source为常见的MySQL,通过这几个案例,启发我们进行实际案例的Source研发
代码版本
Flink : 1.10.0 Scala : 2.12.6
官网部分说明
这个是关于Interface中Souce中的信息以及链接,关于SourceFunction的说明,基本使用到的是实现了SourceFunction接口的类
Flink1.10:https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/SourceFunction.html
ALL Known Implementing Classes 就是SourceFunction以及实现于SourceFunction的各个类
自定义Source中,我们可以使用SourceFunction也可以使用它的实现类,看具体情况
可以通过-非并行Source实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源
以下有四个案例,可以根据代码直接进行跑通实现
- 自定义Source,实现自定义&并行度为1的source
- 自定义Source,实现一个支持并行度的source
- 自定义Source,实现一个支持并行度的富类source
- 自定义Source,实现消费MySQL中的数据
1. 自定义Source,实现自定义&并行度为1的source
自定义source,实现SourceFunction接口,实现一个没有并行度的案例
功能:每隔 1s 进行自增加1
实现的方法:run(),作为数据源,所有数据的产生都在 run() 方法中实现
文件名:MyNoParallelFunction.scala
代码语言:javascript复制package com.tech.consumer
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
* 创建自定义并行度为1的source
*/
class MyNoParallelFunction extends SourceFunction[Long]{
var count = 0L
var isRunning = true
override def run(ctx: SourceContext[Long]): Unit = {
while( isRunning ) {
ctx.collect(count)
count = 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
Flink main函数中使用
文件名:StreamWithMyNoParallelFunction.scala
代码语言:javascript复制package com.tech.consumer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
object StreamWithMyNoParallelFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new MyNoParallelFunction)
val mapData = stream.map(line => {
print("received data: " line)
line
})
mapData.setParallelism(1)
env.execute("StreamWithMyNoParallelFunction")
}
}
执行起来,就可以看到数据的打印,就是我们想要得到的数据源不断的产出:
2. 自定义Source,实现一个支持并行度的source
实现ParallelSourceFunction接口
该接口只是个标记接口,用于标识继承该接口的Source都是并行执行的。其直接实现类是RichParallelSourceFunction,它是一个抽象类并继承自 AbstractRichFunction(从名称可以看出,它应该兼具 rich 和 parallel 两个特性,这里的rich体现在它定义了 open 和 close 这两个方法)。
MyParallelFunction.scala
代码语言:javascript复制package com.tech.consumer
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
/**
* 实现一个可以自定义的source
*/
class MyParallelFunction extends ParallelSourceFunction[Long]{
var count = 0L
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning) {
ctx.collect(count)
count = 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
Flink main函数中使用
文件名:StreamWithMyParallelFunction.scala
代码语言:javascript复制package com.tech.consumer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
object StreamWithMyParallelFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new MyParallelFunction)
stream.print("received data: ")
env.execute("StreamWithMyParallelFunction")
}
}
如果使用该自定义Source,如果代码中没有设置并行度,会根据机器性能自动设置并行度。如机器是8核,则打印出来有8个并行度的数据
根据我找出的cpu记录,就是记录着正在运行的程序,以及下面打印出来的数据
3. 自定义Source,实现一个支持并行度的富类source
RichParallelSourceFunction 中的rich体现在额外提供open和close方法
针对source中如果需要获取其他链接资源,那么可以在open方法中获取资源链接,在close中关闭资源链接
文件名:MyRichParallelSourceFunction.scala
代码语言:javascript复制package com.tech.consumer
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
class MyRichParallelSourceFunction extends RichParallelSourceFunction[Long]{
var count = 0L
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning) {
ctx.collect(count)
count = 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
override def open(parameters: Configuration): Unit = {
// 如果需要获取其他链接资源,那么可以在open方法中获取资源链接
print("资源链接.. ")
}
override def close(): Unit = {
// 在close中关闭资源链接
print("资源关闭.. ")
}
}
文件名:StreamWithMyRichParallelSourceFunction.scala
代码语言:javascript复制package com.tech.consumer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
object StreamWithMyRichParallelSourceFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new MyRichParallelSourceFunction)
stream.print("received data")
env.execute("StreamWithMyRichParallelSourceFunction")
}
}
从 “资源链接” 可以看到是执行在所有数据流最之前的,可以用来定义一些数据源的连接信息,比如说MySQL的连接信息
4. 自定义Source,实现消费MySQL中的数据
这个更加接近实际的案例
4.1 首先添加pom依赖
代码语言:javascript复制<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
4.2 创建mysql表,作为读取的数据源
代码语言:javascript复制CREATE TABLE `person` (
id int(10) unsigned NOT NULL AUTO_INCREMENT,
name varchar(260) NOT NULL DEFAULT '' COMMENT '姓名',
age int(11) unsigned NOT NULL DEFAULT '0' COMMENT '年龄',
sex tinyint(2) unsigned NOT NULL DEFAULT '2' COMMENT '0:女, 1男',
email text COMMENT '邮箱',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='人员定义';
随后插入一些数据,作为数据源的内容
代码语言:javascript复制insert into person values
(null, 'Johngo12', 12, 1, 'Source01@flink.com'),
(null, 'Johngo13', 13, 0, 'Source02@flink.com'),
(null, 'Johngo14', 14, 0, 'Source03@flink.com'),
(null, 'Johngo15', 15, 0, 'Source04@flink.com'),
(null, 'Johngo16', 16, 1, 'Source05@flink.com'),
(null, 'Johngo17', 17, 1, 'Source06@flink.com'),
(null, 'Johngo18', 18, 0, 'Source07@flink.com'),
(null, 'Johngo19', 19, 0, 'Source08@flink.com'),
(null, 'Johngo20', 20, 1, 'Source09@flink.com'),
(null, 'Johngo21', 21, 0, 'Source10@flink.com');
4.3 保存实体类Person Bean
代码语言:javascript复制package com.tech.bean
import scala.beans.BeanProperty
class Person(
@BeanProperty var id:Int = 0,
@BeanProperty var name:String = "",
@BeanProperty var age:Int = 0,
@BeanProperty var sex:Int = 2,
@BeanProperty var email:String = ""
) {
}
4.4 创建自定义Source类,继承 RichSourceFunction
文件名:RichSourceFunctionFromMySQL.scala
代码语言:javascript复制package com.tech.consumer
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import com.tech.bean.Person
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
class RichSourceFunctionFromMySQL extends RichSourceFunction[Person]{
var isRUNNING: Boolean = true
var ps: PreparedStatement = null
var conn: Connection = null
// 建立连接
/**
* 与MySQL建立连接信息
* @return
*/
def getConnection():Connection = {
var conn: Connection = null
val DB_URL: String = "jdbc:mysql://localhost:3306/flinkData?useUnicode=true&characterEncoding=UTF-8"
val USER: String = "root"
val PASS: String = "admin123"
try{
Class.forName("com.mysql.cj.jdbc.Driver")
conn = DriverManager.getConnection(DB_URL, USER, PASS)
} catch {
case _: Throwable => println("due to the connect error then exit!")
}
conn
}
/**
* open()方法初始化连接信息
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = this.getConnection()
val sql = "select * from person"
ps = this.conn.prepareStatement(sql)
}
/**
* main方法中调用run方法获取数据
* @param ctx
*/
override def run(ctx: SourceFunction.SourceContext[Person]): Unit = {
val person:Person = new Person()
val resSet:ResultSet = ps.executeQuery()
while(isRUNNING & resSet.next()) {
person.setId(resSet.getInt("id"))
person.setName(resSet.getString("name"))
person.setAge(resSet.getInt("age"))
person.setSex(resSet.getInt("sex"))
person.setEmail(resSet.getString("email"))
ctx.collect(person)
}
}
override def cancel(): Unit = {
isRUNNING = false
}
/**
* 关闭连接信息
*/
override def close(): Unit = {
if(conn != null) {
conn.close()
}
if(ps != null) {
ps.close()
}
}
}
将上述Source作为数据源,进行消费,当前打印到控制台
文件名:StreamRichSourceFunctionFromMySQL.scala
代码语言:javascript复制package com.tech.consumer
import com.google.gson.Gson
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
/**
* 从MySQL中读取数据 & 打印到控制台
*/
object StreamRichSourceFunctionFromMySQL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new RichSourceFunctionFromMySQL())
val personInfo = stream.map(line => {
new Gson().toJson(line)
})
personInfo.print("Data From MySQL ").setParallelism(1)
env.execute(StreamRichSourceFunctionFromMySQL.getClass.getName)
}
}
强烈建议使用Google的Json包,fastJSON会出现坑
好了,现在就把刚刚存放到MySQL中的数据读取了出来,看图?
作者:Johngo