timewheel简介

  • 时间轮是一个环形队列,底层实现就是一个固定长度的数组,数组中的每个元素存储一个双向列表,这个列表存放着该时间内需要执行的所有任务

例子

  • 抽象点来说时钟表盘就是1秒为一个时间刻度,一共(一天)会有86400个刻度的时间轮,当指针走到那个刻度的时候就可以把对应的任务全部取出执行
  • 也就是说这里我们定义了一个可以延迟86400秒的时候轮,不过当我们定一个86401秒后执行的任务怎么办
    • 方案一 多级时间轮 (这里不展开描述)
    • 方案二 定义circle参数 (本文), 一轮是86400秒的话,86401= 86400 + 1 也就是 1circle(一轮)加上一个刻度就可以取出该任务执行

结构体

type TimeWheel struct {
	interval     time.Duration 
	slots        []*list.List
	slotsNum     int64
	currentSlots int64
	ticker       *time.Ticker
	mt           sync.Mutex
	isRun        bool
	tasks        sync.Map
	addTaskCh    chan *Task
	removeTaskCh chan string
	closeCh      chan struct{}
}

type Task struct {
	ID         string
	createTime time.Time
	delay      time.Duration
	slots      int64
	circle     int64 // 多少圈
	job        Job
	times      int64 //执行多少次 -1 一直执行
}

启动时间轮

//定义定时器驱动时间轮
t.ticker = time.NewTicker(t.interval)
		
func (t *TimeWheel) run() {
	for {
		select {
		case _ = <-t.ticker.C:
			t.runTask()
		case task := <-t.addTaskCh:
			t.addTask(task, true)
		case id := <-t.removeTaskCh:
			t.delTask(id)
		case _ = <-t.closeCh:
			t.ticker.Stop()
			break
		}
	}
}

向时间轮添加任务

func (t *TimeWheel) AddTask(ID string, job Job, delay time.Duration, times ...int64) error {
	if ID == "" {
		return errors.New("ID is empty")
	}
	if delay < t.interval {
		return errors.New("the delay time must be greater than the interval time")
	}
	var timesInt64 int64 = 1
	if len(times) > 0 {
		timesInt64 = times[0]
	}
	_, ok := t.tasks.Load(ID)
	if ok {
		return errors.New("ID already exists")
	}
	task := &Task{
		ID:         ID,
		createTime: time.Now(),
		job:        job,
		delay:      delay,
		times:      timesInt64,
	}
	t.addTaskCh <- task
	return nil
}
func (t *TimeWheel) addTask(task *Task, first bool) {
	task.circle, task.slots = t.getCircleAndSlots(task.delay, first)
	ele := t.slots[task.slots].PushBack(task)
	t.tasks.Store(task.ID, ele)
}

func (t *TimeWheel) getCircleAndSlots(delay time.Duration, first bool) (circle, slots int64) {
	delaySed := int64(delay.Seconds())
	intervalSed := int64(t.interval.Seconds())
	circle = delaySed / intervalSed / t.slotsNum
	slots = delaySed - (t.slotsNum * intervalSed * circle) + t.currentSlots
	if slots == t.currentSlots && circle > 0 {
		circle--
	}
	//第一次加入时 当前秒(currentSlots)还未执行,比如当前是第一秒的slot(0) 延迟5秒计算得出为5 (0~5有6格所有需要-1)
	//第二次加入时 当前秒(currentSlots)已经执行,就不需要-1
	if slots > 0 && first {
		slots--
	}
	return
}

时间轮删除任务

func (t *TimeWheel) RemoveTask(ID string) error {
	_, ok := t.tasks.Load(ID)
	if !ok {
		return errors.New("ID does not exist")
	}
	t.removeTaskCh <- ID
	return nil
}

func (t *TimeWheel) delTask(id string) {
	if val, ok := t.tasks.Load(id); ok {
		task := val.(*list.Element).Value.(*Task)
		t.slots[task.slots].Remove(val.(*list.Element))
		t.tasks.Delete(task.ID)
	}
}

完整代码地址