1、整体代码逻辑的流程:
创建基本环境 ——> 配置环境的基本配置 ——> 执行业务逻辑
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
2、基本环境的创建
代码语言:javascript复制// 1. 环境准备
// 1.1 设置操作 Hadoop 的用户名为 Hadoop 超级用户 atguigu
System.setProperty("HADOOP_USER_NAME", "atguigu");
// 1.2 获取流处理环境,并指定本地测试时启动 WebUI 所绑定的端口
Configuration conf = new Configuration();
conf.setInteger("rest.port", port);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//需要执行SQL语句的时候,要创建SQL环境(非必要)
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
3、环境的配置
代码语言:javascript复制// 1.3 设置并行度
env.setParallelism(parallelism);
// 1.4 状态后端及检查点相关配置
// 1.4.1 设置状态后端
env.setStateBackend(new HashMapStateBackend());
// 1.4.2 开启 checkpoint
env.enableCheckpointing(5000);
// 1.4.3 设置 checkpoint 模式: 精准一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 1.4.4 checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" ckAndGroupId);
// 1.4.5 checkpoint 并发数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 1.4.6 checkpoint 之间的最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// 1.4.7 checkpoint 的超时时间
env.getCheckpointConfig().setCheckpointTimeout(10000);
// 1.4.8 job 取消时 checkpoint 保留策略
env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);
4、业务逻辑流程
在获取kafkasource的时候,会有这样的配置setValueOnlyDeserializer
里面默认重写方法deserialize
没有处理字符串为null的情况,因此需要重写该方法并将新的业务逻辑鞋子重写的该方法里,有
.setValueOnlyDeserializer(new DeserializationSchema<String>() {
@Override
public String deserialize(byte[] message) throws IOException {
if (message != null) {
return new String(message, StandardCharsets.UTF_8);
}
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
})
5、加分选项(redis分担压力)
使用了redis的作用:分担因数据量过大的而导致数据库( hbase )的读写性能变差的压力。
具体写法就是:
代码语言:javascript复制reduceBeanStream.map(new RichMapFunction<TradeSkuOrderBean, TradeSkuOrderBean>() {
Connection connection;
Jedis jedis;
@Override
public void open(Configuration parameters) throws Exception {
//第一次进入这个函数的时候将会执行ope的步骤再去执行map的逻辑步骤
//如果有什么在处理数据前要做的环境准备,可以在这一步骤准备
connection = HBaseUtil.getConnection();
jedis = RedisUtil.getJedis();
}
@Override
public void close() throws Exception {
//在离开函数的时候最后执行的函数 可以给open函数的准备的环境进行关闭
HBaseUtil.closeConnection(connection);
RedisUtil.closeJedis(jedis);
}
@Override
public TradeSkuOrderBean map(TradeSkuOrderBean bean) throws Exception {
//map的功能是把数据进行格式转化 flatmap是数据转换和数据清洗
// 1. 拼接对应的redisKey
String redisKey = RedisUtil.getRedisKey("dim_sku_info",bean.getSkuId());
// 2. 读取redis缓存的数据 -> 此处存储的redis的value值是 JSONObject 的字符串
String dim = jedis.get(redisKey);// 从redis里获取
JSONObject dimSkuInfo;
// 3. 判断redis读取到的数据是否为空
if ( dim == null || dim.length() == 0 ){ // 此时redis还没有此redisKey的数据
// redis没有对应缓存 需要读取HBase
System.out.println("没有缓存,读取hbase " redisKey);
dimSkuInfo = HBaseUtil.getCells(connection,Constant.HBASE_NAMESPACE,"dim_sku_info",bean.getSkuId());
// 读取到数据之后 存到redis
if ( dimSkuInfo.size() != 0 ){ // 24 * 60 * 60 是一天的秒值
jedis.setex( redisKey ,24 * 60 * 60 , dimSkuInfo.toJSONString() );
}
}else {
// redis有缓存 直接返回
System.out.println("有缓存直接返回redis数据 " redisKey);
dimSkuInfo = JSONObject.parseObject(dim);
}
if (dimSkuInfo.size() != 0){
// 进行维度关联
bean.setCategory3Id(dimSkuInfo.getString("category3_id"));
bean.setTrademarkId(dimSkuInfo.getString("tm_id"));
bean.setSpuId(dimSkuInfo.getString("spu_id"));
bean.setSkuName(dimSkuInfo.getString("sku_name"));
}else { // dimSkuInfo.size() == 0
System.out.println("没有对应的维度信息" bean);
}
return bean;
}
}).print();
6、异步IO
概念:
同步io是一个请求发出,只能占据线程等待返回结果,才能执行下一个请求。
异步io则是,一个请求发出后,其他请求也能在第一个请求等待返回结果的时间里发送出去。
一般来说,异步io提升的是请求的发送数量(因为一个请求等待返回结果的等待时间里也能有其他请求发送给服务器/数据库),但是不能提升服务器/数据库接受请求数量(这个是服务器/数据库的硬件问题),只能提升硬件资源才能解决
代码:
代码语言:javascript复制public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {
StatefulRedisConnection<String, String> redisAsyncConnection;
AsyncConnection hBaseAsyncConnection;
@Override
public void open(Configuration parameters) throws Exception {
redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();
}
@Override
public void close() throws Exception {
RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
HBaseUtil.closeAsyncConnection(hBaseAsyncConnection);
}
@Override
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
// java的异步编程方式
String tableName = getTableName();
String rowKey = getId(input);
String redisKey = RedisUtil.getRedisKey(tableName, rowKey);
//Future是表示执行异步的一个接口
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 第一步异步访问得到的数据
RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);
String dimInfo = null;
try {
dimInfo = dimSkuInfoFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
return dimInfo;
}
}).thenApplyAsync(new Function<String, JSONObject>() { // thenApplyAsync是 再走一遍异步IO的访问
@Override
public JSONObject apply(String dimInfo) {
JSONObject dimJsonObj = null;
// 旁路缓存判断
if (dimInfo == null || dimInfo.length() == 0){
// 需要访问HBase
try {
dimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);
// 将读取的数据保存到redis
redisAsyncConnection.async().setex(redisKey,24 * 60 * 60 ,dimJsonObj.toJSONString());
} catch (IOException e) {
e.printStackTrace();
}
}else {
// redis中存在缓存数据
dimJsonObj = JSONObject.parseObject(dimInfo);
}
return dimJsonObj;
}
}).thenAccept(new Consumer<JSONObject>() {
@Override
public void accept(JSONObject dim) {
// 合并维度信息
if (dim == null){
// 无法关联到维度信息
System.out.println("无法关联当前的维度信息" tableName ":" rowKey);
}else {
join(input,dim);
}
// 返回结果
resultFuture.complete(Collections.singletonList(input));
}
});
}
}
代码语言:javascript复制public interface DimJoinFunction<T> {
public String getId( T input);
public String getTableName();
public void join(T input, JSONObject dim);
}
在给出的Java代码示例中,CompletableFuture
被用来异步地执行一系列操作,并且这些操作之间通过链式调用关联起来。以下是 supplyAsync()
, thenApplyAsync()
, 和 thenAccept()
各自承担的功能:
- supplyAsync():
- 它是
CompletableFuture
的一个静态方法,用于异步地执行一个Supplier
类型的任务(在这里是一个匿名内部类)。 - 在这个例子中,
supplyAsync()
异步地执行了一个从Redis中获取数据的操作。它返回一个CompletableFuture<String>
,这个Future
会在数据被成功检索(或发生异常)时完成。 - 这里的主要目标是开始一个异步的I/O操作(访问Redis),以便主线程可以继续执行其他任务,而不是被阻塞。
- 它是
- thenApplyAsync():
- 它是已经存在的
CompletableFuture
的一个方法,用于在原始Future
完成之后异步地执行一个Function
类型的任务。 - 在这个例子中,
thenApplyAsync()
接收来自supplyAsync()
方法的String
类型的结果,并检查这个结果是否为空或长度为零。如果是,它会尝试从HBase中获取数据,并将结果保存到Redis中。如果Redis中的数据不为空,它则直接将Redis中的数据解析为JSONObject
。 - 这个方法允许你基于原始异步操作的结果执行额外的异步逻辑。在这里,它执行了另一个可能的I/O操作(访问HBase)和另一个可能的写操作(保存到Redis)。
- 它是已经存在的
- thenAccept():
- 同样是
CompletableFuture
的一个方法,用于在原始Future
完成之后执行一个Consumer
类型的任务。 - 在这个例子中,
thenAccept()
接收来自thenApplyAsync()
方法的JSONObject
类型的结果,并检查这个结果是否为空。如果为空,它打印一条消息;否则,它调用join(input, dim)
方法来合并维度信息。最后,它使用resultFuture.complete(...)
来完成另一个Future
(可能是用于外部通信或通知的)。 - 与
thenApplyAsync()
不同,thenAccept()
的任务不返回任何结果,因此它适合用于那些不产生新结果但需要处理或消费结果的异步操作。
- 同样是
整个流程是一个典型的异步编程模式,其中多个I/O操作被链式地组织在一起,以便它们可以并行执行,并且主线程不会被阻塞。这种模式可以显著提高应用程序的吞吐量和响应性。