源代码
代码语言:txt复制public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().disableSysoutLogging();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
private String str1 = "{"name":"name-value","age":"28","sex":"1"}";
private long count = 0L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning && count<2){
synchronized (ctx.getCheckpointLock()){
ctx.collect(str1);
count ;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
});
DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()){
String key = iterator.next();
row.setField(pos,jsonNode.get(key).asText());
pos ;
}
return row;
}
});
dataStreamRow.addSink(new SinkFunction<Row>() {
@Override
public void invoke(Row value) throws Exception {
System.out.println(value.getField(0));
}
});
Table myTable = tableEnvironment.fromDataStream(dataStreamRow);
Table result = myTable.select("f0");
DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);
dataStreamResult.print();
environment.execute();
}
运行时报错
提示org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
处理
代码语言:txt复制DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
...
}).returns(new RowTypeInfo(Types.STRING, Types.STRING, ...)); // Add as many fields as your Row has