先前写过实现简单的协程调度器一文,指出调度器的核心是调度算法。即当要执行多个任务时,需要一种调度方法,决定某个任务什么时候执行,什么时候暂停。对于一个定时事件调度器,它需要的算法很简单,以时间先后进行调度分配任务的执行。

如何让任务达到时间先后执行?很简单,对时间排序。但是,任务个数通常是一个态的过程,如果每次有新任务添加进来或删除都进行排序(sorting)性能很差。我们可以使用堆的构建过程,即添加任务—往堆中添加元素。

因此,定时事件调度器的核心就是堆这个数据结构和相关的操作。我们可以在和思想上自己实现,但Python标准库自带了sched模块,那么接下来使用它的模块接口。

  1. sched模块的基本使用
  2. 简单的剖析
  3. 一个例子

基本使用方法

这个模块简单到直接通过源码就能学会使用它。scheduler类的enter方法的参数签名如下:

  • enter(delay, priority, action, args=(), kwargs=None)

支持delay、优先级别,action函数的参数。

  • enterabs(time, priority, action, args=(), kwargs=None)

enterabs则是指定绝对时间。

一个简单的延时运行例子

下面的两个延时事件分别打印相关的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import sched
import time

scheduler = sched.scheduler(time.time, time.sleep)

def event_1(delay, name):
now = time.time()
elapsed = now - delay
print("event[%s]: elapsed=%s, now is %s" % (name, elapsed, time.ctime(now)))

def event_2(delay, name):
print("event 2", delay, name)

def main():
scheduler.enter(2, 1, event_1, (time.time(), 'first'))
scheduler.enter(3, 1, event_2, (time.time(), 'second'))
print("scheduler start at %s" % time.time())
scheduler.run()

if __name__ == '__main__':
main()

默认调用run方法是阻塞,知道任务都运行退出。

运行和输出:

1
2
3
4
$python3 sched_delay.py
scheduler start at 1516230099.0156646
event[first]: elapsed=2.015648365020752, now is Thu Jan 18 07:01:41 2018
event 2 1516230099.0156646 second

重叠事件

我们知道run方法阻塞,事件调用也是单线程,因此如果某个事件运行过长,就超出了事件之间的延时,会出现重叠问题。这样并不会导致事件丢失。但可以通过延时后面的事件来解决问题。

接下来通过一个简单例子我们试试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
import sched

def cpu_bound_event(name):
print("begin: ", time.ctime())
time.sleep(3)
print("end: ", time.ctime())

def main():
scheduler = sched.scheduler()
scheduler.enter(2, 1, cpu_bound_event, ('first',))
scheduler.enter(3, 1, cpu_bound_event, ('second',))
scheduler.run()

if __name__ == '__main__':
main()

运行输出:

1
2
3
4
begin:  Thu Jan 18 07:14:09 2018
end: Thu Jan 18 07:14:12 2018
begin: Thu Jan 18 07:14:12 2018
end: Thu Jan 18 07:14:15 2018

可以看到,第一个事件运行完后第二个事件马上调用,因为它等待得足够长事件了。可以理解,延时最后都是通过绝对事件来比较的。

sched模块还有的功能就是优先级设置和事件的取消。

sched模块简单剖析

简单说说sched模块。

事件

sched模块是一个通用定时调度器,可以在指定的恰当时刻(延时delay)运行。

从源码可以sched对事件action做了简单的封装:

1
2
3
4
5
6
7
class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')):
__slots__ = []
def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority)
def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority)
def __le__(s, o): return (s.time, s.priority) <= (o.time, o.priority)
def __gt__(s, o): return (s.time, s.priority) > (o.time, o.priority)
def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority)

注意到,事件除了delay事件,还有优先级。对比标准库中sched模块中Event类的这种写法其实,还有一种更简洁的写法,直接使用functools.total_ordering装饰器只要实现一个不等比较即可。

scheduler类

scheduler类包括如下实例方法:

  • enter(delay, priority, action, args=(), kwargs=None)

enter方法可以指定action的参数和delay事件和优先级。它们都是Event类的参数。delay事件最后通过time = self.timefunc() + delay转化为绝对事件。

  • enterabs(time, priority, action, args=(), kwargs=None)

enterabs可以指定action执行的绝对事件。

  • def cancel(self, event)

取消事件。它本质上就是把事件从调度堆中移除。

  • run(self, blocking=True)

run方法让调度器运行,默认是非阻塞情况,即等待最近的事件来临,否则会返回阻塞事件,让开发者自己运用这端本来阻塞的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class scheduler:

def run(self, blocking=True):
lock = self._lock
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
while True:
with lock:
if not q:
break
time, priority, action, argument, kwargs = q[0]
now = timefunc()
if time > now:
delay = True
else:
delay = False
pop(q)
if delay:
if not blocking:
return time - now
delayfunc(time - now)
else:
action(*argument, **kwargs)
delayfunc(0) # Let other threads run

整个事件调度的核心就是run方法。通过源码可以理解它处理事件重叠的方法。

当然我们也可以重写run方法,让scheduler以线程的方式运行。

重写scheduler类

  1. 让任务以多线程的方式运行
  2. 让scheduler以多线程方式运行
  3. 让任务周期性运行

  4. 让任务以多线程的方式运行

用于enter方法最终调用enterabs方法,因此只需要重写enterabs方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import sched
import threading
import functools
import heapq

_sentinel = object()

def threadize(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
target = functools.partial(func, *args, **kwargs)
task = threading.Thread(target=target, daemon=True)
task.start()
return task.ident
return wrapper

class ThreadingEventScheduler(sched.scheduler):

def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
if kwargs is _sentinel:
kwargs = {}
action = threadize(action)
event = sched.Event(time, priority, action, argument, kwargs)
with self._lock:
heapq.heappush(self._queue, event)
return event
  1. 让scheduler以多线程方式运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import sched
import time

class ThreadingScheduler(sched.scheduler):

def run(self):
threading.Thread(target=super().run).start()

s = ThreadingScheduler()
s.enter(2, 1, print, ('first',))
s.run()
print('before')
time.sleep(3)

转载请包括本文地址:https://allenwind.github.io/blog/5447
更多文章请参考:https://allenwind.github.io/blog/archives/