一些转换(例如,join
,coGroup
,keyBy
,groupBy
)要求在一组元素上定义一个key。其他转换(Reduce
,GroupReduce
,Aggregate
,Windows
)允许在使用这些函数之前根据key
对数据进行分组。
一个DataSet
进行分组如下:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
DataStream
也可以指定一个key
:
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的数据模型不是基于键值对。因此,没有必要将数据集类型打包成keys
和values
。keys
是”虚拟”:它们只是被定义在实际数据之上的函数,以指导分组算子使用。
备注:
代码语言:javascript复制在下面的讨论中,我们将使用DataStream API和keyBy。对于DataSet API,你只需要替换为DataSet和groupBy即可。
下面介绍几种Flink
定义keys
方法。
1. 为Tuples类型定义keys
最简单的情况就是在元组的一个或多个字段上对元组进行分组。下面是在元组的第一个字段(整数类型)上进行分组:
Java版本:
代码语言:javascript复制DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
Scala版本:
代码语言:javascript复制val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
下面,我们将在复合key上对元组进行分组,复合key包含元组的第一个和第二个字段:
Java版本:
代码语言:javascript复制DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
Scala版本:
代码语言:javascript复制val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
如果你有一个包含嵌套元组的DataStream
,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
如果指定keyBy(0)
,则使用整个Tuple2
作为key
(以Integer
和Float
为key
)。如果要使用嵌套中Tuple2
的某个字段,则必须使用下面介绍的字段表达式指定keys
。
2. 使用字段表达式定义keys
你可以使用基于字符串的字段表达式来引用嵌套字段以及定义keys
来进行分组,排序,连接或coGrouping
。字段表达式可以非常容易地选择(嵌套)复合类型(如Tuple
和POJO
类型)中的字段。
在下面的例子中,我们有一个WC POJO
,它有两个字段word
和count
。如果想通过word
字段分组,我们只需将word
传递给keyBy()
函数即可。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
字段表达式语法:
(1) 按其字段名称选择POJO
字段。例如,user
是指向POJO
类型的user
字段。
(2) 通过字段名称或0到offset
的数值字段索引来选择元组字段(field name or 0-offset field index)。例如,f0
和5
分别指向Java
元组类型的第一和第六字段。
(3) 你可以在POJO
和元组中选择嵌套字段。例如,user.zip
是指POJO
类型user
字段中的zip
字段。支持POJO
和Tuples
的任意嵌套和组合,如f1.user.zip
或user.f3.1.zip
。
(4) 你可以使用*
通配符表达式选择所有类型。这也适用于不是元组或POJO
类型的类型。
Example:
代码语言:javascript复制public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
下面是上述示例代码的有效字段表达式:
代码语言:javascript复制count:WC类中的count字段。
complex:递归地选择复合字段POJO类型ComplexNestedClass的所有字段。
complex.word.f2:选择嵌套字段Tuple3的最后一个字段。
complex.hadoopCitizen:选择Hadoop IntWritable类型。
3. 使用key Selector 函数定义keys
定义key
的另一种方法是key选择器
函数。key选择器函数将单个元素作为输入,并返回元素的key。key
可以是任何类型的。
以下示例显示了一个key选择器函数,它只返回一个对象的字段:
Java版本:
代码语言:javascript复制public class WC {
public String word; public int count;
}
DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
Scala版本:
代码语言:javascript复制case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
备注:
代码语言:javascript复制Flink版本:1.4