Raymond's Notebook
日拱一卒无有尽,功不唐捐终入海
使用时间轮算法实现延迟队列 (一)(Golang)

实际项目当中,经常会使用延迟队列,执行一些延迟任务,比如:

  • 如果第一次通知失败,就延迟 X 分钟再通知一次,这在构建webhook的项目中很常见。
  • 如果用户成功消费,X 天后没有评论,就默认给予好评。

这些任务具有的共同点可以抽象为:在某项任务执行完毕后,延迟一定时间执行另外一项任务。由于这些任务分布在随机的时间点上,构建定时JOB去跑这些任务必定是一件非常繁琐的工程。即使采用每次计算好执行时间,然后存储到缓存或者DB里面,仍然面临每次需要遍历庞大任务数据,并且执行时间判断造成的误差会导致某些任务也许无法执行的窘境。那么一个设计优良的延迟队列,为了解决上述问题,应该具备哪些特性呢?我觉得,它应该具备以下特性:

  • 延迟时间要精准,保证在延迟到达时,执行预先设定的任务。
  • 任务的执行时间,不应太过于影响时间计时器的运行。
  • 延迟粒度可以自由配置,时间间隔可大可小。

要实现延迟队列,目前比较常用的方式,是使用时间轮算法,该算法的结构大体如下图所示:

它由三部分组成:

  • 一个计时器,或者称为时间发生器。
  • 一个循环队列,一般通过算法由数组来实现。
  • 一个存储任务(task)的数据结构,可以是数组或者链表。

其中前两个决定了延迟的粒度,最后一个决定任务的存储方式;当任务执行完毕后,需要将任务从队列中删除,因此采用链表来存储任务,应该效率要比数组高很多(因为整个任务执行过程,不存在随机读取任务的需求)。

现在我们就用go语言来实现一个相对通用的延迟队列:

  1. 先来构建一个简单任务执行接口,具体的业务对象可以实现它来执行任务。
type Executor interface {
    DoDelayTask(contents string) error
}  
  1. 然后构建存储在时间轮中的任务,每个任务节点,我们采用链表实现。

//定义一个获取实现具体任务对象的工厂方法,该方法需要在业务项目中定义并实现
type BuildExecutor func(taskType string) Executor

//任务采用链表结构
type Task struct {
    //任务在时间轮上的循环次数,等于0时,执行该任务
    CycleCount   int
    //任务类型,用于工厂方法判断该使用哪个实现对象
    TaskType     string
    //任务方法参数
    TaskParams   string
    //任务工厂方法指针
    TaskExecutor BuildExecutor
    Next         *Task
}

//通过工厂方法获取具体实现,然后调用方法,执行任务
func (t *Task) Execute() error {
    if t.TaskExecutor != nil {
        executor := t.TaskExecutor(t.TaskType)
        if executor != nil {
            return executor.DoDelayTask(t.TaskParams)
        } else {
            return errors.New("executor is nil")
        }
    } else {
        return errors.New("task build executor is nil")
    }

}
  1. 最后,用时间轮算法实现时间轮,并将任务通知与延迟队列结合。
const (
    //轮的时间长度,目前设置为一个小时,也就是时间轮每循环一次需要1小时;默认时间轮上面的每走一步的最小粒度为1秒。
    WHEEL_SIZE = 3600
)

//定义一个获取实现具体任务对象的工厂方法,该方法需要在业务项目中定义并实现
type BuildExecutor func(taskType string) Executor

var onceNew sync.Once
var onceStart sync.Once

var delayQueueInstance *delayQueue

type wheel struct {
    //所有在时间轮上的任务均采用链表形式存储
    //如果采用数组,对应已经执行过的任务,会造成不必要的空间浪费或者数组移动造成的时间复杂度
    NotifyTasks *Task
}

type delayQueue struct {
    //循环队列
    TimeWheel    [WHEEL_SIZE]wheel
    CurrentIndex uint //时间轮当前指针
    Persistence
    //任务工厂方法指针, 需要在对象创建时初始化它
    TaskExecutor BuildExecutor
}

//单列方法,使用默认的redis持久方案
func GetDelayQueue(serviceBuilder BuildExecutor) *delayQueue {
    //保证只初始化一次
    onceNew.Do(func() {
        delayQueueInstance = &delayQueue{
            Persistence:  getRedisDb(),
            TaskExecutor: serviceBuilder,
        }
    })
    return delayQueueInstance
}

//单列方法,使用外部传入的持久方案
func GetDelayQueueWithPersis(serviceBuilder BuildExecutor, persistence Persistence) *delayQueue {
    if persistence == nil {
        log.Fatalf("persistance is null")
    }
    //保证只初始化一次
    onceNew.Do(func() {
        delayQueueInstance = &delayQueue{
            Persistence:  persistence,
            TaskExecutor: serviceBuilder,
        }
    })
    return delayQueueInstance
}

func (dq *delayQueue) Start() {
    //保证只会有一个时间轮计时器
    onceStart.Do(dq.init)
}

func (dq *delayQueue) init() {
    go func() {
        //从缓存中加载持久化的任务
        dq.loadTasksFromDb()
        for {
            select {
            //默认时间轮上的最小粒度为1秒
            case <-time.After(time.Second * 1):

                if dq.CurrentIndex >= WHEEL_SIZE {
                    dq.CurrentIndex = dq.CurrentIndex % WHEEL_SIZE
                }

                taskLinkHead := dq.TimeWheel[dq.CurrentIndex].NotifyTasks
                //遍历链表
                //当前节点前一指针
                prev := taskLinkHead
                //当前节点指针
                p := taskLinkHead
                for p != nil {
                    if p.CycleCount == 0 {
                        taskId := p.Id
                        //开启新的go routing 去做通知,加快每次遍历的速度,确保不会拖慢时间轮的运行
                        //如果任务有异常,尽量让具体的业务对象去处理,延迟队列不处理具体业务异常,
                        //这样可以保证延迟队列的业务单纯性,避免难以维护的问题。如果具体业务出现问题,需要重复通知,可以将任务重新加入队列即可。
                        go dq.ExecuteTask(p.TaskType, p.TaskParams)
                        //删除链表节点 task
                        //如果是第一个节点
                        if prev == p {
                            dq.TimeWheel[dq.CurrentIndex].NotifyTasks = p.Next
                            prev = p.Next
                            p = p.Next
                        } else {
                            //如果不是第一个节点
                            prev.Next = p.Next
                            p = p.Next
                        }
                        //从持久对象上删除该任务
                        dq.Persistence.Delete(taskId)

                    } else {
                        p.CycleCount--
                        prev = p
                        p = p.Next
                    }

                }

                dq.CurrentIndex++

            }
        }
    }()
}

func (dq *delayQueue) loadTasksFromDb() {
    tasks := dq.Persistence.GetList()
    if tasks != nil && len(tasks) > 0 {
        for _, task := range tasks {
            // fmt.Printf("%v\n", task)
            delaySeconds := ((task.CycleCount + 1) * WHEEL_SIZE) + task.WheelPosition
            if delaySeconds > 0 {
                dq.internalPush(time.Duration(delaySeconds)*time.Second, task.Id, task.TaskType, task.TaskParams, false)
            }
        }
    }
}

//将任务加入延迟队列
func (dq *delayQueue) Push(delaySeconds time.Duration, taskType string, taskParams interface{}) error {

    var pms string
    result, ok := taskParams.(string)
    if !ok {
        tp, _ := json.Marshal(taskParams)
        pms = string(tp)
    } else {
        pms = result
    }

    return dq.internalPush(delaySeconds, "", taskType, pms, true)
}

func (dq *delayQueue) internalPush(delaySeconds time.Duration, taskId string, taskType string, taskParams string, notNeedPresis bool) error {
    if int(delaySeconds.Seconds()) == 0 {
        errorMsg := fmt.Sprintf("the delay time cannot be less than 1 second, current is: %v", delaySeconds)
        log.Println(errorMsg)
        return errors.New(errorMsg)
    }
    //从当前时间指针处开始计时
    calculateValue := int(dq.CurrentIndex) + int(delaySeconds.Seconds())

    cycle := calculateValue / WHEEL_SIZE
    if cycle > 0 {
        cycle--
    }
    index := calculateValue % WHEEL_SIZE

    if taskId == "" {
        u := uuid.New()
        taskId = u.String()
    }
    task := &Task{
        Id:            taskId,
        CycleCount:    cycle,
        WheelPosition: index,
        TaskType:      taskType,
        TaskParams:    taskParams,
    }
    if dq.TimeWheel[index].NotifyTasks == nil {
        dq.TimeWheel[index].NotifyTasks = task
        // log.Println(dq.TimeWheel[index].NotifyTasks)
    } else {
        //将新任务插入链表头,由于任务之间没有顺序关系,这种实现最为简单
        head := dq.TimeWheel[index].NotifyTasks
        task.Next = head
        dq.TimeWheel[index].NotifyTasks = task
        // log.Println(dq.TimeWheel[index].NotifyTasks)
    }
    if notNeedPresis {
        //持久化任务
        dq.Persistence.Save(task)
    }

    return nil
}

//通过工厂方法获取具体实现,然后调用方法,执行任务
func (dq *delayQueue) ExecuteTask(taskType, taskParams string) error {
    if dq.TaskExecutor != nil {
        executor := dq.TaskExecutor(taskType)
        if executor != nil {
            log.Printf("Execute task: %s with params: %s\n", taskType, taskParams)

            return executor.DoDelayTask(taskParams)
        } else {
            return errors.New("executor is nil")
        }
    } else {
        return errors.New("task build executor is nil")
    }

}

//延迟队列上,某个时间轮上的任务数量
func (dq *delayQueue) WheelTaskQuantity(index int) int {
    tasks := dq.TimeWheel[index].NotifyTasks
    if tasks == nil {
        return 0
    }
    k := 0
    for p := tasks; p != nil; p = p.Next {
        k++
    }

    return k
}

上面我们用单列模式来构建延迟队列,因为很多时候,我们程序当中需要保证任何时刻只能有一个时间轮队列存在,不然会造成资源浪费。我们设置时间轮的大小和最小粒度分别为一个小时和一秒钟,这也是最为常用的一种方式。 用一个1秒的时间发生器配合select来驱动时间轮的运行,在每个轮节点上,遍历链表,查找需要执行的任务;任务执行时,无须处理异常,让具体业务对象去处理业务异常,保证时间轮的单纯性。
最后,我们写一个测试用例来测试时间轮的运行:

//创建一个测试业务对象,让它实现 Executor 接口   
type businessNotify struct {
}

func (bn *businessNotify) DoDelayTask(contents string) error {
    fmt.Println(fmt.Sprintf("Do task.....%s", contents))
    return nil
}

//一个工厂方法,通过类型去构建任务执行对象
func commonFactory(taskType string) Executor {
    //do filter business service by taskType ...
    //...
    return &businessNotify{}
}
//测试任务的执行
func TestTaskExecute(t *testing.T) {
    task := &Task{
        CycleCount:   1,
        TaskType:     "RetryNotify",
        TaskParams:   `{name: "raymond"}`,
        TaskExecutor: commonFactory,
    }
    task.Execute()
}

//测试延迟队列的运行
func TestRunDelayQueue(t *testing.T) {
    c := make(chan struct{})

    queue := GetDelayQueue()
    queue.Start()

    go func() {
        //模拟在其他地方将数据压入延迟队列
        q := GetDelayQueue()
        q.Push(5, "RetryNotify", `{name: "raymond"}`, commonFactory)
        q.Push(15, "GoodComments", `{name: "raymond"}`, commonFactory)
        q.Push(25, "TaskOne", `{name: "raymond"}`, commonFactory)
        q.Push(35, "TaskThree", `{name: "raymond"}`, commonFactory)
        q.Push(45, "TaskFour", `{name: "raymond"}`, commonFactory)

    }()

    c <- struct{}{}
}

现在各方面无误的话,可以看到时间轮正常运行,并在合适的时间执行对应的任务。

按理说一个简单的延迟队列就算实现完毕了,是不是我们就可以把它 run on production?且慢。。。一个真正可以在生产环境当中运行的延迟队列,还有很多事情需要考虑,比如:

  • 任务是否需要持久化?如果不持久化,如何保证程序挂掉后,未执行的任务可以恢复执行?
  • 采用何种持久化方案,更能适配当前的业务?
  • 任务出错后,该采用什么补救方案(比如重试),才能保证任务执行的可靠性。

下次,我们将慢慢细化该延迟队列,将它打造成一个可以在生产环境稳定运行的程序 。。。

(该项目源代码,请移步这里)