ES作为强大的和流行的搜索引擎服务组件,为我们提供了方便的和高性能的搜索服务。在实际应用中也是用得比较爽,但如果能够理解更深入一点,那就更好了。虽然网上有许多的文章已经完整说明,ES是如何如何做到高性能,如何做到高可用的,以及有许多的避坑指南。但那些,毕竟还是太描述化。
就让我们以源码作为出发点,一探ES究竟吧,虽然也可能是片面的。
1:ES编译及准备
害,其实我们不想搞编译。一个是意义不大;二个是ES是用java编写的,打包后本质上它就是一个war包或者jar包;三个是编译需要拉取外部的许多jar包依赖,而这些依赖又是在国外网站速度又是超级慢。
简单的,直接在es官网下载个安装包就可以了。https://www.elastic.co/cn/downloads/elasticsearch 。这是实际应用的通常路径,但不是我们学习的路径。
如果要自己编译,也可以,直接下载源码包: https://github.com/elastic/elasticsearch.git ; 下载gradle: https://gradle.org/releases/ ; 安装jdk11 ...
安装方法, 直接切到elasticsearch 源码目录运行: gradlew idea ; 即可。这比起 ./configure; make; make install; 是简单不少的。java自行解决几乎所有的依赖,而非自行解决!
当然,中途你肯定会遇到许多问题,一般主要就是网络问题。主要就是里面依赖了许多国外网站的资源,可以进行修改:
代码语言:javascript复制# 搜索所有 repositories { , 将其网址替换为 aliyun 的地址,如下:
repositories {
// jcenter()
maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
maven { url 'https://maven.aliyun.com/repository/google' }
maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }
maven { url 'https://maven.aliyun.com/repository/jcenter'}
}
重新运行 gradle idea, 即可。如果再失败,就换个快的网络,重试,直到成功。
我们也可以直接将源码导致idea中,直接用ide进行编译即可。同样,需要替换相应的依赖地址。导入完成后,就可以看到整个gradle的目录了。如下:
2. ES server的启动流程
当环境准备好了,我们就可以顺利进入正题了。理论上,一个应用的启动流程都不会很复杂,我们就大致瞅瞅吧。
2.1. Elasticsearch入口类的作用
因为入口类为 Elasticsearch, 所以通过入口类,就可以知道它是如何开始,它是否承担了所有的工作,以及将下文都交给了谁。必然需要直接定位到入口:Elasticsearch#main
代码语言:javascript复制 // org.elasticsearch.bootstrap.Elasticsearch#main
/**
* Main entry point for starting elasticsearch
*/
public static void main(final String[] args) throws Exception {
overrideDnsCachePolicyProperties();
/*
* We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
* presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
* forces such policies to take effect immediately.
*/
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm) {
// grant all permissions so that we can later set the security manager to the one that we want
}
});
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
// 看起来是转移到另一个 main() 方法了
int status = main(args, elasticsearch, Terminal.DEFAULT);
// 如果执行未返回OK, 则说明发生了异常,直接结束JVM。否则 es 进程将被继续后台执行
if (status != ExitCodes.OK) {
final String basePath = System.getProperty("es.logs.base_path");
// It's possible to fail before logging has been configured, in which case there's no point
// suggesting that the user look in the log file.
if (basePath != null) {
Terminal.DEFAULT.errorPrintln(
"ERROR: Elasticsearch did not exit normally - check the logs at "
basePath
System.getProperty("file.separator")
System.getProperty("es.logs.cluster_name") ".log"
);
}
exit(status);
}
}
static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
return elasticsearch.main(args, terminal);
}
入口比较简单,但好像啥也看不出来。但大致就是实例化一个 Elasticsearch, 然后调用其main() 方法。这样做有什么好处呢?这样就可以用 Elasticsearch 中定义的变量了,而不只是调用其静态方法和变量了。
我们先来看一下 Elasticsearch 的类继承图:Elasticsearch extends EnvironmentAwareCommand extends Command implements Closeable
Elasticsearch 的类构造里面也做了一些事情,实际上是增加几个变量的解析规则:
代码语言:javascript复制 // visible for testing
Elasticsearch() {
// beforeMain 为空方法
super("Starts Elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging
versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
"Prints Elasticsearch version information and exits");
daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
"Starts Elasticsearch in the background")
.availableUnless(versionOption);
pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
"Creates a pid file in the specified path on start")
.availableUnless(versionOption)
.withRequiredArg()
.withValuesConvertedBy(new PathConverter());
quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
"Turns off standard output/error streams logging in console")
.availableUnless(versionOption)
.availableUnless(daemonizeOption);
}
而 elasticsearch.main() 则是调用的Command中定义的通用方法,主要目的在于使用一般的命令执行模板方法。整个 Elasticsearch 类可以看作是启动的门面类,它会很多的准备和验证工作。比如创建配置上下文,验证命令参数等等。所以通过它的执行,我们理解到,大体上需要关注什么参数,以及可能用户会遇到的报错情况。
代码语言:javascript复制 // org.elasticsearch.cli.Command#main
/** Parses options for this command from args and executes it. */
public final int main(String[] args, Terminal terminal) throws Exception {
if (addShutdownHook()) {
// 添加关闭钩子,做一些资源的关闭,避免数据损坏或丢失,但实际上此处为空执行
shutdownHookThread = new Thread(() -> {
try {
this.close();
} catch (final IOException e) {
try (
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
e.printStackTrace(pw);
terminal.errorPrintln(sw.toString());
} catch (final IOException impossible) {
// StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
// say that an exception here is impossible
throw new AssertionError(impossible);
}
}
});
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
}
// 此处将被执行空转
beforeMain.run();
try {
// 同样是 Command 的私有实现
mainWithoutErrorHandling(args, terminal);
} catch (OptionException e) {
// print help to stderr on exceptions
printHelp(terminal, true);
terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " e.getMessage());
return ExitCodes.USAGE;
} catch (UserException e) {
if (e.exitCode == ExitCodes.USAGE) {
printHelp(terminal, true);
}
if (e.getMessage() != null) {
terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " e.getMessage());
}
// 异常返回
return e.exitCode;
}
// 正常情况下都返回 OK
return ExitCodes.OK;
}
/**
* Executes the command, but all errors are thrown.
*/
void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
// 命令行参数解析
final OptionSet options = parser.parse(args);
// -h 打印帮助文档
if (options.has(helpOption)) {
printHelp(terminal, false);
return;
}
if (options.has(silentOption)) {
terminal.setVerbosity(Terminal.Verbosity.SILENT);
} else if (options.has(verboseOption)) {
terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
} else {
terminal.setVerbosity(Terminal.Verbosity.NORMAL);
}
// 回调回 Elasticsearch, 先到 EnvironmentAwareCommand
execute(terminal, options);
}
// org.elasticsearch.cli.EnvironmentAwareCommand#execute(org.elasticsearch.cli.Terminal, joptsimple.OptionSet)
@Override
protected void execute(Terminal terminal, OptionSet options) throws Exception {
final Map<String, String> settings = new HashMap<>();
for (final KeyValuePair kvp : settingOption.values(options)) {
// 不能存在空值参数
if (kvp.value.isEmpty()) {
throw new UserException(ExitCodes.USAGE, "setting [" kvp.key "] must not be empty");
}
// 不能存在重复的参数
if (settings.containsKey(kvp.key)) {
final String message = String.format(
Locale.ROOT,
"setting [%s] already set, saw [%s] and [%s]",
kvp.key,
settings.get(kvp.key),
kvp.value);
throw new UserException(ExitCodes.USAGE, message);
}
settings.put(kvp.key, kvp.value);
}
putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
// 回调 Elasticsearch 的 execute 实现
execute(terminal, options, createEnv(settings));
}
// org.elasticsearch.bootstrap.Elasticsearch#execute
@Override
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
if (options.nonOptionArguments().isEmpty() == false) {
throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " options.nonOptionArguments());
}
// 版本打印
if (options.has(versionOption)) {
final String versionOutput = String.format(
Locale.ROOT,
"Version: %s, Build: %s/%s/%s/%s, JVM: %s",
Build.CURRENT.getQualifiedVersion(),
Build.CURRENT.flavor().displayName(),
Build.CURRENT.type().displayName(),
Build.CURRENT.hash(),
Build.CURRENT.date(),
JvmInfo.jvmInfo().version()
);
terminal.println(versionOutput);
return;
}
final boolean daemonize = options.has(daemonizeOption);
final Path pidFile = pidfileOption.value(options);
final boolean quiet = options.has(quietOption);
// a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediately
try {
env.validateTmpFile();
} catch (IOException e) {
throw new UserException(ExitCodes.CONFIG, e.getMessage());
}
try {
// 初始化
init(daemonize, pidFile, quiet, env);
} catch (NodeValidationException e) {
throw new UserException(ExitCodes.CONFIG, e.getMessage());
}
}
// Elasticsearch.init
void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
throws NodeValidationException, UserException {
try {
Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
} catch (BootstrapException | RuntimeException e) {
// format exceptions to the console in a special way
// to avoid 2MB stacktraces from guice, etc.
throw new StartupException(e);
}
}
以上,就是 Elasticsearch 类的使命了。两个重点:1. 创建配置环境上下文;2. 验证传入命令参数的合法性;3. 提交启动命令给到 Bootstrap 类。
2.2. Bootstrap启动流程解析
从表面意思来看,Bootstrap 更像是启动工作的实力担当。接过 Elasticsearch 类的配置上下文信息,Bootstrap 又如何展开进一步的工作呢?我们一起来看下。
代码语言:javascript复制 // 它是以静态方法 init() 作为切入点
// org.elasticsearch.bootstrap.Bootstrap#init
/**
* This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
*/
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
// force the class initializer for BootstrapInfo to run before
// the security manager is installed
BootstrapInfo.init();
INSTANCE = new Bootstrap();
final SecureSettings keystore = loadSecureSettings(initialEnv);
final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
// the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture
// the stream objects before calling LogConfigurator to be able to close them when appropriate
final Runnable sysOutCloser = getSysOutCloser();
final Runnable sysErrorCloser = getSysErrorCloser();
LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
try {
LogConfigurator.configure(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
if (environment.pidFile() != null) {
try {
PidFile.create(environment.pidFile(), true);
} catch (IOException e) {
throw new BootstrapException(e);
}
}
try {
final boolean closeStandardStreams = (foreground == false) || quiet;
if (closeStandardStreams) {
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
sysOutCloser.run();
}
// fail if somebody replaced the lucene jars
checkLucene();
// install the default uncaught exception handler; must be done before security is
// initialized as we do not want to grant the runtime permission
// setDefaultUncaughtExceptionHandler
Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
// 检查环境
INSTANCE.setup(true, environment);
try {
// any secure settings must be read during node construction
IOUtils.close(keystore);
} catch (IOException e) {
throw new BootstrapException(e);
}
// 启动服务
INSTANCE.start();
// We don't close stderr if `--quiet` is passed, because that
// hides fatal startup errors. For example, if Elasticsearch is
// running via systemd, the init script only specifies
// `--quiet`, not `-d`, so we want users to be able to see
// startup errors via journalctl.
if (foreground == false) {
sysErrorCloser.run();
}
} catch (NodeValidationException | RuntimeException e) {
// disable console logging, so user does not see the exception twice (jvm will show it already)
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (foreground && maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
Logger logger = LogManager.getLogger(Bootstrap.class);
// HACK, it sucks to do this, but we will run users out of disk space otherwise
if (e instanceof CreationException) {
// guice: log the shortened exc to the log file
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = null;
try {
ps = new PrintStream(os, false, "UTF-8");
} catch (UnsupportedEncodingException uee) {
assert false;
e.addSuppressed(uee);
}
new StartupException(e).printStackTrace(ps);
ps.flush();
try {
logger.error("Guice Exception: {}", os.toString("UTF-8"));
} catch (UnsupportedEncodingException uee) {
assert false;
e.addSuppressed(uee);
}
} else if (e instanceof NodeValidationException) {
logger.error("node validation exceptionn{}", e.getMessage());
} else {
// full exception
logger.error("Exception", e);
}
// re-enable it if appropriate, so they can see any logging during the shutdown process
if (foreground && maybeConsoleAppender != null) {
Loggers.addAppender(rootLogger, maybeConsoleAppender);
}
throw e;
}
}
以上就是BootStrap的启动框架了。大体分为几步:
1. 实例化BootStrap类到INSTANCE中; 2. 读取密码等安全信息; 3. 重新创建自己的环境上下文,主要是为加入更多配置如密码信息; 4. 加载日志实例; 5. 创建pid; 6. 检查lucene版本信息避免jar包被替换导致的异常; 7. Bootstrap进行准备工作; 8. Bootstrap进行启动工作; 9. 启动完成;
可见,整个框架还是很清晰的,但是又有一种意犹未尽的感觉。那是自然,因为框架只会有大概思路,并不会给你打通任督发二脉。除去一些检查性的工作,其中的核心是的准备工作和启动工作。下面细细分解下。
2.3. Bootstrap准备工作详解
上节说的两个重点之一:Bootstrap准备,都需要准备啥呢?
代码语言:javascript复制 // org.elasticsearch.bootstrap.Bootstrap#setup
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();
try {
// 加载外部模块,独立进程contoller
spawner.spawnNativeControllers(environment, true);
} catch (IOException e) {
throw new BootstrapException(e);
}
// 初始化本地一些资源信息
initializeNatives(
environment.tmpFile(),
BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
// initialize probes before the security manager is installed
// 初始化各需要的探针, 保证实例加载可用
/**
static void initializeProbes() {
// Force probes to be loaded
ProcessProbe.getInstance();
OsProbe.getInstance();
JvmInfo.jvmInfo();
}
*/
initializeProbes();
// 关闭钩子,保证node和外部运行的contoller得到正常关闭
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. "
"Any outstanding requests or tasks might get killed.");
}
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Thread.currentThread().interrupt();
}
}
});
}
try {
// look for jar hell
// 检查重复类
final Logger logger = LogManager.getLogger(JarHell.class);
JarHell.checkJarHell(logger::debug);
} catch (IOException | URISyntaxException e) {
throw new BootstrapException(e);
}
// Log ifconfig output before SecurityManager is installed
// 打印 ifconfig 输出信息
IfConfig.logIfNecessary();
// install SM after natives, shutdown hooks, etc.
try {
// 配置 SecurityManager
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
throw new BootstrapException(e);
}
// 终于实例化节点了
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
// 将检查节点功能委托给 Bootstrap 处理
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}
// org.elasticsearch.bootstrap.Spawner#spawnNativeControllers
/**
* Spawns the native controllers for each module.
*
* @param environment The node environment
* @param inheritIo Should the stdout and stderr of the spawned process inherit the
* stdout and stderr of the JVM spawning it?
* @throws IOException if an I/O error occurs reading the module or spawning a native process
*/
void spawnNativeControllers(final Environment environment, final boolean inheritIo) throws IOException {
if (spawned.compareAndSet(false, true) == false) {
throw new IllegalStateException("native controllers already spawned");
}
if (Files.exists(environment.modulesFile()) == false) {
throw new IllegalStateException("modules directory [" environment.modulesFile() "] not found");
}
/*
* For each module, attempt to spawn the controller daemon. Silently ignore any module that doesn't include a controller for the
* correct platform.
*/
// plugin 目录列举
List<Path> paths = PluginsService.findPluginDirs(environment.modulesFile());
for (final Path modules : paths) {
// 读取 plugin-descriptor.properties 信息
final PluginInfo info = PluginInfo.readFromProperties(modules);
final Path spawnPath = Platforms.nativeControllerPath(modules);
if (Files.isRegularFile(spawnPath) == false) {
continue;
}
if (info.hasNativeController() == false) {
final String message = String.format(
Locale.ROOT,
"module [%s] does not have permission to fork native controller",
modules.getFileName());
throw new IllegalArgumentException(message);
}
// 启动插件的 controller 进程
final Process process = spawnNativeController(spawnPath, environment.tmpFile(), inheritIo);
processes.add(process);
}
}
// org.elasticsearch.bootstrap.Spawner#spawnNativeController
/**
* Attempt to spawn the controller daemon for a given module. The spawned process will remain connected to this JVM via its stdin,
* stdout, and stderr streams, but the references to these streams are not available to code outside this package.
*/
private Process spawnNativeController(final Path spawnPath, final Path tmpPath, final boolean inheritIo) throws IOException {
final String command;
if (Constants.WINDOWS) {
/*
* We have to get the short path name or starting the process could fail due to max path limitations. The underlying issue here
* is that starting the process on Windows ultimately involves the use of CreateProcessW. CreateProcessW has a limitation that
* if its first argument (the application name) is null, then its second argument (the command line for the process to start) is
* restricted in length to 260 characters (cf. https://msdn.microsoft.com/en-us/library/windows/desktop/ms682425.aspx). Since
* this is exactly how the JDK starts the process on Windows (cf.
* http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/windows/native/java/lang/ProcessImpl_md.c#l319), this
* limitation is in force. As such, we use the short name to avoid any such problems.
*/
command = Natives.getShortPathName(spawnPath.toString());
} else {
command = spawnPath.toString();
}
final ProcessBuilder pb = new ProcessBuilder(command);
// the only environment variable passes on the path to the temporary directory
pb.environment().clear();
pb.environment().put("TMPDIR", tmpPath.toString());
// The process _shouldn't_ write any output via its stdout or stderr, but if it does then
// it will block if nothing is reading that output. To avoid this we can inherit the
// JVM's stdout and stderr (which are redirected to files in standard installations).
if (inheritIo) {
pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
pb.redirectError(ProcessBuilder.Redirect.INHERIT);
}
// the output stream of the process object corresponds to the daemon's stdin
return pb.start();
}
读取配置文件细节,感兴趣的同学可以深入查看下,主要是具体解析哪些变量的问题。
代码语言:javascript复制 // 1. 解析 plugin-descriptor.properties 文件
// org.elasticsearch.plugins.PluginInfo#readFromProperties
/**
* Reads the plugin descriptor file.
*
* @param path the path to the root directory for the plugin
* @return the plugin info
* @throws IOException if an I/O exception occurred reading the plugin descriptor
*/
public static PluginInfo readFromProperties(final Path path) throws IOException {
final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);
final Map<String, String> propsMap;
{
final Properties props = new Properties();
try (InputStream stream = Files.newInputStream(descriptor)) {
props.load(stream);
}
propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));
}
final String name = propsMap.remove("name");
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException(
"property [name] is missing in [" descriptor "]");
}
final String description = propsMap.remove("description");
if (description == null) {
throw new IllegalArgumentException(
"property [description] is missing for plugin [" name "]");
}
final String version = propsMap.remove("version");
if (version == null) {
throw new IllegalArgumentException(
"property [version] is missing for plugin [" name "]");
}
final String esVersionString = propsMap.remove("elasticsearch.version");
if (esVersionString == null) {
throw new IllegalArgumentException(
"property [elasticsearch.version] is missing for plugin [" name "]");
}
final Version esVersion = Version.fromString(esVersionString);
final String javaVersionString = propsMap.remove("java.version");
if (javaVersionString == null) {
throw new IllegalArgumentException(
"property [java.version] is missing for plugin [" name "]");
}
JarHell.checkVersionFormat(javaVersionString);
final String extendedString = propsMap.remove("extended.plugins");
final List<String> extendedPlugins;
if (extendedString == null) {
extendedPlugins = Collections.emptyList();
} else {
extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));
}
final boolean hasNativeController = parseBooleanValue(name, "has.native.controller", propsMap.remove("has.native.controller"));
final PluginType type = getPluginType(name, propsMap.remove("type"));
final String classname = getClassname(name, type, propsMap.remove("classname"));
final String javaOpts = propsMap.remove("java.opts");
if (type != PluginType.BOOTSTRAP && Strings.isNullOrEmpty(javaOpts) == false) {
throw new IllegalArgumentException(
"[java.opts] can only have a value when [type] is set to [bootstrap] for plugin [" name "]"
);
}
boolean isLicensed = parseBooleanValue(name, "licensed", propsMap.remove("licensed"));
if (propsMap.isEmpty() == false) {
throw new IllegalArgumentException("Unknown properties for plugin [" name "] in plugin descriptor: " propsMap.keySet());
}
return new PluginInfo(name, description, version, esVersion, javaVersionString,
classname, extendedPlugins, hasNativeController, type, javaOpts, isLicensed);
}
// 2. 读取controller路径信息
/**
* The path to the native controller for a plugin with native components.
*/
public static Path nativeControllerPath(Path plugin) {
if (Constants.MAC_OS_X) {
return plugin
.resolve("platform")
.resolve(PLATFORM_NAME)
.resolve(PROGRAM_NAME ".app")
.resolve("Contents")
.resolve("MacOS")
.resolve(PROGRAM_NAME);
}
// 根据系统平台加载不同文件,如windows为 controller.exe, 其他为 contoller
return plugin
.resolve("platform")
.resolve(PLATFORM_NAME)
.resolve("bin")
.resolve(PROGRAM_NAME);
}
接下来是本地资源的初始化过程,如禁止root运行,检测系统支持功能接口情况等等:
代码语言:javascript复制 // org.elasticsearch.bootstrap.Bootstrap#initializeNatives
/** initialize native resources */
public static void initializeNatives(Path tmpFile, boolean mlockAll, boolean systemCallFilter, boolean ctrlHandler) {
final Logger logger = LogManager.getLogger(Bootstrap.class);
// check if the user is running as root, and bail
// 检查是否是用root运行,windows忽略,linux上通过native方法 JNACLibrary.geteuid() == 0 来判定
if (Natives.definitelyRunningAsRoot()) {
throw new RuntimeException("can not run elasticsearch as root");
}
// enable system call filter
// 如启动 linux 的 linuxImpl(),
if (systemCallFilter) {
Natives.tryInstallSystemCallFilter(tmpFile);
}
// mlockall if requested
if (mlockAll) {
if (Constants.WINDOWS) {
Natives.tryVirtualLock();
} else {
Natives.tryMlockall();
}
}
// listener for windows close event
if (ctrlHandler) {
Natives.addConsoleCtrlHandler(new ConsoleCtrlHandler() {
@Override
public boolean handle(int code) {
if (CTRL_CLOSE_EVENT == code) {
logger.info("running graceful exit on windows");
try {
Bootstrap.stop();
} catch (IOException e) {
throw new ElasticsearchException("failed to stop node", e);
}
return true;
}
return false;
}
});
}
// force remainder of JNA to be loaded (if available).
try {
/**
*
private static final class Holder {
private static final JNAKernel32Library instance = new JNAKernel32Library();
}
*/
JNAKernel32Library.getInstance();
} catch (Exception ignored) {
// we've already logged this.
}
Natives.trySetMaxNumberOfThreads();
Natives.trySetMaxSizeVirtualMemory();
Natives.trySetMaxFileSize();
// init lucene random seed. it will use /dev/urandom where available:
StringHelper.randomId();
}
整个过程如其方法名所示,初始化native的一系列支持性资源,实际上就是测试该运行平台上的各设备,是否可用,以预热处理。
以下是linux平台测试filter/mlockAll过程细节速览,详细可展开。(需JNA支持)
代码语言:javascript复制 // org.elasticsearch.bootstrap.SystemCallFilter#init
/**
* Attempt to drop the capability to execute for the process.
* <p>
* This is best effort and OS and architecture dependent. It may throw any Throwable.
* @return 0 if we can do this for application threads, 1 for the entire process
*/
static int init(Path tmpFile) throws Exception {
if (Constants.LINUX) {
return linuxImpl();
} else if (Constants.MAC_OS_X) {
// try to enable both mechanisms if possible
bsdImpl();
macImpl(tmpFile);
return 1;
} else if (Constants.SUN_OS) {
solarisImpl();
return 1;
} else if (Constants.FREE_BSD || OPENBSD) {
bsdImpl();
return 1;
} else if (Constants.WINDOWS) {
windowsImpl();
return 1;
} else {
throw new UnsupportedOperationException("syscall filtering not supported for OS: '" Constants.OS_NAME "'");
}
}
// org.elasticsearch.bootstrap.SystemCallFilter#linuxImpl
/** try to install our BPF filters via seccomp() or prctl() to block execution */
private static int linuxImpl() {
// first be defensive: we can give nice errors this way, at the very least.
// also, some of these security features get backported to old versions, checking kernel version here is a big no-no!
final Arch arch = ARCHITECTURES.get(Constants.OS_ARCH);
boolean supported = Constants.LINUX && arch != null;
if (supported == false) {
throw new UnsupportedOperationException("seccomp unavailable: '" Constants.OS_ARCH "' architecture unsupported");
}
// we couldn't link methods, could be some really ancient kernel (e.g. < 2.1.57) or some bug
if (linux_libc == null) {
throw new UnsupportedOperationException("seccomp unavailable: could not link methods. requires kernel 3.5 "
"with CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");
}
// try to check system calls really are who they claim
// you never know (e.g. https://chromium.googlesource.com/chromium/src.git/ /master/sandbox/linux/seccomp-bpf/sandbox_bpf.cc#57)
final int bogusArg = 0xf7a46a5c;
// test seccomp(BOGUS)
long ret = linux_syscall(arch.seccomp, bogusArg);
if (ret != -1) {
throw new UnsupportedOperationException("seccomp unavailable: seccomp(BOGUS_OPERATION) returned " ret);
} else {
int errno = Native.getLastError();
switch (errno) {
case ENOSYS: break; // ok
case EINVAL: break; // ok
default: throw new UnsupportedOperationException("seccomp(BOGUS_OPERATION): " JNACLibrary.strerror(errno));
}
}
// test seccomp(VALID, BOGUS)
ret = linux_syscall(arch.seccomp, SECCOMP_SET_MODE_FILTER, bogusArg);
if (ret != -1) {
throw new UnsupportedOperationException("seccomp unavailable: seccomp(SECCOMP_SET_MODE_FILTER, BOGUS_FLAG) returned " ret);
} else {
int errno = Native.getLastError();
switch (errno) {
case ENOSYS: break; // ok
case EINVAL: break; // ok
default: throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER, BOGUS_FLAG): "
JNACLibrary.strerror(errno));
}
}
// test prctl(BOGUS)
ret = linux_prctl(bogusArg, 0, 0, 0, 0);
if (ret != -1) {
throw new UnsupportedOperationException("seccomp unavailable: prctl(BOGUS_OPTION) returned " ret);
} else {
int errno = Native.getLastError();
switch (errno) {
case ENOSYS: break; // ok
case EINVAL: break; // ok
default: throw new UnsupportedOperationException("prctl(BOGUS_OPTION): " JNACLibrary.strerror(errno));
}
}
// now just normal defensive checks
// check for GET_NO_NEW_PRIVS
switch (linux_prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0)) {
case 0: break; // not yet set
case 1: break; // already set by caller
default:
int errno = Native.getLastError();
if (errno == EINVAL) {
// friendly error, this will be the typical case for an old kernel
throw new UnsupportedOperationException("seccomp unavailable: requires kernel 3.5 with"
" CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");
} else {
throw new UnsupportedOperationException("prctl(PR_GET_NO_NEW_PRIVS): " JNACLibrary.strerror(errno));
}
}
// check for SECCOMP
switch (linux_prctl(PR_GET_SECCOMP, 0, 0, 0, 0)) {
case 0: break; // not yet set
case 2: break; // already in filter mode by caller
default:
int errno = Native.getLastError();
if (errno == EINVAL) {
throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP not compiled into kernel,"
" CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");
} else {
throw new UnsupportedOperationException("prctl(PR_GET_SECCOMP): " JNACLibrary.strerror(errno));
}
}
// check for SECCOMP_MODE_FILTER
if (linux_prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, 0, 0, 0) != 0) {
int errno = Native.getLastError();
switch (errno) {
case EFAULT: break; // available
case EINVAL: throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP_FILTER not"
" compiled into kernel, CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");
default: throw new UnsupportedOperationException("prctl(PR_SET_SECCOMP): " JNACLibrary.strerror(errno));
}
}
// ok, now set PR_SET_NO_NEW_PRIVS, needed to be able to set a seccomp filter as ordinary user
if (linux_prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0) {
throw new UnsupportedOperationException("prctl(PR_SET_NO_NEW_PRIVS): " JNACLibrary.strerror(Native.getLastError()));
}
// check it worked
if (linux_prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0) != 1) {
throw new UnsupportedOperationException("seccomp filter did not really succeed: prctl(PR_GET_NO_NEW_PRIVS): "
JNACLibrary.strerror(Native.getLastError()));
}
// BPF installed to check arch, limit, then syscall.
// See https://www.kernel.org/doc/Documentation/prctl/seccomp_filter.txt for details.
SockFilter insns[] = {
/* 1 */ BPF_STMT(BPF_LD BPF_W BPF_ABS, SECCOMP_DATA_ARCH_OFFSET), //
/* 2 */ BPF_JUMP(BPF_JMP BPF_JEQ BPF_K, arch.audit, 0, 7), // if (arch != audit) goto fail;
/* 3 */ BPF_STMT(BPF_LD BPF_W BPF_ABS, SECCOMP_DATA_NR_OFFSET), //
/* 4 */ BPF_JUMP(BPF_JMP BPF_JGT BPF_K, arch.limit, 5, 0), // if (syscall > LIMIT) goto fail;
/* 5 */ BPF_JUMP(BPF_JMP BPF_JEQ BPF_K, arch.fork, 4, 0), // if (syscall == FORK) goto fail;
/* 6 */ BPF_JUMP(BPF_JMP BPF_JEQ BPF_K, arch.vfork, 3, 0), // if (syscall == VFORK) goto fail;
/* 7 */ BPF_JUMP(BPF_JMP BPF_JEQ BPF_K, arch.execve, 2, 0), // if (syscall == EXECVE) goto fail;
/* 8 */ BPF_JUMP(BPF_JMP BPF_JEQ BPF_K, arch.execveat, 1, 0), // if (syscall == EXECVEAT) goto fail;
/* 9 */ BPF_STMT(BPF_RET BPF_K, SECCOMP_RET_ALLOW), // pass: return OK;
/* 10 */ BPF_STMT(BPF_RET BPF_K, SECCOMP_RET_ERRNO | (EACCES & SECCOMP_RET_DATA)), // fail: return EACCES;
};
// seccomp takes a long, so we pass it one explicitly to keep the JNA simple
SockFProg prog = new SockFProg(insns);
prog.write();
long pointer = Pointer.nativeValue(prog.getPointer());
int method = 1;
// install filter, if this works, after this there is no going back!
// first try it with seccomp(SECCOMP_SET_MODE_FILTER), falling back to prctl()
if (linux_syscall(arch.seccomp, SECCOMP_SET_MODE_FILTER, SECCOMP_FILTER_FLAG_TSYNC, new NativeLong(pointer)) != 0) {
method = 0;
int errno1 = Native.getLastError();
if (logger.isDebugEnabled()) {
logger.debug("seccomp(SECCOMP_SET_MODE_FILTER): {}, falling back to prctl(PR_SET_SECCOMP)...",
JNACLibrary.strerror(errno1));
}
if (linux_prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, pointer, 0, 0) != 0) {
int errno2 = Native.getLastError();
throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER): " JNACLibrary.strerror(errno1)
", prctl(PR_SET_SECCOMP): " JNACLibrary.strerror(errno2));
}
}
// now check that the filter was really installed, we should be in filter mode.
if (linux_prctl(PR_GET_SECCOMP, 0, 0, 0, 0) != 2) {
throw new UnsupportedOperationException("seccomp filter installation did not really succeed. seccomp(PR_GET_SECCOMP): "
JNACLibrary.strerror(Native.getLastError()));
}
logger.debug("Linux seccomp filter installation successful, threads: [{}]", method == 1 ? "all" : "app" );
return method;
}
// org.elasticsearch.bootstrap.Natives#tryMlockall
static void tryMlockall() {
if (JNA_AVAILABLE == false) {
logger.warn("cannot mlockall because JNA is not available");
return;
}
JNANatives.tryMlockall();
}
// org.elasticsearch.bootstrap.JNANatives#tryMlockall
static void tryMlockall() {
int errno = Integer.MIN_VALUE;
String errMsg = null;
boolean rlimitSuccess = false;
long softLimit = 0;
long hardLimit = 0;
try {
int result = JNACLibrary.mlockall(JNACLibrary.MCL_CURRENT);
if (result == 0) {
LOCAL_MLOCKALL = true;
return;
}
errno = Native.getLastError();
errMsg = JNACLibrary.strerror(errno);
if (Constants.LINUX || Constants.MAC_OS_X) {
// we only know RLIMIT_MEMLOCK for these two at the moment.
JNACLibrary.Rlimit rlimit = new JNACLibrary.Rlimit();
if (JNACLibrary.getrlimit(JNACLibrary.RLIMIT_MEMLOCK, rlimit) == 0) {
rlimitSuccess = true;
softLimit = rlimit.rlim_cur.longValue();
hardLimit = rlimit.rlim_max.longValue();
} else {
logger.warn("Unable to retrieve resource limits: {}", JNACLibrary.strerror(Native.getLastError()));
}
}
} catch (UnsatisfiedLinkError e) {
// this will have already been logged by CLibrary, no need to repeat it
return;
}
// mlockall failed for some reason
logger.warn("Unable to lock JVM Memory: error={}, reason={}", errno , errMsg);
logger.warn("This can result in part of the JVM being swapped out.");
if (errno == JNACLibrary.ENOMEM) {
if (rlimitSuccess) {
logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit),
rlimitToString(hardLimit));
if (Constants.LINUX) {
// give specific instructions for the linux case to make it easy
String user = System.getProperty("user.name");
logger.warn("These can be adjusted by modifying /etc/security/limits.conf, for example: n"
"t# allow user '{}' mlockalln"
"t{} soft memlock unlimitedn"
"t{} hard memlock unlimited",
user, user, user
);
logger.warn("If you are logged in interactively, you will have to re-login for the new limits to take effect.");
}
} else {
logger.warn("Increase RLIMIT_MEMLOCK (ulimit).");
}
}
}
检查重复类的实现,主要是看是否存在重复jar包,以及类名,详情可戳。
代码语言:javascript复制 // org.elasticsearch.bootstrap.JarHell#checkJarHell
/**
* Checks the current classpath for duplicate classes
* @param output A {@link String} {@link Consumer} to which debug output will be sent
* @throws IllegalStateException if jar hell was found
*/
public static void checkJarHell(Consumer<String> output) throws IOException, URISyntaxException {
ClassLoader loader = JarHell.class.getClassLoader();
output.accept("java.class.path: " System.getProperty("java.class.path"));
output.accept("sun.boot.class.path: " System.getProperty("sun.boot.class.path"));
if (loader instanceof URLClassLoader) {
output.accept("classloader urls: " Arrays.toString(((URLClassLoader)loader).getURLs()));
}
checkJarHell(parseClassPath(), output);
}
/**
* Checks the set of URLs for duplicate classes
* @param urls A set of URLs from the classpath to be checked for conflicting jars
* @param output A {@link String} {@link Consumer} to which debug output will be sent
* @throws IllegalStateException if jar hell was found
*/
@SuppressForbidden(reason = "needs JarFile for speed, just reading entries")
public static void checkJarHell(Set<URL> urls, Consumer<String> output) throws URISyntaxException, IOException {
// we don't try to be sneaky and use deprecated/internal/not portable stuff
// like sun.boot.class.path, and with jigsaw we don't yet have a way to get
// a "list" at all. So just exclude any elements underneath the java home
String javaHome = System.getProperty("java.home");
output.accept("java.home: " javaHome);
final Map<String,Path> clazzes = new HashMap<>(32768);
Set<Path> seenJars = new HashSet<>();
for (final URL url : urls) {
final Path path = PathUtils.get(url.toURI());
// exclude system resources
if (path.startsWith(javaHome)) {
output.accept("excluding system resource: " path);
continue;
}
if (path.toString().endsWith(".jar")) {
// jar包重复
if (seenJars.add(path) == false) {
throw new IllegalStateException("jar hell!" System.lineSeparator()
"duplicate jar on classpath: " path);
}
output.accept("examining jar: " path);
try (JarFile file = new JarFile(path.toString())) {
Manifest manifest = file.getManifest();
if (manifest != null) {
// 检查 MANIFEST.MF, 版本号...
checkManifest(manifest, path);
}
// inspect entries
Enumeration<JarEntry> elements = file.entries();
while (elements.hasMoreElements()) {
String entry = elements.nextElement().getName();
if (entry.endsWith(".class")) {
// for jar format, the separator is defined as /
entry = entry.replace('/', '.').substring(0, entry.length() - 6);
checkClass(clazzes, entry, path);
}
}
}
} else {
output.accept("examining directory: " path);
// case for tests: where we have class files in the classpath
final Path root = PathUtils.get(url.toURI());
final String sep = root.getFileSystem().getSeparator();
// don't try and walk class or resource directories that don't exist
// gradle will add these to the classpath even if they never get created
if (Files.exists(root)) {
Files.walkFileTree(root, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String entry = root.relativize(file).toString();
if (entry.endsWith(".class")) {
// normalize with the os separator, remove '.class'
entry = entry.replace(sep, ".").substring(0, entry.length() - ".class".length());
checkClass(clazzes, entry, path);
}
return super.visitFile(file, attrs);
}
});
}
}
}
}
// class 检查
private static void checkClass(Map<String, Path> clazzes, String clazz, Path jarpath) {
if (clazz.equals("module-info") || clazz.endsWith(".module-info")) {
// Ignore jigsaw module descriptions
return;
}
Path previous = clazzes.put(clazz, jarpath);
if (previous != null) {
if (previous.equals(jarpath)) {
if (clazz.startsWith("org.apache.xmlbeans")) {
return; // https://issues.apache.org/jira/browse/XMLBEANS-499
}
// throw a better exception in this ridiculous case.
// unfortunately the zip file format allows this buggy possibility
// UweSays: It can, but should be considered as bug :-)
throw new IllegalStateException("jar hell!" System.lineSeparator()
"class: " clazz System.lineSeparator()
"exists multiple times in jar: " jarpath " !!!!!!!!!");
} else {
throw new IllegalStateException("jar hell!" System.lineSeparator()
"class: " clazz System.lineSeparator()
"jar1: " previous System.lineSeparator()
"jar2: " jarpath);
}
}
}
设置 SecurityManager 如下:
代码语言:javascript复制 // org.elasticsearch.bootstrap.Security#configure
/**
* Initializes SecurityManager for the environment
* Can only happen once!
* @param environment configuration for generating dynamic permissions
* @param filterBadDefaults true if we should filter out bad java defaults in the system policy.
*/
static void configure(Environment environment, boolean filterBadDefaults) throws IOException, NoSuchAlgorithmException {
// enable security policy: union of template and environment-based paths, and possibly plugin permissions
Map<String, URL> codebases = PolicyUtil.getCodebaseJarMap(JarHell.parseClassPath());
Policy.setPolicy(new ESPolicy(codebases, createPermissions(environment),
getPluginAndModulePermissions(environment), filterBadDefaults, createRecursiveDataPathPermission(environment)));
// enable security manager
final String[] classesThatCanExit =
new String[]{
// SecureSM matches class names as regular expressions so we escape the $ that arises from the nested class name
ElasticsearchUncaughtExceptionHandler.PrivilegedHaltAction.class.getName().replace("$", "\$"),
Command.class.getName()};
System.setSecurityManager(new SecureSM(classesThatCanExit));
// do some basic tests
selfTest();
}
/** Simple checks that everything is ok */
@SuppressForbidden(reason = "accesses jvm default tempdir as a self-test")
static void selfTest() throws IOException {
// check we can manipulate temporary files
try {
// 创建和删除临时文件,以测试 SM 有效性
Path p = Files.createTempFile(null, null);
try {
Files.delete(p);
} catch (IOException ignored) {
// potentially virus scanner
}
} catch (SecurityException problem) {
throw new SecurityException("Security misconfiguration: cannot access java.io.tmpdir", problem);
}
}
以上,就是整个es的setup过程了,native环境检查,依赖jar包的检查,钩子安装,sm安装。。。当然了还有最重要的node的创建,这个我们下节再说。
2.4. Node的创建与检查
每个es-server实际上都是作为一个集群的一个节点运行的,而它的核心工作也是以Node形式呈现的。所以单独谈谈node的创建。
node是在setup中实例化的,而且是另外实现了一个Node, 主要是为了覆盖validateNodeBeforeAcceptingRequests() .
代码语言:javascript复制 // org.elasticsearch.bootstrap.Bootstrap#setup
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
...
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}
所以除去校验工作是在 Bootstrap 中完成外,其他工作都是在 Node 的原生实现中完成,当然这里指的是构造方法。
代码语言:javascript复制 // org.elasticsearch.node.Node#Node
public Node(Environment environment) {
this(environment, Collections.emptyList(), true);
}
/**
* Constructs a node
*
* @param initialEnvironment the initial environment for this node, which will be added to by plugins
* @param classpathPlugins the plugins to be loaded from the classpath
* @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
* test framework for tests that rely on being able to set private settings
*/
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
try {
Settings tmpSettings = Settings.builder().put(initialEnvironment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Build.CURRENT.getQualifiedVersion(),
jvmInfo.pid(),
Build.CURRENT.flavor().displayName(),
Build.CURRENT.type().displayName(),
Build.CURRENT.hash(),
Build.CURRENT.date(),
Constants.OS_NAME,
Constants.OS_VERSION,
Constants.OS_ARCH,
Constants.JVM_VENDOR,
Constants.JVM_NAME,
Constants.JAVA_VERSION,
Constants.JVM_VERSION);
if (jvmInfo.getBundledJdk()) {
logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());
} else {
logger.info("JVM home [{}]", System.getProperty("java.home"));
deprecationLogger.deprecate(
DeprecationCategory.OTHER,
"no-jdk",
"no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release");
}
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
if (Build.CURRENT.isProductionRelease() == false) {
logger.warn(
"version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
Build.CURRENT.getQualifiedVersion());
}
if (logger.isDebugEnabled()) {
logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
initialEnvironment.configFile(), Arrays.toString(initialEnvironment.dataFiles()),
initialEnvironment.logsFile(), initialEnvironment.pluginsFile());
}
// 1. 插件服务实例化
this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),
initialEnvironment.pluginsFile(), classpathPlugins);
final Settings settings = pluginsService.updatedSettings();
final Set<DiscoveryNodeRole> additionalRoles = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getRoles)
.flatMap(Set::stream)
.collect(Collectors.toSet());
// role保存
DiscoveryNode.setAdditionalRoles(additionalRoles);
/*
* Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
* values, no matter they ask for them from.
*/
this.environment = new Environment(settings, initialEnvironment.configFile());
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
DiscoveryNode.getRolesFromSettings(settings).stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new)));
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
// 2. 创建各执行线程池实例
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
// 这个 ThreadPool 包含了许多类型的请求线程池, 如get/search/post...
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
HeaderWarning.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));
final List<Setting<?>> additionalSettings = new ArrayList<>();
// register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
additionalSettings.add(NODE_DATA_SETTING);
additionalSettings.add(NODE_INGEST_SETTING);
additionalSettings.add(NODE_MASTER_SETTING);
additionalSettings.add(NODE_REMOTE_CLUSTER_CLIENT);
additionalSettings.addAll(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
// 创建NodeClient实例
client = new NodeClient(settings, threadPool);
// ScriptPlugin 实例载入
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
// AnalysisPlugin 实例载入
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getSettingUpgraders)
.flatMap(List::stream)
.collect(Collectors.toSet());
final SettingsModule settingsModule =
new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
// DiscoveryPlugin 实例载入
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
// Cluster 服务初始化
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher());
}
// IngestService 服务初始化
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptService, analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), client);
final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
ModulesBuilder modules = new ModulesBuilder();
// 监控服务初始化
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
// 健康检查服务初始化
final FsHealthService fsHealthService = new FsHealthService(settings, clusterService.getClusterSettings(), threadPool,
nodeEnvironment);
final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
// snapshotsInfoService 服务初始化
final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(settings, clusterService,
repositoriesServiceReference::get, rerouteServiceReference::get);
final ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService,
snapshotsInfoService, threadPool.getThreadContext());
modules.add(clusterModule);
// 索引模块服务 初始化
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
// 搜索模块服务 初始化
SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));
// CircuitBreakerPlugin 实例载入
List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
.stream()
.map(plugin -> plugin.getCircuitBreaker(settings))
.collect(Collectors.toList());
final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
pluginCircuitBreakers,
settingsModule.getClusterSettings());
pluginsService.filterPlugins(CircuitBreakerPlugin.class)
.forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
plugin.setCircuitBreaker(breaker);
});
resourcesToClose.add(circuitBreakerService);
// GatewayModule 载入
modules.add(new GatewayModule());
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
modules.add(settingsModule);
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
IndicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream()),
ClusterModule.getNamedWriteables().stream())
.flatMap(Function.identity()).collect(Collectors.toList());
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
NetworkModule.getNamedXContents().stream(),
IndicesModule.getNamedXContents().stream(),
searchModule.getNamedXContents().stream(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
// metaStateService 初始化
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
// 各服务工厂实例化
final PersistedClusterStateService lucenePersistedStateFactory
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(),
threadPool::relativeTimeInMillis);
// collect engine factory providers from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =
enginePlugins.stream().map(plugin -> (Function<IndexSettings, Optional<EngineFactory>>)plugin::getEngineFactory)
.collect(Collectors.toList());
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getDirectoryFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getRecoveryStateFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexFoldersDeletionListeners)
.flatMap(List::stream)
.collect(Collectors.toList());
final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getSnapshotCommitSuppliers)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService
.filterPlugins(SystemIndexPlugin.class)
.stream()
.collect(Collectors.toUnmodifiableMap(
plugin -> plugin.getClass().getSimpleName(),
plugin -> plugin.getSystemIndexDescriptors(settings)));
final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap);
final SystemIndexManager systemIndexManager = new SystemIndexManager(systemIndices, client);
clusterService.addListener(systemIndexManager);
final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
// 索引服务实例化,带入以上解析的许多参数
final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners,
snapshotCommitSuppliers);
final AliasValidator aliasValidator = new AliasValidator();
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
indicesService,
clusterModule.getAllocationService(),
aliasValidator,
shardLimitValidator,
environment,
settingsModule.getIndexScopedSettings(),
threadPool,
xContentRegistry,
systemIndices,
forbidPrivateIndexSettings
);
pluginsService.filterPlugins(Plugin.class)
.forEach(p -> p.getAdditionalIndexSettingProviders()
.forEach(metadataCreateIndexService::addAdditionalIndexSettingProvider));
final MetadataCreateDataStreamService metadataCreateDataStreamService =
new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService);
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptService, xContentRegistry, environment, nodeEnvironment,
namedWriteableRegistry, clusterModule.getIndexNameExpressionResolver(),
repositoriesServiceReference::get).stream())
.collect(Collectors.toList());
ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);
modules.add(actionModule);
// restController 接入
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController, clusterService.getClusterSettings());
Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexTemplateMetadataUpgrader)
.collect(Collectors.toList());
final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders);
final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), scriptService);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
}
new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService, client,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, bigArrays, xContentRegistry,
recoverySettings);
RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
repositoriesServiceReference.set(repositoryService);
SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,
clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters());
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,
transportService, indicesService);
// restoreService 服务初始化
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
metadataCreateIndexService, metadataIndexUpgradeService, clusterService.getClusterSettings(), shardLimitValidator);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
// 服务发现模块初始化
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService,
fsHealthService);
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
responseCollectorService, circuitBreakerService);
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule,
clusterModule.getIndexNameExpressionResolver()))
.flatMap(List::stream)
.collect(toList());
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);
final PersistentTasksClusterService persistentTasksClusterService =
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
resourcesToClose.add(persistentTasksClusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
// EDSL, 依赖注入
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(ScriptService.class).toInstance(scriptService);
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressure.class).toInstance(indexingLimits);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(AliasValidator.class).toInstance(aliasValidator);
b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(
namedWriteableRegistry, searchService::aggReduceContextBuilder));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
b.bind(MetadataIndexUpgradeService.class).toInstance(metadataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,
transportService, recoverySettings, clusterService));
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
b.bind(RepositoriesService.class).toInstance(repositoryService);
b.bind(SnapshotsService.class).toInstance(snapshotsService);
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(SystemIndices.class).toInstance(systemIndices);
}
);
injector = modules.createInjector();
// We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
// The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
// completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
// service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
// reroute, which needs to call into the allocation service. We close the loop here:
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}),
transportService.getTaskManager(),
() -> clusterService.localNode().getId(),
transportService.getLocalNodeConnection(),
transportService.getRemoteClusterService(),
namedWriteableRegistry);
this.namedWriteableRegistry = namedWriteableRegistry;
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
logger.info("initialized");
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
}
怎么样?一看就很复杂吧。是的,不然怎么叫核心呢?当然了,里面有许多的是检查和警告日志的操作。从中,我们也可以看出,pluginsService 是个重要的入口服务,它提供了许多的功能筛选。而 ModulesBuilder 则作为一个聚合各服务的所有者,将相关服务聚到一起。并最终以依赖注入的形式,为后续快速使用各服务打下了基础。线程池是ES的重要组件,它使用一个 ThreadPool, 将所有使用到的线程池都封装起来,并供其他场景使用。最后,将client初始化,以及提供rest接口的相关服务绑定,完成node创建。
代码语言:javascript复制
// 插件过滤实现
@SuppressWarnings("unchecked")
public <T> List<T> filterPlugins(Class<T> type) {
return plugins.stream().filter(x -> type.isAssignableFrom(x.v2().getClass()))
.map(p -> ((T)p.v2())).collect(Collectors.toList());
}
// 线程池创建过程
// org.elasticsearch.threadpool.ThreadPool#ThreadPool
@SuppressWarnings({"rawtypes", "unchecked"})
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000, false));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000, false));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true));
builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
builders.put(Names.FETCH_SHARD_STORE,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false));
builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));
for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" builder.name() "] already exists");
}
builders.put(builder.name(), builder);
}
this.builders = Collections.unmodifiableMap(builders);
threadContext = new ThreadContext(settings);
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
throw new IllegalStateException("duplicate executors with name [" executorHolder.info.getName() "] registered");
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
// 各自类型保存各自的线程池实例,以便将来取用
executors.put(entry.getKey(), executorHolder);
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
final List<Info> infos =
executors
.values()
.stream()
.filter(holder -> holder.info.getName().equals("same") == false)
.map(holder -> holder.info)
.collect(Collectors.toList());
this.threadPoolInfo = new ThreadPoolInfo(infos);
this.scheduler = Scheduler.initScheduler(settings);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
// Guice 依赖注入创建
// org.elasticsearch.common.inject.ModulesBuilder#createInjector
public Injector createInjector() {
Injector injector = Guice.createInjector(modules);
((InjectorImpl) injector).clearCache();
// in ES, we always create all instances as if they are eager singletons
// this allows for considerable memory savings (no need to store construction info) as well as cycles
((InjectorImpl) injector).readOnlyAllSingletons();
return injector;
}
// org.elasticsearch.common.inject.Guice#createInjector
/**
* Creates an injector for the given set of modules.
*
* @throws CreationException if one or more errors occur during Injector
* creation
*/
public static Injector createInjector(Iterable<? extends Module> modules) {
return createInjector(Stage.DEVELOPMENT, modules);
}
rest处理器注册过程,详情可戳。
代码语言:javascript复制 // org.elasticsearch.action.ActionModule#initRestHandlers
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler -> {
if (handler instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) handler);
}
restController.registerHandler(handler);
};
// 以下是各处理器的注册,我们从中可以看出ES支持的操作大类,每个Action包含了具体实现
registerHandler.accept(new RestAddVotingConfigExclusionAction());
registerHandler.accept(new RestClearVotingConfigExclusionsAction());
registerHandler.accept(new RestMainAction());
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestNodesUsageAction());
registerHandler.accept(new RestNodesHotThreadsAction());
registerHandler.accept(new RestClusterAllocationExplainAction());
registerHandler.accept(new RestClusterStatsAction());
registerHandler.accept(new RestClusterStateAction(settingsFilter, threadPool));
registerHandler.accept(new RestClusterHealthAction());
registerHandler.accept(new RestClusterUpdateSettingsAction());
registerHandler.accept(new RestClusterGetSettingsAction(settings, clusterSettings, settingsFilter));
registerHandler.accept(new RestClusterRerouteAction(settingsFilter));
registerHandler.accept(new RestClusterSearchShardsAction());
registerHandler.accept(new RestPendingClusterTasksAction());
registerHandler.accept(new RestPutRepositoryAction());
registerHandler.accept(new RestGetRepositoriesAction(settingsFilter));
registerHandler.accept(new RestDeleteRepositoryAction());
registerHandler.accept(new RestVerifyRepositoryAction());
registerHandler.accept(new RestCleanupRepositoryAction());
registerHandler.accept(new RestGetSnapshotsAction());
registerHandler.accept(new RestCreateSnapshotAction());
registerHandler.accept(new RestCloneSnapshotAction());
registerHandler.accept(new RestRestoreSnapshotAction());
registerHandler.accept(new RestDeleteSnapshotAction());
registerHandler.accept(new RestSnapshotsStatusAction());
registerHandler.accept(new RestGetIndicesAction());
registerHandler.accept(new RestIndicesStatsAction());
registerHandler.accept(new RestIndicesSegmentsAction());
registerHandler.accept(new RestIndicesShardStoresAction());
registerHandler.accept(new RestGetAliasesAction());
registerHandler.accept(new RestIndexDeleteAliasesAction());
registerHandler.accept(new RestIndexPutAliasAction());
registerHandler.accept(new RestIndicesAliasesAction());
registerHandler.accept(new RestCreateIndexAction());
registerHandler.accept(new RestResizeHandler.RestShrinkIndexAction());
registerHandler.accept(new RestResizeHandler.RestSplitIndexAction());
registerHandler.accept(new RestResizeHandler.RestCloneIndexAction());
registerHandler.accept(new RestRolloverIndexAction());
registerHandler.accept(new RestDeleteIndexAction());
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
registerHandler.accept(new RestAnalyzeAction());
registerHandler.accept(new RestGetIndexTemplateAction());
registerHandler.accept(new RestPutIndexTemplateAction());
registerHandler.accept(new RestDeleteIndexTemplateAction());
registerHandler.accept(new RestPutComponentTemplateAction());
registerHandler.accept(new RestGetComponentTemplateAction());
registerHandler.accept(new RestDeleteComponentTemplateAction());
registerHandler.accept(new RestPutComposableIndexTemplateAction());
registerHandler.accept(new RestGetComposableIndexTemplateAction());
registerHandler.accept(new RestDeleteComposableIndexTemplateAction());
registerHandler.accept(new RestSimulateIndexTemplateAction());
registerHandler.accept(new RestSimulateTemplateAction());
registerHandler.accept(new RestPutMappingAction());
registerHandler.accept(new RestGetMappingAction(threadPool));
registerHandler.accept(new RestGetFieldMappingAction());
registerHandler.accept(new RestRefreshAction());
registerHandler.accept(new RestFlushAction());
registerHandler.accept(new RestSyncedFlushAction());
registerHandler.accept(new RestForceMergeAction());
registerHandler.accept(new RestClearIndicesCacheAction());
registerHandler.accept(new RestResolveIndexAction());
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
registerHandler.accept(new RestGetAction());
registerHandler.accept(new RestGetSourceAction());
registerHandler.accept(new RestMultiGetAction(settings));
registerHandler.accept(new RestDeleteAction());
registerHandler.accept(new RestCountAction());
registerHandler.accept(new RestTermVectorsAction());
registerHandler.accept(new RestMultiTermVectorsAction());
registerHandler.accept(new RestBulkAction(settings));
registerHandler.accept(new RestUpdateAction());
registerHandler.accept(new RestSearchAction());
registerHandler.accept(new RestSearchScrollAction());
registerHandler.accept(new RestClearScrollAction());
registerHandler.accept(new RestMultiSearchAction(settings));
registerHandler.accept(new RestValidateQueryAction());
registerHandler.accept(new RestExplainAction());
registerHandler.accept(new RestRecoveryAction());
registerHandler.accept(new RestReloadSecureSettingsAction());
// Scripts API
registerHandler.accept(new RestGetStoredScriptAction());
registerHandler.accept(new RestPutStoredScriptAction());
registerHandler.accept(new RestDeleteStoredScriptAction());
registerHandler.accept(new RestGetScriptContextAction());
registerHandler.accept(new RestGetScriptLanguageAction());
registerHandler.accept(new RestFieldCapabilitiesAction());
// Tasks API
registerHandler.accept(new RestListTasksAction(nodesInCluster));
registerHandler.accept(new RestGetTaskAction());
registerHandler.accept(new RestCancelTasksAction(nodesInCluster));
// Ingest API
registerHandler.accept(new RestPutPipelineAction());
registerHandler.accept(new RestGetPipelineAction());
registerHandler.accept(new RestDeletePipelineAction());
registerHandler.accept(new RestSimulatePipelineAction());
// Dangling indices API
registerHandler.accept(new RestListDanglingIndicesAction());
registerHandler.accept(new RestImportDanglingIndexAction());
registerHandler.accept(new RestDeleteDanglingIndexAction());
// CAT API
registerHandler.accept(new RestAllocationAction());
registerHandler.accept(new RestShardsAction());
registerHandler.accept(new RestMasterAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction());
registerHandler.accept(new RestSegmentsAction());
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestCountAction());
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
registerHandler.accept(new RestCatRecoveryAction());
registerHandler.accept(new RestHealthAction());
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction());
registerHandler.accept(new RestAliasAction());
registerHandler.accept(new RestThreadPoolAction());
registerHandler.accept(new RestPluginsAction());
registerHandler.accept(new RestFielddataAction());
registerHandler.accept(new RestNodeAttrsAction());
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
registerHandler.accept(new RestTemplatesAction());
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings,
settingsFilter, indexNameExpressionResolver, nodesInCluster)) {
registerHandler.accept(handler);
}
}
registerHandler.accept(new RestCatAction(catActions));
}
// org.elasticsearch.rest.RestController#registerHandler(org.elasticsearch.rest.RestHandler)
/**
* Registers a REST handler with the controller. The REST handler declares the {@code method}
* and {@code path} combinations.
*/
public void registerHandler(final RestHandler restHandler) {
restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler));
restHandler.deprecatedRoutes().forEach(route ->
registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage()));
restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(),
restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath()));
}
2.5. Bootstrap的启动
有了上面如此之多的准备工作,接下来就是真正启动ES了。即前面看到的 INSTANCE.start();
代码语言:javascript复制 // org.elasticsearch.bootstrap.Bootstrap#start
private void start() throws NodeValidationException {
// 主要是node操作es节点, keepAliveThread 只是做一个锁等待,避免es进程退出。
node.start();
keepAliveThread.start();
}
// org.elasticsearch.node.Node#start
/**
* Start the node. If the node is already started, this method is no-op.
*/
public Node start() throws NodeValidationException {
if (lifecycle.moveToStarted() == false) {
return this;
}
logger.info("starting ...");
// 通知各生命周期组件,各自start()
pluginLifecycleComponents.forEach(LifecycleComponent::start);
// 通知核心服务启动
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
final ClusterService clusterService = injector.getInstance(ClusterService.class);
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
transportService.start();
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),
injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class));
if (Assertions.ENABLED) {
try {
assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
nodeEnvironment.nodeDataPaths());
assert nodeMetadata != null;
assert nodeMetadata.nodeVersion().equals(Version.CURRENT);
assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
} catch (IOException e) {
assert false : e;
}
}
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
final Metadata onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metadata();
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
clusterService.addStateApplier(transportService.getTaskManager());
// start after transport service so the local disco is known
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
clusterService.start();
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());
configureNodeAndClusterIdStateListener(clusterService);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer =
new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) { latch.countDown(); }
@Override
public void onClusterServiceClose() {
latch.countDown();
}
@Override
public void onTimeout(TimeValue timeout) {
logger.warn("timed out while waiting for initial discovery state - timeout: {}",
initialStateTimeout);
latch.countDown();
}
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
}
// http 服务启动,即打开socket端口,接受 rest 请求了
injector.getInstance(HttpServerTransport.class).start();
if (WRITE_PORTS_FILE_SETTING.get(settings())) {
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
logger.info("started");
// 通知 ClusterPlugin 组件节点启动完成
pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
return this;
}
Ok, 以上就是ES节点的启动过程了。node作为一个调度节点或者框架,协调各个服务进行启动,从而使整个ES有序工作起来。这很合理,也只能这样实现。但我们可以通过这个框架,瞧出具体涉及哪些服务启动,以及其先后顺序如何。这也达到了为我们解开谜团的作用,就够了。
3. 一点闲话
本文讨论的是ES的启动流程。一般地,一个系统的启动流程都大概是这样:进入入口main(), 命令参数校验,创建必要数据结构,加载必要模块配置,打开端口,启动循环服务;对于非分布式的应用,往往不会很复杂,甚至一致无持久化特性的系统,更是无所顾忌。而对于需要进行数据恢复,集群协调的应用,则往往会难上许多。
而对于一些比较核心或者细节的东西,我们在做快速浏览时又往往是不关注的。这又需要更多的时间与精力去领会。
虽然只是走马观花看启动,但毕竟看到了各种该该出场的组件,这也将以后更单独理解组件必然有帮助。
欲知后事如何,且听下回分解。
看完本文记得给作者点赞 在看哦~~~大家的支持,是作者源源不断出文的动力
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/14495459.html