Package mirai_extensions.trigger

事件触发器:提供更多处理事件的方式。

Expand source code
# -*- coding: utf-8 -*-
"""
事件触发器:提供更多处理事件的方式。
"""

__version__ = '0.3.1'
__author__ = '忘忧北萱草'

from mirai_extensions.trigger.filter import Filter
from mirai_extensions.trigger.handler import HandlerControl
from mirai_extensions.trigger.interrupt import InterruptControl
from mirai_extensions.trigger.message import (
    FriendMessageFilter, GroupMessageFilter, TempMessageFilter
)
from mirai_extensions.trigger.trigger import Trigger

__all__ = [
    'Filter', 'Trigger', 'InterruptControl', 'HandlerControl',
    'FriendMessageFilter', 'GroupMessageFilter', 'TempMessageFilter'
]

Sub-modules

mirai_extensions.trigger.filter
mirai_extensions.trigger.handler

本模块提供过滤器总线相关。

mirai_extensions.trigger.interrupt

本模块提供中断控制器相关。

mirai_extensions.trigger.message
mirai_extensions.trigger.trigger

此模块提供事件触发器相关。

Classes

class Filter (event_name: Type[~TEvent], mixin: Optional[Iterable[BaseFilter[~TEvent]]] = None, func: Optional[Callable[[~TEvent], Optional[Any]]] = None)

事件过滤器,允许用户传入自定义的捕获函数。

捕获函数可以在创建过滤器时设置,或者通过装饰器的方式设置。

# 方式一
def filter_one_func(event: FriendMessage):
    if event.sender.id == 12345678:
        return event.sender.nickname or ''
filter_one = Filter(FriendMessage, func=filter_one_func)

# 方式二
@Filter(FriendMessage)
def filter_two(event: FriendMessage):
    if event.sender.id == 12345678:
        return event.sender.nickname or ''

当使用类装饰器的方式创建过滤器时,被装饰函数的名称将失效。比如上例中,filter_two 将不再是函数, 而是成为 Filter 实例。

上例中的过滤器将检测好友消息的发送对象,只有来自 12345678 的消息会被捕获。其他情况下,过滤器返回默认值 None, 不会被捕获。

Args

event_name
过滤器捕获的事件类型。
mixin
过滤器混入。 过滤器会先检查混入的过滤器,若任何一个未捕获,直接停止捕获,返回“未捕获”状态。
func
自定义的捕获函数。
Expand source code
class Filter(BaseFilter[TEvent]):
    """事件过滤器,允许用户传入自定义的捕获函数。

    捕获函数可以在创建过滤器时设置,或者通过装饰器的方式设置。

    ```python
    # 方式一
    def filter_one_func(event: FriendMessage):
        if event.sender.id == 12345678:
            return event.sender.nickname or ''
    filter_one = Filter(FriendMessage, func=filter_one_func)

    # 方式二
    @Filter(FriendMessage)
    def filter_two(event: FriendMessage):
        if event.sender.id == 12345678:
            return event.sender.nickname or ''
    ```

    当使用类装饰器的方式创建过滤器时,被装饰函数的名称将失效。比如上例中,`filter_two` 将不再是函数,
    而是成为 Filter 实例。

    上例中的过滤器将检测好友消息的发送对象,只有来自 12345678 的消息会被捕获。其他情况下,过滤器返回默认值 None,
    不会被捕获。
    """
    event_name: Type[TEvent]
    """过滤器捕获的事件类型。"""

    def __init__(
        self,
        event_name: Type[TEvent],
        mixin: Optional[Iterable[BaseFilter[TEvent]]] = None,
        func: Optional[TFilter[TEvent]] = None
    ):
        """
        Args:
            event_name: 过滤器捕获的事件类型。
            mixin: 过滤器混入。
                过滤器会先检查混入的过滤器,若任何一个未捕获,直接停止捕获,返回“未捕获”状态。
            func: 自定义的捕获函数。
        """
        super().__init__(mixin or [])
        self.event_name = event_name
        self._func: Optional[TFilter[TEvent]] = func

    def __call__(self, func: TFilter[TEvent]):
        self._func = func
        return self

    def _catch(self, event: TEvent) -> Optional[Any]:
        if self._func is None:
            return event
        return self._func(event)

Ancestors

Subclasses

Class variables

var event_name : Type[~TEvent]

过滤器捕获的事件类型。

Inherited members

class FriendMessageFilter (friend: Union[mirai.models.entities.Friend, int, None] = None, quote: Union[mirai.models.events.MessageEvent, mirai.models.message.MessageChain, int, None] = None, func: Optional[Callable[[mirai.models.events.FriendMessage], Optional[Any]]] = None)

好友消息触发器。判断只接受指定好友发来的消息,或必需引用某条消息。

Args

friend
指定好友。
quote
指定引用某条消息。
priority
优先级,小者优先。
filter
过滤器。
Expand source code
class FriendMessageFilter(Filter[FriendMessage]):
    """好友消息触发器。判断只接受指定好友发来的消息,或必需引用某条消息。"""
    def __init__(
        self,
        friend: Union[Friend, int, None] = None,
        quote: Union[MessageEvent, MessageChain, int, None] = None,
        func: Optional[TFilter[FriendMessage]] = None
    ):
        """
        Args:
            friend: 指定好友。
            quote: 指定引用某条消息。
            priority: 优先级,小者优先。
            filter: 过滤器。
        """
        mixin = []
        if friend:
            mixin.append(FriendFilter(friend))
        if quote:
            mixin.append(QuoteFilter(quote))

        super().__init__(FriendMessage, mixin, func)

        if quote:
            logger.warning(
                "FriendMessageTrigger 的 quote 参数目前由于不明原因,有时候不能正常使用,请注意。"
            )

Ancestors

Inherited members

class GroupMessageFilter (group: Union[mirai.models.entities.Group, int, TNotSpecified, None] = <object object>, group_member: Union[mirai.models.entities.GroupMember, int, None] = None, quote: Union[mirai.models.events.MessageEvent, mirai.models.message.MessageChain, int, None] = None, func: Optional[Callable[[mirai.models.events.GroupMessage], Optional[Any]]] = None)

群消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。

Args

group
指定群。
group_member
指定群成员。
quote
指定引用某条消息。
priority
优先级,小者优先。
filter
过滤器。
Expand source code
class GroupMessageFilter(Filter[GroupMessage]):
    """群消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。"""
    def __init__(
        self,
        group: Union[Group, int, TNotSpecified, None] = _not_specified,
        group_member: Union[GroupMember, int, None] = None,
        quote: Union[MessageEvent, MessageChain, int, None] = None,
        func: Optional[TFilter[GroupMessage]] = None
    ):
        """
        Args:
            group: 指定群。
            group_member: 指定群成员。
            quote: 指定引用某条消息。
            priority: 优先级,小者优先。
            filter: 过滤器。
        """
        mixin = []
        if group_member:
            mixin.append(GroupMemberFilter(group_member, group))
        elif group and group is not _not_specified:
            mixin.append(GroupFilter(cast(Union[Group, int], group)))
        if quote:
            mixin.append(QuoteFilter(quote))

        super().__init__(GroupMessage, mixin, func)

Ancestors

Inherited members

class HandlerControl (bus: mirai.bus.AbstractEventBus, priority: int = 0)

事件接收控制器,通过 Filter 实现事件的过滤和解析。

创建过滤器 filter_ 后,可以通过 hdc.on(filter_) 装饰器注册处理器。

处理器是一个函数,接受两个参数 eventpayloadevent 是接收到的原始事件,payload 是过滤器解析的结果。

hdc = HandlerControl(bot)

@Filter(FriendMessage)
async def filter_(event: FriendMessage):
    msg = str(event.message_chain)
    if msg.startswith('我是'):
        return msg[2:]

@hdc.on(filter_)
async def handler(event: FriendMessage, payload: str):
    ...

Args

bus
事件总线。
priority
事件接收控制器工作的优先级,小者优先。
Expand source code
class HandlerControl(AbstractEventBus):
    """事件接收控制器,通过 Filter 实现事件的过滤和解析。

    创建过滤器 `filter_` 后,可以通过 `hdc.on(filter_)` 装饰器注册处理器。

    处理器是一个函数,接受两个参数 `event` 和 `payload`。
    `event` 是接收到的原始事件,`payload` 是过滤器解析的结果。

    ```python
    hdc = HandlerControl(bot)

    @Filter(FriendMessage)
    async def filter_(event: FriendMessage):
        msg = str(event.message_chain)
        if msg.startswith('我是'):
            return msg[2:]

    @hdc.on(filter_)
    async def handler(event: FriendMessage, payload: str):
        ...
    ```
    """
    bus: AbstractEventBus

    def __init__(self, bus: AbstractEventBus, priority: int = 0):
        """
        Args:
            bus: 事件总线。
            priority: 事件接收控制器工作的优先级,小者优先。
        """
        self.bus = bus
        self.priority = priority
        self._handlers: Dict[Any, PriorityDict[Tuple[Filter, THandler]]] = {}

    def _new_handler(self, event_name):
        @self.bus.on(event_name, priority=self.priority)
        async def _(event):
            results = []
            for filters in self._handlers[event_name]:
                for filter_, func in list(filters):
                    payload = filter_.catch(event)
                    if payload is not None:
                        results.append(func(event, payload))
            return asyncio.gather(*filter(inspect.isawaitable, results))

    def subscribe(self, event: Filter, func: THandler, priority: int = 0):
        """注册一个过滤器的处理器。

        Args:
            event: 过滤器。
            func: 处理器。
            priority: 处理器的优先级,小者优先。
        """
        filter_ = event
        event_name = filter_.event_name
        if event_name not in self._handlers:
            self._handlers[event_name] = PriorityDict()
            self._new_handler(event_name)
        self._handlers[event_name].add(priority, (filter_, func))

    def unsubscribe(self, event: Filter, func: THandler):
        """取消一个过滤器的处理器。

        Args:
            event: 过滤器。
            func: 处理器。
        """
        try:
            self._handlers[event.event_name].remove((event, func))
        except KeyError:
            logger.warning(f'试图移除过滤器 `{event}` 的一个不存在的处理器 `{func}`。')

    def on(self, event: Filter, priority: int = 0):
        """以装饰器的形式注册一个过滤器的处理器。

        例如:
        ```python
        @hdc.on(filter)
        async def handler(event, payload):
            ...
        ```

        Args:
            event: 过滤器。
            priority: 处理器的优先级,小者优先。
        """
        def decorator(func: THandler) -> THandler:
            self.subscribe(event, func, priority)
            return func

        return decorator

    async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]:
        """发送事件。

        Args:
            event: 事件名。
            *args: 发送的参数。
            **kwargs: 发送的参数。
        """
        return await self.bus.emit(event, *args, **kwargs)

Ancestors

  • mirai.bus.AbstractEventBus

Class variables

var bus : mirai.bus.AbstractEventBus

Methods

async def emit(self, event, *args, **kwargs) ‑> List[Awaitable[Any]]

发送事件。

Args

event
事件名。
*args
发送的参数。
**kwargs
发送的参数。
Expand source code
async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]:
    """发送事件。

    Args:
        event: 事件名。
        *args: 发送的参数。
        **kwargs: 发送的参数。
    """
    return await self.bus.emit(event, *args, **kwargs)
def on(self, event: Filter, priority: int = 0)

以装饰器的形式注册一个过滤器的处理器。

例如:

@hdc.on(filter)
async def handler(event, payload):
    ...

Args

event
过滤器。
priority
处理器的优先级,小者优先。
Expand source code
def on(self, event: Filter, priority: int = 0):
    """以装饰器的形式注册一个过滤器的处理器。

    例如:
    ```python
    @hdc.on(filter)
    async def handler(event, payload):
        ...
    ```

    Args:
        event: 过滤器。
        priority: 处理器的优先级,小者优先。
    """
    def decorator(func: THandler) -> THandler:
        self.subscribe(event, func, priority)
        return func

    return decorator
def subscribe(self, event: Filter, func: Callable[[Any, Any], Any], priority: int = 0)

注册一个过滤器的处理器。

Args

event
过滤器。
func
处理器。
priority
处理器的优先级,小者优先。
Expand source code
def subscribe(self, event: Filter, func: THandler, priority: int = 0):
    """注册一个过滤器的处理器。

    Args:
        event: 过滤器。
        func: 处理器。
        priority: 处理器的优先级,小者优先。
    """
    filter_ = event
    event_name = filter_.event_name
    if event_name not in self._handlers:
        self._handlers[event_name] = PriorityDict()
        self._new_handler(event_name)
    self._handlers[event_name].add(priority, (filter_, func))
def unsubscribe(self, event: Filter, func: Callable[[Any, Any], Any])

取消一个过滤器的处理器。

Args

event
过滤器。
func
处理器。
Expand source code
def unsubscribe(self, event: Filter, func: THandler):
    """取消一个过滤器的处理器。

    Args:
        event: 过滤器。
        func: 处理器。
    """
    try:
        self._handlers[event.event_name].remove((event, func))
    except KeyError:
        logger.warning(f'试图移除过滤器 `{event}` 的一个不存在的处理器 `{func}`。')
class InterruptControl (bus: mirai.bus.AbstractEventBus, priority: int = 15)

中断控制器,用于实现在一个消息处理器中间监听另一个消息事件。

使用 Filter 定义过滤器,接下来使用 inc.wait 创建触发器并等待。 也可提前创建过滤器,由 inc.wait 等待。

inc = InterruptControl(bot)

@bot.on(FriendMessage)
async def on_friend_message(event: FriendMessage):
    if str(event.message_chain).strip() == '你是谁':
        await bot.send(event, '我是 Yiri。你呢?')

        @Filter(FriendMessage)
        async def filter_(event_new: FriendMessage):
            if event.sender.id == event_new.sender.id:
                msg = str(event.message_chain)
                if msg.startswith('我是'):
                    return msg[2:]

        name = await inc.wait(filter_, timeout=60.)
        if name:
            await bot.send(event, f'你好,{name}。')

inc.wait 的返回值为过滤器的返回值。当达到超时后,返回 None

Args

bus
事件总线。
priority
中断控制器工作的优先级,默认值 15。
Expand source code
class InterruptControl:
    """中断控制器,用于实现在一个消息处理器中间监听另一个消息事件。

    使用 Filter 定义过滤器,接下来使用 `inc.wait` 创建触发器并等待。
    也可提前创建过滤器,由 `inc.wait` 等待。

    ```python
    inc = InterruptControl(bot)

    @bot.on(FriendMessage)
    async def on_friend_message(event: FriendMessage):
        if str(event.message_chain).strip() == '你是谁':
            await bot.send(event, '我是 Yiri。你呢?')

            @Filter(FriendMessage)
            async def filter_(event_new: FriendMessage):
                if event.sender.id == event_new.sender.id:
                    msg = str(event.message_chain)
                    if msg.startswith('我是'):
                        return msg[2:]

            name = await inc.wait(filter_, timeout=60.)
            if name:
                await bot.send(event, f'你好,{name}。')
    ```

    `inc.wait` 的返回值为过滤器的返回值。当达到超时后,返回 `None`。
    """
    def __init__(self, bus: AbstractEventBus, priority: int = 15):
        """
        Args:
            bus: 事件总线。
            priority: 中断控制器工作的优先级,默认值 15。
        """
        self.bus = bus
        self._triggers: Dict[Any, PriorityDict[Trigger]] = {}
        self.priority = priority

    def _new_handler(self, event_name):
        @self.bus.on(event_name, priority=self.priority)
        async def _(event):
            for triggers in self._triggers[event_name]:
                for trigger in list(triggers):
                    if trigger.catch(event):
                        break
                else:  # 跳出两重循环
                    continue
                break

    async def wait(
        self,
        trigger: Union[Filter, Trigger],
        timeout: float = 0.,
        priority: int = 0
    ):
        """监听一个过滤器或触发器,等待其完成。

        Args:
            trigger: 过滤器或触发器。
            timeout: 超时时间,单位为秒。

        Returns:
            Any: 触发器的结果。
            None: 触发器超时。
        """
        event_name = trigger.event_name
        if isinstance(trigger, Filter):
            trigger = Trigger(trigger, priority=priority)

        if event_name not in self._triggers:
            self._triggers[event_name] = PriorityDict()
            self._new_handler(event_name)

        self._triggers[event_name].add(trigger.priority, trigger)

        @trigger.add_done_callback
        def _(_: Trigger):
            self._triggers[event_name].remove(trigger)

        logger.debug(f'[InterruptControl] 等待触发器 {trigger}。')
        return await trigger.wait(timeout)

Methods

async def wait(self, trigger: Union[FilterTrigger], timeout: float = 0.0, priority: int = 0)

监听一个过滤器或触发器,等待其完成。

Args

trigger
过滤器或触发器。
timeout
超时时间,单位为秒。

Returns

Any
触发器的结果。
None
触发器超时。
Expand source code
async def wait(
    self,
    trigger: Union[Filter, Trigger],
    timeout: float = 0.,
    priority: int = 0
):
    """监听一个过滤器或触发器,等待其完成。

    Args:
        trigger: 过滤器或触发器。
        timeout: 超时时间,单位为秒。

    Returns:
        Any: 触发器的结果。
        None: 触发器超时。
    """
    event_name = trigger.event_name
    if isinstance(trigger, Filter):
        trigger = Trigger(trigger, priority=priority)

    if event_name not in self._triggers:
        self._triggers[event_name] = PriorityDict()
        self._new_handler(event_name)

    self._triggers[event_name].add(trigger.priority, trigger)

    @trigger.add_done_callback
    def _(_: Trigger):
        self._triggers[event_name].remove(trigger)

    logger.debug(f'[InterruptControl] 等待触发器 {trigger}。')
    return await trigger.wait(timeout)
class TempMessageFilter (group: Union[mirai.models.entities.Group, int, TNotSpecified, None] = <object object>, group_member: Union[mirai.models.entities.GroupMember, int, None] = None, quote: Union[mirai.models.events.MessageEvent, mirai.models.message.MessageChain, int, None] = None, func: Optional[Callable[[mirai.models.events.TempMessage], Optional[Any]]] = None)

临时消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。

Args

group
指定群。
group_member
指定群成员。
quote
指定引用某条消息。
priority
优先级,小者优先。
filter
过滤器。
Expand source code
class TempMessageFilter(Filter[TempMessage]):
    """临时消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。"""
    def __init__(
        self,
        group: Union[Group, int, TNotSpecified, None] = _not_specified,
        group_member: Union[GroupMember, int, None] = None,
        quote: Union[MessageEvent, MessageChain, int, None] = None,
        func: Optional[TFilter[TempMessage]] = None
    ):
        """
        Args:
            group: 指定群。
            group_member: 指定群成员。
            quote: 指定引用某条消息。
            priority: 优先级,小者优先。
            filter: 过滤器。
        """
        mixin = []
        if group_member:
            mixin.append(GroupMemberFilter(group_member, group))
        elif group and group is not _not_specified:
            mixin.append(GroupFilter(cast(Union[Group, int], group)))
        if quote:
            mixin.append(QuoteFilter(quote))

        super().__init__(GroupMessage, mixin, func)

Ancestors

Inherited members

class Trigger (filter: Filter[~TEvent], priority: int = 0)

事件触发器,提供等待符合特定条件的事件的功能。

事件触发器类似于 asyncio.Future,有已完成和未完成两种状态。和 Future 一样,Trigger 也是低层级 API。 大多数情况下,你不需要用到 Trigger 的实现,而是通过中断控制器、事件接收控制器来使用 Trigger。

触发器创建后,默认为未完成状态。通过 catch 方法尝试捕获事件,捕获成功后将进入已完成状态。 使用 wait 方法等待一个触发器,直到触发器完成。

触发器内部的逻辑由过滤器(Filter)实现。在创建触发器时,指定使用的过滤器。

事件被捕获后,触发器将进入已完成状态,同时触发器的结果会被设置为过滤器的返回值,在上例中就是好友的昵称。

触发器的状态可以通过 done 方法来查询。

使用 wait 方法等待触发器完成,并获得触发器的结果。

@Filter(FriendMessage)
def filter(event: FriendMessage):
    if event.sender.id == 12345678:
        return event.sender.nickname or ''

trigger = Trigger(filter)
result = await trigger.wait()

wait 方法有一个可选的 timeout 参数,表示等待的限时,如果超时则返回 None。

使用 catch 方法尝试捕获事件。事件捕获成功后,触发器会进入已完成状态。

Args

filter
过滤器。
priority
优先级,小者优先。
Expand source code
class Trigger(Generic[TEvent]):
    """事件触发器,提供等待符合特定条件的事件的功能。

    事件触发器类似于 asyncio.Future,有已完成和未完成两种状态。和 Future 一样,Trigger 也是低层级 API。
    大多数情况下,你不需要用到 Trigger 的实现,而是通过中断控制器、事件接收控制器来使用 Trigger。

    触发器创建后,默认为未完成状态。通过 catch 方法尝试捕获事件,捕获成功后将进入已完成状态。
    使用 wait 方法等待一个触发器,直到触发器完成。

    触发器内部的逻辑由过滤器(`mirai_extensions.trigger.filter.Filter`)实现。在创建触发器时,指定使用的过滤器。

    事件被捕获后,触发器将进入已完成状态,同时触发器的结果会被设置为过滤器的返回值,在上例中就是好友的昵称。

    触发器的状态可以通过 done 方法来查询。

    使用 wait 方法等待触发器完成,并获得触发器的结果。

    ```python
    @Filter(FriendMessage)
    def filter(event: FriendMessage):
        if event.sender.id == 12345678:
            return event.sender.nickname or ''

    trigger = Trigger(filter)
    result = await trigger.wait()
    ```

    `wait` 方法有一个可选的 `timeout` 参数,表示等待的限时,如果超时则返回 None。

    使用 `catch` 方法尝试捕获事件。事件捕获成功后,触发器会进入已完成状态。
    """
    filter: Filter[TEvent]
    """过滤器。"""
    priority: int
    """优先级,小者优先。"""
    def __init__(
        self,
        filter: Filter[TEvent],
        priority: int = 0,
    ):
        """
        Args:
            filter: 过滤器。
            priority: 优先级,小者优先。
        """
        self.priority = priority
        self.filter = filter
        self._future: Optional[asyncio.Future] = None
        self._waited: bool = False

    def __del__(self):
        if self._future:
            self._future.cancel()

    @property
    def event_name(self) -> Type[TEvent]:
        """事件类型。"""
        return self.filter.event_name

    def catch(self, event: TEvent) -> bool:
        """尝试捕获一个事件。

        事件会传递给过滤器,过滤器返回一个非 None 的值,表示事件被捕获,
        事件触发器将把过滤器的返回值作为结果。

        事件捕获成功后,触发器会进入已完成状态。

        Args
            event: 事件。

        Returns:
            bool: 捕获成功与否。
        """
        if self._future is None or self.done():
            return False
        if self.filter is not None:
            try:
                result = self.filter.catch(event)
                if result is not None and not self._future.done():
                    self._future.set_result(result)
                    return True
            except Exception as e:
                if not self._future.done():
                    self._future.set_exception(e)
                    return True
        else:
            self._future.set_result(None)
            return True

        return False

    def done(self) -> bool:
        """触发器是否已经完成。"""
        return self._waited or bool(self._future and self._future.done())

    def add_done_callback(self, callback: Callable[['Trigger'], Any]):
        """添加触发器完成后的回调。

        回调函数接受唯一参数:触发器本身。

        Args:
            callback: 回调函数。
        """
        if self._future:
            self._future.add_done_callback(lambda _: callback(self))

    def reset(self) -> 'Trigger[TEvent]':
        """重置事件触发器。

        此方法将丢弃当前结果,并将触发器设置为未完成状态。

        注意,此方法必须被异步函数调用或间接调用。

        Returns:
            Trigger[TEvent]: 触发器本身。
        """
        if self._future:
            self._future.cancel()
        self._future = asyncio.get_running_loop().create_future()
        self._waited = False
        return self

    async def wait(self, timeout: float = 0.):
        """等待事件触发,返回事件触发器的结果。

        Args:
            timeout: 超时时间,单位为秒。

        Returns:
            Any: 触发器的结果。
            None: 触发器超时。
        """
        if self._waited:
            raise RuntimeError('触发器已被等待。')

        if self._future is None:
            self._future = asyncio.get_running_loop().create_future()

        try:
            if timeout > 0:
                return await asyncio.wait_for(self._future, timeout)
            else:
                return await self._future
        except asyncio.TimeoutError:
            return None
        finally:
            self._waited = self._future.done()

Ancestors

  • typing.Generic

Class variables

var filterFilter[~TEvent]

过滤器。

var priority : int

优先级,小者优先。

Instance variables

var event_name : Type[~TEvent]

事件类型。

Expand source code
@property
def event_name(self) -> Type[TEvent]:
    """事件类型。"""
    return self.filter.event_name

Methods

def add_done_callback(self, callback: Callable[[ForwardRef('Trigger')], Any])

添加触发器完成后的回调。

回调函数接受唯一参数:触发器本身。

Args

callback
回调函数。
Expand source code
def add_done_callback(self, callback: Callable[['Trigger'], Any]):
    """添加触发器完成后的回调。

    回调函数接受唯一参数:触发器本身。

    Args:
        callback: 回调函数。
    """
    if self._future:
        self._future.add_done_callback(lambda _: callback(self))
def catch(self, event: ~TEvent) ‑> bool

尝试捕获一个事件。

事件会传递给过滤器,过滤器返回一个非 None 的值,表示事件被捕获, 事件触发器将把过滤器的返回值作为结果。

事件捕获成功后,触发器会进入已完成状态。

Args event: 事件。

Returns

bool
捕获成功与否。
Expand source code
def catch(self, event: TEvent) -> bool:
    """尝试捕获一个事件。

    事件会传递给过滤器,过滤器返回一个非 None 的值,表示事件被捕获,
    事件触发器将把过滤器的返回值作为结果。

    事件捕获成功后,触发器会进入已完成状态。

    Args
        event: 事件。

    Returns:
        bool: 捕获成功与否。
    """
    if self._future is None or self.done():
        return False
    if self.filter is not None:
        try:
            result = self.filter.catch(event)
            if result is not None and not self._future.done():
                self._future.set_result(result)
                return True
        except Exception as e:
            if not self._future.done():
                self._future.set_exception(e)
                return True
    else:
        self._future.set_result(None)
        return True

    return False
def done(self) ‑> bool

触发器是否已经完成。

Expand source code
def done(self) -> bool:
    """触发器是否已经完成。"""
    return self._waited or bool(self._future and self._future.done())
def reset(self) ‑> Trigger[~TEvent]

重置事件触发器。

此方法将丢弃当前结果,并将触发器设置为未完成状态。

注意,此方法必须被异步函数调用或间接调用。

Returns

Trigger[TEvent]
触发器本身。
Expand source code
def reset(self) -> 'Trigger[TEvent]':
    """重置事件触发器。

    此方法将丢弃当前结果,并将触发器设置为未完成状态。

    注意,此方法必须被异步函数调用或间接调用。

    Returns:
        Trigger[TEvent]: 触发器本身。
    """
    if self._future:
        self._future.cancel()
    self._future = asyncio.get_running_loop().create_future()
    self._waited = False
    return self
async def wait(self, timeout: float = 0.0)

等待事件触发,返回事件触发器的结果。

Args

timeout
超时时间,单位为秒。

Returns

Any
触发器的结果。
None
触发器超时。
Expand source code
async def wait(self, timeout: float = 0.):
    """等待事件触发,返回事件触发器的结果。

    Args:
        timeout: 超时时间,单位为秒。

    Returns:
        Any: 触发器的结果。
        None: 触发器超时。
    """
    if self._waited:
        raise RuntimeError('触发器已被等待。')

    if self._future is None:
        self._future = asyncio.get_running_loop().create_future()

    try:
        if timeout > 0:
            return await asyncio.wait_for(self._future, timeout)
        else:
            return await self._future
    except asyncio.TimeoutError:
        return None
    finally:
        self._waited = self._future.done()