如何保证定时任务幂等性?
- 只在一台机器添加定时任务
- 缺点:任务量大的时候,不能满足需求
- 在数据库能插入数据成功的可以执行
- 缺点:效率太低,且,很low
- 使用redis分布式锁,能够成功获取锁的才能够执行定时job
- 缺点:没有重试补偿机制,不能支持集群不支持路由策略
- 使用zk分布式锁,和redis原理相同
- 缺点:没有重试补偿机制,不能支持集群不支持路由策略
- 使用分布式任务调度平台
- 有点:具有重试补偿机制,具有路由策略,支持集群部署
分布式任务调度平台的原理:
分布式调度平台分为两个模块:
- 执行器注册中心: 执行器在启动时将自己的ip和端口信息上报到执行器注册中心
- 执行器管理中心:管理执行器的执行
1. 执行器在启动时将自己的ip和端口信息上报到执行器注册中心
2. 当要执行定时任务时,分布式调度中心先去执行器注册中心获取执行器地址列表
3. 分布式任务调度中心会根据相应的路由策略选出其中的一个或者多个,然后再本地执行定时任务
路由测试有多种:
4. 因为分布式调度中心和执行器实质是netty的服务器端和netty的客户端,两边保持长连接。当分布式任务调度中心的定时任务出发以后,会根据相应的地址去调用相应的执行器执行。如果我们的路由策略是类似于第一个,最后一个,轮询,随机这种(负载均衡算法,根据策略模式来做),就可以保证定时job的幂等性。
XXL-Job Admin平台搭建 任务调度中心
1. 官方下载XXL-Job Admin的源代码
2. 将xxl-job依赖的sql放入数据库中跑一道,添加相应的数据库以及数据库表
3. 在xxl-job-admin项目中,给jdbc连接的配置文件加上&serverTimezone=UTC
否则在启动的时候会报错
4. 启动xxl-job-admin项目,访问项目的管理地址:http://127.0.0.1:8080/xxl-job-admin/
账号密码为:admin 123456
这样,我们的任务调度平台的管理后台就搭建成功了。
之后我们在添加定时任务模块
mt-shop-service-job
mt-shop-service-member-job
添加Maven依赖
代码语言:javascript复制<dependencies>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
添加配置文件
application.yml
代码语言:javascript复制# web port
server.port=8081
# log config
logging.config=classpath:logback.xml
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job executor address
xxl.job.executor.appname=mayikt-member-xxl-job-executor
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job, access token
xxl.job.accessToken=
### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job log retention days
xxl.job.executor.logretentiondays=30
bootstrap.yml
代码语言:javascript复制server:
port: 7070
spring:
cloud:
nacos:
discovery:
##服务的注册
server-addr: 127.0.0.1:8848
### nacos 配置中心
config:
server-addr: 127.0.0.1:8848
file-extension: yaml
datasource:
url: jdbc:mysql://localhost:3306/mayikt_member?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT+8
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
redis:
host: 148.70.123.61
port: 6369
password: 123456
application:
name: mayikt-member-xxl-job
profiles:
active: prd
mayikt:
member:
job:
WeChatActivitiePageSize: 2
logback.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<contextName>logback</contextName>
<property name="log.path" value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>�te %level [%thread] %logger{36} [%file : %line] %msg%n
</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</root>
</configuration>
添加xxl-job的配置类
XxlJobConfig
代码语言:javascript复制package com.mayikt.member.xxljob.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
添加jobHandler
WeChatActivityJob
代码语言:javascript复制package com.mayikt.member.xxljob.jobhandler;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class WeChatActivityJob {
@XxlJob("weChatActivityHandler")
public ReturnT weChatActivityHandler(String param) {
System.out.println("执行器开始执行了");
System.out.println("执行器获取的参数为param:" param);
return ReturnT.SUCCESS;
}
}
添加启动类
MemberXxlJobApplication
代码语言:javascript复制package com.mayikt.member.xxljob;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MemberXxlJobApplication {
public static void main(String[] args) {
SpringApplication.run(MemberXxlJobApplication.class);
}
}
代码就添加完毕了,我们可以到测试平台添加任务进行测试了
1. 进入执行器管理,添加一个执行器,这里的appName就和application.yml中的appName对应
2. 添加完毕以后,进入任务管理,添加相应的任务
- 先切换到我们刚刚加入的执行器蚂蚁课堂会员定时任务
- 新增一个执行器
然后点击操作
就可以看到效果了