[Abp vNext 源码分析] - 12. 后台作业与后台工作者

2022-10-14,,,,

一、简要说明

文章信息:

基于的 abp vnext 版本:1.0.0

创作日期:2019 年 10 月 24 日晚

更新日期:暂无

abp vnext 提供了后台工作者和后台作业的支持,基本实现与原来的 abp 框架类似,并且 abp vnext 还提供了对 hangfire 和 rabbitmq 的后台作业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不需要做其他复杂的配置。

后台作业在系统开发的过程当中,是比较常用的功能。因为总是有一些长耗时的任务,而这些任务我们不是立即响应的,例如 excel 文档导入、批量发送短信通知等。

后台工作者 的话,abp vnext 的实现就是在 clr 的 timer 之上封装了一层,周期性地执行用户逻辑。abp vnext 默认提供的 后台任务管理器,就是在后台工作者基础之上进行的封装。

涉及到后台任务、后台工作者的模块一共有 6 个,它们分别是:

  • volo.abp.threading :提供了一些常用的线程组件,其中 abptimer 就是在里面实现的。
  • volo.abp.backgroundworkers :后台工作者的定义和实现。

  • volo.abp.backgroundjobs.abstractions :后台任务的一些共有定义。
  • volo.abp.backgroundjobs :默认的后台任务管理器实现。
  • volo.abp.backgroundjobs.hangfire :基于 hangfire 库实现的后台任务管理器。
  • volo.abp.backgroundjobs.rabbitmq : 基于 rabbitmq 实现的后台任务管理器。

二、源码分析

线程组件

健壮的计时器

clr 为我们提供了多种计时器,我们一般使用的是 system.threading.timer ,它是基于 clr 线程池的一个周期计时器,会根据我们配置的 period (周期) 定时执行。在 clr 线程池中,所有的 timer 只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 timer 对象到期时,这个线程就会将 timer 的回调方法通过 threadpool.queueuserworkitem() 扔到线程池去执行。

不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会造成上一个任务还没执行完成又开始执行新的任务。

解决这个方法其实很简单,即启动之后,将周期设置为 timeout.infinite ,这样只会执行一次。当回调方法执行完成之后,就设置 duetime 参数说明下次执行要等待多久,并且周期还是 timeout.infinite

abp vnext 已经为我们提供了健壮的计时器,该类型的定义是 abptimer ,在内部用到了 volatile 关键字和 monitor 实现 条件变量模式 解决多线程环境下的问题。

public class abptimer : itransientdependency
{
    // 回调事件。
    public event eventhandler elapsed;

    // 执行周期。
    public int period { get; set; }

    // 定时器启动之后就开始运行,默认为 fasle。
    public bool runonstart { get; set; }

    // 日志记录器。
    public ilogger<abptimer> logger { get; set; }

    private readonly timer _tasktimer;
    // 定时器是否在执行任务,默认为 false。
    private volatile bool _performingtasks;
    // 定时器的运行状态,默认为 false。
    private volatile bool _isrunning;

    public abptimer()
    {
        logger = nulllogger<abptimer>.instance;

        // 回调函数是 timercallback,执行周期为永不执行。
        _tasktimer = new timer(timercallback, null, timeout.infinite, timeout.infinite);
    }

    public void start(cancellationtoken cancellationtoken = default)
    {
        // 如果传递的周期小于等于 0 ,则抛出异常。
        if (period <= 0)
        {
            throw new abpexception("period should be set before starting the timer!");
        }

        // 使用互斥锁,保证线程安全。
        lock (_tasktimer)
        {
            // 如果启动之后就需要马上执行,则设置为 0,马上执行任务,否则会等待 period 毫秒之后再执行(1 个周期)。
            _tasktimer.change(runonstart ? 0 : period, timeout.infinite);
            // 定时器成功运行了。
            _isrunning = true;
        }
        // 释放 _tasktimer 的互斥锁。
    }

    public void stop(cancellationtoken cancellationtoken = default)
    {
        // 使用互斥锁。
        lock (_tasktimer)
        {
            // 将内部定时器设置为永不执行的状态。
            _tasktimer.change(timeout.infinite, timeout.infinite);

            // 检测当前是否还有正在执行的任务,如果有则等待任务执行完成。
            while (_performingtasks)
            {
                // 临时释放锁,阻塞当前线程。但是其他线程可以获取 _timer 的互斥锁。
                monitor.wait(_tasktimer);
            }

            // 需要表示停止状态,所以标记状态为 false。
            _isrunning = false;
        }
    }

    private void timercallback(object state)
    {
        lock (_tasktimer)
        {
            // 如果有任务正在运行,或者内部定时器已经停止了,则不做任何事情。
            if (!_isrunning || _performingtasks)
            {
                return;
            }

            // 临时停止内部定时器。
            _tasktimer.change(timeout.infinite, timeout.infinite);
            // 表明马上需要执行任务了。
            _performingtasks = true;
        }

        try
        {
            // 调用绑定的事件。
            elapsed.invokesafely(this, new eventargs());
        }
        catch
        {
            // 注意,这里将会吞噬异常。
        }
        finally
        {
            lock (_tasktimer)
            {
                // 任务执行完成,更改状态。
                _performingtasks = false;

                // 如果定时器还在运行,没有被停止,则启动下一个 period 周期。
                if (_isrunning)
                {
                    _tasktimer.change(period, timeout.infinite);
                }

                // 解除因为释放锁而阻塞的线程。
                // 如果已经调用了 stop,则会唤醒那个因为 wait 阻塞的线程,就会使 _isrunning 置为 false。
                monitor.pulse(_tasktimer);
            }
        }
    }
}

这里对 _performingtasks_isrunning 字段设置为 volatile 防止指令重排和寄存器缓存。这是因为在 stop 方法内部使用到的 _performingtasks 可能会被优化,所以将该字段设置为了易失的。

irunnable 接口

abp vnext 为任务的启动和停止,抽象了一个 irunnable 接口。虽然描述说的是对线程的行为进行抽象,但千万千万不要手动调用 thread.abort() 。关于 thread.abort() 的坏处,这里不再多加赘述,可以参考 的描述,或者搜索其他的相关文章。

public interface irunnable
{
    // 启动这个服务。
    task startasync(cancellationtoken cancellationtoken = default);

    /// <summary>
    /// 停止这个服务。
    /// </summary>
    task stopasync(cancellationtoken cancellationtoken = default);
}

后台工作者

模块的构造

后台工作者的模块行为比较简单,它定义了在应用程序初始化和销毁时的行为。在初始化时,后台工作者管理器 获得所有 后台工作者,并开始启动它们。在销毁时,后台工作者管理器获得所有后台工作者,并开始停止他们,这样才能够做到优雅退出。

[dependson(
    typeof(abpthreadingmodule)
    )]
public class abpbackgroundworkersmodule : abpmodule
{
    public override void onapplicationinitialization(applicationinitializationcontext context)
    {
        var options = context.serviceprovider.getrequiredservice<ioptions<abpbackgroundworkeroptions>>().value;
        // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 startasync 启动所有后台工作者。
        if (options.isenabled)
        {
            asynchelper.runsync(
                () => context.serviceprovider
                    .getrequiredservice<ibackgroundworkermanager>()
                    .startasync()
            );
        }
    }

    public override void onapplicationshutdown(applicationshutdowncontext context)
    {
        var options = context.serviceprovider.getrequiredservice<ioptions<abpbackgroundworkeroptions>>().value;
        // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 stopasync 停止所有后台工作者。
        if (options.isenabled)
        {
            asynchelper.runsync(
                () => context.serviceprovider
                    .getrequiredservice<ibackgroundworkermanager>()
                    .stopasync()
            );
        }
    }
}

后台工作者的定义

首先看看 ibackgroundworker 接口的定义,是空的。不过继承了 isingletondependency 接口,说明我们的每个后台工作者都是 单例 的。

/// <summary>
/// 在后台运行,执行某些任务的工作程序(线程)的接口定义。
/// </summary>
public interface ibackgroundworker : irunnable, isingletondependency
{

}

abp vnext 为我们定义了一个抽象的后台工作者类型 backgroundworkerbase,这个基类的设计目的是提供一些常用组件(和 applicationservice 一样)。

public abstract class backgroundworkerbase : ibackgroundworker
{
    //todo: add uow, localization and other useful properties..?
    //todo: 是否应该提供工作单元、本地化以及其他常用的属性?

    public ilogger<backgroundworkerbase> logger { protected get; set; }

    protected backgroundworkerbase()
    {
        logger = nulllogger<backgroundworkerbase>.instance;
    }
    
    public virtual task startasync(cancellationtoken cancellationtoken = default)
    {
        logger.logdebug("started background worker: " + tostring());
        return task.completedtask;
    }

    public virtual task stopasync(cancellationtoken cancellationtoken = default)
    {
        logger.logdebug("stopped background worker: " + tostring());
        return task.completedtask;
    }

    public override string tostring()
    {
        return gettype().fullname;
    }
}

abp vnext 内部只有一个默认的后台工作者实现 periodicbackgroundworkerbase。从名字上来看,意思是就是周期执行的后台工作者,内部就是用的 abptimer 来实现,abp vnext 将其包装起来是为了实现统一的模式(后台工作者)。

public abstract class periodicbackgroundworkerbase : backgroundworkerbase
{
    protected readonly abptimer timer;

    // 也就意味着子类必须在其构造函数,指定 timer 的执行周期。
    protected periodicbackgroundworkerbase(abptimer timer)
    {
        timer = timer;
        timer.elapsed += timer_elapsed;
    }

    // 启动后台工作者。
    public override async task startasync(cancellationtoken cancellationtoken = default)
    {
        await base.startasync(cancellationtoken);
        timer.start(cancellationtoken);
    }

    // 停止后台工作者。
    public override async task stopasync(cancellationtoken cancellationtoken = default)
    {
        timer.stop(cancellationtoken);
        await base.stopasync(cancellationtoken);
    }
    
    // timer 关联的周期事件,之所以不直接挂载 dowork,是为了捕获异常。
    private void timer_elapsed(object sender, system.eventargs e)
    {
        try
        {
            dowork();
        }
        catch (exception ex)
        {
            logger.logexception(ex);
        }
    }

    // 你要周期执行的任务。
    protected abstract void dowork();
}

我们如果要实现自己的后台工作者,只需要继承该类,实现 dowork() 方法即可。

public class testworker : periodicbackgroundworkerbase
{
    public testworker(abptimer timer) : base(timer)
    {
        // 每五分钟执行一次。
        timer.period = (int)timespan.fromminutes(5).totalmilliseconds;
    }

    protected override void dowork()
    {
        console.writeline("后台工作者被执行了。");
    }
}

然后在我们自己模块的 onpreapplicationinitialization() 方法内解析出后台作业管理器(ibackgroundworkermanager),调用它的 add() 方法,将我们定义的 testworker 添加到管理器当中即可。

后台工作者管理器

所有的后台工作者都是通过 ibackgroundworkermanager 进行管理的,它提供了 startasync()stopasync()add() 方法。前面两个方法就是 irunnable 接口定义的,后台工作者管理器直接集成了该接口,后面的 add() 方法就是用来动态添加我们的后台工作者。

后台工作者管理器的默认实现是 backgroundworkermanager 类型,它内部做的事情很简单,就是维护一个后台工作者集合。每当调用 startasync()stopasync() 方法的时候,都从这个集合遍历后台工作者,执行他们的启动和停止方法。

这里值得注意的一点是,当我们调用 add() 方法添加了一个后台工作者之后,后台工作者管理器就会启动这个后台工作者。

public class backgroundworkermanager : ibackgroundworkermanager, isingletondependency, idisposable
{
    protected bool isrunning { get; private set; }

    private bool _isdisposed;

    private readonly list<ibackgroundworker> _backgroundworkers;

    public backgroundworkermanager()
    {
        _backgroundworkers = new list<ibackgroundworker>();
    }

    public virtual void add(ibackgroundworker worker)
    {
        _backgroundworkers.add(worker);

        // 如果当前后台工作者管理器还处于运行状态,则调用工作者的 startasync() 方法启动。
        if (isrunning)
        {
            asynchelper.runsync(
                () => worker.startasync()
            );
        }
    }

    public virtual void dispose()
    {
        if (_isdisposed)
        {
            return;
        }

        _isdisposed = true;

        //todo: ???
    }

    // 启动,则遍历集合启动。
    public virtual async task startasync(cancellationtoken cancellationtoken = default)
    {
        isrunning = true;

        foreach (var worker in _backgroundworkers)
        {
            await worker.startasync(cancellationtoken);
        }
    }

    // 停止, 则遍历集合停止。
    public virtual async task stopasync(cancellationtoken cancellationtoken = default)
    {
        isrunning = false;

        foreach (var worker in _backgroundworkers)
        {
            await worker.stopasync(cancellationtoken);
        }
    }
}

上述代码其实存在一个问题,即后台工作者被释放以后,是否还能执行 add() 操作。参考我 ,其实当对象被释放之后,就应该抛出 objectdisposedexception 异常。

后台作业

比起后台工作者,我们执行一次性任务的时候,一般会使用后台作业进行处理。比起只能设置固定周期的 periodicbackgroundworkerbase ,集成了 hangfire 的后台作业管理器,能够让我们使用 cron 表达式,更加灵活地设置任务的执行周期。

模块的构造

关于后台作业的模块,我们需要说道两处。第一处是位于 volo.abp.backgroundjobs.abstractions 项目的 abpbackgroundjobsabstractionsmodule ,第二出则是位于 volo.abp.backgroundjobs 项目的 abpbackgroundjobsmodule

abpbackgroundjobsabstractionsmodule 的主要行为是将符合条件的后台作业,添加到 abpbackgroundjoboptions 配置当中,以便后续进行使用。

public override void preconfigureservices(serviceconfigurationcontext context)
{
    registerjobs(context.services);
}

private static void registerjobs(iservicecollection services)
{
    var jobtypes = new list<type>();

    // 如果注册的类型符合 ibackgroundjob<> 泛型,则添加到集合当中。
    services.onregistred(context =>
    {
        if (reflectionhelper.isassignabletogenerictype(context.implementationtype, typeof(ibackgroundjob<>)))
        {
            jobtypes.add(context.implementationtype);
        }
    });

    services.configure<abpbackgroundjoboptions>(options =>
    {
        // 将数据赋值给配置类。
        foreach (var jobtype in jobtypes)
        {
            options.addjob(jobtype);
        }
    });
}

volo.abp.backgroundjobs 内部是 abp vnext 为我们提供的 默认后台作业管理器,这个后台作业管理器 本质上是一个后台工作者

这个后台工作者会周期性(取决于 abpbackgroundjobworkeroptions.jobpollperiod 值,默认为 5 秒种)地从 ibackgroundjobstore 捞出一堆后台任务,并且在后台执行。至于每次执行多少个后台任务,这也取决于 abpbackgroundjobworkeroptions.maxjobfetchcount 的值,默认值是 1000 个。

注意:

这里的 options 类是 abpbackgroundjobworkeroptions,别和 abpbackgroundworkeroptions 混淆了。

所以在 abpbackgroundjobsmodule 模块里面,只做了一件事情,就是将负责后台作业的后台工作者,添加到后台工作者管理器种,并开始周期性地执行。

public override void onapplicationinitialization(applicationinitializationcontext context)
{
    var options = context.serviceprovider.getrequiredservice<ioptions<abpbackgroundjoboptions>>().value;
    if (options.isjobexecutionenabled)
    {
        // 获得后台工作者管理器,并将负责后台作业的工作者添加进去。
        context.serviceprovider
            .getrequiredservice<ibackgroundworkermanager>()
            .add(context.serviceprovider.getrequiredservice<ibackgroundjobworker>()
            );
    }
}

后台作业的定义

在上一节里面看到,只要是实现 ibackgroundjob<targs> 类型的都视为一个后台作业。这个后台作业接口,只定义了一个行为,那就是执行(execute(targs))。这里的 targs 泛型作为执行后台作业时,需要传递的参数类型。

// 因为是传入的参数,所以泛型参数是逆变的。
public interface ibackgroundjob<in targs>
{
    void execute(targs args);
}

检查源码,发现 abp vnext 的邮箱模块定义了一个邮件发送任务 backgroundemailsendingjob,它的实现大概如下。

public class backgroundemailsendingjob : backgroundjob<backgroundemailsendingjobargs>, itransientdependency
{
    // ...
    
    public override void execute(backgroundemailsendingjobargs args)
    {
        asynchelper.runsync(() => emailsender.sendasync(args.to, args.subject, args.body, args.isbodyhtml));
    }
}

后台作业管理器

后台作业都是通过一个后台作业管理器(ibackgroundjobmanager)进行管理的,这个接口定义了一个入队方法(enqueueasync()),注意,我们的后台作业在入队后,不是马上执行的。

说一下这个入队处理逻辑:

  1. 首先我们会通过参数的类型,获取到任务的名称。(假设任务上面没有标注 backgroundjobnameattribute 特性,那么任务的名称就是参数类型的 fullname 。)
  2. 构造一个 backgroundjobinfo 对象。
  3. 通过 ibackgroundjobstore 持久化任务信息。
public virtual async task<string> enqueueasync<targs>(targs args, backgroundjobpriority priority = backgroundjobpriority.normal, timespan? delay = null)
{
    // 获取任务名称。
    var jobname = backgroundjobnameattribute.getname<targs>();
    var jobid = await enqueueasync(jobname, args, priority, delay);
    return jobid.tostring();
}

protected virtual async task<guid> enqueueasync(string jobname, object args, backgroundjobpriority priority = backgroundjobpriority.normal, timespan? delay = null)
{
    var jobinfo = new backgroundjobinfo
    {
        id = guidgenerator.create(),
        jobname = jobname,
        // 通过序列化器,序列化参数值,方便存储。这里内部其实使用的是 json.net 进行序列化。
        jobargs = serializer.serialize(args),
        priority = priority,
        creationtime = clock.now,
        nexttrytime = clock.now
    };

    // 如果任务有执行延迟,则任务的初始执行时间要加上这个延迟。
    if (delay.hasvalue)
    {
        jobinfo.nexttrytime = clock.now.add(delay.value);
    }

    // 持久化任务信息,方便后面执行后台作业的工作者能够取到。
    await store.insertasync(jobinfo);

    return jobinfo.id;
}

backgroundjobnameattribute 相关的方法:

public static string getname<tjobargs>()
{
    return getname(typeof(tjobargs));
}

public static string getname([notnull] type jobargstype)
{
    check.notnull(jobargstype, nameof(jobargstype));

    // 判断参数类型上面是否标注了特性,并且特性实现了 ibackgroundjobnameprovider 接口。
    return jobargstype
                .getcustomattributes(true)
                .oftype<ibackgroundjobnameprovider>()
                .firstordefault()
                ?.name
        // 拿不到名字,则使用类型的 fullname。
            ?? jobargstype.fullname;
}

后台作业的存储

后台作业的存储默认是放在内存的,这点可以从 inmemorybackgroundjobstore 类型实现看出来。在它的内部使用了一个并行字典,通过作业的 guid 与作业进行关联绑定。

除了内存实现,在 volo.abp.backgroundjobs.domain 模块还有一个 backgroundjobstore 实现,基本套路与 settingstore 一样,都是存储到数据库里面。

public class backgroundjobstore : ibackgroundjobstore, itransientdependency
{
    protected ibackgroundjobrepository backgroundjobrepository { get; }

    // ... 
    
    public backgroundjobinfo find(guid jobid)
    {
        return objectmapper.map<backgroundjobrecord, backgroundjobinfo>(
            backgroundjobrepository.find(jobid)
        );
    }
    
    // ...

    public void insert(backgroundjobinfo jobinfo)
    {
        backgroundjobrepository.insert(
            objectmapper.map<backgroundjobinfo, backgroundjobrecord>(jobinfo)
        );
    }

    // ...
}

后台作业的执行

默认的后台作业管理器是通过一个后台工作者来执行后台任务的,这个实现叫做 backgroundjobworker,这个后台工作者的生命周期也是单例的。后台作业的具体执行逻辑里面,涉及到了以下几个类型的交互。

类型 作用
abpbackgroundjoboptions 提供每个后台任务的配置信息,包括任务的类型、参数类型、任务名称数据。
abpbackgroundjobworkeroptions 提供后台作业工作者的配置信息,例如每个周期 最大执行的作业数量、后台
工作者的 执行周期、作业执行 超时时间 等。
backgroundjobconfiguration 后台任务的配置信息,作用是将持久化存储的作业信息与运行时类型进行绑定
和实例化,以便 abp vnext 来执行具体的任务。
ibackgroundjobexecuter 后台作业的执行器,当我们从持久化存储获取到后台作业信息时,将会通过
这个执行器来执行具体的后台作业。
ibackgroundjobserializer 后台作业序列化器,用于后台作业持久化时进行序列化的工具,默认采用的
是 json.net 进行实现。
jobexecutioncontext 执行器在执行后台作业时,是通过这个上下文参数进行执行的,在这个上下
文内部,包含了后台作业的具体类型、后台作业的参数值。
ibackgroundjobstore 前面已经讲过了,这个是用于后台作业的持久化存储,默认实现是存储在内存。
backgroundjobpriority 后台作业的执行优先级定义,abp vnext 在执行后台任务时,会根据任务的优
先级进行排序,以便在后面执行的时候优先级高的任务先执行。

我们来按照逻辑顺序走一遍它的实现,首先后台作业的执行工作者会从持久化存储内,获取 maxjobfetchcount 个任务用于执行。从持久化存储获取后台作业信息(backgroundjobinfo),是由 ibackgroundjobstore 提供的。

var store = scope.serviceprovider.getrequiredservice<ibackgroundjobstore>();

var waitingjobs = store.getwaitingjobs(workeroptions.maxjobfetchcount);

// 不存在任何后台作业,则直接结束本次调用。
if (!waitingjobs.any())
{
    return;
}

inmemorybackgroundjobstore 的相关实现:

public list<backgroundjobinfo> getwaitingjobs(int maxresultcount)
{
    return _jobs.values
        .where(t => !t.isabandoned && t.nexttrytime <= clock.now)
        .orderbydescending(t => t.priority)
        .thenby(t => t.trycount)
        .thenby(t => t.nexttrytime)
        .take(maxresultcount)
        .tolist();
}

上面的代码可以看出来,首先排除 被放弃的任务 ,包含达到执行时间的任务,然后根据任务的优先级从高到低进行排序。重试次数少的优先执行,预计执行时间越早的越先执行。最后从这些数据中,筛选出 maxresultcount 结果并返回。

说到这里,我们来看一下这个 nexttrytime 是如何被计算出来的?回想起最开始的后台作业管理器,我们在添加一个后台任务的时候,就会设置这个后台任务的 预计执行时间。第一个任务被添加到执行队列中时,它的值一般是 clock.now ,也就是它被添加到队列的时间。

不过 abp vnext 为了让那些经常执行失败的任务,有比较低的优先级再执行,就在每次任务执行失败之后,会将 nexttrytime 的值指数级进行增加。这块代码可以在 calculatenexttrytime 里面看到,也就是说某个任务的执行 失败次数越高,那么它下一次的预期执行时间就会越远。

protected virtual datetime? calculatenexttrytime(backgroundjobinfo jobinfo, iclock clock)
{
    // 一般来说,这个 defaultwaitfactor 因子的值是 2.0 。
    var nextwaitduration = workeroptions.defaultfirstwaitduration * (math.pow(workeroptions.defaultwaitfactor, jobinfo.trycount - 1)); // 同执行失败的次数进行挂钩。
    var nexttrydate = jobinfo.lasttrytime?.addseconds(nextwaitduration) ??
                        clock.now.addseconds(nextwaitduration);

    if (nexttrydate.subtract(jobinfo.creationtime).totalseconds > workeroptions.defaulttimeout)
    {
        return null;
    }

    return nexttrydate;
}

当预期的执行时间都超过 defaulttimeout 的超时时间时(默认为 2 天),说明这个任务确实没救了,就不要再执行了。

我们之前说到,从 ibackgroundjobstore 拿到了需要执行的后台任务信息集合,接下来我们就要开始执行后台任务了。

foreach (var jobinfo in waitingjobs)
{
    jobinfo.trycount++;
    jobinfo.lasttrytime = clock.now;

    try
    {
        // 根据任务名称获取任务的配置参数。
        var jobconfiguration = joboptions.getjob(jobinfo.jobname);
        // 根据配置里面存储的任务类型,将参数值进行反序列化。
        var jobargs = serializer.deserialize(jobinfo.jobargs, jobconfiguration.argstype);
        // 构造一个新的执行上下文,让执行器执行任务。
        var context = new jobexecutioncontext(scope.serviceprovider, jobconfiguration.jobtype, jobargs);

        try
        {
            jobexecuter.execute(context);

            // 如果任务执行成功则删除该任务。            
            store.delete(jobinfo.id);
        }
        catch (backgroundjobexecutionexception)
        {
            // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。
            var nexttrytime = calculatenexttrytime(jobinfo, clock);

            if (nexttrytime.hasvalue)
            {
                jobinfo.nexttrytime = nexttrytime.value;
            }
            else
            {
                // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。
                jobinfo.isabandoned = true;
            }

            tryupdate(store, jobinfo);
        }
    }
    catch (exception ex)
    {
        // 执行过程中,产生了未知异常,设置为废弃任务,并打印日志。
        logger.logexception(ex);
        jobinfo.isabandoned = true;
        tryupdate(store, jobinfo);
    }
}

执行后台任务的时候基本分为 5 步,它们分别是:

  1. 获得任务关联的配置参数,默认不用提供,因为在之前模块初始化的时候就已经配置了(你也可以显式指定)。
  2. 通过之前存储的配置参数,将参数值反序列化出来,构造具体实例。
  3. 构造一个执行上下文。
  4. 后台任务执行器执行具体的后台任务。
  5. 成功则删除任务,失败则更新任务下次的执行状态。

至于执行器里面的真正执行操作,你都拿到了参数值和任务类型了。就可以通过类型用 ioc 获取后台任务对象的实例,然后通过反射匹配方法签名,在实例上调用这个方法传入参数即可。

public virtual void execute(jobexecutioncontext context)
{
    // 构造具体的后台作业实例对象。
    var job = context.serviceprovider.getservice(context.jobtype);
    if (job == null)
    {
        throw new abpexception("the job type is not registered to di: " + context.jobtype);
    }

    // 获得需要执行的方法签名。
    var jobexecutemethod = context.jobtype.getmethod(nameof(ibackgroundjob<object>.execute));
    if (jobexecutemethod == null)
    {
        throw new abpexception($"given job type does not implement {typeof(ibackgroundjob<>).name}. the job type was: " + context.jobtype);
    }

    try
    {
        // 直接通过 methodinfo 的 invoke 方法调用,传入具体的实例对象和参数值即可。
        jobexecutemethod.invoke(job, new[] { context.jobargs });
    }
    catch (exception ex)
    {
        logger.logexception(ex);

        // 如果是执行方法内的异常,则包装进行处理,然后抛出。
        throw new backgroundjobexecutionexception("a background job execution is failed. see inner exception for details.", ex)
        {
            jobtype = context.jobtype.assemblyqualifiedname,
            jobargs = context.jobargs
        };
    }
}

集成 hangfire

abp vnext 对于 hangfire 的集成代码分布在 volo.abp.hangfirevolo.abp.backgroundjobs.hangfire 模块内部,前者是在模块配置里面,调用 hangfire 库的相关方法,注入组件到 ioc 容器当中。后者则是对后台作业进行了适配处理,替换了默认的 ibackgroundjobmanager 实现。

abphangfiremodule 模块内部,通过工厂创建出来一个 backgroudjobserver 实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。

public class abphangfiremodule : abpmodule
{
    private backgroundjobserver _backgroundjobserver;

    public override void configureservices(serviceconfigurationcontext context)
    {
        context.services.addhangfire(configuration =>
        {
            context.services.executepreconfiguredactions(configuration);
        });
    }

    public override void onapplicationinitialization(applicationinitializationcontext context)
    {
        var options = context.serviceprovider.getrequiredservice<ioptions<abphangfireoptions>>().value;
        _backgroundjobserver = options.backgroundjobserverfactory.invoke(context.serviceprovider);
    }

    public override void onapplicationshutdown(applicationshutdowncontext context)
    {
        //todo: abp may provide two methods for application shutdown: onpreapplicationshutdown & onapplicationshutdown
        _backgroundjobserver.sendstop();
        _backgroundjobserver.dispose();
    }
}

我们直奔主题,看一下基于 hangfire 的后台作业管理器是怎么实现的。

public class hangfirebackgroundjobmanager : ibackgroundjobmanager, itransientdependency
{
    public task<string> enqueueasync<targs>(targs args, backgroundjobpriority priority = backgroundjobpriority.normal,
        timespan? delay = null)
    {
        // 如果没有延迟参数,则直接通过 enqueue() 方法扔进执行对了。
        if (!delay.hasvalue)
        {
            return task.fromresult(
                backgroundjob.enqueue<hangfirejobexecutionadapter<targs>>(
                    adapter => adapter.execute(args)
                )
            );
        }
        else
        {
            return task.fromresult(
                backgroundjob.schedule<hangfirejobexecutionadapter<targs>>(
                    adapter => adapter.execute(args),
                    delay.value
                )
            );
        }
    }

上述代码中使用 hangfirejobexecutionadapter 进行了一个适配操作,因为 hangfire 要将一个后台任务扔进队列执行,不是用 targs 就能解决的。

转到这个适配器定义,提供了一个 execute(targs) 方法,当被添加到 hangfire 队列执行的时候。实际 hangfire 会调用适配器的 excetue(targs) 方法,然后内部还是使用的 ibackgroundjobexecuter 来执行具体定义的任务。

public class hangfirejobexecutionadapter<targs>
{
    protected abpbackgroundjoboptions options { get; }
    protected iservicescopefactory servicescopefactory { get; }
    protected ibackgroundjobexecuter jobexecuter { get; }

    public hangfirejobexecutionadapter(
        ioptions<abpbackgroundjoboptions> options, 
        ibackgroundjobexecuter jobexecuter, 
        iservicescopefactory servicescopefactory)
    {
        jobexecuter = jobexecuter;
        servicescopefactory = servicescopefactory;
        options = options.value;
    }

    public void execute(targs args)
    {
        using (var scope = servicescopefactory.createscope())
        {
            var jobtype = options.getjob(typeof(targs)).jobtype;
            var context = new jobexecutioncontext(scope.serviceprovider, jobtype, args);
            jobexecuter.execute(context);
        }
    }
}

集成 rabbitmq

基于 rabbitmq 的后台作业实现,我想放在分布式事件总线里面,对其一起进行讲解。

三、总结

abp vnext 为我们提供了多种后台作业管理器的实现,你可以根据自己的需求选用不同的后台作业管理器,又或者是自己动手造轮子。

需要看其他的 abp vnext 相关文章? 即可跳转到总目录。

《[Abp vNext 源码分析] - 12. 后台作业与后台工作者.doc》

下载本文的Word格式文档,以方便收藏与打印。