上一篇初略的讲解了[Doris核心原理] -- FE启动过程原理分析3 -- 初始化Catalog -- Fe中集元数据管理、回放日志管理、Doris插件管理、Fe角色管理、垃圾数据回收管理等 等 等 等功能于一体, 这一篇主要讲解在[Doris核心原理] -- FE启动过程原理分析3 -- 初始化Catalog 初始化后, Fe继续初始化 QeService,FeServer,HttpServer 三个核心服务.
初始化代码还是在[Doris核心原理] -- FE启动过程原理分析2 -- 启动类PaloFe.java中. 下面分别介绍三个服务初始化过程.
1. QeService, 一个SQL应用服务
QeService是一个SQL应用层服务, 启动后用户可以通过MySQL客户端像连接MySQL Server一样连接Doris, 并且执行SQL语句. Fe是通过构造函数初始化的, 代码如下:
代码语言:javascript复制public QeService(int port, boolean nioEnabled, ConnectScheduler scheduler) {
//加载帮助w文档
try {
HelpModule.getInstance().setUpModule();
} catch (Exception e) {
LOG.error("Help module failed, because:", e);
}
this.port = port;
if (nioEnabled) {//NIO服务开启
mysqlServer = new NMysqlServer(port, scheduler);
} else {
mysqlServer = new MysqlServer(port, scheduler);
}
}
我们先讲解传入参数:
port: SQL服务监听端口, 默认是9030, 可以通过Fe的配置项query_port更改.
nioEnabled: 是否用NIO实现的MySQL Server服务, 可以通过Fe的配置mysql_service_nio_enabled更改. 默认是true
scheduler: SQL请求计划处理器. 现在的逻辑是来一个SQL请求, 新开一个线程处理. 这个对象在执行环境初始化的时候初始化的.
接下来我们以NMysqlServer为例讲解初始化的过程. 我们先看看构造函数初始化的代码:
代码语言:javascript复制public NMysqlServer(int port, ConnectScheduler connectScheduler) {
this.port = port;
this.xnioWorker = Xnio.getInstance().createWorkerBuilder()
.setWorkerName("doris-mysql-nio")
.setWorkerIoThreads(Config.mysql_service_io_threads_num)
.setExternalExecutorService(taskService).build();
// connectScheduler only used for idle check.
this.acceptListener = new AcceptListener(connectScheduler);
}
我们可以看到, NMysqlServer是NIO实现的服务. 这里将SQL调度器connectScheduler传入, 构造一个acceptListener实例, 在NMysqlServer初始化完成后, 会调用其start()方法启动.
AcceptListener的作用是监听SQL请求, 并处理. 其核心方法是handleEvent(), 部分核心如下代码:
代码语言:javascript复制 public void handleEvent(AcceptingChannel<StreamConnection> channel) {
try {
StreamConnection connection = channel.accept();
...
NConnectContext context = new NConnectContext(connection);
context.setCatalog(Catalog.getCurrentCatalog());
connectScheduler.submit(context);
channel.getWorker().execute(() -> {
try {
...
ConnectProcessor processor = new ConnectProcessor(context);
context.startAcceptQuery(processor);
} catch (AfterConnectedException e) {
context.cleanup();
} catch (Exception e) {
LOG.warn("connect processor exception because ", e);
context.cleanup();
} finally {
ConnectContext.remove();
}
});
} catch (IOException e) {
LOG.warn("Connection accept failed.", e);
}
}
当MySQL NIO Server收到一个SQL请求时, 会自动调用handleEvent(), 将请求封装成AcceptingChannel<StreamConnection>类型传递给我们处理.
通过 context.startAcceptQuery(processor)方法, 将注册一个ReadListener, 在这个监听器的handleEvent()方法中调用ConnectProcessor实例中的processOnce()方法, 通过调用dispatch()方法, 开始正式处理一个SQL请求, 代码如下所示:
代码语言:javascript复制private void dispatch() throws IOException {
int code = packetBuf.get();
MysqlCommand command = MysqlCommand.fromCode(code);
if (command == null) {
ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
ctx.getState().setError("Unknown command(" command ")");
LOG.warn("Unknown command(" command ")");
return;
}
ctx.setCommand(command);
ctx.setStartTime();
switch (command) {
case COM_INIT_DB:
handleInitDb();
break;
case COM_QUIT:
handleQuit();
break;
case COM_QUERY:
handleQuery();
ctx.setStartTime();
break;
case COM_FIELD_LIST:
handleFieldList();
break;
case COM_PING:
handlePing();
break;
default:
ctx.getState().setError("Unsupported command(" command ")");
LOG.warn("Unsupported command(" command ")");
break;
}
}
后面的逻辑则是对各种SQL语句的处理, 我们后面会一一详解.
2. FeServer, 一个RPC服务
FeServer其实是一个基于Thrift的RPC服务, 通过其构造函数我们可以很轻易的知道, 构造函数代码如下:
代码语言:javascript复制public void start() throws IOException {
TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>(
new FrontendServiceImpl(ExecuteEnv.getInstance()));
server = new ThriftServer(port, tprocessor);
server.start();
LOG.info("thrift server started.");
}
通过观察 TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>( new FrontendServiceImpl(ExecuteEnv.getInstance())), 我们可以推测出启动了一个RPC服务处理器, 处理FrontendService.Iface接口中的方法, 这个接口的实现类我们可以通过IDEA轻松知道是FrontendServiceImpl.java.
通过之前的文章: Apache Doris概述和组件介绍, 我们可以得知Fe的RPC服务会被其他Fe和Be调用.
3. HttpServer, 一个Http服务
这个http服务是Doris自己实现的一个Http服务, 支持web页面请求和restful api两类http请求.
一般的, org.apache.doris.http.action包下的全部是web页面请求; org.apache.doris.http.rest包下的全部是restful api.
具体哪些是生效的需要查看HttpServer中registerActions()方法, 这个方法注册了全部的web页面请求和restful api. 当然也包含Prometheus Exporter接口.
本章节到此结束了, 希望大家阅读完后, 对三个服务的基本指责和运行过程有大致了解.