本文將按以下結構進行組織,說明tornado中協(xié)程的履行原理
Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 ―― [ 維基百科 ]
我們在平常編程中,更習慣使用的是子例程(subroutine),通俗的叫法是函數(shù),或進程。子例程,常常只有1個入口(函數(shù)調用,實參通過傳給形參開始履行),1個出口(函數(shù)return,履行終了,或引發(fā)異常,將控制權轉移給調用者)。但協(xié)程是子例程基礎上,1種更加寬泛定義的計算機程序模塊(子例程可以看作協(xié)程的特例),它可以有多個入口點,允許從1個入口點,履行到下1個入口點之前暫停,保存履行狀態(tài),等到適合的時機恢復履行狀態(tài),從下1個入口點重新開始履行,這也是協(xié)程應當具有的能力。
1個入口點和下1個入口點(或退出點)中的代碼。
由n個入口點代碼,和n個協(xié)程代碼塊組成。第1個入口點通常是1個函 數(shù)入口點。其組織情勢如:函數(shù)入口點->協(xié)程代碼塊->入口點->協(xié)程代碼塊…,入口點和代碼塊相間。
1個同步函數(shù)的函數(shù)體是線性履行的。也就是說1個模塊中的每行代碼,相繼履行,1個模塊在履行中,如果還沒有履行終了,不會去履行其他模塊的代碼。稱這樣的代碼模塊為線性模塊。
1個協(xié)程模塊,如果只含有單1入口點和單1協(xié)程代碼塊(假定這個協(xié)程代碼塊全是同步代碼),固然這個協(xié)程模塊是1個線性履行模塊,但是如果含有多個入口點和多個協(xié)程代碼塊,那末就不是1個線性模塊。那末履行1個協(xié)程模塊進程實際是分散的(不同的時間段,履行不同的協(xié)程代碼塊,協(xié)程代碼塊的履行時間段,彼此不相交),但也是順序的(后1個協(xié)程代碼塊在前1個協(xié)程代碼塊履行結束后才履行)。兩個屬于同1協(xié)程模塊的相繼協(xié)程代碼塊履行的中間時間間隙,可能有很多其他協(xié)程模塊的協(xié)程代碼片斷在履行。
談到協(xié)程,必須要說說python語義中的生成器(generator)。
在pep255中提到了”simple generator”和”yield語句”(此時還不是”yield表達式”)的實現(xiàn)。1個basic idea,提供1種函數(shù),能夠返回中間結果給調用者,然后保護函數(shù)的局部狀態(tài),以便函數(shù)當離開后,也能恢復履行。
prep255及第了1個簡單的例子,生成斐波那契數(shù)列:
def fib():
a, b = 0, 1
while 1:
yield b
a, b = b, a+b
a,b初始化為0,1。當yield b被履行,1被返回給調用者。當fib恢復履行,a,變成了1,b也是1,然后將1返回給調用者,如此循環(huán)。generator是1種非常自然的編程方式,由于對fib來講,它的功能不變,都還是不斷生成下1個斐波那契數(shù)。而對fib的調用者來講,fib像1個列表的迭代器,不斷迭代,可以獲得下1個斐波那契數(shù)。
def caller():
for num in fib():
print num
生成器是1個含有yield表達式的函數(shù),此時該函數(shù)叫生成器。1個生成器永久是異步的,即便生成器模塊中含有阻塞代碼。由于調用1個生成器,生成器的參數(shù)會綁定到生成器,結果返回是1個生成器對象,它的類型是types.GeneratorType,不會去履行生成器主模塊中的代碼。
每次調用1個GeneratorType對象的next方法,生成器函數(shù)履行到下1個yield語句或,或碰到1個return語句,或履行到生成器函數(shù)結束。
在pep342中,對Generator進1步加強,增加了GeneratorType的send方法,和yield表達式語義。yield表達式,可以作為等號右側的表達式。如果對Generator調用send(None)方法,生成器函數(shù)會從開始1直履行到y(tǒng)ield表達式。那末下1次對Generator調用send(argument),Generator恢復履行。那末可以在生成器函數(shù)體內取得這個argument,這個argument將會作為yield表達式的返回值。
從上面可以看到,Generator已具有協(xié)程的1些能力。如:能夠暫停履行,保存狀態(tài);能夠恢復履行;能夠異步履行。
但是此時Generator還不是1個協(xié)程。1個真實的協(xié)程能夠控制代碼甚么時候繼續(xù)履行。而1個Generator履行遇到1個yield表達式 或語句,會將履行控制權轉移給調用者。
However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 ―― [ 維基百科 ]
在維基百科中提到,可以實現(xiàn)1個頂級的調度子例程,將履行控制權轉移回Generator,從而讓它繼續(xù)履行。在tornado中,ioLoop就是這樣的頂級調度子例程,每一個協(xié)程模塊通過,函數(shù)裝潢器coroutine和ioLoop進行通訊,從而ioLoop可以在協(xié)程模塊履行暫停后,在適合的時機重新調度協(xié)程模塊履行。
不過,接下來還不能介紹coroutine和ioLoop,在介紹這二者之前,先得明白tornado中在協(xié)程環(huán)境中1個非常重要的類Future.
Future類位于tornado源碼的concurrent模塊中。Future類的完全代碼,請查看tornado的源碼。在這里截取1部份代碼作為分析之用
class Future(object):
def done(self):
return self._done
def result(self, timeout=None):
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
def set_result(self, result):
self._result = result
self._set_done()
def _set_done(self):
self._done = True
for cb in self._callbacks:
try:
cb(self)
except Exception:
app_log.exception('exception calling callback %r for %r',
cb, self)
self._callbacks = None
Future的_result成員是不是被設置
獲得Future對象的結果
添加1個回調函數(shù)fn給Future對象。如果這個Future對象已done,則直接履行fn,否則將fn加入到Future類的1個成員列表中保存。
1個內部函數(shù),主要是遍歷列表,逐一調用列表中的callback函數(shù),也就是前面add_done_calback加如來的。
給Future對象設置result,并且調用_set_done。也就是說,當Future對象取得result后,所有add_done_callback加入的回調函數(shù)就會履行。
Future封裝了異步操作的結果。實際是它類似于在網(wǎng)頁html前端中,圖片異步加載的占位符,但加載后終究也是1個完全的圖片。Future也是一樣用途,tornado使用它,終究希望它被set_result,并且調用1些回調函數(shù)。Future對象實際是coroutine函數(shù)裝潢器和IOLoop的溝通使者,有著非常重要的作用。
tornado框架的底層核心類,位于tornado的ioloop模塊。功能方面類似win32窗口的消息循環(huán)。每一個窗口可以綁定1個窗口進程。窗口進程主要是1個消息循環(huán)在履行。消息循環(huán)主要任務是利用PeekMessage系統(tǒng)調用,從消息隊列中取出各種類型的消息,判斷消息的類型,然后交給特定的消息handler進行履行。
tornado中的IOLoop與此相比具有很大的類似性,在協(xié)程運行環(huán)境中擔負著協(xié)程調度器的角色, 和win32的消息循環(huán)本質上都是1種事件循環(huán),等待事件,然后運行對應的事件處理器(handler)。不過IOLoop主要調度處理的是IO事件(如讀,寫,毛病)。除此以外,還能調度callback和timeout事件。
在本博文中,我們暫時只關注callback事件,由于這個與協(xié)程調度的相干性最大。
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))
add_future函數(shù)在基類IOLoop中實現(xiàn),函數(shù)參數(shù)是1個Future對象和1個callback函數(shù)。當Future對象被set_result,履行1個回調函數(shù),是個lambda函數(shù),在lambda函數(shù)中調用IOLoop的add_callback函數(shù)。將add_future的參數(shù)callback加入到IOLoop的統(tǒng)1調度中,讓callback在IOLoop下1次迭代中履行。
def add_callback(self, callback, *args, **kwargs):
with self._callback_lock:
if self._closing:
raise RuntimeError("IOLoop is closing")
list_empty = not self._callbacks
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if list_empty and thread.get_ident() != self._thread_ident:
self._waker.wake()
add_callback函數(shù)主要在IOLoop的子類PollIOLoop中實現(xiàn)。也很容易理解。
將傳入的callback函數(shù),利用偏函數(shù)進行包裝,將所有callback真正運行時需要的參數(shù),都綁定到生成的偏函數(shù)中,實際上就是找個地方把callback運行時需要的參數(shù)保存起來。將包裝好的偏函數(shù)加入到回調函數(shù)列表。當IOLoop下1次迭代運行的時候,遍歷callback函數(shù)列表,運行偏函數(shù)的時候,就不再需要傳入?yún)?shù)履行,效果同等于用實參運行callback。
IOLoop對象調用start函數(shù),會運行event loop。在event loop中,首先遍歷callback列表,履行回調函數(shù),然后遍歷timeout列表,履行timeoutCallback。最后才履行ioHandler。
函數(shù)裝潢器本質是1個函數(shù),我們稱這個函數(shù)為裝潢器函數(shù)。裝潢器函數(shù)簽名含有1個 函數(shù)對象(可調用對象callable)參數(shù),返回的結果是1個裝潢器內部定義的1個新函數(shù)對象。如果返回的函數(shù)對象被調用,裝潢器函數(shù)的參數(shù)(函數(shù)對象)也會被調用。不過,會在這個參數(shù)(裝潢器函數(shù)參數(shù))調用前做1些事情,或在這個參數(shù)調用后做1些事情。實際上做的這些事情,就是利用內部自定義的函數(shù)對象對參數(shù)(原函數(shù))的1些裝潢(額外操作)
當1個函數(shù)被裝潢器裝潢。那末以后調用這個函數(shù)(此函數(shù)已非彼函數(shù))的時候,實際上調用的是裝潢器函數(shù)返回的內部函數(shù)對象。理解tornado中coroutine修飾的函數(shù)如何履行,主要是 理解coroutine這個裝潢器函數(shù)內部定義的新函數(shù)對象所做的那些事兒。
def coroutine(func, replace_callback=True):
return _make_coroutine_wrapper(func, replace_callback=True)
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
if self.handle_yield(first_yielded):
self.run()
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
try:
value = future.result()
except Exception:
self.had_exception = True
yielded = self.gen.throw(*sys.exc_info())
else:
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
return
if not self.handle_yield(yielded):
return
finally:
self.running = False
def handle_yield(self, yielded):
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
以上的代碼其實都對源碼進行了1些調劑。但函數(shù)調用進入到Runner的構造函數(shù)的時候,也就是說Generator的第1次履行已終了。那末接下來,調用的是,handle_yield,對第1次Generator履行的返回結果進行處理。固然返回的結果多是多種類型。多是1個Future對象,list,dict,或其他類型對象,或普通類型。通過convert_yield,self.future保存的是1個Future對象的援用(第1次Generator履行返回的結果)。此時如果self.future還沒被set_result。對為self.future綁定1個done_callback(lambda f: self.run()),加入到self.io_loop中。
在前文說到。ioloop的add_future函數(shù)中,實際上是只有當參數(shù)future,在某個地方調用了set_result, 才在履行done_callback時,將參數(shù)callback加入到IOLoop中調度。換句話說。Runner類中,self.run要等到self.future在某個代碼塊被set_result,IOLoop才有可能在下1次迭代的時候履行它,從而調度協(xié)程繼續(xù)恢復履行。而在self.run函數(shù)中,我們可以看到將會通過Generator的send函數(shù),恢復履行下1個協(xié)程代碼塊。所以關鍵的問題是我們需要明白Runner類中self.future,在甚么時候被set_result?
從這里我們可以看到Future類的重要作用。future.set_result起到的作用是:
發(fā)送1個信號,告知IOLoop去調度暫停的協(xié)程繼續(xù)履行。
我們結合下面的代碼例子就能夠明白協(xié)程調度的全部流程是如何進行的了。
import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future
@coroutine
def asyn_sum(a, b):
print("begin calculate:sum %d+%d"%(a,b))
future = Future()
def callback(a, b):
print("calculating the sum of %d+%d:"%(a,b))
future.set_result(a+b)
tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
result = yield future
print("after yielded")
print("the %d+%d=%d"%(a, b, result))
def main():
asyn_sum(2,3)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
實際的運行場景是:1個協(xié)程(asyn_sum)遇到y(tǒng)ield表達式被暫停履行后,IOLoop調用另外1個代碼段(asyn_sum中的回調函數(shù)callback)履行,而在callback中,恰好可以訪問到屬于被暫停協(xié)程(asyn_sum)中的future對象(也就是Runner對象中的self.future的援用),callback中將future調用set_result,那末這個暫停的協(xié)程(asyn_sum)在IOLoop下1次迭代調度回調函數(shù)時中,被恢復履行。
tornado中的協(xié)程實現(xiàn)基于python語言的Generator并且結合1個全局的調度器IOLoop,Generator通過函數(shù)裝潢器coroutine和IOLoop進行通訊。IOLoop并沒有直接控制能力,調度恢復被暫停的協(xié)程繼續(xù)履行。future對象在協(xié)程中被yield。協(xié)程暫停,IOLoop調度另外1個代碼模塊履行,而在這個履行的代碼模塊中恰好,可以訪問這個future對象,將其set_result,結果通過IOLoop間接恢復暫停協(xié)程履行。不同履行代碼模塊中,同享future對象,彼此合作,協(xié)程調度得順利履行。
從這類意義上來講,future對象,像window中的Event內核對象的作用。window中的event用于線程中同步。而協(xié)程中的yield future相當于WaitForSingleObject(event_object), 而future.set_result(result)。相當于SetEvent(event_object)。而future和Event的不同點在于,協(xié)程借future來恢復履行,而線程借Event來進行線程間同步。