3分钟掌握Quartz.net分布式定时任务的姿势

2020-05-01 18:09:24 浏览数 (1)

引言

长话短说,今天聊一聊分布式定时任务,我的流水账笔记:

  • ASP.NET Core Quartz.Net实现web定时任务
  • AspNetCore结合Redis实践消息队列

细心朋友稍一分析,就知道还有问题: 水平扩展后的WebApp的Quartz.net定时任务会多次触发, 因为webapp实例使用的是默认的RAMJobStore, 多实例在内存中都维护了Job和Trigger的副本.

我的定时任务是同步任务,多次执行倒是没有太大问题,但对于特定业务的定时任务, 多次执行可能是致命问题。

基于此,来看看Quartz.net 分布式定时任务的姿势

AdoJobStore

很明显,水平扩展的多实例需要一个 独立于web实例的机制来存储Job和Trigger.

Quartz.NET提供ADO.NET JobStore来存储任务数据。

  1. 先使用SQL脚本在数据库中生成指定的表结构

执行脚本之后,会看到数据库中多出几个以 QRTZ_开头的表

  1. 配置Quartz.net使用AdoJobStore

可采用编码形式或者 quartz.config形式添加配置

快速实践

1. 预先生成Job、Trigger表

从https://github.com/quartznet/quartznet/tree/master/database/tables 下载合适的数据库表脚本, 生成指定的表结构

2. 添加AdoJobStore

本次使用编码方式添加AdoJobStore配置。 首次启动会将代码中Job和Trigger持久化到sqlite,后面就直接从sqlite中加载Job和Trigger

代码语言:javascript复制
using System;
using System.Collections.Specialized;
using System.Data;
using System.Threading.Tasks;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Impl;
using Quartz.Impl.AdoJobStore.Common;
using Quartz.Spi;

namespace EqidManager
{
    using IOCContainer = IServiceProvider;

    public class QuartzStartup
    {
        public IScheduler Scheduler { get; set; }

        private readonly ILogger _logger;
        private readonly IJobFactory iocJobfactory;
        public QuartzStartup(IOCContainer IocContainer, ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger<QuartzStartup>();
            iocJobfactory = new IOCJobFactory(IocContainer);

            DbProvider.RegisterDbMetadata("sqlite-custom", new DbMetadata()
            {
                AssemblyName = typeof(SqliteConnection).Assembly.GetName().Name,
                ConnectionType = typeof(SqliteConnection),
                CommandType = typeof(SqliteCommand),
                ParameterType = typeof(SqliteParameter),
                ParameterDbType = typeof(DbType),
                ParameterDbTypePropertyName = "DbType",
                ParameterNamePrefix = "@",
                ExceptionType = typeof(SqliteException),
                BindByName = true
            });

            var properties = new NameValueCollection
            {
                ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
                ["quartz.jobStore.useProperties"] = "true",
                ["quartz.jobStore.dataSource"] = "default",
                ["quartz.jobStore.tablePrefix"] = "QRTZ_",
                ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SQLiteDelegate, Quartz",
                ["quartz.dataSource.default.provider"] = "sqlite-custom",
                ["quartz.dataSource.default.connectionString"] = "Data Source=EqidManager.db",
                ["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz",
                ["quartz.serializer.type"] = "binary"
            };

            var schedulerFactory = new StdSchedulerFactory(properties);
            Scheduler = schedulerFactory.GetScheduler().Result;
            Scheduler.JobFactory = iocJobfactory;
        }

        public async Task<IScheduler> ScheduleJob()
        {
            var _eqidCounterResetJob = JobBuilder.Create<EqidCounterResetJob>()
              .WithIdentity("EqidCounterResetJob")
              .Build();

            var _eqidCounterResetJobTrigger = TriggerBuilder.Create()
                .WithIdentity("EqidCounterResetCron")
                .StartNow()
                //每天凌晨0s
                .WithCronSchedule("0 0 0 * * ?")      Seconds,Minutes,Hours,Day-of-Month,Month,Day-of-Week,Year(optional field)
                .Build();
         
           // 这里一定要先判断是否已经从SQlite中加载了Job和Trigger
            if (!await Scheduler.CheckExists(new JobKey("EqidCounterResetJob")) &&
                !await Scheduler.CheckExists(new TriggerKey("EqidCounterResetCron")))
            {
                await Scheduler.ScheduleJob(_eqidCounterResetJob, _eqidCounterResetJobTrigger);
            }
            
            await Scheduler.Start();
            return Scheduler;
        }

        public void EndScheduler()
        {
            if (Scheduler == null)
            {
                return;
            }

            if (Scheduler.Shutdown(waitForJobsToComplete: true).Wait(30000))
                Scheduler = null;
            else
            {
            }
            _logger.LogError("Schedule job upload as application stopped");
        }
    }
}

上面是Quartz.NET 从sqlite中加载Job和Trigger的核心代码

这里要提示两点:

① IOCJobFactory 是自定义JobFactory,目的是与ASP.NET Core原生依赖注入结合 ② 在调度任务的时候,先判断是否已经从sqlite加载了Job和Trigger

3.添加Quartz.Net UI轮子

附赠Quartz.NET的调度UI: CrystalQuartz, 方便在界面管理和调度任务 ① Install-Package CrystalQuartz.AspNetCore -IncludePrerelease ② Startup启用CrystalQuartz

代码语言:javascript复制
using CrystalQuartz.AspNetCore;
/*
 * app is IAppBuilder
 * scheduler is your IScheduler (local or remote)
 */
var quartz = app.ApplicationServices.GetRequiredService<QuartzStartup>();
var _schedule = await  quartz.ScheduleJob();
app.UseCrystalQuartz(() => scheduler);

③ 在localhost:YOUR_PORT/quartz地址查看调度

总结输出

  1. Quartz.net以AdoJobStore支撑分布式定时任务,解决多实例多次触发的问题
  2. 快速抛出轮子:Quartz.Net UI库
  • https://www.quartz-scheduler.net/documentation/quartz-2.x/tutorial/job-stores.html
  • https://github.com/guryanovev/CrystalQuartz

0 人点赞