January 27, 2022

设计一个分布式定时任务系统

尝试设计一个 "分布式" 定时任务系统,"功能设计" 和 "分布式设计" 作为切入点。不含任何的代码,只是一个设计思路。

需求和背景分析

一提到定时任务的使用场景,我们肯定能想到很多场景,比如:

  • 每天晚上12点执行一次清理数据的任务
  • 每天凌晨1点给符合条件的用户发送推广邮件
  • 每个月10号结算工资
  • 每隔5分钟检查一次服务器的状态
  • 每天根据用户的配置,给用户发送站内消息提醒

从常见的场景中,我们可以提炼出一些定时任务的特点:

序号 特点 说明
1 定时 执行时间有规则
2 可靠 可以延迟执行,但不能不执行;可以不执行,但是不能多执行
3 并发(可能) 某些场景下,可以运行多个 cron 进程来提高执行效率
4 可执行 这个有点废话了,但是这个关系到 cronjob 的设计,因此在这里还是提出来

但是作为一个系统来说,我们需要更多的功能来提升用户体验,保证平台的可靠和稳定。我们设想下以下的场景:

  • 定时任务已经触发了,但是有没有执行,执行结果是什么?
  • 如果一个定时任务长时间运行,那么它正常吗?
  • 一个定时任务还在运行,但是下一个触发时机又到来了,该怎么办?
  • 如果服务器资源已经处于高位,那么要被触发的任务还触发吗?

最后,基本的访问控制,权限分配和API设计,这些都是系统功能的一部分,但不作为本文的重点考虑对象。

一些概念

到这里我们可以提取一些概念,来帮助我们设计一个系统:

序号 名词 解释
1 CronJob 定时任务实例,描述定时任务资源的一个实体
2 Job 执行中的定时任务
3 Scheduler 调度器,负责触发 CronJob 执行和监控 Job 的执行状态;或许还会存储一些 CronJob 的状态
4 JobRuntime job 运行时,准备 Job 运行时需要的资源; 可以参考 k8s CRI

这里 JobRuntime 是抽象化的概念,因为 Job 可以是k8s上的POD,也可以是物理机的进程,还可以是一个进程中的coroutine。

这几个概念的关联关系如图:

总结: 把定时任务平台分为了运行前和运行中两个阶段,通过调度器居中协调,达到两个目的:

  1. Scheduler 不负责 CronJob 的运行,减轻 Scheduler 的职责。
  2. JobRuntime 抽象化并独立设计,方便扩展。

关键点

再总结一下设计过程中的关键点,如下图。

可以先忽略分布式相关的要点(large scale, reliable),先重点看下功能性的要点。

  1. CronJob Track「定时任务跟踪」

    感知 CronJob 的状态,并记录状态变化的历史记录,用于辅助 scheduler 实现一些策略调度。同时也能提供一些 CronJob 的运行数据,帮助用户调试和了解 CronJob 的健康状况。

  2. Idempotent「幂等保证」

    某些定时任务对执行次数是有要求的,作为提供服务的平台来说,需要尽最大努力保证任务执行。因此提出一些 “幂等级别” 供不同的定时任务使用。

    • 只执行一次。一个 “CronJob” 一定会触发一次,但是这个比较难保证。
    • 最多一次。可以不触发,但是不能触发多次。
    • 可以重复执行。可以触发多次。
  3. Realtime「实时性」

    CronJob 对于执行的时机也有要求,如果 CronJob 要求1分钟内执行完毕,如果1分钟后再运行会导致时效性的问题,那么平台也应该考虑相关的设计。

  4. Fault Tolerance「容错设计」

    CronJob 可能会因为一些外部因素 “执行失败” 或者 “超时”,这时候需要提供一些容错机制,比如:失败重试,超时关闭。

  5. Cocurrency「并发控制」

    好比于,k8s 中 Deployment 作为一个无状态服务,可以运行多个副本以提高效率;但是相对的,Statefule Set 作为有状态的应用,POD不能随意的扩容,所以需要提供一些并发控制机制,严格的限制 CronJob 的并发数量。

  6. Pre-Allocated「预分配」

    同样的,某些 CronJob 的优先级比较高,要保证这些任务一直稳定运行。那么需要提供一些预分配机制,可以避免再系统资源处于高位的时候,这些 CronJob 还能继续被调度运行。

总结: 这些要点体现在设计上就是,Schuduler 对 CronJob 的调度控制,来自于 CronJob 的配置和运行状态。CronJob Configuration 包含一些辅助调度控制的配置,如:

{
    // ...
    "concurrency": 1, // 1 表示这个 CronJob 只能有一个运行中的 Job 实例。
    "preAllocated": true, // true 表示这个 CronJob 可以被预分配,这样可以避免系统资源被占满,导致这个 CronJob 无法调度。
    "idempotent": "onceMost", // [onceOnly, onceMost, unlimit]
    "faultTolerance": {
        "maxStartupRetries": 3, // 最大启动失败重试次数
        "maxFailureRetries": 3, // 最大执行失败重试次数
        "maxFailureRetriesSeconds": "60s", // 最大执行失败重试间隔
        "activeDeadlineSeconds": "120s", // 最大执行时间,超过这个时间就会被认为是失败
    }, // 容错配置
    // ... 
    "resourceLimit": {
        "cpu": "0.1", // CPU 限制
        "memory": "128Mi", // 内存限制
    }, // 资源限制
}

向分布式演进

到目前为止,这个定时任务系统的功能已经差不多了,那么是不是意味着这样的设计已经满足需求了?当然没有,比如:

  • 单个服务器要承担 交互调度执行 的压力,那么它能承载的定时任务数量势必会被限制。
  • 同样,如果这一台机器出现问题,那么不是就导致所有的定时任务都无法正常运行,影响整个系统的运行。
  • 同样,如果我们需求的是一个 PasS 平台,单机不可能 邻近 所有的用户,那么用户请求就很可能经过一些低俗和拥堵的网络链路。

简单的说就是单点问题~,因此我们需要 分布式 设计。

那么怎么把它演进成一个分布式系统呢?

首先应用 CAP 理论,我们可以先确认这个系统应该是 CP 还是 AP 系统呢?对于定时任务系统,高可用不是最重要的,反而是可靠性非常重要。那么可靠性是怎么体现的呢?

  • 定时任务不会因为某些节点宕机而不执行
  • 强调了最多运行一次,也不会被反复调度执行
  • 因为系统异常而未被执行,可以被重新调度执行

因此,我们不需要这个系统有非常高的可用性,偶尔不可用是可以接受的(这是由定时任务的性质决定的)。

对一致性的思考

思考下,这里是否需要一致性?如果需要,那么是强一致还是最终一致呢? 按照我目前的理解,分布式分为两种(无状态和有状态),针对有状态的服务,可能需要强一致性,也可能最终一致就能满足服务正常运行的需求了。

这里我们分为几个问题来一步步理解:

  1. 分布式设计之后,对定时任务系统带来的问题是什么?
    • 按照我们上面的抽象,怎么确定由哪个节点来承担 scheduler 的职责?– 选主算法
    • 如果 scheduler 宕机,应该由谁来接替它的工作?– 选主算法
    • 接替者如何“正确的继续”(崩溃时,被调度未结束的和调度中的任务如何处理)?– 集群状态
    • 如果集群整体崩溃,那么下次集群重启时,如何弥补宕机期间的任务执行?– 集群状态
  2. 分布式集群有哪些 状态
    • 任务队列(调度器)
    • 调度中/执行中的任务(调度器)
    • 集群中节点清单和节点状态
    • 集群时间/上一次调度时间戳(调度器)
    • … 等等跟运行相关的数据

通过上面的QA,再来思考下分布式定时任务平台对于一致性的要求。如果你把整个系统的状态或者日志,放在外部的存储系统中,那么对于定时任务平台也没有特别的一致性要求,因为已经由外部存储系统来维护了。但是如果是保持在系统内部,那么就有要求了,如果说系统中各个节点对于集群的状态存储不一致,那么一旦执行调度任务的节点宕机,其他节点执行调度时上,这样就会导致定时任务的调度出现不一致,这就是一致性的问题。

按照我们的分析,设计的分布式定时任务系统算是一个 CP 系统。因此在设计的时候(暂定为集群内部维护状态),我们通过引入一致性协议来解决一致性问题。如果你学习过分布式理论,亦或者学习过一些分布式软件,如 k8s / etcd / consul 等等,就应该知道 RaftPaxos 这两个一致性协议。

Raft 或者 Paxos 是通过 Leader-Follower 的方式来实现强一致性,那么我们引入 Raft 之后就有如下的架构:

至此,整个分布式定时任务的框架已经成型,我们可以开始设计定时任务的调度器了。

定时任务的调度器

思考下,调度器的职责是什么?

调度器的职责说简单点就是,在确定在什么时间在哪个节点上运行某个定时任务。时间根据cron规则和cron任务配置的策略来确定,而节点是由调度器根据每个节点的资源负载同时配合cron任务配置的策略来确定。

但是,只是这样而已吗?别忘了现在已经是个分布式集群了,调度器是由leader节点来运行的。如果leadership发生变化,那么接替者还应该检查上一任正在处理的事情,并且把它接着做。举个例子:

A、B、C 三个节点,A作为第一任leader,A正在处理任务1且尚未分配到具体节点执行。这时A宕机了,B作为接替者,那么B成为leader后运行调度器,首先要做的是就是重建调度状态也就是集群状态,并根据集群状态来继续调度。那么怎么重建?

  • 加载所有的定时任务,重建时间轮或者定时器。
  • 检查 “运行中队列”
  • 检查 “待调度队列”
  • 检查 “重试队列”
  • 其他数据加载(调度时间)

队列为空,当然是最简单的状况,复杂的就是队列不为空情况下怎么处理?运行中不为空,那么恢复结果同步协程;待调度不为空,那么根据调度算法和任务配置的策略来调度;重试队列不为空,那么根据调度算法和任务配置的策略来调度。

任务配置的策略,如不能并发;可以重试;最晚调度时间等等,都会参与调度算法。

用代码来描述的话,就是:

def schedule(self):
    if leadership_changed:
        # cluster first run or leadership changed
        self.start_cron_timer()
        self.rebuild_wait_scheduled_queue()
        self.rebuild_retry_queue()
        self.rebuild_running_queue()

    for True:
        job = self.get_from_wait_schduled_or_retry_queue()
        if job is None:
            backoff_delay()
            continue
        node_id = self.match_node(job)
        # build cronjob's RunContext, put it into running queue and 
        # start a coroutine to synchroize the cronjob's running result.
        self.schedule_to_node(node_id, job)

定时任务运行时

任务运行时是为了方便抽象和扩展而来的一个概念,它的职责是接受Leader任务调度请求,运行定时任务并且同步运行结果。但运行时不一定是要集群内的服务器来担任,也可以是外部的系统,如:k8s, containerd, docker, etc,内部只需要把外部系统包装成符合运行时要求的资源即可。

这里,我们假定定时任务的资源类型为容器镜像,运行环境自然就是容器运行时。那么定时任务的运行时,就只需要包装一下k8s的API或者定义CRD即可。

type JobRuntime interface {
    // running job
    RunJobAsync(ctx context.Context, job *Job, resultCh chan<- *JobResult) (err error)
    // cancel job (timeout, etc)
    CancelJob(job *Job, reason Reason) (err error)
    // query job's status (running, success, failed, etc)
    QueryJobResult(job *Job) (result *JobResult, err error)
}

因为运行时也算一个无状态节点,所以也没有太多的设计要素。

Kubernetes CronJob

如果对k8s稍微了解一点,肯定会好奇,为啥不直接使用k8s的cronjob来满足定时任务系统的使用场景呢?当然是可以的,只是这里的主要目的是从头梳理一个(分布式)定时任务系统的功能和设计。在一般场景下 k8s cronjob 已经能够满足使用了,只有个别场景不能直接支持:

  • 预分配(优先保证重要的定时任务资源)
  • 幂等保证(并发/幂等精细控制)
  • 实时性(超时退出,过时不启动) spec.startingDeadlineSeconds
  • 异常告警

另外就是在使用上没有那么“方便”(运维可不一定会让开发直接接触k8s~)。

关于 CronJob 的详细控制可以参见 https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/cron-job-v1/#CronJobSpec

个人以为,中小型公司没必要花费大精力去开发分分布式定时任务系统,只需要在k8s的 CronJob/Job 的基础上稍微包装一下自己的控制模块就能满足要求。

总结

定时任务系统是最常用的服务之一,实现方式可大可小,也可以使用linux crontab 服务。本文在不依赖任何服务的前提下,来设计一个分布式定时任务系统,从功能到架构一一梳理。当然,文中提到的功能清单不一定全,只是站在我的角度看到的必要提及的通用功能。架构也不是一成不变的,可以根据自己的场景和目的来设计,只要达到目的即可。

水平有限,如有错误,欢迎勘误指正。

参考文献