[Doris核心原理] Fe启动过程原理分析4 - 初始化QeService, FeServer, HttpServer

2022-07-24 09:33:30 浏览数 (1)

上一篇初略的讲解了[Doris核心原理] -- FE启动过程原理分析3 -- 初始化Catalog -- Fe中集元数据管理、回放日志管理、Doris插件管理、Fe角色管理、垃圾数据回收管理等 等 等 等功能于一体, 这一篇主要讲解在[Doris核心原理] -- FE启动过程原理分析3 -- 初始化Catalog 初始化后, Fe继续初始化 QeService,FeServer,HttpServer 三个核心服务.

初始化代码还是在[Doris核心原理] -- FE启动过程原理分析2 -- 启动类PaloFe.java中. 下面分别介绍三个服务初始化过程.

1. QeService, 一个SQL应用服务

QeService是一个SQL应用层服务, 启动后用户可以通过MySQL客户端像连接MySQL Server一样连接Doris, 并且执行SQL语句. Fe是通过构造函数初始化的, 代码如下:

代码语言:javascript复制
public QeService(int port, boolean nioEnabled, ConnectScheduler scheduler) {
  //加载帮助w文档

        try {
            HelpModule.getInstance().setUpModule();
        } catch (Exception e) {
            LOG.error("Help module failed, because:", e);
        }
        this.port = port;
        if (nioEnabled) {//NIO服务开启
            mysqlServer = new NMysqlServer(port, scheduler);
        } else {
            mysqlServer = new MysqlServer(port, scheduler);
        }
    }

我们先讲解传入参数:

port: SQL服务监听端口, 默认是9030, 可以通过Fe的配置项query_port更改.

nioEnabled: 是否用NIO实现的MySQL Server服务, 可以通过Fe的配置mysql_service_nio_enabled更改. 默认是true

scheduler: SQL请求计划处理器. 现在的逻辑是来一个SQL请求, 新开一个线程处理. 这个对象在执行环境初始化的时候初始化的.

接下来我们以NMysqlServer为例讲解初始化的过程. 我们先看看构造函数初始化的代码:

代码语言:javascript复制
public NMysqlServer(int port, ConnectScheduler connectScheduler) {
        this.port = port;
        this.xnioWorker = Xnio.getInstance().createWorkerBuilder()
                .setWorkerName("doris-mysql-nio")
                .setWorkerIoThreads(Config.mysql_service_io_threads_num)
                .setExternalExecutorService(taskService).build();
        // connectScheduler only used for idle check.
        this.acceptListener = new AcceptListener(connectScheduler);
    }

我们可以看到, NMysqlServer是NIO实现的服务. 这里将SQL调度器connectScheduler传入, 构造一个acceptListener实例, 在NMysqlServer初始化完成后, 会调用其start()方法启动.

AcceptListener的作用是监听SQL请求, 并处理. 其核心方法是handleEvent(), 部分核心如下代码:

代码语言:javascript复制
 public void handleEvent(AcceptingChannel<StreamConnection> channel) {
    try {
        StreamConnection connection = channel.accept();
        ...
        NConnectContext context = new NConnectContext(connection);
        context.setCatalog(Catalog.getCurrentCatalog());
        connectScheduler.submit(context);
        channel.getWorker().execute(() -> {
            try {
                ...
                ConnectProcessor processor = new ConnectProcessor(context);
                context.startAcceptQuery(processor);
            } catch (AfterConnectedException e) {
                context.cleanup();
            } catch (Exception e) {
                LOG.warn("connect processor exception because ", e);
                context.cleanup();
            } finally {
                ConnectContext.remove();
            }
        });
    } catch (IOException e) {
        LOG.warn("Connection accept failed.", e);
    }
}

当MySQL NIO Server收到一个SQL请求时, 会自动调用handleEvent(), 将请求封装成AcceptingChannel<StreamConnection>类型传递给我们处理.

通过 context.startAcceptQuery(processor)方法, 将注册一个ReadListener, 在这个监听器的handleEvent()方法中调用ConnectProcessor实例中的processOnce()方法, 通过调用dispatch()方法, 开始正式处理一个SQL请求, 代码如下所示:

代码语言:javascript复制
private void dispatch() throws IOException {
        int code = packetBuf.get();
        MysqlCommand command = MysqlCommand.fromCode(code);
        if (command == null) {
            ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
            ctx.getState().setError("Unknown command("   command   ")");
            LOG.warn("Unknown command("   command   ")");
            return;
        }
        ctx.setCommand(command);
        ctx.setStartTime();


        switch (command) {
            case COM_INIT_DB:
                handleInitDb();
                break;
            case COM_QUIT:
                handleQuit();
                break;
            case COM_QUERY:
                handleQuery();
                ctx.setStartTime();
                break;
            case COM_FIELD_LIST:
                handleFieldList();
                break;
            case COM_PING:
                handlePing();
                break;
            default:
                ctx.getState().setError("Unsupported command("   command   ")");
                LOG.warn("Unsupported command("   command   ")");
                break;
        }
    }

后面的逻辑则是对各种SQL语句的处理, 我们后面会一一详解.

2. FeServer, 一个RPC服务

FeServer其实是一个基于Thrift的RPC服务, 通过其构造函数我们可以很轻易的知道, 构造函数代码如下:

代码语言:javascript复制
public void start() throws IOException {

        TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>(
                new FrontendServiceImpl(ExecuteEnv.getInstance()));
        server = new ThriftServer(port, tprocessor);
        server.start();
        LOG.info("thrift server started.");
    }

通过观察 TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>( new FrontendServiceImpl(ExecuteEnv.getInstance())), 我们可以推测出启动了一个RPC服务处理器, 处理FrontendService.Iface接口中的方法, 这个接口的实现类我们可以通过IDEA轻松知道是FrontendServiceImpl.java.

通过之前的文章: Apache Doris概述和组件介绍, 我们可以得知Fe的RPC服务会被其他Fe和Be调用.

3. HttpServer, 一个Http服务

这个http服务是Doris自己实现的一个Http服务, 支持web页面请求和restful api两类http请求.

一般的, org.apache.doris.http.action包下的全部是web页面请求; org.apache.doris.http.rest包下的全部是restful api.

具体哪些是生效的需要查看HttpServer中registerActions()方法, 这个方法注册了全部的web页面请求和restful api. 当然也包含Prometheus Exporter接口.

本章节到此结束了, 希望大家阅读完后, 对三个服务的基本指责和运行过程有大致了解.

0 人点赞