Package mirai_extensions.trigger
事件触发器:提供更多处理事件的方式。
Expand source code
# -*- coding: utf-8 -*-
"""
事件触发器:提供更多处理事件的方式。
"""
__version__ = '0.3.1'
__author__ = '忘忧北萱草'
from mirai_extensions.trigger.filter import Filter
from mirai_extensions.trigger.handler import HandlerControl
from mirai_extensions.trigger.interrupt import InterruptControl
from mirai_extensions.trigger.message import (
FriendMessageFilter, GroupMessageFilter, TempMessageFilter
)
from mirai_extensions.trigger.trigger import Trigger
__all__ = [
'Filter', 'Trigger', 'InterruptControl', 'HandlerControl',
'FriendMessageFilter', 'GroupMessageFilter', 'TempMessageFilter'
]
Sub-modules
mirai_extensions.trigger.filter
mirai_extensions.trigger.handler
-
本模块提供过滤器总线相关。
mirai_extensions.trigger.interrupt
-
本模块提供中断控制器相关。
mirai_extensions.trigger.message
mirai_extensions.trigger.trigger
-
此模块提供事件触发器相关。
Classes
class Filter (event_name: Type[~TEvent], mixin: Optional[Iterable[BaseFilter[~TEvent]]] = None, func: Optional[Callable[[~TEvent], Optional[Any]]] = None)
-
事件过滤器,允许用户传入自定义的捕获函数。
捕获函数可以在创建过滤器时设置,或者通过装饰器的方式设置。
# 方式一 def filter_one_func(event: FriendMessage): if event.sender.id == 12345678: return event.sender.nickname or '' filter_one = Filter(FriendMessage, func=filter_one_func) # 方式二 @Filter(FriendMessage) def filter_two(event: FriendMessage): if event.sender.id == 12345678: return event.sender.nickname or ''
当使用类装饰器的方式创建过滤器时,被装饰函数的名称将失效。比如上例中,
filter_two
将不再是函数, 而是成为 Filter 实例。上例中的过滤器将检测好友消息的发送对象,只有来自 12345678 的消息会被捕获。其他情况下,过滤器返回默认值 None, 不会被捕获。
Args
event_name
- 过滤器捕获的事件类型。
mixin
- 过滤器混入。 过滤器会先检查混入的过滤器,若任何一个未捕获,直接停止捕获,返回“未捕获”状态。
func
- 自定义的捕获函数。
Expand source code
class Filter(BaseFilter[TEvent]): """事件过滤器,允许用户传入自定义的捕获函数。 捕获函数可以在创建过滤器时设置,或者通过装饰器的方式设置。 ```python # 方式一 def filter_one_func(event: FriendMessage): if event.sender.id == 12345678: return event.sender.nickname or '' filter_one = Filter(FriendMessage, func=filter_one_func) # 方式二 @Filter(FriendMessage) def filter_two(event: FriendMessage): if event.sender.id == 12345678: return event.sender.nickname or '' ``` 当使用类装饰器的方式创建过滤器时,被装饰函数的名称将失效。比如上例中,`filter_two` 将不再是函数, 而是成为 Filter 实例。 上例中的过滤器将检测好友消息的发送对象,只有来自 12345678 的消息会被捕获。其他情况下,过滤器返回默认值 None, 不会被捕获。 """ event_name: Type[TEvent] """过滤器捕获的事件类型。""" def __init__( self, event_name: Type[TEvent], mixin: Optional[Iterable[BaseFilter[TEvent]]] = None, func: Optional[TFilter[TEvent]] = None ): """ Args: event_name: 过滤器捕获的事件类型。 mixin: 过滤器混入。 过滤器会先检查混入的过滤器,若任何一个未捕获,直接停止捕获,返回“未捕获”状态。 func: 自定义的捕获函数。 """ super().__init__(mixin or []) self.event_name = event_name self._func: Optional[TFilter[TEvent]] = func def __call__(self, func: TFilter[TEvent]): self._func = func return self def _catch(self, event: TEvent) -> Optional[Any]: if self._func is None: return event return self._func(event)
Ancestors
- BaseFilter
- typing.Generic
Subclasses
Class variables
var event_name : Type[~TEvent]
-
过滤器捕获的事件类型。
Inherited members
class FriendMessageFilter (friend: Union[mirai.models.entities.Friend, int, None] = None, quote: Union[mirai.models.events.MessageEvent, mirai.models.message.MessageChain, int, None] = None, func: Optional[Callable[[mirai.models.events.FriendMessage], Optional[Any]]] = None)
-
好友消息触发器。判断只接受指定好友发来的消息,或必需引用某条消息。
Args
friend
- 指定好友。
quote
- 指定引用某条消息。
priority
- 优先级,小者优先。
filter
- 过滤器。
Expand source code
class FriendMessageFilter(Filter[FriendMessage]): """好友消息触发器。判断只接受指定好友发来的消息,或必需引用某条消息。""" def __init__( self, friend: Union[Friend, int, None] = None, quote: Union[MessageEvent, MessageChain, int, None] = None, func: Optional[TFilter[FriendMessage]] = None ): """ Args: friend: 指定好友。 quote: 指定引用某条消息。 priority: 优先级,小者优先。 filter: 过滤器。 """ mixin = [] if friend: mixin.append(FriendFilter(friend)) if quote: mixin.append(QuoteFilter(quote)) super().__init__(FriendMessage, mixin, func) if quote: logger.warning( "FriendMessageTrigger 的 quote 参数目前由于不明原因,有时候不能正常使用,请注意。" )
Ancestors
- Filter
- BaseFilter
- typing.Generic
Inherited members
class GroupMessageFilter (group: Union[mirai.models.entities.Group, int, TNotSpecified, None] = <object object>, group_member: Union[mirai.models.entities.GroupMember, int, None] = None, quote: Union[mirai.models.events.MessageEvent, mirai.models.message.MessageChain, int, None] = None, func: Optional[Callable[[mirai.models.events.GroupMessage], Optional[Any]]] = None)
-
群消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。
Args
group
- 指定群。
group_member
- 指定群成员。
quote
- 指定引用某条消息。
priority
- 优先级,小者优先。
filter
- 过滤器。
Expand source code
class GroupMessageFilter(Filter[GroupMessage]): """群消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。""" def __init__( self, group: Union[Group, int, TNotSpecified, None] = _not_specified, group_member: Union[GroupMember, int, None] = None, quote: Union[MessageEvent, MessageChain, int, None] = None, func: Optional[TFilter[GroupMessage]] = None ): """ Args: group: 指定群。 group_member: 指定群成员。 quote: 指定引用某条消息。 priority: 优先级,小者优先。 filter: 过滤器。 """ mixin = [] if group_member: mixin.append(GroupMemberFilter(group_member, group)) elif group and group is not _not_specified: mixin.append(GroupFilter(cast(Union[Group, int], group))) if quote: mixin.append(QuoteFilter(quote)) super().__init__(GroupMessage, mixin, func)
Ancestors
- Filter
- BaseFilter
- typing.Generic
Inherited members
class HandlerControl (bus: mirai.bus.AbstractEventBus, priority: int = 0)
-
事件接收控制器,通过 Filter 实现事件的过滤和解析。
创建过滤器
filter_
后,可以通过hdc.on(filter_)
装饰器注册处理器。处理器是一个函数,接受两个参数
event
和payload
。event
是接收到的原始事件,payload
是过滤器解析的结果。hdc = HandlerControl(bot) @Filter(FriendMessage) async def filter_(event: FriendMessage): msg = str(event.message_chain) if msg.startswith('我是'): return msg[2:] @hdc.on(filter_) async def handler(event: FriendMessage, payload: str): ...
Args
bus
- 事件总线。
priority
- 事件接收控制器工作的优先级,小者优先。
Expand source code
class HandlerControl(AbstractEventBus): """事件接收控制器,通过 Filter 实现事件的过滤和解析。 创建过滤器 `filter_` 后,可以通过 `hdc.on(filter_)` 装饰器注册处理器。 处理器是一个函数,接受两个参数 `event` 和 `payload`。 `event` 是接收到的原始事件,`payload` 是过滤器解析的结果。 ```python hdc = HandlerControl(bot) @Filter(FriendMessage) async def filter_(event: FriendMessage): msg = str(event.message_chain) if msg.startswith('我是'): return msg[2:] @hdc.on(filter_) async def handler(event: FriendMessage, payload: str): ... ``` """ bus: AbstractEventBus def __init__(self, bus: AbstractEventBus, priority: int = 0): """ Args: bus: 事件总线。 priority: 事件接收控制器工作的优先级,小者优先。 """ self.bus = bus self.priority = priority self._handlers: Dict[Any, PriorityDict[Tuple[Filter, THandler]]] = {} def _new_handler(self, event_name): @self.bus.on(event_name, priority=self.priority) async def _(event): results = [] for filters in self._handlers[event_name]: for filter_, func in list(filters): payload = filter_.catch(event) if payload is not None: results.append(func(event, payload)) return asyncio.gather(*filter(inspect.isawaitable, results)) def subscribe(self, event: Filter, func: THandler, priority: int = 0): """注册一个过滤器的处理器。 Args: event: 过滤器。 func: 处理器。 priority: 处理器的优先级,小者优先。 """ filter_ = event event_name = filter_.event_name if event_name not in self._handlers: self._handlers[event_name] = PriorityDict() self._new_handler(event_name) self._handlers[event_name].add(priority, (filter_, func)) def unsubscribe(self, event: Filter, func: THandler): """取消一个过滤器的处理器。 Args: event: 过滤器。 func: 处理器。 """ try: self._handlers[event.event_name].remove((event, func)) except KeyError: logger.warning(f'试图移除过滤器 `{event}` 的一个不存在的处理器 `{func}`。') def on(self, event: Filter, priority: int = 0): """以装饰器的形式注册一个过滤器的处理器。 例如: ```python @hdc.on(filter) async def handler(event, payload): ... ``` Args: event: 过滤器。 priority: 处理器的优先级,小者优先。 """ def decorator(func: THandler) -> THandler: self.subscribe(event, func, priority) return func return decorator async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]: """发送事件。 Args: event: 事件名。 *args: 发送的参数。 **kwargs: 发送的参数。 """ return await self.bus.emit(event, *args, **kwargs)
Ancestors
- mirai.bus.AbstractEventBus
Class variables
var bus : mirai.bus.AbstractEventBus
Methods
async def emit(self, event, *args, **kwargs) ‑> List[Awaitable[Any]]
-
发送事件。
Args
event
- 事件名。
*args
- 发送的参数。
**kwargs
- 发送的参数。
Expand source code
async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]: """发送事件。 Args: event: 事件名。 *args: 发送的参数。 **kwargs: 发送的参数。 """ return await self.bus.emit(event, *args, **kwargs)
def on(self, event: Filter, priority: int = 0)
-
以装饰器的形式注册一个过滤器的处理器。
例如:
@hdc.on(filter) async def handler(event, payload): ...
Args
event
- 过滤器。
priority
- 处理器的优先级,小者优先。
Expand source code
def on(self, event: Filter, priority: int = 0): """以装饰器的形式注册一个过滤器的处理器。 例如: ```python @hdc.on(filter) async def handler(event, payload): ... ``` Args: event: 过滤器。 priority: 处理器的优先级,小者优先。 """ def decorator(func: THandler) -> THandler: self.subscribe(event, func, priority) return func return decorator
def subscribe(self, event: Filter, func: Callable[[Any, Any], Any], priority: int = 0)
-
注册一个过滤器的处理器。
Args
event
- 过滤器。
func
- 处理器。
priority
- 处理器的优先级,小者优先。
Expand source code
def subscribe(self, event: Filter, func: THandler, priority: int = 0): """注册一个过滤器的处理器。 Args: event: 过滤器。 func: 处理器。 priority: 处理器的优先级,小者优先。 """ filter_ = event event_name = filter_.event_name if event_name not in self._handlers: self._handlers[event_name] = PriorityDict() self._new_handler(event_name) self._handlers[event_name].add(priority, (filter_, func))
def unsubscribe(self, event: Filter, func: Callable[[Any, Any], Any])
-
取消一个过滤器的处理器。
Args
event
- 过滤器。
func
- 处理器。
Expand source code
def unsubscribe(self, event: Filter, func: THandler): """取消一个过滤器的处理器。 Args: event: 过滤器。 func: 处理器。 """ try: self._handlers[event.event_name].remove((event, func)) except KeyError: logger.warning(f'试图移除过滤器 `{event}` 的一个不存在的处理器 `{func}`。')
class InterruptControl (bus: mirai.bus.AbstractEventBus, priority: int = 15)
-
中断控制器,用于实现在一个消息处理器中间监听另一个消息事件。
使用 Filter 定义过滤器,接下来使用
inc.wait
创建触发器并等待。 也可提前创建过滤器,由inc.wait
等待。inc = InterruptControl(bot) @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): if str(event.message_chain).strip() == '你是谁': await bot.send(event, '我是 Yiri。你呢?') @Filter(FriendMessage) async def filter_(event_new: FriendMessage): if event.sender.id == event_new.sender.id: msg = str(event.message_chain) if msg.startswith('我是'): return msg[2:] name = await inc.wait(filter_, timeout=60.) if name: await bot.send(event, f'你好,{name}。')
inc.wait
的返回值为过滤器的返回值。当达到超时后,返回None
。Args
bus
- 事件总线。
priority
- 中断控制器工作的优先级,默认值 15。
Expand source code
class InterruptControl: """中断控制器,用于实现在一个消息处理器中间监听另一个消息事件。 使用 Filter 定义过滤器,接下来使用 `inc.wait` 创建触发器并等待。 也可提前创建过滤器,由 `inc.wait` 等待。 ```python inc = InterruptControl(bot) @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): if str(event.message_chain).strip() == '你是谁': await bot.send(event, '我是 Yiri。你呢?') @Filter(FriendMessage) async def filter_(event_new: FriendMessage): if event.sender.id == event_new.sender.id: msg = str(event.message_chain) if msg.startswith('我是'): return msg[2:] name = await inc.wait(filter_, timeout=60.) if name: await bot.send(event, f'你好,{name}。') ``` `inc.wait` 的返回值为过滤器的返回值。当达到超时后,返回 `None`。 """ def __init__(self, bus: AbstractEventBus, priority: int = 15): """ Args: bus: 事件总线。 priority: 中断控制器工作的优先级,默认值 15。 """ self.bus = bus self._triggers: Dict[Any, PriorityDict[Trigger]] = {} self.priority = priority def _new_handler(self, event_name): @self.bus.on(event_name, priority=self.priority) async def _(event): for triggers in self._triggers[event_name]: for trigger in list(triggers): if trigger.catch(event): break else: # 跳出两重循环 continue break async def wait( self, trigger: Union[Filter, Trigger], timeout: float = 0., priority: int = 0 ): """监听一个过滤器或触发器,等待其完成。 Args: trigger: 过滤器或触发器。 timeout: 超时时间,单位为秒。 Returns: Any: 触发器的结果。 None: 触发器超时。 """ event_name = trigger.event_name if isinstance(trigger, Filter): trigger = Trigger(trigger, priority=priority) if event_name not in self._triggers: self._triggers[event_name] = PriorityDict() self._new_handler(event_name) self._triggers[event_name].add(trigger.priority, trigger) @trigger.add_done_callback def _(_: Trigger): self._triggers[event_name].remove(trigger) logger.debug(f'[InterruptControl] 等待触发器 {trigger}。') return await trigger.wait(timeout)
Methods
async def wait(self, trigger: Union[Filter, Trigger], timeout: float = 0.0, priority: int = 0)
-
监听一个过滤器或触发器,等待其完成。
Args
trigger
- 过滤器或触发器。
timeout
- 超时时间,单位为秒。
Returns
Any
- 触发器的结果。
None
- 触发器超时。
Expand source code
async def wait( self, trigger: Union[Filter, Trigger], timeout: float = 0., priority: int = 0 ): """监听一个过滤器或触发器,等待其完成。 Args: trigger: 过滤器或触发器。 timeout: 超时时间,单位为秒。 Returns: Any: 触发器的结果。 None: 触发器超时。 """ event_name = trigger.event_name if isinstance(trigger, Filter): trigger = Trigger(trigger, priority=priority) if event_name not in self._triggers: self._triggers[event_name] = PriorityDict() self._new_handler(event_name) self._triggers[event_name].add(trigger.priority, trigger) @trigger.add_done_callback def _(_: Trigger): self._triggers[event_name].remove(trigger) logger.debug(f'[InterruptControl] 等待触发器 {trigger}。') return await trigger.wait(timeout)
class TempMessageFilter (group: Union[mirai.models.entities.Group, int, TNotSpecified, None] = <object object>, group_member: Union[mirai.models.entities.GroupMember, int, None] = None, quote: Union[mirai.models.events.MessageEvent, mirai.models.message.MessageChain, int, None] = None, func: Optional[Callable[[mirai.models.events.TempMessage], Optional[Any]]] = None)
-
临时消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。
Args
group
- 指定群。
group_member
- 指定群成员。
quote
- 指定引用某条消息。
priority
- 优先级,小者优先。
filter
- 过滤器。
Expand source code
class TempMessageFilter(Filter[TempMessage]): """临时消息触发器。判断只接受指定群的某个群成员发来的消息,或必需引用某条消息。""" def __init__( self, group: Union[Group, int, TNotSpecified, None] = _not_specified, group_member: Union[GroupMember, int, None] = None, quote: Union[MessageEvent, MessageChain, int, None] = None, func: Optional[TFilter[TempMessage]] = None ): """ Args: group: 指定群。 group_member: 指定群成员。 quote: 指定引用某条消息。 priority: 优先级,小者优先。 filter: 过滤器。 """ mixin = [] if group_member: mixin.append(GroupMemberFilter(group_member, group)) elif group and group is not _not_specified: mixin.append(GroupFilter(cast(Union[Group, int], group))) if quote: mixin.append(QuoteFilter(quote)) super().__init__(GroupMessage, mixin, func)
Ancestors
- Filter
- BaseFilter
- typing.Generic
Inherited members
class Trigger (filter: Filter[~TEvent], priority: int = 0)
-
事件触发器,提供等待符合特定条件的事件的功能。
事件触发器类似于 asyncio.Future,有已完成和未完成两种状态。和 Future 一样,Trigger 也是低层级 API。 大多数情况下,你不需要用到 Trigger 的实现,而是通过中断控制器、事件接收控制器来使用 Trigger。
触发器创建后,默认为未完成状态。通过 catch 方法尝试捕获事件,捕获成功后将进入已完成状态。 使用 wait 方法等待一个触发器,直到触发器完成。
触发器内部的逻辑由过滤器(
Filter
)实现。在创建触发器时,指定使用的过滤器。事件被捕获后,触发器将进入已完成状态,同时触发器的结果会被设置为过滤器的返回值,在上例中就是好友的昵称。
触发器的状态可以通过 done 方法来查询。
使用 wait 方法等待触发器完成,并获得触发器的结果。
@Filter(FriendMessage) def filter(event: FriendMessage): if event.sender.id == 12345678: return event.sender.nickname or '' trigger = Trigger(filter) result = await trigger.wait()
wait
方法有一个可选的timeout
参数,表示等待的限时,如果超时则返回 None。使用
catch
方法尝试捕获事件。事件捕获成功后,触发器会进入已完成状态。Args
filter
- 过滤器。
priority
- 优先级,小者优先。
Expand source code
class Trigger(Generic[TEvent]): """事件触发器,提供等待符合特定条件的事件的功能。 事件触发器类似于 asyncio.Future,有已完成和未完成两种状态。和 Future 一样,Trigger 也是低层级 API。 大多数情况下,你不需要用到 Trigger 的实现,而是通过中断控制器、事件接收控制器来使用 Trigger。 触发器创建后,默认为未完成状态。通过 catch 方法尝试捕获事件,捕获成功后将进入已完成状态。 使用 wait 方法等待一个触发器,直到触发器完成。 触发器内部的逻辑由过滤器(`mirai_extensions.trigger.filter.Filter`)实现。在创建触发器时,指定使用的过滤器。 事件被捕获后,触发器将进入已完成状态,同时触发器的结果会被设置为过滤器的返回值,在上例中就是好友的昵称。 触发器的状态可以通过 done 方法来查询。 使用 wait 方法等待触发器完成,并获得触发器的结果。 ```python @Filter(FriendMessage) def filter(event: FriendMessage): if event.sender.id == 12345678: return event.sender.nickname or '' trigger = Trigger(filter) result = await trigger.wait() ``` `wait` 方法有一个可选的 `timeout` 参数,表示等待的限时,如果超时则返回 None。 使用 `catch` 方法尝试捕获事件。事件捕获成功后,触发器会进入已完成状态。 """ filter: Filter[TEvent] """过滤器。""" priority: int """优先级,小者优先。""" def __init__( self, filter: Filter[TEvent], priority: int = 0, ): """ Args: filter: 过滤器。 priority: 优先级,小者优先。 """ self.priority = priority self.filter = filter self._future: Optional[asyncio.Future] = None self._waited: bool = False def __del__(self): if self._future: self._future.cancel() @property def event_name(self) -> Type[TEvent]: """事件类型。""" return self.filter.event_name def catch(self, event: TEvent) -> bool: """尝试捕获一个事件。 事件会传递给过滤器,过滤器返回一个非 None 的值,表示事件被捕获, 事件触发器将把过滤器的返回值作为结果。 事件捕获成功后,触发器会进入已完成状态。 Args event: 事件。 Returns: bool: 捕获成功与否。 """ if self._future is None or self.done(): return False if self.filter is not None: try: result = self.filter.catch(event) if result is not None and not self._future.done(): self._future.set_result(result) return True except Exception as e: if not self._future.done(): self._future.set_exception(e) return True else: self._future.set_result(None) return True return False def done(self) -> bool: """触发器是否已经完成。""" return self._waited or bool(self._future and self._future.done()) def add_done_callback(self, callback: Callable[['Trigger'], Any]): """添加触发器完成后的回调。 回调函数接受唯一参数:触发器本身。 Args: callback: 回调函数。 """ if self._future: self._future.add_done_callback(lambda _: callback(self)) def reset(self) -> 'Trigger[TEvent]': """重置事件触发器。 此方法将丢弃当前结果,并将触发器设置为未完成状态。 注意,此方法必须被异步函数调用或间接调用。 Returns: Trigger[TEvent]: 触发器本身。 """ if self._future: self._future.cancel() self._future = asyncio.get_running_loop().create_future() self._waited = False return self async def wait(self, timeout: float = 0.): """等待事件触发,返回事件触发器的结果。 Args: timeout: 超时时间,单位为秒。 Returns: Any: 触发器的结果。 None: 触发器超时。 """ if self._waited: raise RuntimeError('触发器已被等待。') if self._future is None: self._future = asyncio.get_running_loop().create_future() try: if timeout > 0: return await asyncio.wait_for(self._future, timeout) else: return await self._future except asyncio.TimeoutError: return None finally: self._waited = self._future.done()
Ancestors
- typing.Generic
Class variables
var filter : Filter[~TEvent]
-
过滤器。
var priority : int
-
优先级,小者优先。
Instance variables
var event_name : Type[~TEvent]
-
事件类型。
Expand source code
@property def event_name(self) -> Type[TEvent]: """事件类型。""" return self.filter.event_name
Methods
def add_done_callback(self, callback: Callable[[ForwardRef('Trigger')], Any])
-
添加触发器完成后的回调。
回调函数接受唯一参数:触发器本身。
Args
callback
- 回调函数。
Expand source code
def add_done_callback(self, callback: Callable[['Trigger'], Any]): """添加触发器完成后的回调。 回调函数接受唯一参数:触发器本身。 Args: callback: 回调函数。 """ if self._future: self._future.add_done_callback(lambda _: callback(self))
def catch(self, event: ~TEvent) ‑> bool
-
尝试捕获一个事件。
事件会传递给过滤器,过滤器返回一个非 None 的值,表示事件被捕获, 事件触发器将把过滤器的返回值作为结果。
事件捕获成功后,触发器会进入已完成状态。
Args event: 事件。
Returns
bool
- 捕获成功与否。
Expand source code
def catch(self, event: TEvent) -> bool: """尝试捕获一个事件。 事件会传递给过滤器,过滤器返回一个非 None 的值,表示事件被捕获, 事件触发器将把过滤器的返回值作为结果。 事件捕获成功后,触发器会进入已完成状态。 Args event: 事件。 Returns: bool: 捕获成功与否。 """ if self._future is None or self.done(): return False if self.filter is not None: try: result = self.filter.catch(event) if result is not None and not self._future.done(): self._future.set_result(result) return True except Exception as e: if not self._future.done(): self._future.set_exception(e) return True else: self._future.set_result(None) return True return False
def done(self) ‑> bool
-
触发器是否已经完成。
Expand source code
def done(self) -> bool: """触发器是否已经完成。""" return self._waited or bool(self._future and self._future.done())
def reset(self) ‑> Trigger[~TEvent]
-
Expand source code
def reset(self) -> 'Trigger[TEvent]': """重置事件触发器。 此方法将丢弃当前结果,并将触发器设置为未完成状态。 注意,此方法必须被异步函数调用或间接调用。 Returns: Trigger[TEvent]: 触发器本身。 """ if self._future: self._future.cancel() self._future = asyncio.get_running_loop().create_future() self._waited = False return self
async def wait(self, timeout: float = 0.0)
-
等待事件触发,返回事件触发器的结果。
Args
timeout
- 超时时间,单位为秒。
Returns
Any
- 触发器的结果。
None
- 触发器超时。
Expand source code
async def wait(self, timeout: float = 0.): """等待事件触发,返回事件触发器的结果。 Args: timeout: 超时时间,单位为秒。 Returns: Any: 触发器的结果。 None: 触发器超时。 """ if self._waited: raise RuntimeError('触发器已被等待。') if self._future is None: self._future = asyncio.get_running_loop().create_future() try: if timeout > 0: return await asyncio.wait_for(self._future, timeout) else: return await self._future except asyncio.TimeoutError: return None finally: self._waited = self._future.done()