Module mirai.models.bus

此模块提供模型事件总线相关。

关于一般的事件总线,参看模块 mirai.bus

Expand source code
# -*- coding: utf-8 -*-
"""
此模块提供模型事件总线相关。

关于一般的事件总线,参看模块 `mirai.bus`。
"""
import logging
from collections import defaultdict
from typing import Any, Awaitable, Callable, List, Type, Union, cast

from mirai.bus import EventBus
from mirai.models.events import Event
from mirai.utils import async_with_exception

logger = logging.getLogger(__name__)


def event_chain_parents(event: str):
    """包含事件及所有父事件的事件链。

    例如:`FriendMessage` 的事件链为 `['FriendMessage', 'MessageEvent', 'Event']`。
    """
    event_type = Event.get_subtype(event)
    while issubclass(event_type, Event):
        yield event_type.__name__
        event_type = event_type.__base__


class ModelEventBus(EventBus):
    """模型事件总线,实现底层事件总线上的事件再分发,以将事件解析到 Event 对象。

    `ModelEventBus` 在注册事件处理器时,可使用 `Event` 类或事件名。关于可用的 `Event` 类,
    参见模块 `mirai.models.events`。

    模型事件总线支持的事件处理器接受唯一的参数`event`,该参数是一个 `Event` 对象,包含触发的事件的信息。

    事件触发时,会自动按照 `Event` 类的继承关系向上级传播。
    """
    def __init__(self):
        self.base_bus = EventBus(event_chain_generator=event_chain_parents)
        self._middlewares = defaultdict(type(None))

    def subscribe(
        self,
        event_type: Union[Type[Event], str],
        func: Callable,
        priority: int = 0
    ) -> None:
        """注册事件处理器。

        Args:
            event: 事件类型。
            func: 事件处理器。
            priority: 优先级,小者优先。
        """
        if isinstance(event_type, str):
            event_type = cast(Type[Event], Event.get_subtype(event_type))

        async def middleware(event: dict):
            """中间件。负责与底层 bus 沟通,将 event dict 解析为 Event 对象。"""
            event_model = cast(Event, Event.parse_subtype(event))
            logger.debug(f'收到事件 {event_model.type}。')
            return await async_with_exception(func(event_model))

        self._middlewares[func] = middleware
        self.base_bus.subscribe(event_type.__name__, middleware, priority)
        logger.debug(f'注册事件 {event_type.__name__} at {func}。')

    def unsubscribe(
        self, event_type: Union[Type[Event], str], func: Callable
    ) -> None:
        """移除事件处理器。

        Args:
            event_type: 事件类型。
            func: 事件处理器。
        """
        if isinstance(event_type, str):
            event_type = cast(Type[Event], Event.get_subtype(event_type))

        self.base_bus.unsubscribe(event_type.__name__, self._middlewares[func])
        del self._middlewares[func]
        logger.debug(f'解除事件注册 {event_type.__name__} at {func}。')

    def on(
        self,
        event_type: Union[Type[Event], str],
        priority: int = 0,
    ) -> Callable:
        """以装饰器的方式注册事件处理器。

        例如:
        ```py
        @bus.on(FriendMessage)
        def my_event_handler(event: FriendMessage):
            print(event.sender.id)
        ```

        Args:
            event_type: 事件类型或事件名。
            priority: 优先级,小者优先。
        """
        def decorator(func: Callable) -> Callable:
            self.subscribe(event_type, func, priority)
            return func

        return decorator

    async def emit(self, event: Union[Event, str], *args,
                   **kwargs) -> List[Awaitable[Any]]:
        """触发一个事件。

        Args:
            event: 要触发的事件。
            *args: 参数。
            **kwargs: 参数。
        """
        if isinstance(event, str):
            return await super().emit(event, *args, **kwargs)

        return await self.base_bus.emit(
            event.type, event.dict(by_alias=True, exclude_none=True)
        )

Functions

def event_chain_parents(event: str)

包含事件及所有父事件的事件链。

例如:FriendMessage 的事件链为 ['FriendMessage', 'MessageEvent', 'Event']

Expand source code
def event_chain_parents(event: str):
    """包含事件及所有父事件的事件链。

    例如:`FriendMessage` 的事件链为 `['FriendMessage', 'MessageEvent', 'Event']`。
    """
    event_type = Event.get_subtype(event)
    while issubclass(event_type, Event):
        yield event_type.__name__
        event_type = event_type.__base__

Classes

class ModelEventBus

模型事件总线,实现底层事件总线上的事件再分发,以将事件解析到 Event 对象。

ModelEventBus 在注册事件处理器时,可使用 Event 类或事件名。关于可用的 Event 类, 参见模块 mirai.models.events

模型事件总线支持的事件处理器接受唯一的参数event,该参数是一个 Event 对象,包含触发的事件的信息。

事件触发时,会自动按照 Event 类的继承关系向上级传播。

Args

event_chain_generator
一个函数, 输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器, 默认行为是事件链只包含单一事件。
Expand source code
class ModelEventBus(EventBus):
    """模型事件总线,实现底层事件总线上的事件再分发,以将事件解析到 Event 对象。

    `ModelEventBus` 在注册事件处理器时,可使用 `Event` 类或事件名。关于可用的 `Event` 类,
    参见模块 `mirai.models.events`。

    模型事件总线支持的事件处理器接受唯一的参数`event`,该参数是一个 `Event` 对象,包含触发的事件的信息。

    事件触发时,会自动按照 `Event` 类的继承关系向上级传播。
    """
    def __init__(self):
        self.base_bus = EventBus(event_chain_generator=event_chain_parents)
        self._middlewares = defaultdict(type(None))

    def subscribe(
        self,
        event_type: Union[Type[Event], str],
        func: Callable,
        priority: int = 0
    ) -> None:
        """注册事件处理器。

        Args:
            event: 事件类型。
            func: 事件处理器。
            priority: 优先级,小者优先。
        """
        if isinstance(event_type, str):
            event_type = cast(Type[Event], Event.get_subtype(event_type))

        async def middleware(event: dict):
            """中间件。负责与底层 bus 沟通,将 event dict 解析为 Event 对象。"""
            event_model = cast(Event, Event.parse_subtype(event))
            logger.debug(f'收到事件 {event_model.type}。')
            return await async_with_exception(func(event_model))

        self._middlewares[func] = middleware
        self.base_bus.subscribe(event_type.__name__, middleware, priority)
        logger.debug(f'注册事件 {event_type.__name__} at {func}。')

    def unsubscribe(
        self, event_type: Union[Type[Event], str], func: Callable
    ) -> None:
        """移除事件处理器。

        Args:
            event_type: 事件类型。
            func: 事件处理器。
        """
        if isinstance(event_type, str):
            event_type = cast(Type[Event], Event.get_subtype(event_type))

        self.base_bus.unsubscribe(event_type.__name__, self._middlewares[func])
        del self._middlewares[func]
        logger.debug(f'解除事件注册 {event_type.__name__} at {func}。')

    def on(
        self,
        event_type: Union[Type[Event], str],
        priority: int = 0,
    ) -> Callable:
        """以装饰器的方式注册事件处理器。

        例如:
        ```py
        @bus.on(FriendMessage)
        def my_event_handler(event: FriendMessage):
            print(event.sender.id)
        ```

        Args:
            event_type: 事件类型或事件名。
            priority: 优先级,小者优先。
        """
        def decorator(func: Callable) -> Callable:
            self.subscribe(event_type, func, priority)
            return func

        return decorator

    async def emit(self, event: Union[Event, str], *args,
                   **kwargs) -> List[Awaitable[Any]]:
        """触发一个事件。

        Args:
            event: 要触发的事件。
            *args: 参数。
            **kwargs: 参数。
        """
        if isinstance(event, str):
            return await super().emit(event, *args, **kwargs)

        return await self.base_bus.emit(
            event.type, event.dict(by_alias=True, exclude_none=True)
        )

Ancestors

Methods

async def emit(self, event: Union[Event, str], *args, **kwargs) ‑> List[Awaitable[Any]]

触发一个事件。

Args

event
要触发的事件。
*args
参数。
**kwargs
参数。
Expand source code
async def emit(self, event: Union[Event, str], *args,
               **kwargs) -> List[Awaitable[Any]]:
    """触发一个事件。

    Args:
        event: 要触发的事件。
        *args: 参数。
        **kwargs: 参数。
    """
    if isinstance(event, str):
        return await super().emit(event, *args, **kwargs)

    return await self.base_bus.emit(
        event.type, event.dict(by_alias=True, exclude_none=True)
    )
def on(self, event_type: Union[Type[Event], str], priority: int = 0) ‑> Callable

以装饰器的方式注册事件处理器。

例如:

@bus.on(FriendMessage)
def my_event_handler(event: FriendMessage):
    print(event.sender.id)

Args

event_type
事件类型或事件名。
priority
优先级,小者优先。
Expand source code
def on(
    self,
    event_type: Union[Type[Event], str],
    priority: int = 0,
) -> Callable:
    """以装饰器的方式注册事件处理器。

    例如:
    ```py
    @bus.on(FriendMessage)
    def my_event_handler(event: FriendMessage):
        print(event.sender.id)
    ```

    Args:
        event_type: 事件类型或事件名。
        priority: 优先级,小者优先。
    """
    def decorator(func: Callable) -> Callable:
        self.subscribe(event_type, func, priority)
        return func

    return decorator
def subscribe(self, event_type: Union[Type[Event], str], func: Callable, priority: int = 0) ‑> NoneType

注册事件处理器。

Args

event
事件类型。
func
事件处理器。
priority
优先级,小者优先。
Expand source code
def subscribe(
    self,
    event_type: Union[Type[Event], str],
    func: Callable,
    priority: int = 0
) -> None:
    """注册事件处理器。

    Args:
        event: 事件类型。
        func: 事件处理器。
        priority: 优先级,小者优先。
    """
    if isinstance(event_type, str):
        event_type = cast(Type[Event], Event.get_subtype(event_type))

    async def middleware(event: dict):
        """中间件。负责与底层 bus 沟通,将 event dict 解析为 Event 对象。"""
        event_model = cast(Event, Event.parse_subtype(event))
        logger.debug(f'收到事件 {event_model.type}。')
        return await async_with_exception(func(event_model))

    self._middlewares[func] = middleware
    self.base_bus.subscribe(event_type.__name__, middleware, priority)
    logger.debug(f'注册事件 {event_type.__name__} at {func}。')
def unsubscribe(self, event_type: Union[Type[Event], str], func: Callable) ‑> NoneType

移除事件处理器。

Args

event_type
事件类型。
func
事件处理器。
Expand source code
def unsubscribe(
    self, event_type: Union[Type[Event], str], func: Callable
) -> None:
    """移除事件处理器。

    Args:
        event_type: 事件类型。
        func: 事件处理器。
    """
    if isinstance(event_type, str):
        event_type = cast(Type[Event], Event.get_subtype(event_type))

    self.base_bus.unsubscribe(event_type.__name__, self._middlewares[func])
    del self._middlewares[func]
    logger.debug(f'解除事件注册 {event_type.__name__} at {func}。')