快速入门Flink (8) —— DataStream 的 Transformation常用操作

2021-01-27 16:32:29 浏览数 (1)

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

本文是快速入门Flink系列的第8篇博客,为大家介绍的是流数据常用的Transformation 操作。关于批数据处理的常用16种算子操作,请参考博主的这篇文章?《快速入门Flink (5) ——DataSet必知必会的16种Transformation操作(超详细!建议收藏!)》。

码字不易,先赞后看!!!


1、DataStream的Transformation

1.1 KeyBy

逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散列分区来实现的。

代码语言:javascript复制
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

/*
 * @Author: Alice菌
 * @Date: 2020/7/9 11:25
 * @Description: 
    
 */
// keyBy   分组操作算子
object StreamKeyBy {
  def main(args: Array[String]): Unit = {

    // 1、创建执行环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、构建数据集
    import org.apache.flink.api.scala._
    val elementSource: DataStream[String] = senv.fromElements("hadoop hadoop scala")
    // 3、数据组成元组类型
    val wordAndOne: DataStream[(String, Int)] = elementSource.flatMap(x=>x.split(" ")).map((_,1))
    // 4、进行分组
    val KeyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
    // 5、聚合计算
    val result: DataStream[(String, Int)] = KeyedStream.sum(1)
    // 6、打印输出
    result.print()
    // 7、执行程序
    senv.execute("StreamKeyBy")
    //1> (scala,1)
    //11> (hadoop,1)
    //11> (hadoop,2)


  }
}

1.2 Connect

用来将两个 dataStream 组装成一个 ConnectedStreams。而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们就可以把不同的数据组装成同一个结构。

  • 代码示例
代码语言:javascript复制
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*
 * @Author: Alice菌
 * @Date: 2020/8/10 15:48
 * @Description: 
    Connect:
    用来将两个 dataStream 组装成一个 ConnectedStreams
    而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;
    这样我们 就可以把不同的数据组装成同一个结构
 */
object StreamConnectDemo {
  def main(args: Array[String]): Unit = {

    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val source1: DataStream[Long] = senv.addSource(new MyNoParallelSource)
    val source2: DataStream[Long] = senv.addSource(new MyNoParallelSource)

    val connectStreams: ConnectedStreams[Long, Long] = source1.connect(source2)

    val result: DataStream[String] = connectStreams.map(
      function1 => {
        "function1 = "   function1
      },
      function2 => {
        "function2 = "   function2
      }
    )

    result.print()

    senv.execute("StreamConnectDemo")

    //8> function2 = 1
    //2> function1 = 1
    //3> function1 = 2
    //9> function2 = 2
    //4> function1 = 3
    //10> function2 = 3
    //5> function1 = 4
    //11> function2 = 4
    //12> function2 = 5
    //6> function1 = 5

  }

  /**
    * 创建自定义并行度为 1 的 source
    * 实现从 1 开始产生递增数字
    */
  class MyNoParallelSource extends SourceFunction[Long]{

    var count:Long = 1L
    var isRunning: Boolean = true

    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {

      while (isRunning){
        ctx.collect(count)
        count  = 1
        Thread.sleep(1000)
        if (count > 5){
          cancel()
        }
      }
    }

    override def cancel(): Unit = {
      isRunning = false
    }
  }
}

1.3 Split 和 Select

Split 就是将一个 DataStream 分成两个或者多个 DataStream。

Select 就是获取分流后对应的数据。

需求:

给出数据 1, 2, 3, 4, 5, 6, 7 请使用 split 和 select 把数据中的奇偶数分开,并打印出奇数

代码语言:javascript复制
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}

/*
 * @Author: Alice菌
 * @Date: 2020/7/9 11:38
 * @Description: 
    
 */
object StreamSplit {
  def main(args: Array[String]): Unit = {

    // 1、创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、构建数据集
    import org.apache.flink.api.scala._
    val source: DataStream[Int] = env.fromElements(1,2,3,4,5,6,7,8,9)

    val spiltStream: SplitStream[Int] = source.split(x => {
      x % 2 match {
        case 0 => List("偶数")
        case 1 => List("奇数")
      }
    })

    val selectDataStream: DataStream[Int] = spiltStream.select("奇数")

    selectDataStream.print()

    env.execute("StreamSplit")

    //2> 7
    //3> 9
    //11> 1
    //12> 3
    //1> 5

  }
}

结语

Flink的流处理API有很多与之前,菌哥在介绍常用的16种批处理API的时候谈到的一致,像map,filter,reduce等等…所以下面就不为大家做更多的介绍了,感兴趣的朋友可以去阅读前面的博客,或者有想要补充的内容,可以私信,也可以在评论区留言。

如果以上过程中出现了任何的纰漏错误,烦请大佬们指正?

受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?

希望我们都能在学习的道路上越走越远?

0 人点赞