写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
本文是快速入门Flink系列的第8篇博客,为大家介绍的是流数据常用的Transformation 操作。关于批数据处理的常用16种算子操作,请参考博主的这篇文章?《快速入门Flink (5) ——DataSet必知必会的16种Transformation操作(超详细!建议收藏!)》。
码字不易,先赞后看!!!
1、DataStream的Transformation
1.1 KeyBy
逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散列分区来实现的。
代码语言:javascript复制import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
/*
* @Author: Alice菌
* @Date: 2020/7/9 11:25
* @Description:
*/
// keyBy 分组操作算子
object StreamKeyBy {
def main(args: Array[String]): Unit = {
// 1、创建执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2、构建数据集
import org.apache.flink.api.scala._
val elementSource: DataStream[String] = senv.fromElements("hadoop hadoop scala")
// 3、数据组成元组类型
val wordAndOne: DataStream[(String, Int)] = elementSource.flatMap(x=>x.split(" ")).map((_,1))
// 4、进行分组
val KeyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
// 5、聚合计算
val result: DataStream[(String, Int)] = KeyedStream.sum(1)
// 6、打印输出
result.print()
// 7、执行程序
senv.execute("StreamKeyBy")
//1> (scala,1)
//11> (hadoop,1)
//11> (hadoop,2)
}
}
1.2 Connect
用来将两个 dataStream 组装成一个 ConnectedStreams。而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们就可以把不同的数据组装成同一个结构。
- 代码示例
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*
* @Author: Alice菌
* @Date: 2020/8/10 15:48
* @Description:
Connect:
用来将两个 dataStream 组装成一个 ConnectedStreams
而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;
这样我们 就可以把不同的数据组装成同一个结构
*/
object StreamConnectDemo {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val source1: DataStream[Long] = senv.addSource(new MyNoParallelSource)
val source2: DataStream[Long] = senv.addSource(new MyNoParallelSource)
val connectStreams: ConnectedStreams[Long, Long] = source1.connect(source2)
val result: DataStream[String] = connectStreams.map(
function1 => {
"function1 = " function1
},
function2 => {
"function2 = " function2
}
)
result.print()
senv.execute("StreamConnectDemo")
//8> function2 = 1
//2> function1 = 1
//3> function1 = 2
//9> function2 = 2
//4> function1 = 3
//10> function2 = 3
//5> function1 = 4
//11> function2 = 4
//12> function2 = 5
//6> function1 = 5
}
/**
* 创建自定义并行度为 1 的 source
* 实现从 1 开始产生递增数字
*/
class MyNoParallelSource extends SourceFunction[Long]{
var count:Long = 1L
var isRunning: Boolean = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning){
ctx.collect(count)
count = 1
Thread.sleep(1000)
if (count > 5){
cancel()
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
}
1.3 Split 和 Select
Split 就是将一个 DataStream 分成两个或者多个 DataStream。
Select 就是获取分流后对应的数据。
需求:
代码语言:javascript复制给出数据 1, 2, 3, 4, 5, 6, 7 请使用 split 和 select 把数据中的奇偶数分开,并打印出奇数
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
/*
* @Author: Alice菌
* @Date: 2020/7/9 11:38
* @Description:
*/
object StreamSplit {
def main(args: Array[String]): Unit = {
// 1、创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2、构建数据集
import org.apache.flink.api.scala._
val source: DataStream[Int] = env.fromElements(1,2,3,4,5,6,7,8,9)
val spiltStream: SplitStream[Int] = source.split(x => {
x % 2 match {
case 0 => List("偶数")
case 1 => List("奇数")
}
})
val selectDataStream: DataStream[Int] = spiltStream.select("奇数")
selectDataStream.print()
env.execute("StreamSplit")
//2> 7
//3> 9
//11> 1
//12> 3
//1> 5
}
}
结语
Flink的流处理API有很多与之前,菌哥在介绍常用的16种批处理API的时候谈到的一致,像map,filter,reduce等等…所以下面就不为大家做更多的介绍了,感兴趣的朋友可以去阅读前面的博客,或者有想要补充的内容,可以私信,也可以在评论区留言。
如果以上过程中出现了任何的纰漏错误,烦请大佬们指正?
受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?
希望我们都能在学习的道路上越走越远?