先前写过实现简单的协程调度器 一文,指出调度器的核心是调度算法。即当要执行多个任务时,需要一种调度方法,决定某个任务什么时候执行,什么时候暂停。对于一个定时事件调度器,它需要的算法很简单,以时间先后进行调度分配任务的执行。
如何让任务达到时间先后执行?很简单,对时间排序。但是,任务个数通常是一个态的过程,如果每次有新任务添加进来或删除都进行排序(sorting)性能很差。我们可以使用堆的构建过程,即添加任务—往堆中添加元素。
因此,定时事件调度器的核心就是堆这个数据结构和相关的操作。我们可以在和思想上自己实现,但Python标准库自带了sched
模块,那么接下来使用它的模块接口。
sched模块的基本使用
简单的剖析
一个例子
基本使用方法 这个模块简单到直接通过源码就能学会使用它。scheduler
类的enter
方法的参数签名如下:
enter(delay, priority, action, args=(), kwargs=None)
支持delay、优先级别,action函数的参数。
enterabs(time, priority, action, args=(), kwargs=None)
enterabs则是指定绝对时间。
一个简单的延时运行例子 下面的两个延时事件分别打印相关的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import schedimport timescheduler = sched.scheduler(time.time, time.sleep) def event_1 (delay, name ): now = time.time() elapsed = now - delay print ("event[%s]: elapsed=%s, now is %s" % (name, elapsed, time.ctime(now))) def event_2 (delay, name ): print ("event 2" , delay, name) def main (): scheduler.enter(2 , 1 , event_1, (time.time(), 'first' )) scheduler.enter(3 , 1 , event_2, (time.time(), 'second' )) print ("scheduler start at %s" % time.time()) scheduler.run() if __name__ == '__main__' : main()
默认调用run方法是阻塞,知道任务都运行退出。
运行和输出:
1 2 3 4 $python3 sched_delay.pyscheduler start at 1516230099.0156646 event[first]: elapsed=2.015648365020752, now is Thu Jan 18 07:01:41 2018 event 2 1516230099.0156646 second
重叠事件 我们知道run方法阻塞,事件调用也是单线程,因此如果某个事件运行过长,就超出了事件之间的延时,会出现重叠问题。这样并不会导致事件丢失。但可以通过延时后面的事件来解决问题。
接下来通过一个简单例子我们试试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timeimport scheddef cpu_bound_event (name ): print ("begin: " , time.ctime()) time.sleep(3 ) print ("end: " , time.ctime()) def main (): scheduler = sched.scheduler() scheduler.enter(2 , 1 , cpu_bound_event, ('first' ,)) scheduler.enter(3 , 1 , cpu_bound_event, ('second' ,)) scheduler.run() if __name__ == '__main__' : main()
运行输出:
1 2 3 4 begin: Thu Jan 18 07:14:09 2018 end: Thu Jan 18 07:14:12 2018 begin: Thu Jan 18 07:14:12 2018 end: Thu Jan 18 07:14:15 2018
可以看到,第一个事件运行完后第二个事件马上调用,因为它等待得足够长事件了。可以理解,延时最后都是通过绝对事件来比较的。
sched模块还有的功能就是优先级设置和事件的取消。
sched模块简单剖析 简单说说sched模块。
事件 sched
模块是一个通用定时调度器,可以在指定的恰当时刻(延时delay)运行。
从源码可以sched对事件action做了简单的封装:
1 2 3 4 5 6 7 class Event (namedtuple('Event' , 'time, priority, action, argument, kwargs' )): __slots__ = [] def __eq__ (s, o ): return (s.time, s.priority) == (o.time, o.priority) def __lt__ (s, o ): return (s.time, s.priority) < (o.time, o.priority) def __le__ (s, o ): return (s.time, s.priority) <= (o.time, o.priority) def __gt__ (s, o ): return (s.time, s.priority) > (o.time, o.priority) def __ge__ (s, o ): return (s.time, s.priority) >= (o.time, o.priority)
注意到,事件除了delay事件,还有优先级。对比标准库中sched模块中Event类的这种写法其实,还有一种更简洁的写法,直接使用functools.total_ordering
装饰器只要实现一个不等比较即可。
scheduler类 scheduler类包括如下实例方法:
enter(delay, priority, action, args=(), kwargs=None)
enter方法可以指定action的参数和delay事件和优先级。它们都是Event类的参数。delay事件最后通过time = self.timefunc() + delay
转化为绝对事件。
enterabs(time, priority, action, args=(), kwargs=None)
enterabs可以指定action执行的绝对事件。
取消事件。它本质上就是把事件从调度堆中移除。
run方法让调度器运行,默认是非阻塞情况,即等待最近的事件来临,否则会返回阻塞事件,让开发者自己运用这端本来阻塞的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class scheduler : def run (self, blocking=True ): lock = self._lock q = self._queue delayfunc = self.delayfunc timefunc = self.timefunc pop = heapq.heappop while True : with lock: if not q: break time, priority, action, argument, kwargs = q[0 ] now = timefunc() if time > now: delay = True else : delay = False pop(q) if delay: if not blocking: return time - now delayfunc(time - now) else : action(*argument, **kwargs) delayfunc(0 )
整个事件调度的核心就是run方法。通过源码可以理解它处理事件重叠的方法。
当然我们也可以重写run方法,让scheduler以线程的方式运行。
重写scheduler类
让任务以多线程的方式运行
让scheduler以多线程方式运行
让任务周期性运行
让任务以多线程的方式运行
用于enter
方法最终调用enterabs
方法,因此只需要重写enterabs
方法即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import schedimport threadingimport functoolsimport heapq_sentinel = object () def threadize (func ): @functools.wraps(func ) def wrapper (*args, **kwargs ): target = functools.partial(func, *args, **kwargs) task = threading.Thread(target=target, daemon=True ) task.start() return task.ident return wrapper class ThreadingEventScheduler (sched.scheduler): def enterabs (self, time, priority, action, argument=( ), kwargs=_sentinel ): if kwargs is _sentinel: kwargs = {} action = threadize(action) event = sched.Event(time, priority, action, argument, kwargs) with self._lock: heapq.heappush(self._queue, event) return event
让scheduler以多线程方式运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import threadingimport schedimport timeclass ThreadingScheduler (sched.scheduler): def run (self ): threading.Thread(target=super ().run).start() s = ThreadingScheduler() s.enter(2 , 1 , print , ('first' ,)) s.run() print ('before' )time.sleep(3 )
转载请包括本文地址:https://allenwind.github.io/blog/5447 更多文章请参考:https://allenwind.github.io/blog/archives/