执行器示例
下面是xxl-job
提供的不同方式集成执行器示例代码:
最常用的当然是springboot
集成方式,下面我们就以该示例研究下xxl-job
客户端执行器的启动流程。
启动流程
客户端执行器启动流程入口在XxlJobSpringExecutor
类中,通过实现spring扩展SmartInitializingSingleton
,当IOC的单例Bean加载完成调用方法afterSingletonsInstantiated()
:
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Bean类型任务解析
首先,我们来分析下initJobHandlerMethodRepository(applicationContext)
方法,该方法
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" beanDefinitionName "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
......
}
}
上面逻辑就是遍历IoC
容器中Bean
,获取解析带有@XxlJob
注解的方法,最后返回Map<Method, XxlJob>
结果类型,key
就是带有@XxlJob
注解的Method
,value
就是该方法上解析的@XxlJob
注解信息。
解析获取到带有@XxlJob
注解的Method
信息后,下面看下如何进一步处理:
// 遍历
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
// 获取@XxlJob注解value配置
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" bean.getClass() "#" method.getName() "] .");
}
// 判断是否同名的已经加载,有则抛异常
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" name "] naming conflicts.");
}
// @XxlJob注解方法参数校验:必须只有一个参数,且参数类型是String,否则抛出异常
if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" bean.getClass() "#" method.getName() "] , " "The correct method format like " public ReturnT<String> execute(String param) " .");
}
// @XxlJob注解方法返回值校验,必须返回ReturnT类型
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" bean.getClass() "#" method.getName() "] , " "The correct method format like " public ReturnT<String> execute(String param) " .");
}
method.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
// 解析@XxlJob注解init配置
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" bean.getClass() "#" method.getName() "] .");
}
}
// 解析@XxlJob注解destroy配置
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" bean.getClass() "#" method.getName() "] .");
}
}
// 将@XxlJob注解的Method、initMethod和destroyMethod封装成MethodJobHandler,并放入到Map中完成注册,key=@XxlJob.value
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
GlueFactory初始化
GlueFactory
主要处理GLUE(Java)
类型作业,对GLUE(Java)
类型作业源码进行编译、创建实例进行调用,可以支持spring依赖注入,如源码中支持@Autowired
、@Resource
、@Qualifier
等。
执行器启动流程
super.start()
这句才会真正进入执行器启动流程XxlJobExecutor#start
:
public void start() throws Exception {
// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// 初始化AdminBizClient,用于和admin远程交互
initAdminBizList(adminAddresses, accessToken);
// 初始化日志清理线程,用于清理日志目录下过期日志文件
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化回调线程,triggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
这里比较主要的是后面两条语句,TriggerCallbackThread.getInstance().start();
主要启动用于作业执行完成后回调将结果传递给admin模块,具体见下节客户端执行器作业执行流程分析。initEmbedServer(address, ip, port, appname, accessToken);
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port);
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
上面一堆逻辑是处理ip:port
解析,关键是后面两句,EmbedServer#start
内部创建一个线程Thread,并启动:
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
......
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
下面再来看下这个线程内部干了什么:
代码语言:javascript复制EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
}
上面代码一大堆,其实逻辑很简单,主要干了两件事:
- 使用
netty
初始化启动了一个http server
,主要用于接收admin模块
向执行器发送指令,比如下发执行作业指令、kill作业指令,主要处理逻辑封装在EmbedHttpServerHandler
中; startRegistry(appname, address)
启动客户端执行器定时向admin模块
注册线程,逻辑代码在ExecutorRegistryThread#start
方法中,比较简单;
xxl-job
客户端执行器注册流程大致如下图:
1、客户端利用adminBiz.registry(registryParam)
定时周期向admin模块
发送注册信息;
2、admin模块
接收到客户端注册信息后,插入|更新xxl_job_registry
表的update_time
字段时间值;
3、admin模块
在JobRegistryMonitorHelper
中启动线程定时扫描xxl_job_registry
表,将超时移除,并将在线实例集合拼接在一起更新到执行器地址为自动注册的执行器address_list
字段信息中。
总结
xxl-job
客户端执行器启动流程还是比较简单,核心主要有两点:
- 利用
netty
启动一个http server
容器,并将IP:PORT
利用注册信息带到admin
模块,这样admin
就可以给执行器下发运行作业、杀死作业等指令; - 执行器定时(默认30秒)向
admin
注册一次,当admin
超过90秒未收到执行器注册信息,即认为该执行器超时,进行下线移除;