2021-09-18 15:09:24
浏览数 (1)
Flink读parquet
代码语言:javascript
复制import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.ParquetRowInputFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.{Level, Logger}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.{MessageType, PrimitiveType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
object ReadFromParquet {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.flink").setLevel(Level.ERROR)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hdfs_parquet_file_path_t1 = "hdfs://ns1/user/hive/warehouse/test.db/t1"
val hdfs_parquet_file_path = "hdfs://ns1//user/hhy/parquet/2019-11-18--10"
/**
* 手动指定 parquet的 schema
*/
val id = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT64, "id")
val username = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "username")
val password = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "password")
val t1_schema = new MessageType("t1", id, username, password)
println(s"t1_schema : ${t1_schema}")
val t1 = env.readFile(new ParquetRowInputFormat(new Path(hdfs_parquet_file_path_t1), t1_schema), hdfs_parquet_file_path_t1)
//print the second field
t1.map(_.getField(2)).print().setParallelism(1)
/**
* 使用相关接口得到schema
*/
val configurationconfiguration = new Configuration(true)
val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(configurationconfiguration)
val files = hdfs.listFiles(new org.apache.hadoop.fs.Path(hdfs_parquet_file_path), false)
var flag = true
var first_file_name = ""
while (flag){
if(files.hasNext){
first_file_name = files.next().getPath.getName
if(!first_file_name.equalsIgnoreCase(s"_SUCCESS") && !first_file_name.startsWith(".")){
// println(first_file_name)
flag = false
}
// println(s"flag:$flag")
}else{
flag = false
}
}
println(s"first_file_name : ${first_file_name}")
val parquetFileReader = ParquetFileReader.readFooter(configurationconfiguration, new org.apache.hadoop.fs.Path(hdfs_parquet_file_path s"/${first_file_name}"))
val schema: MessageType =parquetFileReader.getFileMetaData().getSchema()
println(s"readed schema:${schema}")
/**
* using by read parquet file's schema
*/
val t1_one: DataStream[Row] = env.readFile(new ParquetRowInputFormat(new Path(hdfs_parquet_file_path), schema), hdfs_parquet_file_path)
t1_one.map(_.getField(1)).print().setParallelism(1)
//执行job
env.execute("ReadFromParquet")
}
}
Flink写parquet
代码语言:javascript
复制import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.log4j.{Level, Logger}
object SocketSourceAvroparquetSink {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.flink").setLevel(Level.ERROR)
System.setProperty("HADOOP_USER_NAME", "root")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hdfs_parquet_file_save_path = "hdfs://ns1/user/hhy/parquet"
env.enableCheckpointing(1000)
val port = 9999
val source = env.socketTextStream("localhost", port)
val wc: DataStream[WORD] = source
.flatMap(_.split("\s"))
.filter(_ != null)
.filter(!"".equalsIgnoreCase(_))
.map(WORD(_, 1))
.keyBy("word")
.timeWindow(Time.seconds(3))
.sum("count")
/**
* ParquetAvroWriters 这种方式保存的文件,spark.read.parquet 可以直接读取
* 也可以 完整的写入到 hdfs文件中去
*/
val sink_parquet: StreamingFileSink[WORD] = StreamingFileSink
.forBulkFormat(new Path(hdfs_parquet_file_save_path), ParquetAvroWriters.forReflectRecord(classOf[WORD]))
.withBucketAssigner(new DateTimeBucketAssigner())
.build()
wc.addSink(sink_parquet).setParallelism(1)
env.execute("SocketSourceAvroparquetSink")
}
case class WORD(word:String, count:Int)
}