Flink源码分析之深度解读流式数据写入hive

2020-09-15 14:23:13 浏览数 (1)

  • 前言
  • 数据流处理
    • hive基本信息获取
    • 流、批判断
    • 写入格式判断
    • 构造分区提交算子
  • 详解StreamingFileWriter
  • 简述StreamingFileSink
  • 分区信息提交
    • 提交分区算子
    • 分区提交触发器
    • 分区提交策略
  • 总结

前言

前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下。以便朋友们对flink流式数据写入hive有一个深入的了解,以及在出现问题的时候知道该怎么调试。

其实我们可以想一下这个工作大概是什么流程,首先要写入hive,我们首先要从hive的元数据里拿到相关的hive表的信息,比如存储的路径是哪里,以便往那个目录写数据,还有存储的格式是什么,orc还是parquet,这样我们需要调用对应的实现类来进行写入,其次这个表是否是分区表,写入数据是动态分区还是静态分区,这些都会根据场景的不同而选择不同的写入策略。

写入数据的时候肯定不会把所有数据写入一个文件,那么文件的滚动策略是什么呢?写完了数据我们如何更新hive的元数据信息,以便我们可以及时读取到相应的数据呢?

我画了一个简单的流程图,大家可以先看下,接下来我们带着这些疑问,一步步的从源码里探索这些功能是如何实现的。

数据流处理

我们这次主要是分析flink如何将类似kafka的流式数据写入到hive表,我们先来一段简单的代码:

代码语言:javascript复制
		//构造hive catalog
		String name = "myhive";
		String defaultDatabase = "default";
		String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
		String version = "3.1.2";

		HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
		tEnv.registerCatalog("myhive", hive);
		tEnv.useCatalog("myhive");
		tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
		tEnv.useDatabase("db1");

		tEnv.createTemporaryView("kafka_source_table", dataStream);

		String insertSql = "insert into  hive.db1.fs_table SELECT userId, amount, "  
		                   " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table";
		tEnv.executeSql(insertSql);

系统在启动的时候会首先解析sql,获取相应的属性,然后会通过java的SPI机制加载TableFactory的所有子类,包含TableSourceFactory和TableSinkFactory,之后,会根据从sql中解析的属性循环判断使用哪个工厂类,具体的操作是在TableFactoryUtil类的方法里面实现的。

比如对于上面的sql,解析之后,发现是要写入一个表名为hive.db1.fs_table的hive sink。所以系统在调用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法以后,得到了TableSinkFactory的子类HiveTableFactory,然后调用相应的createTableSink方法来创建相应的sink,也就是HiveTableSink。

我们来简单看下HiveTableSink的变量和结构。

代码语言:javascript复制
/**
 * Table sink to write to Hive tables.
 */
public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {

	private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);

	private final boolean userMrWriter;
	//是否有界,用来区分是批处理还是流处理
	private final boolean isBounded;
	private final JobConf jobConf;
	private final CatalogTable catalogTable;
	private final ObjectIdentifier identifier;
	private final TableSchema tableSchema;
	private final String hiveVersion;
	private final HiveShim hiveShim;

	private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();

	private boolean overwrite = false;
	private boolean dynamicGrouping = false;

我们看到它实现了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三个接口,这三个接口决定了hive sink实现的功能,数据只能是append模式的,数据是可分区的、并且数据是可以被覆盖写的。

类里面的这些变量,看名字就大概知道是什么意思了,就不做解释了,讲一下HiveShim,我们在构造方法里看到hiveShim是和hive 的版本有关的,所以其实这个类我们可以理解为对不同hive版本操作的一层封装。

代码语言:javascript复制
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);

tablesink处理数据流的方法是consumeDataStream,我们来重点分析下。

hive基本信息获取

首先会通过hive的配置连接到hive的元数据库,得到hive表的基本信息。

代码语言:javascript复制
		String[] partitionColumns = getPartitionKeys().toArray(new String[0]);
		String dbName = identifier.getDatabaseName();
		String tableName = identifier.getObjectName();
		try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(
				new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
			Table table = client.getTable(dbName, tableName);
			StorageDescriptor sd = table.getSd();
  • 获取到hive的表的信息,也就是Table对象。
  • 获取表的一些存储信息,StorageDescriptor对象,这里面包含了hive表的存储路径、存储格式等等。

流、批判断

接下来判断写入hive是批处理还是流处理

代码语言:javascript复制
if (isBounded){
   ......
   //batch
    
} else {
   ......
   //streaming
    
}

由于这次我们主要分析flink的流处理,所以对于batch就暂且跳过,进入else,也就是流处理。

在这里,定义了一些基本的配置:

  • 桶分配器TableBucketAssigner,简单来说就是如何确定数据的分区,比如按时间,还是按照字段的值等等。
  • 滚动策略,如何生成下一个文件,按照时间,还是文件的大小等等。
  • 构造bulkFactory,目前只有parquet和orc的列存储格式使用bulkFactory
代码语言:javascript复制
    //桶分配器
	TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
	
	//滚动策略
	TableRollingPolicy rollingPolicy = new TableRollingPolicy(
						true,
						conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
						conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());

    //构造bulkFactory
	Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);

createBulkWriterFactory方法主要是用于构造写入列存储格式的工厂类,目前只支持parquet和orc格式,首先定义用于构造工厂类的一些参数,比如字段的类型,名称等等,之后根据不同类型构造不同的工厂类。如果是parquet格式,最终构造的是ParquetWriterFactory工厂类,如果是orc格式,根据hive的版本不同,分别构造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory。

写入格式判断

如果是使用MR的writer或者是行格式,进入if逻辑,使用HadoopPathBasedBulkFormatBuilder,如果是列存储格式,进入else逻辑,使用StreamingFileSink来写入数据.

代码语言:javascript复制
				if (userMrWriter || !bulkFactory.isPresent()) {
					HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
					builder = new HadoopPathBasedBulkFormatBuilder<>(
							new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
							.withRollingPolicy(rollingPolicy)
							.withOutputFileConfig(outputFileConfig);
					LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
				} else {
					builder = StreamingFileSink.forBulkFormat(
							new org.apache.flink.core.fs.Path(sd.getLocation()),
							new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
							.withBucketAssigner(assigner)
							.withRollingPolicy(rollingPolicy)
							.withOutputFileConfig(outputFileConfig);
					LOG.info("Hive streaming sink: Use native parquet&orc writer.");
				}

在大数据处理中,列式存储比行存储有着更好的查询效率,所以我们这次以列式存储为主,聊聊StreamingFileSink是如何写入列式数据的。通过代码我们看到在构造buckets builder的时候,使用了前面刚生成的bucket assigner、输出的配置、以及文件滚动的策略。

构造分区提交算子

在HiveTableSink#consumeDataStream方法的最后,进入了FileSystemTableSink#createStreamingSink方法,这个方法主要做了两件事情,一个是创建了用于流写入的算子StreamingFileWriter,另一个是当存在分区列并且在配置文件配置了分区文件提交策略的时候,构造了一个用于提交分区文件的算子StreamingFileCommitter,这个算子固定的只有一个并发度。

代码语言:javascript复制
		StreamingFileWriter fileWriter = new StreamingFileWriter(
				rollingCheckInterval,
				bucketsBuilder);
		DataStream<CommitMessage> writerStream = inputStream.transform(
				StreamingFileWriter.class.getSimpleName(),
				TypeExtractor.createTypeInfo(CommitMessage.class),
				fileWriter).setParallelism(inputStream.getParallelism());

		DataStream<?> returnStream = writerStream;

		// save committer when we don't need it.
		if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
			StreamingFileCommitter committer = new StreamingFileCommitter(
					path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
			returnStream = writerStream
					.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
					.setParallelism(1)
					.setMaxParallelism(1);
		}

我们看到在代码中,inputStream经过transform方法,最终将要提交的数据转换成CommitMessage格式,然后发送给它的下游StreamingFileCommitter算子,也就是说StreamingFileCommitter将会接收StreamingFileWriter中收集的数据。

详解StreamingFileWriter

这个StreamingFileWriter我们可以理解为一个算子级别的写入文件的sink,它对StreamingFileSink进行了一些包装,然后添加了一些其他操作,比如提交分区信息等等。我们简单看下这个类的结构,并简单聊聊各个方法的作用。

代码语言:javascript复制
public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
		implements OneInputStreamOperator<RowData, CommitMessage>, BoundedOneInput{
		
	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
	    .........................
	}		

		@Override
	public void snapshotState(StateSnapshotContext context) throws Exception {
	  .........................
	}

	@Override
	public void processWatermark(Watermark mark) throws Exception {
	  .........................
	}

	@Override
	public void processElement(StreamRecord<RowData> element) throws Exception {
	  .........................
	}

	/**
	 * Commit up to this checkpoint id, also send inactive partitions to downstream for committing.
	 */
	@Override
	public void notifyCheckpointComplete(long checkpointId) throws Exception {
		  .........................
	}
	    
		@Override
	public void endInput() throws Exception {
		  .........................
	}

	@Override
	public void dispose() throws Exception {
	  .........................
	}	    
		    

		   
}
  • initializeState :初始化状态的方法,在这里构造了要写入文件的buckets,以及具体写入文件的StreamingFileSinkHelper等等。
  • snapshotState:这个方法主要是进行每次checkpoint的时候调用。
  • processWatermark这个方法通过名字就能看出来,是处理水印的,比如往下游发送水印等等。
  • processElement:处理元素最核心的方法,每来一条数据,都会进入这个方法进行处理。
  • notifyCheckpointComplete,每次checkpoint完成的时候调用该方法。在这里,收集了一些要提交的分区的信息,用于分区提交。
  • endInput:不再有更多的数据进来,也就是输入结束的时候调用。
  • dispose:算子的生命周期结束的时候调用。

简述StreamingFileSink

StreamingFileSink我们来简单的描述下,通过名字我们就能看出来,这是一个用于将流式数据写入文件系统的sink,它集成了checkpoint提供exactly once语义。

在StreamingFileSink里有一个bucket的概念,我们可以理解为数据写入的目录,每个bucket下可以写入多个文件。它提供了一个BucketAssigner的概念用于生成bucket,进来的每一个数据在写入的时候都会判断下要写入哪个bucket,默认的实现是DateTimeBucketAssigner,每小时生成一个bucket。

它根据不同的写入格式分别使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat来进行相应的处理。

此外,该sink还提供了一个RollingPolicy用于决定数据的滚动策略,比如文件到达多大或者经过多久就关闭当前文件,开启下一个新文件。

具体的写入ORC格式的数据,可以参考下这个文章: flink 1.11 流式数据ORC格式写入file ,由于我们这次主要是讲整体写入hive的流程,这个sink就不做太具体的讲解了。

分区信息提交

StreamingFileWriter#notifyCheckpointComplete 调用commitUpToCheckpoint在checkpoint完成的时候触发了分区的提交操作。

代码语言:javascript复制
	private void commitUpToCheckpoint(long checkpointId) throws Exception {
		helper.commitUpToCheckpoint(checkpointId);
		CommitMessage message = new CommitMessage(
				checkpointId,
				getRuntimeContext().getIndexOfThisSubtask(),
				getRuntimeContext().getNumberOfParallelSubtasks(),
				new ArrayList<>(inactivePartitions));
		output.collect(new StreamRecord<>(message));
		inactivePartitions.clear();
	}

在这里,我们看到,使用inactivePartitions构造了CommitMessage对象,然后使用output.collect将这个提交数据收集起来,也就是上文我们提到的这里收集到的这个数据将会发给StreamingFileCommitter算子来处理。

而inactivePartitions里面的数据是什么时候添加进来的呢,也就是什么时候才会生成要提交的分区呢?我们跟踪一下代码,发现是给写入文件的buckets添加了一个监听器,在bucket成为非活跃状态之后,触发监听器,然后将对应的bucket id 添加到inactivePartitions集合。

代码语言:javascript复制
	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
        ..........................
		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {

			@Override
			public void bucketCreated(Bucket<RowData, String> bucket) {
			}

			@Override
			public void bucketInactive(Bucket<RowData, String> bucket) {
				inactivePartitions.add(bucket.getBucketId());
			}
		});
	}

而通知bucket变为非活动状态又是什么情况会触发呢?从代码注释我们看到,到目前为止该bucket已接收的所有记录都已提交后,则该bucket将变为非活动状态。

提交分区算子

这是一个单并行度的算子,用于提交写入文件系统的分区信息。具体的处理步骤如下:

  • 从上游收集要提交的分区信息
  • 判断某一个checkpoint下,所有的子任务是否都已经接收了分区的数据
  • 获取分区提交触发器。(目前支持partition-time和process-time)
  • 使用分区提交策略去依次提交分区信息(可以配置多个分区策略)

这里我们主要讲一下 StreamingFileCommitter#processElement方法是如何对进来的每个提交数据进行处理的。

代码语言:javascript复制
	@Override
	public void processElement(StreamRecord<CommitMessage> element) throws Exception {
		CommitMessage message = element.getValue();
		for (String partition : message.partitions) {
			trigger.addPartition(partition);
		}

		if (taskTracker == null) {
			taskTracker = new TaskTracker(message.numberOfTasks);
		}
		boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);
		if (needCommit) {
			commitPartitions(message.checkpointId);
		}
	}

我们看到,从上游接收到CommitMessage元素,然后从里面得到要提交的分区,添加到PartitionCommitTrigger里(变量trigger),然后通过taskTracker来判断一下,该checkpoint每个子任务是否已经接收到了分区数据,最后通过commitPartitions方法来提交分区信息。

进入commitPartitions方法,看看是如何提交分区的。

代码语言:javascript复制
	private void commitPartitions(long checkpointId) throws Exception {
		List<String> partitions = checkpointId == Long.MAX_VALUE ?
				trigger.endInput() :
				trigger.committablePartitions(checkpointId);
		if (partitions.isEmpty()) {
			return;
		}

		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
			for (String partition : partitions) {
				LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
				LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
				Path path = new Path(locationPath, generatePartitionPath(partSpec));
				PartitionCommitPolicy.Context context = new PolicyContext(
						new ArrayList<>(partSpec.values()), path);
				for (PartitionCommitPolicy policy : policies) {
					if (policy instanceof MetastoreCommitPolicy) {
						((MetastoreCommitPolicy) policy).setMetastore(metaStore);
					}
					policy.commit(context);
				}
			}
		}
	}

从trigger中获取该checkpoint下的所有要提交的分区,放到一个List集合partitions中,在提交的分区不为空的情况下,循环遍历要配置的分区提交策略PartitionCommitPolicy,然后提交分区。

分区提交触发器

目前系统提供了两种分区提交的触发器,PartitionTimeCommitTigger和ProcTimeCommitTigger,分别用于处理什么时候提交分区。

  • ProcTimeCommitTigger 主要依赖于分区的创建时间和delay,当处理时间大于'partition creation time' 'delay'的时候,将提交这个分区
  • PartitionTimeCommitTigger 依赖于水印,当水印的值大于 partition-time delay的时候提交这个分区。

分区提交策略

目前系统提供了一个接口PartitionCommitPolicy,用于提交分区的信息,目前系统提供了以下几种方案,

  • 一种是METASTORE,主要是用于提交hive的分区,比如创建hive分区等等
  • 还有一种是SUCCESS_FILE,也就是往对应的分区目录下写一个success文件。
  • 此外,系统还提供了一个对外的自定义实现,用于用户自定义分区提交,比如提交分区之后合并小文件等等。自定义提交策略的时候,需要实现PartitionCommitPolicy接口,并将提交策略置为custom。

我在网上也看到过一些实现该接口用于合并小文件的示例,但是我个人觉得其实有点不太完美,因为这个合并小文件可能会涉及很多的问题:

  • 合并的时候如何保证事务,保证合并的同时如何有读操作不会发生脏读
  • 事务的一致性,如果合并出错了怎么回滚
  • 合并小文件的性能是否跟得上,目前flink只提供了一个单并行度的提交算子。
  • 如何多并发合并写入

所以暂时我也没有想到一个完美的方案用于flink来合并小文件。

总结

通过上述的描述,我们简单聊了一下flink是如何将流式数据写入hive的,但是可能每个人在做的过程中还是会遇到各种各种的环境问题导致的写入失败,比如window和linux系统的差异,hdfs版本的差异,系统时区的配置等等,在遇到一些个性化的问题之后,就可能需要大家去针对自己的问题去个性化的debug了。

0 人点赞