eventMesh源码学习

2023-08-31 13:45:07 浏览数 (2)

eventMesh在runtime启动之前,需要启动roketmq的nameServer和broker,然后启动即可。启动完成后,再启动eventMesh中的example的生产者和消费者。

一、eventmesh-runtime启动的整体过程

eventMesh中的模块eventmesh-runtime的org.apache.eventmesh.runtime.boot.EventMeshStartup启动,基于http/tcp/grpc三种协议做了下面这些事情:

代码语言:javascript复制
EventMeshHTTPConfiguration初始化
EventMeshTCPConfiguration初始化
EventMeshGrpcConfiguration初始化
EventMeshServer初始化init
EventMeshServer 基于http/tcp/grpc启动start 重要
EventMeshServer添加钩子关闭函数

这个过程在运行中是非常重要的。而这些过程中,EventMeshServer初始化init和启动start做的事情比较多,以http协议进行说明:

代码语言:javascript复制
initThreadPool() 初始化线程池
初始化HttpRetryer、HTTPMetricsServer
初始化ConsumerManager、ProducerManager
registerHTTPRequestProcessor 注册处理器 重要

其中初始化线程池和注册处理器很重要

二、初始化相关线程池

代码语言:javascript复制
batchMsgExecutor 批量消息线程池
sendMsgExecutor  发送消息线程池
remoteMsgExecutor 远程消息线程池
pushMsgExecutor  推送消息线程池
clientManageExecutor 客户端管理线程池
adminExecutor admin线程池
replyMsgExecutor 响应消息线程池

三、http协议下注册相关处理器

发送消息相关处理器:

代码语言:javascript复制
BatchSendMessageProcessor => requestCode:MSG_BATCH_SEND => 线程池 batchMsgExecutor
BatchSendMessageV2Processor=> requestCode:MSG_BATCH_SEND_V2 =>线程池 batchMsgExecutor
SendSyncMessageProcessor =>requestCode:MSG_SEND_SYNC => 线程池 sendMsgExecutor
SendAsyncMessageProcessor => requestCode:MSG_SEND_ASYNC => 线程池 sendMsgExecutor
SendAsyncEventProcessor => 线程池 sendMsgExecutor
SendAsyncRemoteEventProcessor => 线程池 remoteMsgExecutor

指标心跳相关处理器:

代码语言:javascript复制
AdminMetricsProcessor => requestCode:ADMIN_METRICS => 线程池 adminExecutor
HeartBeatProcessor =>  requestCode:HEARTBEAT => 线程池 clientManageExecutor  

消费订阅相关处理器:

代码语言:javascript复制
SubscribeProcessor=> requestCode:SUBSCRIBE => 线程池 clientManageExecutor
LocalSubscribeEventProcessor => 线程池 clientManageExecutor
RemoteSubscribeEventProcessor=> 线程池 clientManageExecutor
UnSubscribeProcessor=> requestCode:UNSUBSCRIBE => 线程池 clientManageExecutor
LocalUnSubscribeEventProcessor=> 线程池 clientManageExecutor
RemoteUnSubscribeEventProcessor=> 线程池 clientManageExecutor
ReplyMessageProcessor=> 线程池 clientManageExecutor
ReplyMessageProcessor=> requestCode:REPLY_MESSAGE =>线程池 clientManageExecutor

四、启动

代码语言:javascript复制
 // 执行httpServer启动
    @Override
    public void start() throws Exception {
        super.start();
        metrics.start();
        consumerManager.start();
        producerManager.start();
        httpRetryer.start();
        if (eventMeshHttpConfiguration.eventMeshServerRegistryEnable) {
            this.register();
        }
        logger.info("--------------------------EventMeshHTTPServer started");
    }

这里我们可以看到super.start()进行启动,而启动之前,会将对应的业务处理器进行放入。 此时会启动netty服务,这个时候会添加childHandler(new HttpsServerInitializer(sslContext)),也即HttpsServerInitializer。pipeline添加 httpHandler 重要 这里的httpHandler,就可以理解为业务处理器。这里会根据是否使用URI和是否使用链路追踪。分为processHttpRequest(ctx, asyncContext)和 processEventMeshRequest(ctx, asyncContext)。从代码上可以看到processHttpRequest(ctx, asyncContext)为空实现。所以处理eventmesh请求 重要 这里是eventMesh的请求,业务处理的核心部分,此时会走到对应的处理器。这里可以看到org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor这个处理器接口目前主要处理的一些处理器实现主要有:

代码语言:javascript复制
发送消息处理器 有同步/异步/批量
心跳处理器
订阅处理器
取消订阅处理器
admin指标监控处理器
admin关闭处理器

这里会处理具体的业务逻辑发送和订阅以及心跳。因此此时启动eventmesh-runtime之后,还需要启动生产者和消费者。

五、生产者和消费者启动

启动之后,处理器会根据当前生产者消息处理到发送消息处理器,执行具体的业务处理eventMeshProducer.send(sendMessageContext, new SendCallback()),最终调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl,也是调用发送消息核心实现。

可以看到eventmesh生产消息过程:

代码语言:javascript复制
创建EventMeshHttpClientConfig配置对象信息
创建EventMeshHttpProducer生产者对象信息
构建EventMeshMessage对象信息
执行消息发布EventMeshHttpProducer
进行消息发布,以http请求为例,此时会有对应的code=>SG_SEND_ASYNC: key=>code
可以看到在请求生产者和消费者之前,会先启动eventMesh的runtime模块,此时会注册SendAsyncMessageProcessor发送异步消息处理器,可以看到对应的code信息 =>RequestCode.MSG_SEND_ASYNC

这个时候发送消息后,会进行消息的放入putMessage,此时会将消息存入到rocketmq的broker中。

然后就是我们的消费消息和订阅消息消费消息的过程: 处理请求消费消息,可以看到SubscribeProcessor订阅进行消费消息或者调用roketmq的消费消息接口直接进行消息消费。可以看到eventMeshHttpConsumer.subscribe(topicList, url)这里会执行订阅操作,进行消息消费。

具体可以跟踪eventmesh-example:

当然除了这里看到的http外,还有基于grpc、tcp协议的。以及webhook的相关操作。同时还有基于spi扩展的相关插件。

0 人点赞