1# -*- coding: utf-8 -*- 2 3from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine 4from typing import Any, Callable, cast, Dict, Optional, Tuple 5 6from pyee.base import EventEmitter 7 8__all__ = ["AsyncIOEventEmitter"] 9 10 11class AsyncIOEventEmitter(EventEmitter): 12 """An event emitter class which can run asyncio coroutines in addition to 13 synchronous blocking functions. For example:: 14 15 @ee.on('event') 16 async def async_handler(*args, **kwargs): 17 await returns_a_future() 18 19 On emit, the event emitter will automatically schedule the coroutine using 20 ``asyncio.ensure_future`` and the configured event loop (defaults to 21 ``asyncio.get_event_loop()``). 22 23 Unlike the case with the EventEmitter, all exceptions raised by 24 event handlers are automatically emitted on the ``error`` event. This is 25 important for asyncio coroutines specifically but is also handled for 26 synchronous functions for consistency. 27 28 When ``loop`` is specified, the supplied event loop will be used when 29 scheduling work with ``ensure_future``. Otherwise, the default asyncio 30 event loop is used. 31 32 For asyncio coroutine event handlers, calling emit is non-blocking. 33 In other words, you do not have to await any results from emit, and the 34 coroutine is scheduled in a fire-and-forget fashion. 35 """ 36 37 def __init__(self, loop: Optional[AbstractEventLoop] = None): 38 super(AsyncIOEventEmitter, self).__init__() 39 self._loop: Optional[AbstractEventLoop] = loop 40 41 def _emit_run( 42 self, 43 f: Callable, 44 args: Tuple[Any, ...], 45 kwargs: Dict[str, Any], 46 ): 47 try: 48 coro: Any = f(*args, **kwargs) 49 except Exception as exc: 50 self.emit("error", exc) 51 else: 52 if iscoroutine(coro): 53 if self._loop: 54 # ensure_future is *extremely* cranky about the types here, 55 # but this is relatively well-tested and I think the types 56 # are more strict than they should be 57 fut: Any = ensure_future(cast(Any, coro), loop=self._loop) 58 else: 59 fut = ensure_future(cast(Any, coro)) 60 elif isinstance(coro, Future): 61 fut = cast(Any, coro) 62 else: 63 return 64 65 def callback(f): 66 if f.cancelled(): 67 return 68 69 exc: Exception = f.exception() 70 if exc: 71 self.emit("error", exc) 72 73 fut.add_done_callback(callback) 74