005. Flink DataSource API

2019-08-09 18:07:57 浏览数 (2)

1. 从本地集合获取数据

代码语言:javascript复制
import org.apache.flink.api.scala._

/**
  * author: YangYunhe
  * date: 2019/8/3 18:59
  * description: 从本地集合中获取数据
  */
object CollectionSource {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ds1: DataSet[String] = env.fromElements[String]("Tom Jack", "Tony", "Bob Lucy Jed")
    val ds2: DataSet[Int] = env.fromCollection(Seq(1, 2, 3, 3))
    val ds3: DataSet[Int] = env.fromCollection(List(1, 2, 3, 3))
    val ds4: DataSet[Int] = env.fromCollection(Set(1, 2, 3, 3))
    val ds5: DataSet[(String, String)] = env.fromCollection(Map("name" -> "Tom", "sex" -> "male"))
    val ds6: DataSet[Long] = env.generateSequence(1L, 100L)  // [1, 100]的序列


    println("ds1: ")
    ds1.print()
    println()

    println("ds2: ")
    ds2.print()
    println()

    println("ds3: ")
    ds3.print()
    println()

    println("ds4: ")
    ds4.print()
    println()

    println("ds5: ")
    ds5.print()
    println()

  }

}

# 运行结果:

ds1: 
Tom Jack
Tony
Bob Lucy Jed

ds2: 
1
2
3
3

ds3: 
1
2
3
3

ds4: 
1
2
3

ds5: 
(name,Tom)
(sex,male)

2. 读文件获取数据

代码语言:javascript复制
import org.apache.flink.api.scala._
import org.apache.flink.types.StringValue

/**
  * author: YangYunhe
  * date: 2019/8/3 19:09
  * description: 从文件中获取数据
  */
object FileSource {

  case class Student(id: String, name: String, age: Int, sex: String)

  def main(args: Array[String]): Unit = {

    val localPath1 = "D:\space\idea\course\learning-flink\inputPath\words.txt"
    val localPath2 = "D:\space\idea\course\learning-flink\inputPath\nums.txt"
    val hdfsPath = "hdfs://beh07:9000/flink/input/students.txt"

    val env = ExecutionEnvironment.getExecutionEnvironment

    // 1. 按行读取文件并将它们作为字符串返回
    val ds1: DataSet[String] = env.readTextFile(localPath1, "UTF-8")

    // 2. 读取文件中数据的原始类型
    val ds2: DataSet[Int] = env.readFileOfPrimitives[Int](localPath2, ",")

    // 3. 按行读取文件并将它们作为StringValues返回,StringValues是可变字符串
    val ds3: DataSet[StringValue] = env.readTextFileWithValue(localPath1, "UTF-8")

    // 4. 读取有标准结构化的数据(例如CSV文件)
    val ds4: DataSet[Student] = env.readCsvFile(
      filePath = hdfsPath, // 文件路径
      lineDelimiter = "n", // 指定行分隔符,默认'n'
      fieldDelimiter = ",", // 指定列分隔符,默认','
      /*
       * quoteCharacter: Character
       * 设置一个引号字符,启用带引号的字符串解析
       * 如果字段的第一个字符是引号字符,则字符串将被解析为带引号的字符串,引号字符串中的字段分隔符将被忽略
       * 如果带引号的字符串字段的最后一个字符不是引号字符,则引用的字符串解析将会失败
       * 如果启用了带引号的字符串解析并且该字段的第一个字符不是引号字符串,则该字符串将被解析为不带引号的字符串
       * 默认情况下,禁用带引号的字符串解析
       */
      quoteCharacter = null,
      ignoreFirstLine = false, // 是否忽略第一行,默认为false
      ignoreComments = null, // 设置注释的符号,例如设置为"#",那么#开头的数据都不会读取,默认不开启此功能
      lenient = false, // 是否启用宽松解析,即忽略无法正确解析的行,默认为false
      includedFields = Array[Int](0, 1, 2, 3), // Array[Int],定义从输入文件中读取的字段的下标,默认全部读取
      pojoFields = Array[String]("id", "name", "age", "sex") // Array[String],指定映射到CSV字段的POJO的字段,CSV字段的解析器将根据POJO字段的类型和顺序自动初始化
    )

    ds1.print()
    println("----------------")
    ds2.print()
    println("----------------")
    ds4.print()

  }

}

# localPath1文件中的内容为:
Tom Tony Jack Jed Tom
Tony Jed Bob Tony Jed
Tom Harry James Bob Gary
Allen Kobe Tom Kobe Bob
Ben Allen Jed Tom Tom

# localPath2文件中的内容为:
0,1,2,3

# hdfsPath文件中的内容为:
0001,Tom,18,男,23.4
0002,Bob,19,男,21.2
0003,Jack,32,男,78.1
0004,Jed,27,男,99.9

# 运行结果:
Tom Harry James Bob Gary
Tom Tony Jack Jed Tom
Allen Kobe Tom Kobe Bob
Ben Allen Jed Tom Tom
Tony Jed Bob Tony Jed
----------------
1
0
2
3
----------------
Student(0001,Tom,18,男)
Student(0004,Jed,27,男)
Student(0002,Bob,19,男)
Student(0003,Jack,32,男)

注意:本地访问HDFS路径需要添加hadoop-client依赖

3. 监听网络端口

代码语言:javascript复制
import org.apache.flink.streaming.api.scala._

/**
  * author: YangYunhe
  * date: 2019/8/8 20:55
  * description: 监听网络端口,流式数据源
  */
object SocketSource {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 指定监听的主机名、端口、行分隔符以及重试的时间间隔
    val source: DataStream[String] = env.socketTextStream("beh07", 9999, 'n', 1)

    source.print().setParallelism(1)

    env.execute("SocketSource")

  }

}

# 监听端口处输入:
[hadoop@beh07 data]$ nc -lk 9999
hello flink

# 程序输出:
hello flink

4. 自定义数据源

以读取MySQL中的数据为例

  • 首先完成自定义Source类的开发
代码语言:javascript复制
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

/**
  * 与表结构对应的实体类
  */
case class Student(id: String, name: String, age: Int, score: Double)

/**
  * author: YangYunhe
  * date: 2019/7/29 14:26
  * description: 自定义的数据源,从MySQL中读取数据
  */
class JDBCSource extends RichSourceFunction[Student] {

  val DRIVER = "com.mysql.jdbc.Driver"
  val URL = "jdbc:mysql://beh07:3306/test?characterEncoding=UTF-8" // &
  val USER = "root"
  val PASSWORD = "root"

  private var statement: PreparedStatement = _
  private var conn: Connection = _

  /**
    * 在整个Source对象初始化之后执行
    */
  override def open(parameters: Configuration): Unit = {
    Class.forName(DRIVER)
    conn = DriverManager.getConnection(URL, USER, PASSWORD)
    val sql = "SELECT * FROM students"
    statement = conn.prepareStatement(sql)
  }


  /**
    * 不停的执行,发送数据到下游
    */
  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {

    val resultSet: ResultSet = statement.executeQuery()

    while(resultSet.next()) {

      val id = resultSet.getString("id")
      val name = resultSet.getString("name")
      val age = resultSet.getInt("age")
      val score = resultSet.getDouble("score")

      // 数据封装到Student对象中
      ctx.collect(Student(id, name, age, score))

    }

  }

  /**
    * 取消发送数据
    */
  override def cancel(): Unit = {

  }

  /**
    * 关闭数据源
    */
  override def close(): Unit = {
    statement.close()
    conn.close()
  }

}
  • 然后运行主程序读取数据
代码语言:javascript复制
import org.apache.flink.streaming.api.scala._

/**
  * author: YangYunhe
  * date: 2019/7/29 15:40
  * description: 从MySQL批量读取数据的主程序
  */
object JDBCSourceApp {

  def main(args: Array[String]): Unit = {

    // 注意,即使是批量读取,也需要使用StreamExecutionEnvironment对象
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val source = environment.addSource(new JDBCSource)

    source.print().setParallelism(1)

    environment.execute("JDBCSourceApp")

  }

}

# 运行结果:
Student(001,Tom,23,99.8)
Student(002,Tony,21,98.1)
Student(003,Jed,24,97.1)
Student(004,Bob,21,98.2)

0 人点赞