1# -*- coding: utf-8 -*-
2
3from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine
4from typing import Any, Callable, cast, Dict, Optional, Tuple
5
6from pyee.base import EventEmitter
7
8__all__ = ["AsyncIOEventEmitter"]
9
10
11class AsyncIOEventEmitter(EventEmitter):
12    """An event emitter class which can run asyncio coroutines in addition to
13    synchronous blocking functions. For example::
14
15        @ee.on('event')
16        async def async_handler(*args, **kwargs):
17            await returns_a_future()
18
19    On emit, the event emitter  will automatically schedule the coroutine using
20    ``asyncio.ensure_future`` and the configured event loop (defaults to
21    ``asyncio.get_event_loop()``).
22
23    Unlike the case with the EventEmitter, all exceptions raised by
24    event handlers are automatically emitted on the ``error`` event. This is
25    important for asyncio coroutines specifically but is also handled for
26    synchronous functions for consistency.
27
28    When ``loop`` is specified, the supplied event loop will be used when
29    scheduling work with ``ensure_future``. Otherwise, the default asyncio
30    event loop is used.
31
32    For asyncio coroutine event handlers, calling emit is non-blocking.
33    In other words, you do not have to await any results from emit, and the
34    coroutine is scheduled in a fire-and-forget fashion.
35    """
36
37    def __init__(self, loop: Optional[AbstractEventLoop] = None):
38        super(AsyncIOEventEmitter, self).__init__()
39        self._loop: Optional[AbstractEventLoop] = loop
40
41    def _emit_run(
42        self,
43        f: Callable,
44        args: Tuple[Any, ...],
45        kwargs: Dict[str, Any],
46    ):
47        try:
48            coro: Any = f(*args, **kwargs)
49        except Exception as exc:
50            self.emit("error", exc)
51        else:
52            if iscoroutine(coro):
53                if self._loop:
54                    # ensure_future is *extremely* cranky about the types here,
55                    # but this is relatively well-tested and I think the types
56                    # are more strict than they should be
57                    fut: Any = ensure_future(cast(Any, coro), loop=self._loop)
58                else:
59                    fut = ensure_future(cast(Any, coro))
60            elif isinstance(coro, Future):
61                fut = cast(Any, coro)
62            else:
63                return
64
65            def callback(f):
66                if f.cancelled():
67                    return
68
69                exc: Exception = f.exception()
70                if exc:
71                    self.emit("error", exc)
72
73            fut.add_done_callback(callback)
74