大数据开发之Flink Table操作

2022-11-22 16:23:59 浏览数 (1)

前言

本文使用环境版本

  • Hive:2.3.9
  • Flink:flink-1.12.7-bin-scala_2.12

依赖

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.psvmc</groupId>
  <artifactId>WordCount</artifactId>
  <version>1.0</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.12.7</flink.version>
    <scala.version>2.12.15</scala.version>
    <hadoop.version>2.7.7</hadoop.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

  <repositories>
    <repository>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>https://maven.aliyun.com/repository/public</url>
    </repository>
  </repositories>
  <dependencies>
    <!-- flink核心API -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>


    <!-- rocksdb-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <!-- Hive Connector的支持,仅在编译时生效-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>2.1.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.calcite</groupId>
          <artifactId>calcite-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.calcite</groupId>
          <artifactId>calcite-avatica</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>hadoop-hdfs</artifactId>
          <groupId>org.apache.hadoop</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <!--读取hadoop文件-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

先看一个简单的例子

代码语言:javascript复制
import org.apache.flink.table.api.{$, EnvironmentSettings, FieldExpression, SqlDialect, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)

  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
    val tableEnv: TableEnvironment = TableEnvironment.create(settings)

    val name: String = "hive"
    val defaultDataBase: String = "default"
    val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"

    val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir)
    tableEnv.registerCatalog("myHive", hive) // 注册Catalog
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useCatalog("myHive") // 使用注册的Catalog ,不使用的话查不到数据
    tableEnv.useDatabase("default") // 设置要查询的数据库
    tableEnv.executeSql("show tables").print()

    val selectTables_sql = "select id,name,password from t_user"
    val result = tableEnv.sqlQuery(selectTables_sql)
    result.execute().print()

    val mTable = tableEnv.from("t_user").select($"id",$"name",$"password")
    mTable.execute().print()
  }
}

如上我们可以看到

  • Table 可以调用计算处理相关方法 Table调用execute返回TableResult
  • TableResult 可以用来打印
代码语言:javascript复制
//返回Table 
tableEnv.sqlQuery(sqlstr)
//返回TableResult 
tableEnv.executeSql(sqlstr)

0 人点赞