flink维表关联系列之kafka维表关联:广播方式

2022-04-18 11:54:55 浏览数 (1)

维表关联系列目录:

一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询

Flink中广播状态

假设存在这样一种场景,一个是用户行为数据,一个是规则数据,要求通过规则去匹配用户行为找到符合规则的用户,并且规则是可以实时变更的,在用户行为匹配中也能根据规则的实时变更作出相应的调整。这个时候就可以使用广播状态,将用户行为数据看做是一个流userActionStream,规则数据也看做是一个流ruleStream,将ruleStream流中数据下发到userActionStream流中,使得在userActionStream流中每一个Task都能获取到ruleStream流中所有数据,这种行为在Flink中称之为广播,ruleStream流称之为广播流,userActionStream称之为非广播流,流入到userActionStream流中的rule数据称之为广播数据,放入到Flink的状态中就称之为广播状态。

定义一条广播流:

代码语言:javascript复制
 val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcast-state",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO)

    val broadcastRuleStream=ruleStream.broadcast()
 

broadcastStateDesc定义了一个广播状态的描述,只能是 MapStateDescriptor类型,在后续的处理中可通过该描述获取到广播状态;广播流通过broadcast方式定义,其内部实现实际上是定义了该流数据分区方式为广播方式,由BroadcastPartitioner来对数据进行分区,在数据选择分区channel 会选择所有的channel, 也就是一条数据会发送到下游所有的Task中

广播流使用:

代码语言:javascript复制
val connectedStream=userActionStream.connect(broadcastRuleStream) 

通过connect方式连接一条广播流,那么广播流broadcastRuleStream就会被广播到userActionStream非广播流中,得到的是一个BroadcastConnectedStream的流,该流包含两个输入流broadcastRuleStream与userActionStream,之后可以通过:

代码语言:javascript复制
connectedStream.process(...) 

process中可为KeyedBroadcastProcessFunction或者BroadcastProcessFunction这两种类型的function, 取决于userActionStream的类型,如果为KeyedStream,则需要使用KeyedBroadcastProcessFunction,否则BroadcastProcessFunction。这两个function的区别在于BroadcastProcessFunction无法提供定时注册,因为定时注册只能在keyedStream中,在使用上都有两个方法:processElement处理非connected流数据并且只可读取广播状态,processBroadcastElement处理connectedStream流数据并且可读写广播状态。

在这里思考一个问题:在KeyedStream中状态都是与具体的key绑定的,在keyedStream中广播状态很显然是非key绑定的,否则就没法全局有效了,看下普通keyed状态存储类型:StateTable<K, N, SV>, SV表示具体的状态 ,可以是value/map/list任意类型,但是都与K有绑定关系,看下广播状态存储类型:HeapBroadcastState中Map<K, V>,是一个普通的map存储结构,其类型就是我们定义的broadcastStateDesc的类型,并没有具体的key绑定,所在在非broadcast流key切换对其并不产生影响,仍然可以读取全局的广播数据。

广播状态用于维表关联

如果需求上存在要求低延时感知维表数据的更新,而又担心实时查询对外部存储维表数据的影响,那么就可以使用广播方式将维表数据广播出去,既能满足实时性、又能满足不对外部存储产生影响,仍然以用户行为规则匹配为例,其实现步骤如下:

  1. 上层业务在规则数据变更的同时发送一条变更数据到kafka,或者直接通过binlog方式发送到kafka中
  2. 将规则数据流定义成为广播流,广播到用户行为数据流中
  3. 定义一个广播状态存储规则数据,在用户行为处理中查询广播数据进行规则匹配,符合要求则发送出去。

代码实现如下:

代码语言:javascript复制
val env=StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(60000)

    val kafkaConfig = new Properties();

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");

    val ruleConsumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema(), kafkaConfig)



    val ruleStream=env.addSource(ruleConsumer)

      .map(x=>{

        val a=x.split(",")

        Rule(a(0),a(1).toBoolean)

      })

    val broadcastStateDesc=new MapStateDescriptor[String,Rule]("broadcast-state",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule] {}))

    val broadcastRuleStream=ruleStream.broadcast()



    val userActionConsumer = new FlinkKafkaConsumer011[String]("topic2", new SimpleStringSchema(), kafkaConfig)



    val userActionStream=env.addSource(userActionConsumer).map(x=>{

      val a=x.split(",")

      UserAction(a(0),a(1),a(2))

    }).keyBy(_.userId)

    val connectedStream=userActionStream.connect(broadcastRuleStream)



    connectedStream.process(new KeyedBroadcastProcessFunction[String,UserAction,Rule,String] {



      override def processElement(value: UserAction, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {



        val state=ctx.getBroadcastState(broadcastStateDesc)

        if(state.contains(value.actionType))

          {

            out.collect(Tuple4.apply(value.userId,value.actionType,value.time,"true").toString())

          }

      }

      override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {



        ctx.getBroadcastState(broadcastStateDesc).put(value.actionType,value)

      }

    })



    env.execute()
 

以上就是简易版使用广播状态来实现维表关联的实现,由于将维表数据存储在广播状态中,但是广播状态是非key的,而rocksdb类型statebackend只能存储keyed状态类型,所以广播维表数据只能存储在内存中,因此在使用中需要注意维表的大小以免撑爆内存。

end

0 人点赞