序
本文主要研究一下PowerJob的日志上报及存储
OmsLoggerFactory.build
tech/powerjob/worker/log/OmsLoggerFactory.java
代码语言:javascript复制public class OmsLoggerFactory {
public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) {
LogConfig cfg;
if (StringUtils.isEmpty(logConfig)) {
cfg = new LogConfig();
} else {
try {
cfg = JsonUtils.parseObject(logConfig, LogConfig.class);
} catch (Exception ignore) {
cfg = new LogConfig();
}
}
switch (LogType.of(cfg.getType())) {
case LOCAL:
return new OmsLocalLogger(cfg);
case STDOUT:
return new OmsStdOutLogger(cfg);
case NULL:
return new OmsNullLogger();
case LOCAL_AND_ONLINE:
return new OmsServerAndLocalLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
default:
return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
}
}
}
默认logConfig为null,cfg是new LogConfig(),其build出来的是OmsServerLogger
OmsServerLogger
tech/powerjob/worker/log/impl/OmsServerLogger.java
代码语言:javascript复制public class OmsServerLogger extends AbstractOmsLogger {
private final long instanceId;
private final OmsLogHandler omsLogHandler;
public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) {
super(logConfig);
this.instanceId = instanceId;
this.omsLogHandler = omsLogHandler;
}
@Override
public void debug0(String messagePattern, Object... args) {
process(LogLevel.DEBUG, messagePattern, args);
}
@Override
public void info0(String messagePattern, Object... args) {
process(LogLevel.INFO, messagePattern, args);
}
@Override
public void warn0(String messagePattern, Object... args) {
process(LogLevel.WARN, messagePattern, args);
}
@Override
public void error0(String messagePattern, Object... args) {
process(LogLevel.ERROR, messagePattern, args);
}
private void process(LogLevel level, String messagePattern, Object... args) {
String logContent = genLogContent(messagePattern, args);
omsLogHandler.submitLog(instanceId, level, logContent);
}
}
OmsServerLogger的process方法调用的是OmsLogHandler的submitLog方法
submitLog
tech/powerjob/worker/background/OmsLogHandler.java
代码语言:javascript复制@Slf4j
public class OmsLogHandler {
private final String workerAddress;
private final Transporter transporter;
private final ServerDiscoveryService serverDiscoveryService;
// 处理线程,需要通过线程池启动
public final Runnable logSubmitter = new LogSubmitter();
// 上报锁,只需要一个线程上报即可
private final Lock reportLock = new ReentrantLock();
// 生产者消费者模式,异步上传日志
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
// 每次上报携带的数据条数
private static final int BATCH_SIZE = 20;
// 本地囤积阈值
private static final int REPORT_SIZE = 1024;
public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
this.workerAddress = workerAddress;
this.transporter = transporter;
this.serverDiscoveryService = serverDiscoveryService;
}
/**
* 提交日志
* @param instanceId 任务实例ID
* @param logContent 日志内容
*/
public void submitLog(long instanceId, LogLevel logLevel, String logContent) {
if (logQueue.size() > REPORT_SIZE) {
// 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁
new Thread(logSubmitter).start();
}
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
boolean offerRet = logQueue.offer(tuple);
if (!offerRet) {
log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
}
}
//......
}
OmsLogHandler的submitLog方法每次先判断logQueue大小是否大于REPORT_SIZE(
1024
),是则启动logSubmitter线程,否则放入logQueue队列
LogSubmitter
tech/powerjob/worker/background/OmsLogHandler.java
代码语言:javascript复制 private class LogSubmitter implements Runnable {
@Override
public void run() {
boolean lockResult = reportLock.tryLock();
if (!lockResult) {
return;
}
try {
final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
// 当前无可用 Server
if (StringUtils.isEmpty(currentServerAddress)) {
if (!logQueue.isEmpty()) {
logQueue.clear();
log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
}
return;
}
List<InstanceLogContent> logs = Lists.newLinkedList();
while (!logQueue.isEmpty()) {
try {
InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
logs.add(logContent);
if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
// 不可靠请求,WEB日志不追求极致
TransportUtils.reportLogs(req, currentServerAddress, transporter);
logs.clear();
}
}catch (Exception ignore) {
break;
}
}
if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
TransportUtils.reportLogs(req, currentServerAddress, transporter);
}
}finally {
reportLock.unlock();
}
}
}
LogSubmitter不断地从logQueue.poll数据,在logs的大小大于等于BATCH_SIZE(
20
)时通过TransportUtils.reportLogs给server上报日志
reportLogs
tech/powerjob/worker/common/utils/TransportUtils.java
代码语言:javascript复制 public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
transporter.tell(url, req);
}
reportLogs请求S4W_HANDLER_REPORT_LOG
processWorkerLogReport
tech/powerjob/server/core/handler/AbWorkerRequestHandler.java
代码语言:javascript复制 @Override
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
public void processWorkerLogReport(WorkerLogReportReq req) {
WorkerLogReportEvent event = new WorkerLogReportEvent()
.setWorkerAddress(req.getWorkerAddress())
.setLogNum(req.getInstanceLogContents().size());
try {
processWorkerLogReport0(req, event);
event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
} catch (RejectedExecutionException re) {
event.setStatus(WorkerLogReportEvent.Status.REJECTED);
} catch (Throwable t) {
event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
log.warn("[WorkerRequestHandler] process worker report failed!", t);
} finally {
monitorService.monitor(event);
}
}
server端的processWorkerLogReport接收WorkerLogReportReq,执行processWorkerLogReport0方法
WorkerRequestHandlerImpl
tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java
代码语言:javascript复制 protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
// 这个效率应该不会拉垮吧...也就是一些判断 Map#get 吧...
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
WorkerRequestHandlerImpl的processWorkerLogReport0执行的是instanceLogService.submitLogs
submitLogs
tech/powerjob/server/core/instance/InstanceLogService.java
代码语言:javascript复制 @Async(value = PJThreadPool.LOCAL_DB_POOL)
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());
LocalInstanceLogDO y = new LocalInstanceLogDO();
BeanUtils.copyProperties(x, y);
y.setWorkerAddress(workerAddress);
return y;
}).collect(Collectors.toList());
try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
}catch (Exception e) {
log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
}
}
InstanceLogService的submitLogs是个异步方法,它将InstanceLogContent转换为LocalInstanceLogDO,然后执行localInstanceLogRepository.saveAll保存
LocalJpaConfig
tech/powerjob/server/persistence/config/LocalJpaConfig.java
代码语言:javascript复制@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
// repository包名
basePackages = LocalJpaConfig.LOCAL_PACKAGES,
// 实体管理bean名称
entityManagerFactoryRef = "localEntityManagerFactory",
// 事务管理bean名称
transactionManagerRef = "localTransactionManager"
)
public class LocalJpaConfig {
public static final String LOCAL_PACKAGES = "tech.powerjob.server.persistence.local";
private static Map<String, Object> genDatasourceProperties() {
JpaProperties jpaProperties = new JpaProperties();
jpaProperties.setOpenInView(false);
jpaProperties.setShowSql(false);
HibernateProperties hibernateProperties = new HibernateProperties();
// 每次启动都删除数据(重启后原来的Instance已经通过故障转移更换了Server,老的日志数据也没什么意义了)
hibernateProperties.setDdlAuto("create");
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
}
@Bean(name = "localEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(@Qualifier("omsLocalDatasource") DataSource omsLocalDatasource,EntityManagerFactoryBuilder builder) {
return builder
.dataSource(omsLocalDatasource)
.properties(genDatasourceProperties())
.packages(LOCAL_PACKAGES)
.persistenceUnit("localPersistenceUnit")
.build();
}
@Bean(name = "localTransactionManager")
public PlatformTransactionManager initLocalTransactionManager(@Qualifier("localEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {
return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));
}
@Bean(name = "localTransactionTemplate")
public TransactionTemplate initTransactionTemplate(@Qualifier("localTransactionManager") PlatformTransactionManager ptm) {
TransactionTemplate tt = new TransactionTemplate(ptm);
// 设置隔离级别
tt.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT);
return tt;
}
}
LocalJpaConfig针对
tech.powerjob.server.persistence.local
的dao采用了omsLocalDatasource数据源
MultiDatasourceConfig
tech/powerjob/server/persistence/config/MultiDatasourceConfig.java
代码语言:javascript复制@Configuration
public class MultiDatasourceConfig {
private static final String H2_DRIVER_CLASS_NAME = "org.h2.Driver";
private static final String H2_JDBC_URL_PATTERN = "jdbc:h2:file:%spowerjob_server_db";
private static final int H2_MIN_SIZE = 4;
private static final int H2_MAX_ACTIVE_SIZE = 10;
@Primary
@Bean("omsRemoteDatasource")
@ConfigurationProperties(prefix = "spring.datasource.core")
public DataSource initOmsCoreDatasource() {
return DataSourceBuilder.create().build();
}
@Bean("omsLocalDatasource")
public DataSource initOmsLocalDatasource() {
String h2Path = OmsFileUtils.genH2WorkPath();
HikariConfig config = new HikariConfig();
config.setDriverClassName(H2_DRIVER_CLASS_NAME);
config.setJdbcUrl(String.format(H2_JDBC_URL_PATTERN, h2Path));
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(H2_MIN_SIZE);
// 池中最大连接数量
config.setMaximumPoolSize(H2_MAX_ACTIVE_SIZE);
// JVM 关闭时删除文件
try {
FileUtils.forceDeleteOnExit(new File(h2Path));
}catch (Exception ignore) {
}
return new HikariDataSource(config);
}
}
MultiDatasourceConfig定义了两个数据源,一个是远程的数据源,比如mysql,一个是本地的h2数据源
processFinishedInstance
tech/powerjob/server/core/instance/InstanceManager.java
代码语言:javascript复制 public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
// 上报日志数据
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);
// workflow 特殊处理
if (wfInstanceId != null) {
// 手动停止在工作流中也认为是失败(理论上不应该发生)
workflowInstanceManager.move(wfInstanceId, instanceId, status, result);
}
// 告警
if (status == InstanceStatus.FAILED) {
alert(instanceId, result);
}
// 主动移除缓存,减小内存占用
instanceMetadataService.invalidateJobInfo(instanceId);
}
InstanceManager的processFinishedInstance方法会延时60s执行instanceLogService.sync(instanceId)
sync
tech/powerjob/server/core/instance/InstanceLogService.java
代码语言:javascript复制 @Async(PJThreadPool.BACKGROUND_POOL)
public void sync(Long instanceId) {
Stopwatch sw = Stopwatch.createStarted();
try {
// 先持久化到本地文件
File stableLogFile = genStableLogFile(instanceId);
// 将文件推送到 MongoDB
FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
try {
dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL));
log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) {
log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
}
}catch (Exception e) {
log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
}
// 删除本地数据库数据
try {
instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
}catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
}
}
InstanceLogService的sync方法先通过genStableLogFile将日志持久化到server端的本地日志文件,接着将该任务实例日志的元信息(
哪个任务实例、在哪个server、本地日志文件的路径
)存储到dFsService(它有oss、gridfs、minio、mysql四种实现,具体看server的配置文件是启动哪个,如果是mysql则是存储到powerjob_files表中
),最后通过localInstanceLogRepository.deleteByInstanceId清空该任务实例在h2中的LOCAL_INSTANCE_LOG表的记录
genStableLogFile
代码语言:javascript复制 private File genStableLogFile(long instanceId) {
String path = genLogFilePath(instanceId, true);
int lockId = ("stFileLock-" instanceId).hashCode();
try {
segmentLock.lockInterruptibleSafe(lockId);
return localTransactionTemplate.execute(status -> {
File f = new File(path);
if (f.exists()) {
return f;
}
try {
// 创建父文件夹(文件在开流时自动会被创建)
FileUtils.forceMkdirParent(f);
// 本地存在数据,从本地持久化(对应 SYNC 的情况)
if (instanceId2LastReportTime.containsKey(instanceId)) {
try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
stream2File(allLogStream, f);
}
}else {
FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
Optional<FileMeta> dflMetaOpt = dFsService.fetchFileMeta(dfl);
if (!dflMetaOpt.isPresent()) {
OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
return f;
}
dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl));
}
return f;
}catch (Exception e) {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
throw new RuntimeException(e);
}
});
}finally {
segmentLock.unlock(lockId);
}
}
private static String genLogFilePath(long instanceId, boolean stable) {
if (stable) {
return OmsFileUtils.genLogDirPath() String.format("%d-stable.log", instanceId);
}else {
return OmsFileUtils.genLogDirPath() String.format("%d-temporary.log", instanceId);
}
}
genStableLogFile它先判断该server是否有存储该任务实例的日志文件(
~/powerjob/server/online_log/%d-stable.log
),有则直接返回;否则判断该instanceId2LastReportTime是否包含该任务实例,包含则从localInstanceLogRepository拉取日志然后写入到文件;不包含则通过dFsService.fetchFileMeta拉取元信息,然后下载到本地再返回
相关表结构
LOCAL_INSTANCE_LOG
代码语言:javascript复制CREATE TABLE PUBLIC.LOCAL_INSTANCE_LOG (
ID BIGINT NOT NULL AUTO_INCREMENT,
INSTANCE_ID BIGINT,
LOG_CONTENT CHARACTER VARYING,
LOG_LEVEL INTEGER,
LOG_TIME BIGINT,
WORKER_ADDRESS CHARACTER VARYING(255),
CONSTRAINT CONSTRAINT_8 PRIMARY KEY (ID)
);
CREATE INDEX IDXPJ6CD8W5EAW8QBKMD84I8KYS7 ON PUBLIC.LOCAL_INSTANCE_LOG (INSTANCE_ID);
CREATE UNIQUE INDEX PRIMARY_KEY_8 ON PUBLIC.LOCAL_INSTANCE_LOG (ID);
powerjob_files
代码语言:javascript复制CREATE TABLE `powerjob_files` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`bucket` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '分桶',
`name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '文件名称',
`version` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '版本',
`meta` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '元数据',
`length` bigint NOT NULL COMMENT '长度',
`status` int NOT NULL COMMENT '状态',
`data` longblob NOT NULL COMMENT '文件内容',
`extra` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '其他信息',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
小结
- PowerJob的worker端的OmsServerLogger的process方法调用的是OmsLogHandler的submitLog方法,它每次先判断logQueue大小是否大于REPORT_SIZE(
1024
),是则启动logSubmitter线程,否则放入logQueue队列LogSubmitter不断地从logQueue.poll数据,在logs的大小大于等于BATCH_SIZE(20
)时通过TransportUtils.reportLogs给server上报日志 - server端的AbWorkerRequestHandler的processWorkerLogReport接收WorkerLogReportReq,执行processWorkerLogReport0方法,它执行的是instanceLogService.submitLogs;InstanceLogService的submitLogs是个异步方法,它将InstanceLogContent转换为LocalInstanceLogDO,然后执行localInstanceLogRepository.saveAll保存;server端有两份数据源,一份是mysql,一份是h2,而localInstanceLog存储到的是h2的LOCAL_INSTANCE_LOG表
- 另外server端在任务实例结束时会执行InstanceManager的processFinishedInstance方法,它会延时60s执行instanceLogService.sync(instanceId);sync方法先通过genStableLogFile将日志持久化到server端的本地日志文件,接着将该任务实例日志的元信息(
哪个任务实例、在哪个server、本地日志文件的路径
)存储到dFsService(它有oss、gridfs、minio、mysql四种实现,具体看server的配置文件是启动哪个,如果是mysql则是存储到powerjob_files表中
),最后通过localInstanceLogRepository.deleteByInstanceId清空该任务实例在h2中的LOCAL_INSTANCE_LOG表的记录