Package mirai

YiriMirai

一个轻量级、低耦合的基于 mirai-api-http 的 Python SDK。

更多信息请看文档

Expand source code
# -*- coding: utf-8 -*-
"""
# YiriMirai
一个轻量级、低耦合的基于 mirai-api-http 的 Python SDK。

更多信息请看[文档](https://yiri-mirai.vercel.app/)。
"""
__version__ = '0.2.6.2'
__author__ = '忘忧北萱草'

import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from mirai.adapters import (
        Adapter, ComposeAdapter, HTTPAdapter, WebHookAdapter, WebSocketAdapter
    )
else:
    from mirai.adapters import Adapter

from mirai.api_provider import Method
from mirai.bot import (
    LifeSpan, Mirai, MiraiRunner, Shutdown, SimpleMirai, Startup
)
from mirai.bus import EventBus
from mirai.colorlog import ColoredFormatter
from mirai.exceptions import (
    ApiError, NetworkError, SkipExecution, StopExecution, StopPropagation
)
from mirai.models import (
    At, AtAll, Dice, Event, Face, FriendMessage, GroupMessage, Image,
    MessageChain, MessageEvent, Plain, Poke, PokeNames, StrangerMessage,
    TempMessage, Voice, deserialize, serialize
)

__all__ = [
    'Mirai', 'SimpleMirai', 'MiraiRunner', 'LifeSpan', 'Startup', 'Shutdown',
    'Adapter', 'Method', 'HTTPAdapter', 'WebSocketAdapter', 'WebHookAdapter',
    'ComposeAdapter', 'EventBus', 'get_logger', 'Event', 'MessageEvent',
    'FriendMessage', 'GroupMessage', 'TempMessage', 'StrangerMessage',
    'MessageChain', 'Plain', 'At', 'AtAll', 'Dice', 'Face', 'Poke',
    'PokeNames', 'Image', 'Voice', 'serialize', 'deserialize', 'ApiError',
    'NetworkError', 'SkipExecution', 'StopExecution', 'StopPropagation'
]

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

formatter = ColoredFormatter(
    '%(asctime)s - %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
ch.setFormatter(formatter)

logger.addHandler(ch)


def get_logger() -> logging.Logger:
    """获取 YiriMirai 的模块 Logger。

    所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。

    Returns:
        logging.Logger: 模块 Logger。
    """
    return logger


def __getattr__(name):
    if name in (
        'HTTPAdapter', 'WebSocketAdapter', 'WebHookAdapter', 'ComposeAdapter'
    ):
        import mirai.adapters
        return getattr(mirai.adapters, name)
    raise AttributeError(f'Module {__name__} has no attribute {name}')

Sub-modules

mirai.adapters

此模块提供网络适配器相关。 …

mirai.api_provider

此模块提供 ApiProvider 的基础定义。

mirai.asgi

此模块提供公共 ASGI 前端。

mirai.bot

此模块定义机器人类,包含处于 model 层之下的 SimpleMirai 和建立在 model 层上的 Mirai

mirai.bus

此模块提供基础事件总线相关。 …

mirai.colorlog

此模块提供带颜色的格式化日志输出的功能。 …

mirai.exceptions

此模块提供异常相关。

mirai.models

此模块提供 model 层封装。 …

mirai.tasks

此模块提供了管理多个异步任务的一种方式。

mirai.utils

此模块提供一些实用的辅助方法。

Functions

def deserialize(s: str) ‑> str

mirai 码去转义。

Args

s
待去转义的字符串。

Returns

str
去转义后的字符串。
Expand source code
def deserialize(s: str) -> str:
    """mirai 码去转义。

    Args:
        s: 待去转义的字符串。

    Returns:
        str: 去转义后的字符串。
    """
    return re.sub(
        r'\\([\[\]:,\\])', lambda match: match.group(1),
        s.replace('\\n', '\n').replace('\\r', '\r')
    )
def get_logger() ‑> logging.Logger

获取 YiriMirai 的模块 Logger。

所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。

Returns

logging.Logger
模块 Logger。
Expand source code
def get_logger() -> logging.Logger:
    """获取 YiriMirai 的模块 Logger。

    所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。

    Returns:
        logging.Logger: 模块 Logger。
    """
    return logger
def serialize(s: str) ‑> str

mirai 码转义。

Args

s
待转义的字符串。

Returns

str
去转义后的字符串。
Expand source code
def serialize(s: str) -> str:
    """mirai 码转义。

    Args:
        s: 待转义的字符串。

    Returns:
        str: 去转义后的字符串。
    """
    return re.sub(r'[\[\]:,\\]', lambda match: '\\' + match.group(0),
                  s).replace('\n', '\\n').replace('\r', '\\r')

Classes

class Adapter (verify_key: Optional[str], single_mode: bool = False)

适配器基类,与 mirai-api-http 沟通的底层实现。

属性 buses 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
single_mode
是否开启 single_mode,开启后与 session 将无效。
Expand source code
class Adapter(ApiProvider, AdapterInterface):
    """适配器基类,与 mirai-api-http 沟通的底层实现。

    属性 `buses` 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。
    """
    verify_key: Optional[str]
    """mirai-api-http 配置的认证 key,关闭认证时为 None。"""
    single_mode: bool
    """是否开启 single_mode,开启后与 session 将无效。"""
    session: str
    """从 mirai-api-http 处获得的 session。"""
    buses: Set[AbstractEventBus]
    """注册的事件总线集合。"""
    background: Optional[asyncio.Task]
    """背景事件循环任务。"""
    def __init__(self, verify_key: Optional[str], single_mode: bool = False):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            single_mode: 是否开启 single_mode,开启后与 session 将无效。
        """
        self.verify_key = verify_key
        self.single_mode = single_mode
        self.session = ''
        self.buses = set()
        self.background = None

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "Adapter":
        """从适配器接口创建适配器。

        Args:
            adapter_interface: 适配器接口。

        Returns:
            Adapter: 创建的适配器。
        """
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in ['single_mode'] if info.get(key) is not None
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    def register_event_bus(self, *buses: AbstractEventBus):
        """注册事件总线。

        Args:
            *buses: 一个或多个事件总线。
        """
        self.buses |= set(buses)

    def unregister_event_bus(self, *buses: AbstractEventBus):
        """解除注册事件总线。

        Args:
            *buses: 一个或多个事件总线。
        """
        self.buses -= set(buses)

    @abc.abstractmethod
    async def login(self, qq: int):
        """登录到 mirai-api-http。"""

    @abc.abstractmethod
    async def logout(self, terminate: bool = True):
        """登出。"""

    @abc.abstractmethod
    async def call_api(self, api: str, method: Method = Method.GET, **params):
        """调用 API。

        Args:
            api: API 名称,需与 mirai-api-http 中的定义一致。
            method: 调用方法。默认为 GET。
            **params: 参数。
        """

    @abc.abstractmethod
    async def _background(self):
        """背景事件循环,用于接收事件。"""

    async def start(self):
        """运行背景事件循环。"""
        if not self.buses:
            raise RuntimeError('事件总线未指定!')
        if not self.session:
            raise RuntimeError('未登录!')

        self.background = asyncio.create_task(self._background())

    async def shutdown(self):
        """停止背景事件循环。"""
        if self.background:
            await Tasks.cancel(self.background)

    async def emit(self, event: str, *args, **kwargs):
        """向事件总线发送一个事件。

        Args:
            event: 事件名称。
            *args: 事件参数。
            **kwargs: 事件参数。
        """
        coros = [bus.emit(event, *args, **kwargs) for bus in self.buses]
        return sum(await asyncio.gather(*coros), [])

Ancestors

Subclasses

Class variables

var background : Optional[_asyncio.Task]

背景事件循环任务。

var buses : Set[AbstractEventBus]

注册的事件总线集合。

var session : str

从 mirai-api-http 处获得的 session。

var single_mode : bool

是否开启 single_mode,开启后与 session 将无效。

var verify_key : Optional[str]

mirai-api-http 配置的认证 key,关闭认证时为 None。

Static methods

def via(adapter_interface: AdapterInterface) ‑> Adapter

从适配器接口创建适配器。

Args

adapter_interface
适配器接口。

Returns

Adapter
创建的适配器。
Expand source code
@classmethod
def via(cls, adapter_interface: AdapterInterface) -> "Adapter":
    """从适配器接口创建适配器。

    Args:
        adapter_interface: 适配器接口。

    Returns:
        Adapter: 创建的适配器。
    """
    info = adapter_interface.adapter_info
    adapter = cls(
        verify_key=info['verify_key'],
        **{
            key: info[key]
            for key in ['single_mode'] if info.get(key) is not None
        }
    )
    adapter.session = cast(str, info.get('session'))
    return adapter

Methods

async def call_api(self, api: str, method: Method = Method.GET, **params)

调用 API。

Args

api
API 名称,需与 mirai-api-http 中的定义一致。
method
调用方法。默认为 GET。
**params
参数。
Expand source code
@abc.abstractmethod
async def call_api(self, api: str, method: Method = Method.GET, **params):
    """调用 API。

    Args:
        api: API 名称,需与 mirai-api-http 中的定义一致。
        method: 调用方法。默认为 GET。
        **params: 参数。
    """
async def emit(self, event: str, *args, **kwargs)

向事件总线发送一个事件。

Args

event
事件名称。
*args
事件参数。
**kwargs
事件参数。
Expand source code
async def emit(self, event: str, *args, **kwargs):
    """向事件总线发送一个事件。

    Args:
        event: 事件名称。
        *args: 事件参数。
        **kwargs: 事件参数。
    """
    coros = [bus.emit(event, *args, **kwargs) for bus in self.buses]
    return sum(await asyncio.gather(*coros), [])
async def login(self, qq: int)

登录到 mirai-api-http。

Expand source code
@abc.abstractmethod
async def login(self, qq: int):
    """登录到 mirai-api-http。"""
async def logout(self, terminate: bool = True)

登出。

Expand source code
@abc.abstractmethod
async def logout(self, terminate: bool = True):
    """登出。"""
def register_event_bus(self, *buses: AbstractEventBus)

注册事件总线。

Args

*buses
一个或多个事件总线。
Expand source code
def register_event_bus(self, *buses: AbstractEventBus):
    """注册事件总线。

    Args:
        *buses: 一个或多个事件总线。
    """
    self.buses |= set(buses)
async def shutdown(self)

停止背景事件循环。

Expand source code
async def shutdown(self):
    """停止背景事件循环。"""
    if self.background:
        await Tasks.cancel(self.background)
async def start(self)

运行背景事件循环。

Expand source code
async def start(self):
    """运行背景事件循环。"""
    if not self.buses:
        raise RuntimeError('事件总线未指定!')
    if not self.session:
        raise RuntimeError('未登录!')

    self.background = asyncio.create_task(self._background())
def unregister_event_bus(self, *buses: AbstractEventBus)

解除注册事件总线。

Args

*buses
一个或多个事件总线。
Expand source code
def unregister_event_bus(self, *buses: AbstractEventBus):
    """解除注册事件总线。

    Args:
        *buses: 一个或多个事件总线。
    """
    self.buses -= set(buses)

Inherited members

class ApiError (response: dict)

调用 API 出错。

Args

response(dict): mirai-api-http 的 API 返回结果。

Expand source code
class ApiError(RuntimeError):
    """调用 API 出错。"""
    def __init__(self, response: dict):
        """
        Args:
            response(`dict`): mirai-api-http 的 API 返回结果。
        """
        code = response['code']
        self.code = code
        self.args = (
            code, f'[ERROR {code}]' + API_ERROR_FMT.get(code, ''),
            response.get('msg', '')
        )

Ancestors

  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException
class At (*args, **kwargs)

At某人。

Expand source code
class At(MessageComponent):
    """At某人。"""
    type: str = "At"
    """消息组件类型。"""
    target: int
    """群员 QQ 号。"""
    display: Optional[str] = None
    """At时显示的文字,发送消息时无效,自动使用群名片。"""
    def __eq__(self, other):
        return isinstance(other, At) and self.target == other.target

    def __str__(self):
        return f"@{self.display or self.target}"

    def as_mirai_code(self) -> str:
        return f"[mirai:at:{self.target}]"

Ancestors

Class variables

var display : Optional[str]

At时显示的文字,发送消息时无效,自动使用群名片。

var target : int

群员 QQ 号。

Inherited members

class AtAll (*args, **kwargs)

At全体。

Expand source code
class AtAll(MessageComponent):
    """At全体。"""
    type: str = "AtAll"
    """消息组件类型。"""
    def __str__(self):
        return "@全体成员"

    def as_mirai_code(self) -> str:
        return f"[mirai:atall]"

Ancestors

Inherited members

class ComposeAdapter (api_channel: Adapter, event_channel: Adapter)

组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。

Args

api_channel
提供 API 调用的适配器。
event_channel
提供事件处理的适配器。
Expand source code
class ComposeAdapter(Adapter):
    """组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。"""
    api_channel: Adapter
    """提供 API 调用的适配器。"""
    event_channel: Adapter
    """提供事件处理的适配器。"""
    def __init__(self, api_channel: Adapter, event_channel: Adapter):
        """
        Args:
            api_channel: 提供 API 调用的适配器。
            event_channel: 提供事件处理的适配器。
        """
        super().__init__(
            verify_key=api_channel.verify_key,
            single_mode=api_channel.single_mode
        )
        if api_channel.verify_key != event_channel.verify_key:
            raise ValueError('组合适配器应使用相同的 verify_key。')

        self.api_channel = api_channel
        self.event_channel = event_channel

        event_channel.buses = self.buses

        self.verify_key = api_channel.verify_key
        self.single_mode = api_channel.single_mode

    @property
    def adapter_info(self):
        return self.api_channel.adapter_info

    async def login(self, qq: int):
        await self.api_channel.login(qq)
        # 绑定 session
        self.event_channel.session = self.api_channel.session
        self.session = self.api_channel.session
        await self.event_channel.login(qq)

    async def logout(self):
        await self.event_channel.logout()
        await self.api_channel.logout()

    async def call_api(self, api: str, method: Method = Method.GET, **params):
        return await self.api_channel.call_api(api, method, **params)

    async def _background(self):
        await self.event_channel._background()

Ancestors

Class variables

var api_channelAdapter

提供 API 调用的适配器。

var event_channelAdapter

提供事件处理的适配器。

Inherited members

class Dice (*args, **kwargs)

骰子。

Expand source code
class Dice(MessageComponent):
    """骰子。"""
    type: str = "Dice"
    """消息组件类型。"""
    value: int
    """点数。"""
    def __str__(self):
        return f'[骰子{self.value}]'

    def as_mirai_code(self) -> str:
        return f'[mirai:dice:{self.value}]'

Ancestors

Class variables

var value : int

点数。

Inherited members

class Event (*args, **kwargs)

事件基类。

Args

type
事件名。
Expand source code
class Event(MiraiIndexedModel):
    """事件基类。

    Args:
        type: 事件名。
    """
    type: str
    """事件名。"""
    def __repr__(self):
        return self.__class__.__name__ + '(' + ', '.join(
            (
                f'{k}={repr(v)}'
                for k, v in self.__dict__.items() if k != 'type' and v
            )
        ) + ')'

    @classmethod
    def parse_subtype(cls, obj: dict) -> 'Event':
        try:
            return cast(Event, super().parse_subtype(obj))
        except ValueError:
            return Event(type=obj['type'])

    @classmethod
    def get_subtype(cls, name: str) -> Type['Event']:
        try:
            return cast(Type[Event], super().get_subtype(name))
        except ValueError:
            return Event

Ancestors

Subclasses

Class variables

var type : str

事件名。

Inherited members

class EventBus (event_chain_generator: Callable[[str], Iterable[str]] = <function event_chain_single>)

事件总线。

事件总线提供了一个简单的方法,用于分发事件。事件处理器可以通过 subscribeon 注册事件, 并通过 emit 来触发事件。

事件链(Event Chain)是一种特殊的事件处理机制。事件链包含一系列事件,其中底层事件触发时,上层事件也会响应。

事件总线的构造函数中的 event_chain_generator 参数规定了生成事件链的方式。 此模块中的 event_chain_singleevent_chain_separator 可应用于此参数,分别生成单一事件的事件链和按照分隔符划分的事件链。

Args

event_chain_generator
一个函数, 输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器, 默认行为是事件链只包含单一事件。
Expand source code
class EventBus(AbstractEventBus):
    """事件总线。

    事件总线提供了一个简单的方法,用于分发事件。事件处理器可以通过 `subscribe` 或 `on` 注册事件,
    并通过 `emit` 来触发事件。

    事件链(Event Chain)是一种特殊的事件处理机制。事件链包含一系列事件,其中底层事件触发时,上层事件也会响应。

    事件总线的构造函数中的 `event_chain_generator` 参数规定了生成事件链的方式。
    此模块中的 `event_chain_single` 和 `event_chain_separator` 可应用于此参数,分别生成单一事件的事件链和按照分隔符划分的事件链。
    """
    def __init__(
        self,
        event_chain_generator: Callable[[str],
                                        Iterable[str]] = event_chain_single,
    ):
        """
        Args:
            event_chain_generator: 一个函数,
                输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器,
                默认行为是事件链只包含单一事件。
        """
        self._subscribers: Dict[
            str, PriorityDict[Callable]] = defaultdict(PriorityDict)
        self.event_chain_generator = event_chain_generator

    def subscribe(self, event: str, func: Callable, priority: int = 0) -> None:
        """注册事件处理器。

        Args:
            event: 事件名。
            func: 事件处理器。
            priority: 优先级,小者优先。
        """
        self._subscribers[event].add(priority, func)

    def unsubscribe(self, event: str, func: Callable) -> None:
        """移除事件处理器。

        Args:
            event: 事件名。
            func: 事件处理器。
        """
        try:
            self._subscribers[event].remove(func)
        except KeyError:
            logger.warning(f'试图移除事件 `{event}` 的一个不存在的事件处理器 `{func}`。')

    def on(self, event: str, priority: int = 0) -> Callable:
        """以装饰器的方式注册事件处理器。

        例如:
        ```py
        @bus.on('Event.MyEvent')
        def my_event_handler(event):
            print(event)
        ```

        Args:
            event: 事件名。
            priority: 优先级,小者优先。
        """
        def decorator(func: Callable) -> Callable:
            self.subscribe(event, func, priority)
            return func

        return decorator

    async def emit(self, event: str, *args, **kwargs) -> List[Awaitable[Any]]:
        """触发一个事件。

        异步执行说明:`await emit` 时执行事件处理器,所有事件处理器执行完后,并行运行所有快速响应。

        Args:
            event: 要触发的事件名称。
            *args: 传递给事件处理器的参数。
            **kwargs: 传递给事件处理器的参数。

        Returns:
            List[Awaitable[Any]]: 所有事件处理器的快速响应协程的 Task。
        """
        async def call(f) -> Optional[Awaitable[Any]]:
            result = await async_with_exception(f(*args, **kwargs))
            # 快速响应:如果事件处理器返回一个协程,那么立即运行这个协程。
            if inspect.isawaitable(result):
                return async_with_exception(result)
            # 当不使用快速响应时,返回值无意义。
            return None

        coros: List[Optional[Awaitable[Any]]] = []
        try:
            for m_event in self.event_chain_generator(event):
                try:
                    # 使用 list 避免 _subscribers 被改变引起错误。
                    for listeners in list(self._subscribers[m_event]):
                        try:
                            # noinspection PyTypeChecker
                            callee = (call(f) for f in listeners)
                            coros += await asyncio.gather(*callee)
                        except SkipExecution:
                            continue
                except StopExecution:
                    continue
        except StopPropagation:
            pass

        # 只保留快速响应的返回值。
        return [asyncio.create_task(coro) for coro in filter(None, coros)]

Ancestors

Subclasses

Inherited members

class Face (*args, **kwargs)

表情。

Expand source code
class Face(MessageComponent):
    """表情。"""
    type: str = "Face"
    """消息组件类型。"""
    face_id: Optional[int] = None
    """QQ 表情编号,可选,优先度高于 name。"""
    name: Optional[str] = None
    """QQ表情名称,可选。"""
    def __init__(self, *args, **kwargs):
        if len(args) == 1:
            if isinstance(args[0], str):
                self.name = args[0]
            elif isinstance(args[0], int):
                self.face_id = args[0]
            super().__init__(**kwargs)
        super().__init__(*args, **kwargs)

    def __eq__(self, other):
        return isinstance(other, Face) and \
            (self.face_id == other.face_id or self.name == other.name)

    def __str__(self):
        if self.name:
            return f'[{self.name}]'
        return '[表情]'

    def as_mirai_code(self):
        return f"[mirai:face:{self.face_id}]"

Ancestors

Class variables

var face_id : Optional[int]

QQ 表情编号,可选,优先度高于 name。

var name : Optional[str]

QQ表情名称,可选。

Inherited members

class FriendMessage (*args, **kwargs)

好友消息。

Args

type
事件名。
sender
发送消息的好友。
message_chain
消息内容。
Expand source code
class FriendMessage(MessageEvent):
    """好友消息。

    Args:
        type: 事件名。
        sender: 发送消息的好友。
        message_chain: 消息内容。
    """
    type: str = 'FriendMessage'
    """事件名。"""
    sender: Friend
    """发送消息的好友。"""
    message_chain: MessageChain
    """消息内容。"""

Ancestors

Class variables

var senderFriend

发送消息的好友。

Inherited members

class GroupMessage (*args, **kwargs)

群消息。

Args

type
事件名。
sender
发送消息的群成员。
message_chain
消息内容。
Expand source code
class GroupMessage(MessageEvent):
    """群消息。

    Args:
        type: 事件名。
        sender: 发送消息的群成员。
        message_chain: 消息内容。
    """
    type: str = 'GroupMessage'
    """事件名。"""
    sender: GroupMember
    """发送消息的群成员。"""
    message_chain: MessageChain
    """消息内容。"""
    @property
    def group(self) -> Group:
        return self.sender.group

Ancestors

Class variables

var senderGroupMember

发送消息的群成员。

Instance variables

var groupGroup
Expand source code
@property
def group(self) -> Group:
    return self.sender.group

Inherited members

class HTTPAdapter (verify_key: Optional[str], host: str, port: int, poll_interval: float = 1.0, single_mode: bool = False)

HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
host
HTTP Server 的地址。
port
HTTP Server 的端口。
poll_interval
轮询时间间隔,单位秒。
single_mode
是否为单例模式。
Expand source code
class HTTPAdapter(Adapter):
    """HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。"""
    host_name: str
    """mirai-api-http 的 HTTPAdapter Server 主机名。"""
    poll_interval: float
    """轮询时间间隔,单位秒。"""
    qq: int
    """机器人的 QQ 号。"""
    headers: httpx.Headers
    """HTTP 请求头。"""
    def __init__(
        self,
        verify_key: Optional[str],
        host: str,
        port: int,
        poll_interval: float = 1.,
        single_mode: bool = False
    ):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            host: HTTP Server 的地址。
            port: HTTP Server 的端口。
            poll_interval: 轮询时间间隔,单位秒。
            single_mode: 是否为单例模式。
        """
        super().__init__(verify_key=verify_key, single_mode=single_mode)

        self._host = host
        self._port = port

        if host[:2] == '//':
            host = 'http:' + host
        elif host[:8] == 'https://':
            raise exceptions.NetworkError('不支持 HTTPS!')
        elif host[:7] != 'http://':
            host = 'http://' + host

        if host[-1:] == '/':
            host = host[:-1]

        self.host_name = f'{host}:{port}'

        self.poll_interval = poll_interval

        self.qq = 0
        self.headers = httpx.Headers()  # 使用 headers 传递 session
        self._tasks = Tasks()

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
            'host': self._host,
            'port': self._port,
            'poll_interval': self.poll_interval,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "HTTPAdapter":
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in ['host', 'port', 'poll_interval', 'single_mode']
                if key in info
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    @_error_handler_async_local
    async def _post(self, client: httpx.AsyncClient, url: str,
                    json: dict) -> Optional[dict]:
        """调用 POST 方法。"""
        # 使用自定义的 json.dumps
        content = json_dumps(json).encode('utf-8')
        try:
            response = await client.post(
                url,
                content=content,
                headers={'Content-Type': 'application/json'},
                timeout=10.
            )
            logger.debug(
                f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。'
            )
        except httpx.TimeoutException:
            logger.error(f'[HTTP] POST 请求超时,地址{url}。')
            return None
        return _parse_response(response)

    @_error_handler_async_local
    async def _get(self, client: httpx.AsyncClient, url: str,
                   params: dict) -> Optional[dict]:
        """调用 GET 方法。"""
        try:
            response = await client.get(url, params=params, timeout=10.)
            logger.debug(
                f'[HTTP] 发送 GET 请求,地址{url},状态 {response.status_code}。'
            )
        except httpx.TimeoutException:
            logger.error(f'[HTTP] GET 请求超时,地址{url}。')
            return None
        return _parse_response(response)

    @_error_handler_async_local
    async def _post_multipart(
        self, client: httpx.AsyncClient, url: str, data: dict, files: dict
    ) -> Optional[dict]:
        """调用 POST 方法,发送 multipart 数据。"""
        try:
            response = await client.post(
                url, data=data, files=files, timeout=30.
            )
            logger.debug(
                f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。'
            )
        except httpx.TimeoutException:
            logger.error(f'[HTTP] POST 请求超时,地址{url}。')
            return None
        return _parse_response(response)

    @_error_handler_async_local
    async def login(self, qq: int):
        async with httpx.AsyncClient(
            base_url=self.host_name, headers=self.headers
        ) as client:
            if not self.session:
                if self.verify_key is not None:
                    self.session = (
                        await self._post(
                            client, '/verify', {
                                "verifyKey": self.verify_key,
                            }
                        )
                    )['session']
                else:
                    self.session = str(random.randint(1, 2**20))

            if not self.single_mode:
                await self._post(
                    client, '/bind', {
                        "sessionKey": self.session,
                        "qq": qq,
                    }
                )

            self.headers = httpx.Headers({'sessionKey': self.session})
            self.qq = qq
            logger.info(f'[HTTP] 成功登录到账号{qq}。')

    @_error_handler_async_local
    async def logout(self, terminate: bool = True):
        if self.session and not self.single_mode:
            if terminate:
                async with httpx.AsyncClient(
                    base_url=self.host_name, headers=self.headers
                ) as client:
                    await self._post(
                        client, '/release', {
                            "sessionKey": self.session,
                            "qq": self.qq,
                        }
                    )
        logger.info(f"[HTTP] 从账号{self.qq}退出。")

    async def poll_event(self):
        """进行一次轮询,获取并处理事件。"""
        async with httpx.AsyncClient(
            base_url=self.host_name, headers=self.headers
        ) as client:
            msg_count = (await self._get(client, '/countMessage', {}))['data']
            if msg_count > 0:
                msg_list = (
                    await
                    self._get(client, '/fetchMessage', {'count': msg_count})
                )['data']

                coros = [self.emit(msg['type'], msg) for msg in msg_list]
                await asyncio.gather(*coros)

    async def call_api(self,
                       api: str,
                       method: Method = Method.GET,
                       **params) -> Optional[dict]:
        async with httpx.AsyncClient(
            base_url=self.host_name, headers=self.headers
        ) as client:
            if method == Method.GET or method == Method.RESTGET:
                return await self._get(client, f'/{api}', params)
            if method == Method.POST or method == Method.RESTPOST:
                return await self._post(client, f'/{api}', params)
            if method == Method.MULTIPART:
                return await self._post_multipart(
                    client, f'/{api}', params['data'], params['files']
                )
            return None

    async def _background(self):
        """开始轮询。"""
        logger.info('[HTTP] 机器人开始运行。按 Ctrl + C 停止。')

        try:
            while True:
                self._tasks.create_task(self.poll_event())
                await asyncio.sleep(self.poll_interval)
        finally:
            await self._tasks.cancel_all()

Ancestors

Class variables

var headers : httpx.Headers

HTTP 请求头。

var host_name : str

mirai-api-http 的 HTTPAdapter Server 主机名。

var poll_interval : float

轮询时间间隔,单位秒。

var qq : int

机器人的 QQ 号。

Methods

async def poll_event(self)

进行一次轮询,获取并处理事件。

Expand source code
async def poll_event(self):
    """进行一次轮询,获取并处理事件。"""
    async with httpx.AsyncClient(
        base_url=self.host_name, headers=self.headers
    ) as client:
        msg_count = (await self._get(client, '/countMessage', {}))['data']
        if msg_count > 0:
            msg_list = (
                await
                self._get(client, '/fetchMessage', {'count': msg_count})
            )['data']

            coros = [self.emit(msg['type'], msg) for msg in msg_list]
            await asyncio.gather(*coros)

Inherited members

class Image (*args, **kwargs)

图片。

Expand source code
class Image(MessageComponent):
    """图片。"""
    type: str = "Image"
    """消息组件类型。"""
    image_id: Optional[str] = None
    """图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。"""
    url: Optional[HttpUrl] = None
    """图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。"""
    path: Union[str, Path, None] = None
    """图片的路径,发送本地图片。"""
    base64: Optional[str] = None
    """图片的 Base64 编码。"""
    width: int
    """图片的宽度"""
    height: int
    """图片的高度"""
    def __eq__(self, other):
        return isinstance(
            other, Image
        ) and self.type == other.type and self.uuid == other.uuid

    def __str__(self):
        return '[图片]'

    def as_mirai_code(self) -> str:
        return f"[mirai:image:{self.image_id}]"

    @validator('path')
    def validate_path(cls, path: Union[str, Path, None]):
        """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。"""
        if path:
            try:
                return str(Path(path).resolve(strict=True))
            except FileNotFoundError:
                raise ValueError(f"无效路径:{path}")
        else:
            return path

    @property
    def uuid(self):
        image_id = self.image_id
        if image_id[0] == '{':  # 群图片
            image_id = image_id[1:37]
        elif image_id[0] == '/':  # 好友图片
            image_id = image_id[1:]
        return image_id

    def as_group_image(self) -> str:
        return f"{{{self.uuid.upper()}}}.jpg"

    def as_friend_image(self) -> str:
        return f"/{self.uuid.lower()}"

    def as_flash_image(self) -> "FlashImage":
        return FlashImage(
            image_id=self.image_id,
            url=self.url,
            path=self.path,
            base64=self.base64
        )

    async def download(
        self,
        filename: Union[str, Path, None] = None,
        directory: Union[str, Path, None] = None,
        determine_type: bool = True
    ):
        """下载图片到本地。

        Args:
            filename: 下载到本地的文件路径。与 `directory` 二选一。
            directory: 下载到本地的文件夹路径。与 `filename` 二选一。
            determine_type: 是否自动根据图片类型确定拓展名,默认为 True。
        """
        if not self.url:
            logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。')
            return

        import httpx
        async with httpx.AsyncClient() as client:
            response = await client.get(self.url)
            response.raise_for_status()
            content = response.content

            if filename:
                path = Path(filename)
                if determine_type:
                    import imghdr
                    path = path.with_suffix(
                        '.' + str(imghdr.what(None, content))
                    )
                path.parent.mkdir(parents=True, exist_ok=True)
            elif directory:
                import imghdr
                path = Path(directory)
                path.mkdir(parents=True, exist_ok=True)
                path = path / f'{self.uuid}.{imghdr.what(None, content)}'
            else:
                raise ValueError("请指定文件路径或文件夹路径!")

            import aiofiles
            async with aiofiles.open(path, 'wb') as f:
                await f.write(content)

            return path

    @classmethod
    async def from_local(
        cls,
        filename: Union[str, Path, None] = None,
        content: Optional[bytes] = None,
    ) -> "Image":
        """从本地文件路径加载图片,以 base64 的形式传递。

        Args:
            filename: 从本地文件路径加载图片,与 `content` 二选一。
            content: 从本地文件内容加载图片,与 `filename` 二选一。

        Returns:
            Image: 图片对象。
        """
        if content:
            pass
        elif filename:
            path = Path(filename)
            import aiofiles
            async with aiofiles.open(path, 'rb') as f:
                content = await f.read()
        else:
            raise ValueError("请指定图片路径或图片内容!")
        import base64
        img = cls(base64=base64.b64encode(content).decode())
        return img

    @classmethod
    def from_unsafe_path(cls, path: Union[str, Path]) -> "Image":
        """从不安全的路径加载图片。

        Args:
            path: 从不安全的路径加载图片。

        Returns:
            Image: 图片对象。
        """
        return cls.construct(path=str(path))

Ancestors

Subclasses

Class variables

var base64 : Optional[str]

图片的 Base64 编码。

var height : int

图片的高度

var image_id : Optional[str]

图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。

var path : Union[str, pathlib.Path, NoneType]

图片的路径,发送本地图片。

var url : Optional[pydantic.networks.HttpUrl]

图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。

var width : int

图片的宽度

Static methods

async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Image

从本地文件路径加载图片,以 base64 的形式传递。

Args

filename
从本地文件路径加载图片,与 content 二选一。
content
从本地文件内容加载图片,与 filename 二选一。

Returns

Image
图片对象。
Expand source code
@classmethod
async def from_local(
    cls,
    filename: Union[str, Path, None] = None,
    content: Optional[bytes] = None,
) -> "Image":
    """从本地文件路径加载图片,以 base64 的形式传递。

    Args:
        filename: 从本地文件路径加载图片,与 `content` 二选一。
        content: 从本地文件内容加载图片,与 `filename` 二选一。

    Returns:
        Image: 图片对象。
    """
    if content:
        pass
    elif filename:
        path = Path(filename)
        import aiofiles
        async with aiofiles.open(path, 'rb') as f:
            content = await f.read()
    else:
        raise ValueError("请指定图片路径或图片内容!")
    import base64
    img = cls(base64=base64.b64encode(content).decode())
    return img
def from_unsafe_path(path: Union[str, pathlib.Path]) ‑> Image

从不安全的路径加载图片。

Args

path
从不安全的路径加载图片。

Returns

Image
图片对象。
Expand source code
@classmethod
def from_unsafe_path(cls, path: Union[str, Path]) -> "Image":
    """从不安全的路径加载图片。

    Args:
        path: 从不安全的路径加载图片。

    Returns:
        Image: 图片对象。
    """
    return cls.construct(path=str(path))
def validate_path(path: Union[str, pathlib.Path, NoneType])

修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。

Expand source code
@validator('path')
def validate_path(cls, path: Union[str, Path, None]):
    """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。"""
    if path:
        try:
            return str(Path(path).resolve(strict=True))
        except FileNotFoundError:
            raise ValueError(f"无效路径:{path}")
    else:
        return path

Instance variables

var uuid
Expand source code
@property
def uuid(self):
    image_id = self.image_id
    if image_id[0] == '{':  # 群图片
        image_id = image_id[1:37]
    elif image_id[0] == '/':  # 好友图片
        image_id = image_id[1:]
    return image_id

Methods

def as_flash_image(self) ‑> FlashImage
Expand source code
def as_flash_image(self) -> "FlashImage":
    return FlashImage(
        image_id=self.image_id,
        url=self.url,
        path=self.path,
        base64=self.base64
    )
def as_friend_image(self) ‑> str
Expand source code
def as_friend_image(self) -> str:
    return f"/{self.uuid.lower()}"
def as_group_image(self) ‑> str
Expand source code
def as_group_image(self) -> str:
    return f"{{{self.uuid.upper()}}}.jpg"
async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None, determine_type: bool = True)

下载图片到本地。

Args

filename
下载到本地的文件路径。与 directory 二选一。
directory
下载到本地的文件夹路径。与 filename 二选一。
determine_type
是否自动根据图片类型确定拓展名,默认为 True。
Expand source code
async def download(
    self,
    filename: Union[str, Path, None] = None,
    directory: Union[str, Path, None] = None,
    determine_type: bool = True
):
    """下载图片到本地。

    Args:
        filename: 下载到本地的文件路径。与 `directory` 二选一。
        directory: 下载到本地的文件夹路径。与 `filename` 二选一。
        determine_type: 是否自动根据图片类型确定拓展名,默认为 True。
    """
    if not self.url:
        logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。')
        return

    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(self.url)
        response.raise_for_status()
        content = response.content

        if filename:
            path = Path(filename)
            if determine_type:
                import imghdr
                path = path.with_suffix(
                    '.' + str(imghdr.what(None, content))
                )
            path.parent.mkdir(parents=True, exist_ok=True)
        elif directory:
            import imghdr
            path = Path(directory)
            path.mkdir(parents=True, exist_ok=True)
            path = path / f'{self.uuid}.{imghdr.what(None, content)}'
        else:
            raise ValueError("请指定文件路径或文件夹路径!")

        import aiofiles
        async with aiofiles.open(path, 'wb') as f:
            await f.write(content)

        return path

Inherited members

class LifeSpan (*args, **kwargs)

生命周期事件。

Expand source code
class LifeSpan(Event):
    """生命周期事件。"""
    type: str = 'LifeSpan'
    """事件名。"""

Ancestors

Subclasses

Inherited members

class MessageChain

消息链。

一个构造消息链的例子:

message_chain = MessageChain([
    AtAll(),
    Plain("Hello World!"),
])

Plain 可以省略。

message_chain = MessageChain([
    AtAll(),
    "Hello World!",
])

在调用 API 时,参数中需要 MessageChain 的,也可以使用 List[MessageComponent] 代替。 例如,以下两种写法是等价的:

await bot.send_friend_message(12345678, [
    Plain("Hello World!")
])
await bot.send_friend_message(12345678, MessageChain([
    Plain("Hello World!")
]))

使用str(message_chain)获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的文档。 获取未转义的消息链字符串,可以使用deserialize()(str(message_chain))

可以使用 for 循环遍历消息链中的消息组件。

for component in message_chain:
    print(repr(component))

可以使用 == 运算符比较两个消息链是否相同。

another_msg_chain = MessageChain([
    {
        "type": "AtAll"
    }, {
        "type": "Plain",
        "text": "Hello World!"
    },
])
print(message_chain == another_msg_chain)
'True'

可以使用 in 运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。

if AtAll in message_chain:
    print('AtAll')

if At(bot.qq) in message_chain:
    print('At Me')

if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain:
    print('Hello!')

if 'Hello' in message_chain:
    print('Hi!')

消息链的 has 方法和 in 等价。

if message_chain.has(AtAll):
    print('AtAll')

也可以使用 >=<=运算符:

if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain:
    print('Hello!')

需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。

消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。

plain_list = message_chain[Plain]
'[Plain("Hello World!")]'

类型, 数量 为索引,获取前至多多少个该类型的消息组件。

plain_list_first = message_chain[Plain, 1]
'[Plain("Hello World!")]'

消息链的 get 方法和索引操作等价。

plain_list_first = message_chain.get(Plain)
'[Plain("Hello World!")]'

消息链的 get 方法还可指定第二个参数 count,这相当于以 类型, 数量 为索引。

plain_list_first = message_chain.get(Plain, 1)
# 这等价于
plain_list_first = message_chain[Plain, 1]

可以用加号连接两个消息链。

MessageChain(['Hello World!']) + MessageChain(['Goodbye World!'])
# 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")])

可以用 * 运算符复制消息链。

MessageChain(['Hello World!']) * 2
# 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")])

除此之外,消息链还支持很多 list 拥有的操作,比如 indexcount

message_chain = MessageChain([
    AtAll(),
    "Hello World!",
])
message_chain.index(Plain)
# 返回 0
message_chain.count(Plain)
# 返回 1

消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。

Expand source code
class MessageChain(MiraiBaseModel):
    """消息链。

    一个构造消息链的例子:
    ```py
    message_chain = MessageChain([
        AtAll(),
        Plain("Hello World!"),
    ])
    ```

    `Plain` 可以省略。
    ```py
    message_chain = MessageChain([
        AtAll(),
        "Hello World!",
    ])
    ```

    在调用 API 时,参数中需要 MessageChain 的,也可以使用 `List[MessageComponent]` 代替。
    例如,以下两种写法是等价的:
    ```py
    await bot.send_friend_message(12345678, [
        Plain("Hello World!")
    ])
    ```
    ```py
    await bot.send_friend_message(12345678, MessageChain([
        Plain("Hello World!")
    ]))
    ```

    使用`str(message_chain)`获取消息链的字符串表示,字符串采用 mirai 码格式,
    并自动按照 mirai 码的规定转义。
    参看 mirai 的[文档](https://github.com/mamoe/mirai/blob/dev/docs/Messages.md#mirai-%E7%A0%81)。
    获取未转义的消息链字符串,可以使用`deserialize(str(message_chain))`。

    可以使用 for 循环遍历消息链中的消息组件。
    ```py
    for component in message_chain:
        print(repr(component))
    ```

    可以使用 `==` 运算符比较两个消息链是否相同。
    ```py
    another_msg_chain = MessageChain([
        {
            "type": "AtAll"
        }, {
            "type": "Plain",
            "text": "Hello World!"
        },
    ])
    print(message_chain == another_msg_chain)
    'True'
    ```

    可以使用 `in` 运算检查消息链中:
    1. 是否有某个消息组件。
    2. 是否有某个类型的消息组件。
    3. 是否有某个子消息链。
    4. 对应的 mirai 码中是否有某子字符串。

    ```py
    if AtAll in message_chain:
        print('AtAll')

    if At(bot.qq) in message_chain:
        print('At Me')

    if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain:
        print('Hello!')

    if 'Hello' in message_chain:
        print('Hi!')
    ```

    消息链的 `has` 方法和 `in` 等价。
    ```py
    if message_chain.has(AtAll):
        print('AtAll')
    ```

    也可以使用 `>=` 和 `<= `运算符:
    ```py
    if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain:
        print('Hello!')
    ```

    需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。
    如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。

    消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。
    ```py
    plain_list = message_chain[Plain]
    '[Plain("Hello World!")]'
    ```

    以 `类型, 数量` 为索引,获取前至多多少个该类型的消息组件。
    ```py
    plain_list_first = message_chain[Plain, 1]
    '[Plain("Hello World!")]'
    ```

    消息链的 `get` 方法和索引操作等价。
    ```py
    plain_list_first = message_chain.get(Plain)
    '[Plain("Hello World!")]'
    ```

    消息链的 `get` 方法还可指定第二个参数 `count`,这相当于以 `类型, 数量` 为索引。
    ```py
    plain_list_first = message_chain.get(Plain, 1)
    # 这等价于
    plain_list_first = message_chain[Plain, 1]
    ```

    可以用加号连接两个消息链。
    ```py
    MessageChain(['Hello World!']) + MessageChain(['Goodbye World!'])
    # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")])
    ```

    可以用 `*` 运算符复制消息链。
    ```py
    MessageChain(['Hello World!']) * 2
    # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")])
    ```

    除此之外,消息链还支持很多 list 拥有的操作,比如 `index` 和 `count`。
    ```py
    message_chain = MessageChain([
        AtAll(),
        "Hello World!",
    ])
    message_chain.index(Plain)
    # 返回 0
    message_chain.count(Plain)
    # 返回 1
    ```

    消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。
    """
    __root__: List[MessageComponent]

    @staticmethod
    def _parse_message_chain(msg_chain: Iterable):
        result = []
        for msg in msg_chain:
            if isinstance(msg, dict):
                result.append(MessageComponent.parse_subtype(msg))
            elif isinstance(msg, MessageComponent):
                result.append(msg)
            elif isinstance(msg, str):
                result.append(Plain(msg))
            else:
                raise TypeError(
                    f"消息链中元素需为 dict 或 str 或 MessageComponent,当前类型:{type(msg)}"
                )
        return result

    @validator('__root__', always=True, pre=True)
    def _parse_component(cls, msg_chain):
        if isinstance(msg_chain, (str, MessageComponent)):
            msg_chain = [msg_chain]
        if not msg_chain:
            msg_chain = []
        return cls._parse_message_chain(msg_chain)

    @classmethod
    def parse_obj(cls, msg_chain: Iterable):
        """通过列表形式的消息链,构造对应的 `MessageChain` 对象。

        Args:
            msg_chain: 列表形式的消息链。
        """
        result = cls._parse_message_chain(msg_chain)
        return cls(__root__=result)

    def __init__(self, __root__: Iterable[MessageComponent] = None):
        super().__init__(__root__=__root__)

    def __str__(self):
        return "".join(str(component) for component in self.__root__)

    def as_mirai_code(self) -> str:
        """将消息链转换为 mirai 码字符串。

        该方法会自动转换消息链中的元素。

        Returns:
            mirai 码字符串。
        """
        return "".join(
            component.as_mirai_code() for component in self.__root__
        )

    def __repr__(self):
        return f'{self.__class__.__name__}({self.__root__!r})'

    def __iter__(self):
        yield from self.__root__

    @overload
    def get(self, index: int) -> MessageComponent:
        ...

    @overload
    def get(self, index: slice) -> List[MessageComponent]:
        ...

    @overload
    def get(self, index: Type[TMessageComponent]) -> List[TMessageComponent]:
        ...

    @overload
    def get(
        self, index: Tuple[Type[TMessageComponent], int]
    ) -> List[TMessageComponent]:
        ...

    def get(
        self,
        index: Union[int, slice, Type[TMessageComponent],
                     Tuple[Type[TMessageComponent], int]],
        count: Optional[int] = None
    ) -> Union[MessageComponent, List[MessageComponent],
               List[TMessageComponent]]:
        """获取消息链中的某个(某些)消息组件,或某类型的消息组件。

        Args:
            index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`):
                如果为 `int`,则返回该索引处的消息组件。
                如果为 `slice`,则返回该索引范围处的消息组件。
                如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。
                如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。

            count: 如果为 `int`,则返回至多 `count` 个消息组件。

        Returns:
            MessageComponent: 返回指定索引处的消息组件。
            List[MessageComponent]: 返回指定索引范围的消息组件。
            List[TMessageComponent]: 返回指定类型的消息组件构成的列表。
        """
        # 正常索引
        if isinstance(index, int):
            return self.__root__[index]
        # 切片索引
        if isinstance(index, slice):
            return self.__root__[index]
        # 指定 count
        if count:
            if isinstance(index, type):
                index = (index, count)
            elif isinstance(index, tuple):
                index = (index[0], count if count < index[1] else index[1])
        # 索引对象为 MessageComponent 类,返回所有对应 component
        if isinstance(index, type):
            return [
                component for component in self if type(component) is index
            ]
        # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component
        if isinstance(index, tuple):
            components = (
                component for component in self if type(component) is index[0]
            )
            return [
                component for component, _ in zip(components, range(index[1]))
            ]
        raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}")

    def get_first(self,
                  t: Type[TMessageComponent]) -> Optional[TMessageComponent]:
        """获取消息链中第一个符合类型的消息组件。"""
        for component in self:
            if isinstance(component, t):
                return component
        return None

    @overload
    def __getitem__(self, index: int) -> MessageComponent:
        ...

    @overload
    def __getitem__(self, index: slice) -> List[MessageComponent]:
        ...

    @overload
    def __getitem__(self,
                    index: Type[TMessageComponent]) -> List[TMessageComponent]:
        ...

    @overload
    def __getitem__(
        self, index: Tuple[Type[TMessageComponent], int]
    ) -> List[TMessageComponent]:
        ...

    def __getitem__(
        self, index: Union[int, slice, Type[TMessageComponent],
                           Tuple[Type[TMessageComponent], int]]
    ) -> Union[MessageComponent, List[MessageComponent],
               List[TMessageComponent]]:
        return self.get(index)

    def __setitem__(
        self, key: Union[int, slice],
        value: Union[MessageComponent, str, Iterable[Union[MessageComponent,
                                                           str]]]
    ):
        if isinstance(value, str):
            value = Plain(value)
        if isinstance(value, Iterable):
            value = (Plain(c) if isinstance(c, str) else c for c in value)
        self.__root__[key] = value  # type: ignore

    def __delitem__(self, key: Union[int, slice]):
        del self.__root__[key]

    def __reversed__(self) -> Iterable[MessageComponent]:
        return reversed(self.__root__)

    def has(
        self, sub: Union[MessageComponent, Type[MessageComponent],
                         'MessageChain', str]
    ) -> bool:
        """判断消息链中:
        1. 是否有某个消息组件。
        2. 是否有某个类型的消息组件。
        3. 是否有某个子消息链。
        4. 对应的 mirai 码中是否有某子字符串。

        Args:
            sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`):
                若为 `MessageComponent`,则判断该组件是否在消息链中。
                若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。
                若为 `MessageChain`,则判断该子消息链是否在消息链中。
                若为 `str`,则判断对应的 mirai 码中是否有某子字符串。

        Returns:
            bool: 是否找到。
        """
        if isinstance(sub, type):  # 检测消息链中是否有某种类型的对象
            for i in self:
                if type(i) is sub:
                    return True
            return False
        if isinstance(sub, MessageComponent):  # 检查消息链中是否有某个组件
            for i in self:
                if i == sub:
                    return True
            return False
        if isinstance(sub, MessageChain):  # 检查消息链中是否有某个子消息链
            return bool(kmp(self, sub))
        if isinstance(sub, str):  # 检查消息中有无指定字符串子串
            return sub in deserialize(str(self))
        raise TypeError(f"类型不匹配,当前类型:{type(sub)}")

    def __contains__(self, sub) -> bool:
        return self.has(sub)

    def __ge__(self, other):
        return other in self

    def __len__(self) -> int:
        return len(self.__root__)

    def __add__(
        self, other: Union['MessageChain', MessageComponent, str]
    ) -> 'MessageChain':
        if isinstance(other, MessageChain):
            return self.__class__(self.__root__ + other.__root__)
        if isinstance(other, str):
            return self.__class__(self.__root__ + [Plain(other)])
        if isinstance(other, MessageComponent):
            return self.__class__(self.__root__ + [other])
        return NotImplemented

    def __radd__(self, other: Union[MessageComponent, str]) -> 'MessageChain':
        if isinstance(other, MessageComponent):
            return self.__class__([other] + self.__root__)
        if isinstance(other, str):
            return self.__class__(
                [cast(MessageComponent, Plain(other))] + self.__root__
            )
        return NotImplemented

    def __mul__(self, other: int):
        if isinstance(other, int):
            return self.__class__(self.__root__ * other)
        return NotImplemented

    def __rmul__(self, other: int):
        return self.__mul__(other)

    def __iadd__(self, other: Iterable[Union[MessageComponent, str]]):
        self.extend(other)

    def __imul__(self, other: int):
        if isinstance(other, int):
            self.__root__ *= other
        return NotImplemented

    def index(
        self,
        x: Union[MessageComponent, Type[MessageComponent]],
        i: int = 0,
        j: int = -1
    ) -> int:
        """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。

        Args:
            x (`Union[MessageComponent, Type[MessageComponent]]`):
                要查找的消息元素或消息元素类型。
            i: 从哪个位置开始查找。
            j: 查找到哪个位置结束。

        Returns:
            int: 如果找到,则返回索引号。

        Raises:
            ValueError: 没有找到。
            TypeError: 类型不匹配。
        """
        if isinstance(x, type):
            l = len(self)
            if i < 0:
                i += l
            if i < 0:
                i = 0
            if j < 0:
                j += l
            if j > l:
                j = l
            for index in range(i, j):
                if type(self[index]) is x:
                    return index
            raise ValueError("消息链中不存在该类型的组件。")
        if isinstance(x, MessageComponent):
            return self.__root__.index(x, i, j)
        raise TypeError(f"类型不匹配,当前类型:{type(x)}")

    def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int:
        """返回消息链中 x 出现的次数。

        Args:
            x (`Union[MessageComponent, Type[MessageComponent]]`):
                要查找的消息元素或消息元素类型。

        Returns:
            int: 次数。
        """
        if isinstance(x, type):
            return sum(1 for i in self if type(i) is x)
        if isinstance(x, MessageComponent):
            return self.__root__.count(x)
        raise TypeError(f"类型不匹配,当前类型:{type(x)}")

    def extend(self, x: Iterable[Union[MessageComponent, str]]):
        """将另一个消息链中的元素添加到消息链末尾。

        Args:
            x: 另一个消息链,也可为消息元素或字符串元素的序列。
        """
        self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x)

    def append(self, x: Union[MessageComponent, str]):
        """将一个消息元素或字符串元素添加到消息链末尾。

        Args:
            x: 消息元素或字符串元素。
        """
        self.__root__.append(Plain(x) if isinstance(x, str) else x)

    def insert(self, i: int, x: Union[MessageComponent, str]):
        """将一个消息元素或字符串添加到消息链中指定位置。

        Args:
            i: 插入位置。
            x: 消息元素或字符串元素。
        """
        self.__root__.insert(i, Plain(x) if isinstance(x, str) else x)

    def pop(self, i: int = -1) -> MessageComponent:
        """从消息链中移除并返回指定位置的元素。

        Args:
            i: 移除位置。默认为末尾。

        Returns:
            MessageComponent: 移除的元素。
        """
        return self.__root__.pop(i)

    def remove(self, x: Union[MessageComponent, Type[MessageComponent]]):
        """从消息链中移除指定元素或指定类型的一个元素。

        Args:
            x: 指定的元素或元素类型。
        """
        if isinstance(x, type):
            self.pop(self.index(x))
        if isinstance(x, MessageComponent):
            self.__root__.remove(x)

    def exclude(
        self,
        x: Union[MessageComponent, Type[MessageComponent]],
        count: int = -1
    ) -> 'MessageChain':
        """返回移除指定元素或指定类型的元素后剩余的消息链。

        Args:
            x: 指定的元素或元素类型。
            count: 至多移除的数量。默认为全部移除。

        Returns:
            MessageChain: 剩余的消息链。
        """
        def _exclude():
            nonlocal count
            x_is_type = isinstance(x, type)
            for c in self:
                if count > 0 and ((x_is_type and type(c) is x) or c == x):
                    count -= 1
                    continue
                yield c

        return self.__class__(_exclude())

    def reverse(self):
        """将消息链原地翻转。"""
        self.__root__.reverse()

    @classmethod
    def join(cls, *args: Iterable[Union[str, MessageComponent]]):
        return cls(
            Plain(c) if isinstance(c, str) else c
            for c in itertools.chain(*args)
        )

    @property
    def source(self) -> Optional['Source']:
        """获取消息链中的 `Source` 对象。"""
        return self.get_first(Source)

    @property
    def message_id(self) -> int:
        """获取消息链的 message_id,若无法获取,返回 -1。"""
        source = self.source
        return source.id if source else -1

Ancestors

  • MiraiBaseModel
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Static methods

def join(*args: Iterable[Union[MessageComponent, str]])
Expand source code
@classmethod
def join(cls, *args: Iterable[Union[str, MessageComponent]]):
    return cls(
        Plain(c) if isinstance(c, str) else c
        for c in itertools.chain(*args)
    )
def parse_obj(msg_chain: Iterable)

通过列表形式的消息链,构造对应的 MessageChain 对象。

Args

msg_chain
列表形式的消息链。
Expand source code
@classmethod
def parse_obj(cls, msg_chain: Iterable):
    """通过列表形式的消息链,构造对应的 `MessageChain` 对象。

    Args:
        msg_chain: 列表形式的消息链。
    """
    result = cls._parse_message_chain(msg_chain)
    return cls(__root__=result)

Instance variables

var message_id : int

获取消息链的 message_id,若无法获取,返回 -1。

Expand source code
@property
def message_id(self) -> int:
    """获取消息链的 message_id,若无法获取,返回 -1。"""
    source = self.source
    return source.id if source else -1
var source : Optional[Source]

获取消息链中的 Source 对象。

Expand source code
@property
def source(self) -> Optional['Source']:
    """获取消息链中的 `Source` 对象。"""
    return self.get_first(Source)

Methods

def append(self, x: Union[MessageComponent, str])

将一个消息元素或字符串元素添加到消息链末尾。

Args

x
消息元素或字符串元素。
Expand source code
def append(self, x: Union[MessageComponent, str]):
    """将一个消息元素或字符串元素添加到消息链末尾。

    Args:
        x: 消息元素或字符串元素。
    """
    self.__root__.append(Plain(x) if isinstance(x, str) else x)
def as_mirai_code(self) ‑> str

将消息链转换为 mirai 码字符串。

该方法会自动转换消息链中的元素。

Returns

mirai 码字符串。

Expand source code
def as_mirai_code(self) -> str:
    """将消息链转换为 mirai 码字符串。

    该方法会自动转换消息链中的元素。

    Returns:
        mirai 码字符串。
    """
    return "".join(
        component.as_mirai_code() for component in self.__root__
    )
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) ‑> int

返回消息链中 x 出现的次数。

Args

x (Union[MessageComponent, Type[MessageComponent]]): 要查找的消息元素或消息元素类型。

Returns

int
次数。
Expand source code
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int:
    """返回消息链中 x 出现的次数。

    Args:
        x (`Union[MessageComponent, Type[MessageComponent]]`):
            要查找的消息元素或消息元素类型。

    Returns:
        int: 次数。
    """
    if isinstance(x, type):
        return sum(1 for i in self if type(i) is x)
    if isinstance(x, MessageComponent):
        return self.__root__.count(x)
    raise TypeError(f"类型不匹配,当前类型:{type(x)}")
def exclude(self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1) ‑> MessageChain

返回移除指定元素或指定类型的元素后剩余的消息链。

Args

x
指定的元素或元素类型。
count
至多移除的数量。默认为全部移除。

Returns

MessageChain
剩余的消息链。
Expand source code
def exclude(
    self,
    x: Union[MessageComponent, Type[MessageComponent]],
    count: int = -1
) -> 'MessageChain':
    """返回移除指定元素或指定类型的元素后剩余的消息链。

    Args:
        x: 指定的元素或元素类型。
        count: 至多移除的数量。默认为全部移除。

    Returns:
        MessageChain: 剩余的消息链。
    """
    def _exclude():
        nonlocal count
        x_is_type = isinstance(x, type)
        for c in self:
            if count > 0 and ((x_is_type and type(c) is x) or c == x):
                count -= 1
                continue
            yield c

    return self.__class__(_exclude())
def extend(self, x: Iterable[Union[MessageComponent, str]])

将另一个消息链中的元素添加到消息链末尾。

Args

x
另一个消息链,也可为消息元素或字符串元素的序列。
Expand source code
def extend(self, x: Iterable[Union[MessageComponent, str]]):
    """将另一个消息链中的元素添加到消息链末尾。

    Args:
        x: 另一个消息链,也可为消息元素或字符串元素的序列。
    """
    self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x)
def get(self, index: Union[int, slice, Type[~TMessageComponent], Tuple[Type[~TMessageComponent], int]], count: Optional[int] = None) ‑> Union[MessageComponent, List[MessageComponent], List[~TMessageComponent]]

获取消息链中的某个(某些)消息组件,或某类型的消息组件。

Args

index (Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]): 如果为 int,则返回该索引处的消息组件。 如果为 slice,则返回该索引范围处的消息组件。 如果为 Type[TMessageComponent],则返回该类型的全部消息组件。 如果为 Tuple[Type[TMessageComponent], int],则返回该类型的至多 index[1] 个消息组件。

count
如果为 int,则返回至多 count 个消息组件。

Returns

MessageComponent
返回指定索引处的消息组件。
List[MessageComponent]
返回指定索引范围的消息组件。
List[TMessageComponent]
返回指定类型的消息组件构成的列表。
Expand source code
def get(
    self,
    index: Union[int, slice, Type[TMessageComponent],
                 Tuple[Type[TMessageComponent], int]],
    count: Optional[int] = None
) -> Union[MessageComponent, List[MessageComponent],
           List[TMessageComponent]]:
    """获取消息链中的某个(某些)消息组件,或某类型的消息组件。

    Args:
        index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`):
            如果为 `int`,则返回该索引处的消息组件。
            如果为 `slice`,则返回该索引范围处的消息组件。
            如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。
            如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。

        count: 如果为 `int`,则返回至多 `count` 个消息组件。

    Returns:
        MessageComponent: 返回指定索引处的消息组件。
        List[MessageComponent]: 返回指定索引范围的消息组件。
        List[TMessageComponent]: 返回指定类型的消息组件构成的列表。
    """
    # 正常索引
    if isinstance(index, int):
        return self.__root__[index]
    # 切片索引
    if isinstance(index, slice):
        return self.__root__[index]
    # 指定 count
    if count:
        if isinstance(index, type):
            index = (index, count)
        elif isinstance(index, tuple):
            index = (index[0], count if count < index[1] else index[1])
    # 索引对象为 MessageComponent 类,返回所有对应 component
    if isinstance(index, type):
        return [
            component for component in self if type(component) is index
        ]
    # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component
    if isinstance(index, tuple):
        components = (
            component for component in self if type(component) is index[0]
        )
        return [
            component for component, _ in zip(components, range(index[1]))
        ]
    raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}")
def get_first(self, t: Type[~TMessageComponent]) ‑> Optional[~TMessageComponent]

获取消息链中第一个符合类型的消息组件。

Expand source code
def get_first(self,
              t: Type[TMessageComponent]) -> Optional[TMessageComponent]:
    """获取消息链中第一个符合类型的消息组件。"""
    for component in self:
        if isinstance(component, t):
            return component
    return None
def has(self, sub: Union[MessageComponent, Type[MessageComponent], ForwardRef('MessageChain'), str]) ‑> bool

判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。

Args

sub (Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]): 若为 MessageComponent,则判断该组件是否在消息链中。 若为 Type[MessageComponent],则判断该组件类型是否在消息链中。 若为 MessageChain,则判断该子消息链是否在消息链中。 若为 str,则判断对应的 mirai 码中是否有某子字符串。

Returns

bool
是否找到。
Expand source code
def has(
    self, sub: Union[MessageComponent, Type[MessageComponent],
                     'MessageChain', str]
) -> bool:
    """判断消息链中:
    1. 是否有某个消息组件。
    2. 是否有某个类型的消息组件。
    3. 是否有某个子消息链。
    4. 对应的 mirai 码中是否有某子字符串。

    Args:
        sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`):
            若为 `MessageComponent`,则判断该组件是否在消息链中。
            若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。
            若为 `MessageChain`,则判断该子消息链是否在消息链中。
            若为 `str`,则判断对应的 mirai 码中是否有某子字符串。

    Returns:
        bool: 是否找到。
    """
    if isinstance(sub, type):  # 检测消息链中是否有某种类型的对象
        for i in self:
            if type(i) is sub:
                return True
        return False
    if isinstance(sub, MessageComponent):  # 检查消息链中是否有某个组件
        for i in self:
            if i == sub:
                return True
        return False
    if isinstance(sub, MessageChain):  # 检查消息链中是否有某个子消息链
        return bool(kmp(self, sub))
    if isinstance(sub, str):  # 检查消息中有无指定字符串子串
        return sub in deserialize(str(self))
    raise TypeError(f"类型不匹配,当前类型:{type(sub)}")
def index(self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1) ‑> int

返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。

Args

x (Union[MessageComponent, Type[MessageComponent]]):
要查找的消息元素或消息元素类型。
i
从哪个位置开始查找。
j
查找到哪个位置结束。

Returns

int
如果找到,则返回索引号。

Raises

ValueError
没有找到。
TypeError
类型不匹配。
Expand source code
def index(
    self,
    x: Union[MessageComponent, Type[MessageComponent]],
    i: int = 0,
    j: int = -1
) -> int:
    """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。

    Args:
        x (`Union[MessageComponent, Type[MessageComponent]]`):
            要查找的消息元素或消息元素类型。
        i: 从哪个位置开始查找。
        j: 查找到哪个位置结束。

    Returns:
        int: 如果找到,则返回索引号。

    Raises:
        ValueError: 没有找到。
        TypeError: 类型不匹配。
    """
    if isinstance(x, type):
        l = len(self)
        if i < 0:
            i += l
        if i < 0:
            i = 0
        if j < 0:
            j += l
        if j > l:
            j = l
        for index in range(i, j):
            if type(self[index]) is x:
                return index
        raise ValueError("消息链中不存在该类型的组件。")
    if isinstance(x, MessageComponent):
        return self.__root__.index(x, i, j)
    raise TypeError(f"类型不匹配,当前类型:{type(x)}")
def insert(self, i: int, x: Union[MessageComponent, str])

将一个消息元素或字符串添加到消息链中指定位置。

Args

i
插入位置。
x
消息元素或字符串元素。
Expand source code
def insert(self, i: int, x: Union[MessageComponent, str]):
    """将一个消息元素或字符串添加到消息链中指定位置。

    Args:
        i: 插入位置。
        x: 消息元素或字符串元素。
    """
    self.__root__.insert(i, Plain(x) if isinstance(x, str) else x)
def pop(self, i: int = -1) ‑> MessageComponent

从消息链中移除并返回指定位置的元素。

Args

i
移除位置。默认为末尾。

Returns

MessageComponent
移除的元素。
Expand source code
def pop(self, i: int = -1) -> MessageComponent:
    """从消息链中移除并返回指定位置的元素。

    Args:
        i: 移除位置。默认为末尾。

    Returns:
        MessageComponent: 移除的元素。
    """
    return self.__root__.pop(i)
def remove(self, x: Union[MessageComponent, Type[MessageComponent]])

从消息链中移除指定元素或指定类型的一个元素。

Args

x
指定的元素或元素类型。
Expand source code
def remove(self, x: Union[MessageComponent, Type[MessageComponent]]):
    """从消息链中移除指定元素或指定类型的一个元素。

    Args:
        x: 指定的元素或元素类型。
    """
    if isinstance(x, type):
        self.pop(self.index(x))
    if isinstance(x, MessageComponent):
        self.__root__.remove(x)
def reverse(self)

将消息链原地翻转。

Expand source code
def reverse(self):
    """将消息链原地翻转。"""
    self.__root__.reverse()
class MessageEvent (*args, **kwargs)

消息事件。

Args

type
事件名。
message_chain
消息内容。
Expand source code
class MessageEvent(Event):
    """消息事件。

    Args:
        type: 事件名。
        message_chain: 消息内容。
    """
    type: str
    """事件名。"""
    message_chain: MessageChain
    """消息内容。"""

Ancestors

Subclasses

Class variables

var message_chainMessageChain

消息内容。

Inherited members

class Method (value, names=None, *, module=None, qualname=None, type=None, start=1)

API 接口的调用方法。

Expand source code
class Method(str, Enum):
    """API 接口的调用方法。"""
    GET = "GET"
    """使用 GET 方法调用。"""
    POST = "POST"
    """使用 POST 方法调用。"""
    # 区分下面两个,是为了兼容 websocket
    RESTGET = "RESTGET"
    """表明这是一个对 RESTful 接口的 GET。"""
    RESTPOST = "RESTPOST"
    """表明这是一个对 RESTful 接口的 POST。"""
    MULTIPART = "MULTIPART"
    """表明这是一个使用了 multipart/form-data 的 POST。"""

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var GET

使用 GET 方法调用。

var MULTIPART

表明这是一个使用了 multipart/form-data 的 POST。

var POST

使用 POST 方法调用。

var RESTGET

表明这是一个对 RESTful 接口的 GET。

var RESTPOST

表明这是一个对 RESTful 接口的 POST。

class Mirai (qq: int, adapter: Adapter)

机器人主类。

使用了 __getattr__ 魔术方法,可以直接在对象上调用 API。

Mirai: 类包含 model 层封装,API 名称经过转写以符合命名规范,所有的 API 全部使用小写字母及下划线命名。

(API 名称也可使用原名。) API 参数可以使用具名参数,也可以使用位置参数,关于 API 参数的更多信息请参见模块 mirai.models.api

例如:

await bot.send_friend_message(12345678, [
    Plain("Hello World!")
])

也可以使用 call_api 方法,须注意此方法直接继承自 SimpleMirai,因此未经 model 层封装, 需要遵循 SimpleMirai 的规定。

Args

qq
QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
adapter
适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
Expand source code
class Mirai(SimpleMirai):
    """
    机器人主类。

    使用了 `__getattr__` 魔术方法,可以直接在对象上调用 API。

        Mirai: 类包含 model 层封装,API 名称经过转写以符合命名规范,所有的 API 全部使用小写字母及下划线命名。
    (API 名称也可使用原名。)
    API 参数可以使用具名参数,也可以使用位置参数,关于 API 参数的更多信息请参见模块 `mirai.models.api`。

    例如:
    ```py
    await bot.send_friend_message(12345678, [
        Plain("Hello World!")
    ])
    ```

    也可以使用 `call_api` 方法,须注意此方法直接继承自 `SimpleMirai`,因此未经 model 层封装,
    需要遵循 `SimpleMirai` 的规定。
    """
    def __init__(self, qq: int, adapter: Adapter):
        super().__init__(qq=qq, adapter=adapter)
        # 将 bus 更换为 ModelEventBus
        adapter.unregister_event_bus(self._bus)
        self._bus: ModelEventBus = ModelEventBus()
        adapter.register_event_bus(self._bus.base_bus)

    @property
    def bus(self) -> ModelEventBus:
        return self._bus

    async def call_api(self, api: str, *args, **kwargs):
        """调用 API。

        Args:
            api: API 名称。
            *args: 参数。
            **kwargs: 参数。
        """
        return await self._adapter.call_api(api, *args, **kwargs)

    def on(
        self,
        event_type: Union[Type[Event], str],
        priority: int = 0,
    ) -> Callable:
        """注册事件处理器。

        用法举例:
        ```python
        @bot.on(FriendMessage)
        async def on_friend_message(event: FriendMessage):
            print(f"收到来自{event.sender.nickname}的消息。")
        ```

        Args:
            event_type: 事件类或事件名。
            priority: 优先级,较小者优先。
        """
        return self._bus.on(event_type, priority)

    def api(self, api: str) -> ApiModel.Proxy:
        """获取 API Proxy 对象。

        API Proxy 提供更加简便的调用 API 的写法,详见 `mirai.models.api`。

        `Mirai` 的 `__getattr__` 与此方法完全相同,可支持直接在对象上调用 API。

        Args:
            api: API 名称。

        Returns:
            ApiModel.Proxy: API Proxy 对象。
        """
        api_type = ApiModel.get_subtype(api)
        return api_type.Proxy(self, api_type)

    def __getattr__(self, api: str) -> ApiModel.Proxy:
        return self.api(api)

    async def send(
        self,
        target: Union[Entity, MessageEvent],
        message: TMessage,
        quote: bool = False
    ) -> int:
        """发送消息。可以从 `Friend` `Group` 等对象,或者从 `MessageEvent` 中自动识别消息发送对象。

        Args:
            target: 目标对象。
            message: 发送的消息。
            quote: 是否以回复消息的形式发送,默认为 False。

        Returns:
            int: 发送的消息的 message_id。
        """
        # 识别消息发送对象
        if isinstance(target, TempMessage):
            quoting = target.message_chain.message_id if quote else None
            return (
                await self.send_temp_message(
                    qq=target.sender.id,
                    group=target.group.id,
                    message_chain=message,
                    quote=quoting
                )
            ).message_id

        if isinstance(target, MessageEvent):
            quoting = target.message_chain.message_id if quote else None
            target = target.sender
        else:
            quoting = None

        if isinstance(target, Friend):
            send_message = self.send_friend_message
            id_ = target.id
        elif isinstance(target, Group):
            send_message = self.send_group_message
            id_ = target.id
        elif isinstance(target, GroupMember):
            send_message = self.send_group_message
            id_ = target.group.id
        else:
            raise ValueError(f"{target} 不是有效的消息发送对象。")

        response = await send_message(
            target=id_, message_chain=message, quote=quoting
        )

        return response.message_id if response else -1

    async def get_friend(self, id_: int) -> Optional[Friend]:
        """获取好友对象。

        Args:
            id_: 好友 QQ 号。

        Returns:
            Friend: 好友对象。
            None: 好友不存在。
        """
        friend_list = await self.friend_list.get()
        if not friend_list:
            return None
        for friend in cast(List[Friend], friend_list):
            if friend.id == id_:
                return friend
        return None

    async def get_group(self, id_: int) -> Optional[Group]:
        """获取群组对象。

        Args:
            id_: 群号。

        Returns:
            Group: 群组对象。
            None: 群组不存在或 bot 未入群。
        """
        group_list = await self.group_list.get()
        if not group_list:
            return None
        for group in cast(List[Group], group_list):
            if group.id == id_:
                return group
        return None

    async def get_group_member(self, group: Union[Group, int],
                               id_: int) -> Optional[GroupMember]:
        """获取群成员对象。

        Args:
            group: 群组对象或群号。
            id_: 群成员 QQ 号。

        Returns:
            GroupMember: 群成员对象。
            None: 群成员不存在。
        """
        if isinstance(group, Group):
            group = group.id
        member_list = await self.member_list.get(group)
        if not member_list:
            return None
        for member in cast(List[GroupMember], member_list):
            if member.id == id_:
                return member
        return None

    async def get_entity(self, subject: Subject) -> Optional[Entity]:
        """获取实体对象。

        Args:
            subject: 以 `Subject` 表示的实体对象。

        Returns:
            Entity: 实体对象。
            None: 实体不存在。
        """
        if subject.kind == 'Friend':
            return await self.get_friend(subject.id)
        if subject.kind == 'Group':
            return await self.get_group(subject.id)
        return None

    @staticmethod
    async def is_admin(group: Group) -> bool:
        """判断机器人在群组中是否是管理员。

        Args:
            group: 群组对象。

        Returns:
            bool: 是否是管理员。
        """
        return group.permission in (Permission.Administrator, Permission.Owner)

    async def process_request(
        self,
        event: RequestEvent,
        operate: Union[int, RespOperate],
        message: str = ''
    ):
        """处理申请。

        Args:
            event: 申请事件。
            operate: 处理操作。
            message: 回复的信息。
        """
        api_type = cast(
            Type[RespEvent], getattr(mirai.models.api, 'Resp' + event.type)
        )
        api = api_type.from_event(event, operate, message)
        await api.call(self, 'POST')

    async def allow(self, event: RequestEvent, message: str = ''):
        """允许申请。

        Args:
            event: 申请事件。
            message: 回复的信息。
        """
        await self.process_request(event, RespOperate.ALLOW, message)

    async def decline(
        self, event: RequestEvent, message: str = '', ban: bool = False
    ):
        """拒绝申请。

        Args:
            event: 申请事件。
            message: 回复的信息。
            ban: 是否拉黑,默认为 False。
        """
        await self.process_request(
            event, RespOperate.DECLINE
            & RespOperate.BAN if ban else RespOperate.DECLINE, message
        )

    async def ignore(
        self, event: RequestEvent, message: str = '', ban: bool = False
    ):
        """忽略申请。

        Args:
            event: 申请事件。
            message: 回复的信息。
            ban: 是否拉黑,默认为 False。
        """
        await self.process_request(
            event, RespOperate.IGNORE
            & RespOperate.BAN if ban else RespOperate.DECLINE, message
        )

Ancestors

Class variables

var qq : int

Static methods

async def is_admin(group: Group) ‑> bool

判断机器人在群组中是否是管理员。

Args

group
群组对象。

Returns

bool
是否是管理员。
Expand source code
@staticmethod
async def is_admin(group: Group) -> bool:
    """判断机器人在群组中是否是管理员。

    Args:
        group: 群组对象。

    Returns:
        bool: 是否是管理员。
    """
    return group.permission in (Permission.Administrator, Permission.Owner)

Instance variables

var busModelEventBus
Expand source code
@property
def bus(self) -> ModelEventBus:
    return self._bus

Methods

async def allow(self, event: RequestEvent, message: str = '')

允许申请。

Args

event
申请事件。
message
回复的信息。
Expand source code
async def allow(self, event: RequestEvent, message: str = ''):
    """允许申请。

    Args:
        event: 申请事件。
        message: 回复的信息。
    """
    await self.process_request(event, RespOperate.ALLOW, message)
def api(self, api: str) ‑> ApiModel.Proxy

获取 API Proxy 对象。

API Proxy 提供更加简便的调用 API 的写法,详见 mirai.models.api

Mirai__getattr__ 与此方法完全相同,可支持直接在对象上调用 API。

Args

api
API 名称。

Returns

ApiModel.Proxy
API Proxy 对象。
Expand source code
def api(self, api: str) -> ApiModel.Proxy:
    """获取 API Proxy 对象。

    API Proxy 提供更加简便的调用 API 的写法,详见 `mirai.models.api`。

    `Mirai` 的 `__getattr__` 与此方法完全相同,可支持直接在对象上调用 API。

    Args:
        api: API 名称。

    Returns:
        ApiModel.Proxy: API Proxy 对象。
    """
    api_type = ApiModel.get_subtype(api)
    return api_type.Proxy(self, api_type)
async def decline(self, event: RequestEvent, message: str = '', ban: bool = False)

拒绝申请。

Args

event
申请事件。
message
回复的信息。
ban
是否拉黑,默认为 False。
Expand source code
async def decline(
    self, event: RequestEvent, message: str = '', ban: bool = False
):
    """拒绝申请。

    Args:
        event: 申请事件。
        message: 回复的信息。
        ban: 是否拉黑,默认为 False。
    """
    await self.process_request(
        event, RespOperate.DECLINE
        & RespOperate.BAN if ban else RespOperate.DECLINE, message
    )
async def get_entity(self, subject: Subject) ‑> Optional[Entity]

获取实体对象。

Args

subject
Subject 表示的实体对象。

Returns

Entity
实体对象。
None
实体不存在。
Expand source code
async def get_entity(self, subject: Subject) -> Optional[Entity]:
    """获取实体对象。

    Args:
        subject: 以 `Subject` 表示的实体对象。

    Returns:
        Entity: 实体对象。
        None: 实体不存在。
    """
    if subject.kind == 'Friend':
        return await self.get_friend(subject.id)
    if subject.kind == 'Group':
        return await self.get_group(subject.id)
    return None
async def get_friend(self, id_: int) ‑> Optional[Friend]

获取好友对象。

Args

id_
好友 QQ 号。

Returns

Friend
好友对象。
None
好友不存在。
Expand source code
async def get_friend(self, id_: int) -> Optional[Friend]:
    """获取好友对象。

    Args:
        id_: 好友 QQ 号。

    Returns:
        Friend: 好友对象。
        None: 好友不存在。
    """
    friend_list = await self.friend_list.get()
    if not friend_list:
        return None
    for friend in cast(List[Friend], friend_list):
        if friend.id == id_:
            return friend
    return None
async def get_group(self, id_: int) ‑> Optional[Group]

获取群组对象。

Args

id_
群号。

Returns

Group
群组对象。
None
群组不存在或 bot 未入群。
Expand source code
async def get_group(self, id_: int) -> Optional[Group]:
    """获取群组对象。

    Args:
        id_: 群号。

    Returns:
        Group: 群组对象。
        None: 群组不存在或 bot 未入群。
    """
    group_list = await self.group_list.get()
    if not group_list:
        return None
    for group in cast(List[Group], group_list):
        if group.id == id_:
            return group
    return None
async def get_group_member(self, group: Union[Group, int], id_: int) ‑> Optional[GroupMember]

获取群成员对象。

Args

group
群组对象或群号。
id_
群成员 QQ 号。

Returns

GroupMember
群成员对象。
None
群成员不存在。
Expand source code
async def get_group_member(self, group: Union[Group, int],
                           id_: int) -> Optional[GroupMember]:
    """获取群成员对象。

    Args:
        group: 群组对象或群号。
        id_: 群成员 QQ 号。

    Returns:
        GroupMember: 群成员对象。
        None: 群成员不存在。
    """
    if isinstance(group, Group):
        group = group.id
    member_list = await self.member_list.get(group)
    if not member_list:
        return None
    for member in cast(List[GroupMember], member_list):
        if member.id == id_:
            return member
    return None
async def ignore(self, event: RequestEvent, message: str = '', ban: bool = False)

忽略申请。

Args

event
申请事件。
message
回复的信息。
ban
是否拉黑,默认为 False。
Expand source code
async def ignore(
    self, event: RequestEvent, message: str = '', ban: bool = False
):
    """忽略申请。

    Args:
        event: 申请事件。
        message: 回复的信息。
        ban: 是否拉黑,默认为 False。
    """
    await self.process_request(
        event, RespOperate.IGNORE
        & RespOperate.BAN if ban else RespOperate.DECLINE, message
    )
def on(self, event_type: Union[Type[Event], str], priority: int = 0) ‑> Callable

注册事件处理器。

用法举例:

@bot.on(FriendMessage)
async def on_friend_message(event: FriendMessage):
    print(f"收到来自{event.sender.nickname}的消息。")

Args

event_type
事件类或事件名。
priority
优先级,较小者优先。
Expand source code
def on(
    self,
    event_type: Union[Type[Event], str],
    priority: int = 0,
) -> Callable:
    """注册事件处理器。

    用法举例:
    ```python
    @bot.on(FriendMessage)
    async def on_friend_message(event: FriendMessage):
        print(f"收到来自{event.sender.nickname}的消息。")
    ```

    Args:
        event_type: 事件类或事件名。
        priority: 优先级,较小者优先。
    """
    return self._bus.on(event_type, priority)
async def process_request(self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '')

处理申请。

Args

event
申请事件。
operate
处理操作。
message
回复的信息。
Expand source code
async def process_request(
    self,
    event: RequestEvent,
    operate: Union[int, RespOperate],
    message: str = ''
):
    """处理申请。

    Args:
        event: 申请事件。
        operate: 处理操作。
        message: 回复的信息。
    """
    api_type = cast(
        Type[RespEvent], getattr(mirai.models.api, 'Resp' + event.type)
    )
    api = api_type.from_event(event, operate, message)
    await api.call(self, 'POST')
async def send(self, target: Union[EntityMessageEvent], message: Union[MessageChain, Iterable[Union[MessageComponent, str]], MessageComponent, str], quote: bool = False) ‑> int

发送消息。可以从 Friend Group 等对象,或者从 MessageEvent 中自动识别消息发送对象。

Args

target
目标对象。
message
发送的消息。
quote
是否以回复消息的形式发送,默认为 False。

Returns

int
发送的消息的 message_id。
Expand source code
async def send(
    self,
    target: Union[Entity, MessageEvent],
    message: TMessage,
    quote: bool = False
) -> int:
    """发送消息。可以从 `Friend` `Group` 等对象,或者从 `MessageEvent` 中自动识别消息发送对象。

    Args:
        target: 目标对象。
        message: 发送的消息。
        quote: 是否以回复消息的形式发送,默认为 False。

    Returns:
        int: 发送的消息的 message_id。
    """
    # 识别消息发送对象
    if isinstance(target, TempMessage):
        quoting = target.message_chain.message_id if quote else None
        return (
            await self.send_temp_message(
                qq=target.sender.id,
                group=target.group.id,
                message_chain=message,
                quote=quoting
            )
        ).message_id

    if isinstance(target, MessageEvent):
        quoting = target.message_chain.message_id if quote else None
        target = target.sender
    else:
        quoting = None

    if isinstance(target, Friend):
        send_message = self.send_friend_message
        id_ = target.id
    elif isinstance(target, Group):
        send_message = self.send_group_message
        id_ = target.id
    elif isinstance(target, GroupMember):
        send_message = self.send_group_message
        id_ = target.group.id
    else:
        raise ValueError(f"{target} 不是有效的消息发送对象。")

    response = await send_message(
        target=id_, message_chain=message, quote=quoting
    )

    return response.message_id if response else -1

Inherited members

class MiraiRunner (*args, **kwargs_)

运行 SimpleMirai 对象的托管类。

使用此类以实现机器人的多例运行。

例如:

runner = MiraiRunner(mirai)
runner.run(host='127.0.0.1', port=8000)
Expand source code
class MiraiRunner(Singleton):
    """运行 SimpleMirai 对象的托管类。

    使用此类以实现机器人的多例运行。

    例如:
    ```py
    runner = MiraiRunner(mirai)
    runner.run(host='127.0.0.1', port=8000)
    ```
    """
    bots: Iterable[SimpleMirai]
    """运行的 SimpleMirai 对象。"""
    def __init__(self, *bots: SimpleMirai):
        """
        Args:
            *bots: 要运行的机器人。
        """
        self.bots = bots
        self._asgi = ASGI()
        self._asgi.add_event_handler('startup', self.startup)
        self._asgi.add_event_handler('shutdown', self.shutdown)

    async def startup(self):
        """开始运行。"""
        coros = [bot.startup() for bot in self.bots]
        await asyncio.gather(*coros)

    async def shutdown(self):
        """结束运行。"""
        coros = [bot.shutdown() for bot in self.bots]
        await asyncio.gather(*coros)

    async def __call__(self, scope, recv, send):
        await self._asgi(scope, recv, send)

    async def _run(self):
        try:
            await self.startup()
            backgrounds = [bot.background() for bot in self.bots]
            await asyncio.gather(*backgrounds)
        finally:
            await self.shutdown()

    def run(
        self,
        host: str = '127.0.0.1',
        port: int = 8000,
        asgi_server: str = 'auto',
        **kwargs
    ):
        """开始运行机器人。

        一般情况下,此函数会进入主循环,不再返回。
        """
        if not asgi_serve(
            self, host=host, port=port, asgi_server=asgi_server, **kwargs
        ):
            import textwrap
            logger = logging.getLogger(__name__)
            logger.warning(
                textwrap.dedent(
                    """
                    未找到可用的 ASGI 服务,反向 WebSocket 和 WebHook 上报将不可用。
                    仅 HTTP 轮询与正向 WebSocket 可用。
                    建议安装 ASGI 服务器,如 `uvicorn` 或 `hypercorn`。
                    在命令行键入:
                        pip install uvicorn
                    或者
                        pip install hypercorn
                    """
                ).strip()
            )
            try:
                asyncio.run(self._run())
            except (KeyboardInterrupt, SystemExit):
                exit()

Ancestors

Class variables

var bots : Iterable[SimpleMirai]

运行的 SimpleMirai 对象。

Methods

def run(self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)

开始运行机器人。

一般情况下,此函数会进入主循环,不再返回。

Expand source code
def run(
    self,
    host: str = '127.0.0.1',
    port: int = 8000,
    asgi_server: str = 'auto',
    **kwargs
):
    """开始运行机器人。

    一般情况下,此函数会进入主循环,不再返回。
    """
    if not asgi_serve(
        self, host=host, port=port, asgi_server=asgi_server, **kwargs
    ):
        import textwrap
        logger = logging.getLogger(__name__)
        logger.warning(
            textwrap.dedent(
                """
                未找到可用的 ASGI 服务,反向 WebSocket 和 WebHook 上报将不可用。
                仅 HTTP 轮询与正向 WebSocket 可用。
                建议安装 ASGI 服务器,如 `uvicorn` 或 `hypercorn`。
                在命令行键入:
                    pip install uvicorn
                或者
                    pip install hypercorn
                """
            ).strip()
        )
        try:
            asyncio.run(self._run())
        except (KeyboardInterrupt, SystemExit):
            exit()
async def shutdown(self)

结束运行。

Expand source code
async def shutdown(self):
    """结束运行。"""
    coros = [bot.shutdown() for bot in self.bots]
    await asyncio.gather(*coros)
async def startup(self)

开始运行。

Expand source code
async def startup(self):
    """开始运行。"""
    coros = [bot.startup() for bot in self.bots]
    await asyncio.gather(*coros)
class NetworkError (*args, **kwargs)

网络连接出错。

Expand source code
class NetworkError(RuntimeError):
    """网络连接出错。"""

Ancestors

  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException
class Plain (*args, **kwargs)

纯文本。

Expand source code
class Plain(MessageComponent):
    """纯文本。"""
    type: str = "Plain"
    """消息组件类型。"""
    text: str
    """文字消息。"""
    def __str__(self):
        return self.text

    def as_mirai_code(self) -> str:
        return serialize(self.text)

    def __repr__(self):
        return f'Plain({self.text!r})'

Ancestors

Class variables

var text : str

文字消息。

Inherited members

class Poke (*args, **kwargs)

戳一戳。

Expand source code
class Poke(MessageComponent):
    """戳一戳。"""
    type: str = "Poke"
    """消息组件类型。"""
    name: PokeNames
    """名称。"""
    @property
    def poke_type(self):
        return POKE_TYPE[self.name]

    @property
    def poke_id(self):
        return POKE_ID[self.name]

    def __str__(self):
        return f'[{POKE_NAME[self.name]}]'

    def as_mirai_code(self) -> str:
        return f'[mirai:poke:{self.name},{self.poke_type},{self.poke_id}]'

Ancestors

Class variables

var namePokeNames

名称。

Instance variables

var poke_id
Expand source code
@property
def poke_id(self):
    return POKE_ID[self.name]
var poke_type
Expand source code
@property
def poke_type(self):
    return POKE_TYPE[self.name]

Inherited members

class PokeNames (value, names=None, *, module=None, qualname=None, type=None, start=1)

戳一戳名称。

Expand source code
class PokeNames(str, Enum):
    """戳一戳名称。"""
    ChuoYiChuo = "ChuoYiChuo"
    BiXin = "BiXin"
    DianZan = "DianZan"
    XinSui = "XinSui"
    LiuLiuLiu = "LiuLiuLiu"
    FangDaZhao = "FangDaZhao"
    BaoBeiQiu = "BaoBeiQiu"
    Rose = "Rose"
    ZhaoHuanShu = "ZhaoHuanShu"
    RangNiPi = "RangNiPi"
    JeiYin = "JeiYin"
    ShouLei = "ShouLei"
    GouYin = "GouYin"
    ZhuaYiXia = "ZhuaYiXia"
    SuiPing = "SuiPing"
    QiaoMen = "QiaoMen"

    def as_component(self) -> 'Poke':
        return Poke(name=self.value)

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var BaoBeiQiu
var BiXin
var ChuoYiChuo
var DianZan
var FangDaZhao
var GouYin
var JeiYin
var LiuLiuLiu
var QiaoMen
var RangNiPi
var Rose
var ShouLei
var SuiPing
var XinSui
var ZhaoHuanShu
var ZhuaYiXia

Methods

def as_component(self) ‑> Poke
Expand source code
def as_component(self) -> 'Poke':
    return Poke(name=self.value)
class Shutdown (*args, **kwargs)

关闭事件。

Expand source code
class Shutdown(LifeSpan):
    """关闭事件。"""
    type: str = 'Shutdown'
    """事件名。"""

Ancestors

Inherited members

class SimpleMirai (qq: int, adapter: Adapter)

基于 adapter 和 bus,处于 model 层之下的机器人类。

使用了 __getattr__ 魔术方法,可以直接在对象上调用 API。

通过 SimpleMirai 调用 API 时,需注意此类不含 model 层封装, 因此 API 名称与参数名称需与 mirai-api-http 中的定义相同, 参数需要全部以具名参数的形式给出,并且需要指明使用的方法(GET/POST)。

例如:

await bot.sendFriendMessage(target=12345678, messageChain=[
    {"type": "Plain", "text": "Hello World!"}
], method="POST")

也可以使用 call_api 方法。

对于名称的路由含有二级目录的 API,由于名称中含有斜杠,必须使用 call_api 调用,例如:

file_list = await bot.call_api(
    "file/list", id="", target=12345678, method="GET"
)

Args

qq
QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
adapter
适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
Expand source code
class SimpleMirai(ApiProvider, AdapterInterface, AbstractEventBus):
    """
    基于 adapter 和 bus,处于 model 层之下的机器人类。

    使用了 `__getattr__` 魔术方法,可以直接在对象上调用 API。

    通过 `SimpleMirai` 调用 API 时,需注意此类不含 model 层封装,
    因此 API 名称与参数名称需与 mirai-api-http 中的定义相同,
    参数需要全部以具名参数的形式给出,并且需要指明使用的方法(GET/POST)。

    例如:
    ```py
    await bot.sendFriendMessage(target=12345678, messageChain=[
        {"type": "Plain", "text": "Hello World!"}
    ], method="POST")
    ```

    也可以使用 `call_api` 方法。

    对于名称的路由含有二级目录的 API,由于名称中含有斜杠,必须使用 `call_api` 调用,例如:
    ```py
    file_list = await bot.call_api(
        "file/list", id="", target=12345678, method="GET"
    )
    ```
    """
    qq: int

    def __init__(self, qq: int, adapter: Adapter):
        """
        Args:
            qq: QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
            adapter: 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
        """
        self.qq = qq

        self._adapter = adapter
        self._bus = EventBus()
        self._adapter.register_event_bus(self._bus)

    @property
    def bus(self) -> EventBus:
        return self._bus

    def subscribe(self, event, func: Callable, priority: int = 0) -> None:
        self._bus.subscribe(event, func, priority)

    def unsubscribe(self, event, func: Callable) -> None:
        self._bus.unsubscribe(event, func)

    async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]:
        return await self._bus.emit(event, *args, **kwargs)

    async def call_api(self, api: str, *args, **kwargs):
        """调用 API。

        Args:
            api: API 名称。
            *args: 参数。
            **kwargs: 参数。
        """
        warnings.warn("SimpleMirai 已弃用,将在 0.3 后移除。", DeprecationWarning)
        return await self._adapter.call_api(api, *args, **kwargs)

    def on(self, event: str, priority: int = 0) -> Callable:
        """注册事件处理器。

        用法举例:
        ```py
        @bot.on('FriendMessage')
        async def on_friend_message(event: dict):
            print(f"收到来自{event['sender']['nickname']}的消息。")
        ```

        Args:
            event: 事件名。
            priority: 优先级,小者优先。
        """
        return self._bus.on(event, priority=priority)

    @property
    def adapter_info(self) -> Dict[str, Any]:
        return self._adapter.adapter_info

    @contextlib.asynccontextmanager
    async def use_adapter(self, adapter: Adapter):
        """临时使用另一个适配器。

        用法:
        ```py
        async with bot.use_adapter(HTTPAdapter.via(bot)):
            ...
        ```

        Args:
            adapter: 使用的适配器。
        """
        origin_adapter = self._adapter
        await adapter.login(self.qq)
        self._adapter = adapter
        yield
        self._adapter = origin_adapter
        await adapter.logout(False)

    async def startup(self):
        """开始运行机器人(立即返回)。"""
        await self._adapter.login(self.qq)

        if self._adapter.single_mode:
            # Single Mode 下,QQ 号可以随便传入。这里从 session info 中获取正确的 QQ 号。
            self.qq = (await self.call_api('sessionInfo'))['data']['qq']['id']

        asyncio.create_task(self._adapter.emit("Startup", {'type': 'Startup'}))
        await self._adapter.start()

    async def background(self):
        """等待背景任务完成。"""
        await self._adapter.background

    async def shutdown(self):
        """结束运行机器人。"""
        await asyncio.create_task(
            self._adapter.emit("Shutdown", {'type': 'Shutdown'})
        )
        await self._adapter.logout()
        await self._adapter.shutdown()

    @property
    def session(self) -> str:
        """获取 session key,可用于调试。

        Returns:
            str: session key。
        """
        return self._adapter.session

    @property
    def asgi(self) -> 'MiraiRunner':
        """ASGI 对象,用于使用 uvicorn 等启动。"""
        return MiraiRunner(self)

    @staticmethod
    def add_background_task(func: Union[Callable, Awaitable, None] = None):
        """注册背景任务,将在 bot 启动后自动运行。

        Args:
            func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。
        """
        asgi = ASGI()
        return asgi.add_background_task(func)

    def run(
        self,
        host: str = '127.0.0.1',
        port: int = 8000,
        asgi_server: str = 'auto',
        **kwargs
    ):
        """开始运行机器人。

        一般情况下,此函数会进入主循环,不再返回。

        Args:
            host: YiriMirai 作为服务器的地址,默认为 127.0.0.1。
            port: YiriMirai 作为服务器的端口,默认为 8000。
            asgi_server: ASGI 服务器类型,可选项有 `'uvicorn'` `'hypercorn'` 和 `'auto'`。
            **kwargs: 可选参数。多余的参数将传递给 `uvicorn.run` 和 `hypercorn.run`。
        """
        MiraiRunner(self).run(host, port, asgi_server, **kwargs)

Ancestors

Subclasses

Class variables

var qq : int

Static methods

def add_background_task(func: Union[Callable, Awaitable, NoneType] = None)

注册背景任务,将在 bot 启动后自动运行。

Args

func(Union[Callable, Awaitable, None]): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。

Expand source code
@staticmethod
def add_background_task(func: Union[Callable, Awaitable, None] = None):
    """注册背景任务,将在 bot 启动后自动运行。

    Args:
        func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。
    """
    asgi = ASGI()
    return asgi.add_background_task(func)

Instance variables

var asgiMiraiRunner

ASGI 对象,用于使用 uvicorn 等启动。

Expand source code
@property
def asgi(self) -> 'MiraiRunner':
    """ASGI 对象,用于使用 uvicorn 等启动。"""
    return MiraiRunner(self)
var busEventBus
Expand source code
@property
def bus(self) -> EventBus:
    return self._bus
var session : str

获取 session key,可用于调试。

Returns

str
session key。
Expand source code
@property
def session(self) -> str:
    """获取 session key,可用于调试。

    Returns:
        str: session key。
    """
    return self._adapter.session

Methods

async def background(self)

等待背景任务完成。

Expand source code
async def background(self):
    """等待背景任务完成。"""
    await self._adapter.background
async def call_api(self, api: str, *args, **kwargs)

调用 API。

Args

api
API 名称。
*args
参数。
**kwargs
参数。
Expand source code
async def call_api(self, api: str, *args, **kwargs):
    """调用 API。

    Args:
        api: API 名称。
        *args: 参数。
        **kwargs: 参数。
    """
    warnings.warn("SimpleMirai 已弃用,将在 0.3 后移除。", DeprecationWarning)
    return await self._adapter.call_api(api, *args, **kwargs)
def on(self, event: str, priority: int = 0) ‑> Callable

注册事件处理器。

用法举例:

@bot.on('FriendMessage')
async def on_friend_message(event: dict):
    print(f"收到来自{event['sender']['nickname']}的消息。")

Args

event
事件名。
priority
优先级,小者优先。
Expand source code
def on(self, event: str, priority: int = 0) -> Callable:
    """注册事件处理器。

    用法举例:
    ```py
    @bot.on('FriendMessage')
    async def on_friend_message(event: dict):
        print(f"收到来自{event['sender']['nickname']}的消息。")
    ```

    Args:
        event: 事件名。
        priority: 优先级,小者优先。
    """
    return self._bus.on(event, priority=priority)
def run(self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)

开始运行机器人。

一般情况下,此函数会进入主循环,不再返回。

Args

host
YiriMirai 作为服务器的地址,默认为 127.0.0.1。
port
YiriMirai 作为服务器的端口,默认为 8000。
asgi_server
ASGI 服务器类型,可选项有 'uvicorn' 'hypercorn''auto'
**kwargs
可选参数。多余的参数将传递给 uvicorn.runhypercorn.run
Expand source code
def run(
    self,
    host: str = '127.0.0.1',
    port: int = 8000,
    asgi_server: str = 'auto',
    **kwargs
):
    """开始运行机器人。

    一般情况下,此函数会进入主循环,不再返回。

    Args:
        host: YiriMirai 作为服务器的地址,默认为 127.0.0.1。
        port: YiriMirai 作为服务器的端口,默认为 8000。
        asgi_server: ASGI 服务器类型,可选项有 `'uvicorn'` `'hypercorn'` 和 `'auto'`。
        **kwargs: 可选参数。多余的参数将传递给 `uvicorn.run` 和 `hypercorn.run`。
    """
    MiraiRunner(self).run(host, port, asgi_server, **kwargs)
async def shutdown(self)

结束运行机器人。

Expand source code
async def shutdown(self):
    """结束运行机器人。"""
    await asyncio.create_task(
        self._adapter.emit("Shutdown", {'type': 'Shutdown'})
    )
    await self._adapter.logout()
    await self._adapter.shutdown()
async def startup(self)

开始运行机器人(立即返回)。

Expand source code
async def startup(self):
    """开始运行机器人(立即返回)。"""
    await self._adapter.login(self.qq)

    if self._adapter.single_mode:
        # Single Mode 下,QQ 号可以随便传入。这里从 session info 中获取正确的 QQ 号。
        self.qq = (await self.call_api('sessionInfo'))['data']['qq']['id']

    asyncio.create_task(self._adapter.emit("Startup", {'type': 'Startup'}))
    await self._adapter.start()
async def use_adapter(self, adapter: Adapter)

临时使用另一个适配器。

用法:

async with bot.use_adapter(HTTPAdapter.via(bot)):
    ...

Args

adapter
使用的适配器。
Expand source code
@contextlib.asynccontextmanager
async def use_adapter(self, adapter: Adapter):
    """临时使用另一个适配器。

    用法:
    ```py
    async with bot.use_adapter(HTTPAdapter.via(bot)):
        ...
    ```

    Args:
        adapter: 使用的适配器。
    """
    origin_adapter = self._adapter
    await adapter.login(self.qq)
    self._adapter = adapter
    yield
    self._adapter = origin_adapter
    await adapter.logout(False)

Inherited members

class SkipExecution (*args, **kwargs)

跳过同优先度的事件处理器,进入下一优先度。

Expand source code
class SkipExecution(Exception):
    """跳过同优先度的事件处理器,进入下一优先度。"""

Ancestors

  • builtins.Exception
  • builtins.BaseException
class Startup (*args, **kwargs)

启动事件。

Expand source code
class Startup(LifeSpan):
    """启动事件。"""
    type: str = 'Startup'
    """事件名。"""

Ancestors

Inherited members

class StopExecution (*args, **kwargs)

终止事件处理器执行,但不阻止事件向上传播。

Expand source code
class StopExecution(Exception):
    """终止事件处理器执行,但不阻止事件向上传播。"""

Ancestors

  • builtins.Exception
  • builtins.BaseException
class StopPropagation (*args, **kwargs)

终止事件处理器执行,并停止事件向上传播。

Expand source code
class StopPropagation(Exception):
    """终止事件处理器执行,并停止事件向上传播。"""

Ancestors

  • builtins.Exception
  • builtins.BaseException
class StrangerMessage (*args, **kwargs)

陌生人消息。

Args

type
事件名。
sender
发送消息的人。
message_chain
消息内容。
Expand source code
class StrangerMessage(MessageEvent):
    """陌生人消息。

    Args:
        type: 事件名。
        sender: 发送消息的人。
        message_chain: 消息内容。
    """
    type: str = 'StrangerMessage'
    """事件名。"""
    sender: Friend
    """发送消息的人。"""
    message_chain: MessageChain
    """消息内容。"""

Ancestors

Class variables

var senderFriend

发送消息的人。

Inherited members

class TempMessage (*args, **kwargs)

群临时消息。

Args

type
事件名。
sender
发送消息的群成员。
message_chain
消息内容。
Expand source code
class TempMessage(MessageEvent):
    """群临时消息。

    Args:
        type: 事件名。
        sender: 发送消息的群成员。
        message_chain: 消息内容。
    """
    type: str = 'TempMessage'
    """事件名。"""
    sender: GroupMember
    """发送消息的群成员。"""
    message_chain: MessageChain
    """消息内容。"""
    @property
    def group(self) -> Group:
        return self.sender.group

Ancestors

Class variables

var senderGroupMember

发送消息的群成员。

Instance variables

var groupGroup
Expand source code
@property
def group(self) -> Group:
    return self.sender.group

Inherited members

class Voice (*args, **kwargs)

语音。

Expand source code
class Voice(MessageComponent):
    """语音。"""
    type: str = "Voice"
    """消息组件类型。"""
    voice_id: Optional[str] = None
    """语音的 voice_id,不为空时将忽略 url 属性。"""
    url: Optional[str] = None
    """语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。"""
    path: Optional[str] = None
    """语音的路径,发送本地语音。"""
    base64: Optional[str] = None
    """语音的 Base64 编码。"""
    length: Optional[int] = None
    """语音的长度,单位为秒。"""
    @validator('path')
    def validate_path(cls, path: Optional[str]):
        """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。"""
        if path:
            try:
                return str(Path(path).resolve(strict=True))
            except FileNotFoundError:
                raise ValueError(f"无效路径:{path}")
        else:
            return path

    def __str__(self):
        return '[语音]'

    async def download(
        self,
        filename: Union[str, Path, None] = None,
        directory: Union[str, Path, None] = None
    ):
        """下载语音到本地。

        语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。

        Args:
            filename: 下载到本地的文件路径。与 `directory` 二选一。
            directory: 下载到本地的文件夹路径。与 `filename` 二选一。
        """
        if not self.url:
            logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。')
            return

        import httpx
        async with httpx.AsyncClient() as client:
            response = await client.get(self.url)
            response.raise_for_status()
            content = response.content

            if filename:
                path = Path(filename)
                path.parent.mkdir(parents=True, exist_ok=True)
            elif directory:
                path = Path(directory)
                path.mkdir(parents=True, exist_ok=True)
                path = path / f'{self.voice_id}.silk'
            else:
                raise ValueError("请指定文件路径或文件夹路径!")

            import aiofiles
            async with aiofiles.open(path, 'wb') as f:
                await f.write(content)

    @classmethod
    async def from_local(
        cls,
        filename: Union[str, Path, None] = None,
        content: Optional[bytes] = None,
    ) -> "Voice":
        """从本地文件路径加载语音,以 base64 的形式传递。

        Args:
            filename: 从本地文件路径加载语音,与 `content` 二选一。
            content: 从本地文件内容加载语音,与 `filename` 二选一。
        """
        if content:
            pass
        if filename:
            path = Path(filename)
            import aiofiles
            async with aiofiles.open(path, 'rb') as f:
                content = await f.read()
        else:
            raise ValueError("请指定语音路径或语音内容!")
        import base64
        img = cls(base64=base64.b64encode(content).decode())
        return img

Ancestors

Class variables

var base64 : Optional[str]

语音的 Base64 编码。

var length : Optional[int]

语音的长度,单位为秒。

var path : Optional[str]

语音的路径,发送本地语音。

var url : Optional[str]

语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。

var voice_id : Optional[str]

语音的 voice_id,不为空时将忽略 url 属性。

Static methods

async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Voice

从本地文件路径加载语音,以 base64 的形式传递。

Args

filename
从本地文件路径加载语音,与 content 二选一。
content
从本地文件内容加载语音,与 filename 二选一。
Expand source code
@classmethod
async def from_local(
    cls,
    filename: Union[str, Path, None] = None,
    content: Optional[bytes] = None,
) -> "Voice":
    """从本地文件路径加载语音,以 base64 的形式传递。

    Args:
        filename: 从本地文件路径加载语音,与 `content` 二选一。
        content: 从本地文件内容加载语音,与 `filename` 二选一。
    """
    if content:
        pass
    if filename:
        path = Path(filename)
        import aiofiles
        async with aiofiles.open(path, 'rb') as f:
            content = await f.read()
    else:
        raise ValueError("请指定语音路径或语音内容!")
    import base64
    img = cls(base64=base64.b64encode(content).decode())
    return img
def validate_path(path: Optional[str])

修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。

Expand source code
@validator('path')
def validate_path(cls, path: Optional[str]):
    """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。"""
    if path:
        try:
            return str(Path(path).resolve(strict=True))
        except FileNotFoundError:
            raise ValueError(f"无效路径:{path}")
    else:
        return path

Methods

async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None)

下载语音到本地。

语音采用 silk v3 格式,silk 格式的编码解码请使用 graiax-silkcoder

Args

filename
下载到本地的文件路径。与 directory 二选一。
directory
下载到本地的文件夹路径。与 filename 二选一。
Expand source code
async def download(
    self,
    filename: Union[str, Path, None] = None,
    directory: Union[str, Path, None] = None
):
    """下载语音到本地。

    语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。

    Args:
        filename: 下载到本地的文件路径。与 `directory` 二选一。
        directory: 下载到本地的文件夹路径。与 `filename` 二选一。
    """
    if not self.url:
        logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。')
        return

    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(self.url)
        response.raise_for_status()
        content = response.content

        if filename:
            path = Path(filename)
            path.parent.mkdir(parents=True, exist_ok=True)
        elif directory:
            path = Path(directory)
            path.mkdir(parents=True, exist_ok=True)
            path = path / f'{self.voice_id}.silk'
        else:
            raise ValueError("请指定文件路径或文件夹路径!")

        import aiofiles
        async with aiofiles.open(path, 'wb') as f:
            await f.write(content)

Inherited members

class WebHookAdapter (verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False)

WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
route
适配器的路由,默认在根目录上提供服务。
extra_headers
额外请求头,与 mirai-api-http 的配置一致。
enable_quick_response
是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
single_mode
是否启用单例模式。
Expand source code
class WebHookAdapter(Adapter):
    """WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。"""
    session: str
    """WebHook 不需要 session,此处为机器人的 QQ 号。"""
    route: str
    """适配器的路由。"""
    extra_headers: Mapping[str, str]
    """额外请求头。"""
    enable_quick_response: bool
    """是否启用快速响应。"""
    def __init__(
        self,
        verify_key: Optional[str],
        route: str = '/',
        extra_headers: Optional[Mapping[str, str]] = None,
        enable_quick_response: bool = True,
        single_mode: bool = False
    ):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            route: 适配器的路由,默认在根目录上提供服务。
            extra_headers: 额外请求头,与 mirai-api-http 的配置一致。
            enable_quick_response: 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
            single_mode: 是否启用单例模式。
        """
        super().__init__(verify_key=verify_key, single_mode=single_mode)
        self.route = route
        self.extra_headers = extra_headers or {}
        self.enable_quick_response = enable_quick_response

        async def endpoint(request: Request):
            # 鉴权(QQ 号和额外请求头)
            if request.headers.get('bot') != self.session:  # 验证 QQ 号
                logger.debug(f"收到来自其他账号({request.headers.get('bot')})的事件。")
                return
            for key in self.extra_headers:  # 验证请求头
                key = key.lower()  # HTTP headers 不区分大小写
                request_value = request.headers.get(key, '').lower()
                expect_value = self.extra_headers[key].lower()
                if (request_value != expect_value
                    ) and (request_value != '[' + expect_value + ']'):
                    logger.info(
                        f"请求头验证失败:expect [{expect_value}], " +
                        f"got {request_value}。"
                    )
                    return JSONResponse(
                        status_code=401, content={'error': 'Unauthorized'}
                    )
            # 处理事件
            event = await request.json()
            result = await self.handle_event(event)
            return YiriMiraiJSONResponse(result)

        ASGI().add_route(self.route, endpoint, methods=['POST'])

    class QuickResponse(BaseException):
        """WebHook 快速响应,以异常的方式跳出。"""
        def __init__(self, data: dict):
            self.data = data

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
            'route': self.route,
            'extra_headers': self.extra_headers,
            'enable_quick_response': self.enable_quick_response,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "WebHookAdapter":
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in [
                    'route', 'extra_headers', 'enable_quick_response',
                    'single_mode'
                ] if key in info
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    async def login(self, qq: int):
        """WebHook 不需要登录。直接返回。"""
        self.session = str(qq)
        logger.info(f'[WebHook] 成功登录到账号{qq}。')

    async def logout(self, terminate: bool = True):
        """WebHook 不需要登出。直接返回。"""
        logger.info(f"[WebHook] 从账号{self.session}退出。")

    async def call_api(self, api: str, method: Method = Method.GET, **params):
        """调用 API。WebHook 的 API 调用只能在快速响应中发生。"""
        if self.enable_quick_response:
            content = {'command': api.replace('/', '_'), 'content': params}
            if method == Method.RESTGET:
                content['subCommand'] = 'get'
            elif method == Method.RESTPOST:
                content['subCommand'] = 'update'
            elif method == Method.MULTIPART:
                raise NotImplementedError(
                    "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。"
                )

            logger.debug(f'[WebHook] WebHook 快速响应 {api}。')
            raise WebHookAdapter.QuickResponse(content)
        return None

    async def _background(self):
        """WebHook 不需要事件循环。直接返回。"""

    async def handle_event(self, event):
        try:
            tasks = await self.emit(event['type'], event)
            await asyncio.gather(*tasks)
        except WebHookAdapter.QuickResponse as response:
            # 快速响应,直接返回。
            return response.data

        return {}

Ancestors

Class variables

var QuickResponse

WebHook 快速响应,以异常的方式跳出。

var enable_quick_response : bool

是否启用快速响应。

var extra_headers : Mapping[str, str]

额外请求头。

var route : str

适配器的路由。

Methods

async def call_api(self, api: str, method: Method = Method.GET, **params)

调用 API。WebHook 的 API 调用只能在快速响应中发生。

Expand source code
async def call_api(self, api: str, method: Method = Method.GET, **params):
    """调用 API。WebHook 的 API 调用只能在快速响应中发生。"""
    if self.enable_quick_response:
        content = {'command': api.replace('/', '_'), 'content': params}
        if method == Method.RESTGET:
            content['subCommand'] = 'get'
        elif method == Method.RESTPOST:
            content['subCommand'] = 'update'
        elif method == Method.MULTIPART:
            raise NotImplementedError(
                "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。"
            )

        logger.debug(f'[WebHook] WebHook 快速响应 {api}。')
        raise WebHookAdapter.QuickResponse(content)
    return None
async def handle_event(self, event)
Expand source code
async def handle_event(self, event):
    try:
        tasks = await self.emit(event['type'], event)
        await asyncio.gather(*tasks)
    except WebHookAdapter.QuickResponse as response:
        # 快速响应,直接返回。
        return response.data

    return {}
async def login(self, qq: int)

WebHook 不需要登录。直接返回。

Expand source code
async def login(self, qq: int):
    """WebHook 不需要登录。直接返回。"""
    self.session = str(qq)
    logger.info(f'[WebHook] 成功登录到账号{qq}。')
async def logout(self, terminate: bool = True)

WebHook 不需要登出。直接返回。

Expand source code
async def logout(self, terminate: bool = True):
    """WebHook 不需要登出。直接返回。"""
    logger.info(f"[WebHook] 从账号{self.session}退出。")

Inherited members

class WebSocketAdapter (verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60.0)

WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
host
WebSocket Server 的地址。
port
WebSocket Server 的端口。
sync_id
mirai-api-http 配置的同步 ID。
single_mode
是否启用单例模式。
heartbeat_interval
每隔多久发送心跳包,单位秒。
Expand source code
class WebSocketAdapter(Adapter):
    """WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。"""
    host_name: str
    """WebSocket Server 的地址。"""
    sync_id: str
    """mirai-api-http 配置的同步 ID。"""
    qq: int
    """机器人的 QQ 号。"""
    connection: Optional[WebSocketClientProtocol]
    """WebSocket 客户端连接。"""
    heartbeat_interval: float
    """每隔多久发送心跳包,单位:秒。"""
    def __init__(
        self,
        verify_key: Optional[str],
        host: str,
        port: int,
        sync_id: str = '-1',
        single_mode: bool = False,
        heartbeat_interval: float = 60.,
    ):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            host: WebSocket Server 的地址。
            port: WebSocket Server 的端口。
            sync_id: mirai-api-http 配置的同步 ID。
            single_mode: 是否启用单例模式。
            heartbeat_interval: 每隔多久发送心跳包,单位秒。
        """
        super().__init__(verify_key=verify_key, single_mode=single_mode)

        self._host = host
        self._port = port

        if host[:2] == '//':
            host = 'ws:' + host
        elif host[:7] == 'http://' or host[:8] == 'https://':
            raise exceptions.NetworkError(f'{host} 不是一个可用的 WebSocket 地址!')
        elif host[:5] != 'ws://':
            host = 'ws://' + host

        if host[-1:] == '/':
            host = host[:-1]

        self.host_name = f'{host}:{port}/all'

        self.sync_id = sync_id  # 这个神奇的 sync_id,默认值 -1,居然是个字符串
        # 既然这样不如把 sync_id 全改成字符串好了

        self.qq = 0
        self.connection = None

        self.heartbeat_interval = heartbeat_interval

        # 接收 WebSocket 数据的 Task
        self._receiver_task: Optional[asyncio.Task] = None
        # 用于临时保存接收到的数据,以便根据 sync_id 进行同步识别
        self._recv_dict: Dict[str, deque] = defaultdict(deque)
        # 本地同步 ID,每次调用 API 递增。
        self._local_sync_id = random.randint(1, 1024) * 1024
        # 事件处理任务管理器
        self._tasks = Tasks()
        # 心跳机制(Keep-Alive):上次发送数据包的时间
        self._last_send_time: float = 0.

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
            'host': self._host,
            'port': self._port,
            'sync_id': self.sync_id,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "WebSocketAdapter":
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in ['host', 'port', 'sync_id', 'single_mode']
                if key in info
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    @_error_handler_async_local
    async def _receiver(self):
        """开始接收 websocket 数据。"""
        if not self.connection:
            raise exceptions.NetworkError(
                f'WebSocket 通道 {self.host_name} 未连接!'
            )
        while True:
            try:
                # 数据格式:
                # {
                #   'syncId': '-1',
                #   'data': {
                #       // Event Content
                #   }
                # }
                response = json.loads(await self.connection.recv())
                data = response['data']

                logger.debug(
                    f"[WebSocket] 收到 WebSocket 数据,同步 ID:{response['syncId']}。"
                )
                self._recv_dict[response['syncId']].append(data)
            except KeyError:
                logger.error(f'[WebSocket] 不正确的数据:{response}')
            except ConnectionClosedOK:
                raise SystemExit()
            except ConnectionClosed as e:
                exit_message = f'[WebSocket] WebSocket 通道意外关闭。code: {e.code}, reason: {e.reason}'
                logger.error(exit_message)
                raise SystemExit(exit_message)

    async def _recv(self, sync_id: str = '-1', timeout: int = 600) -> dict:
        """接收并解析 websocket 数据。"""
        timer = range(timeout) if timeout > 0 else repeat(0)
        for _ in timer:
            if self._recv_dict[sync_id]:
                data = self._recv_dict[sync_id].popleft()

                if data.get('code', 0) != 0:
                    raise exceptions.ApiError(data)

                return data
                # 如果没有对应同步 ID 的数据,则等待 websocket 数据
                # 目前存在问题:如果 mah 发回的数据不含 sync_id,
                # 这里就会无限循环……
                # 所以还是限制次数好了。
            await asyncio.sleep(0.1)
        raise TimeoutError(
            f'[WebSocket] mirai-api-http 响应超时,可能是由于调用出错。同步 ID:{sync_id}。'
        )

    @_error_handler_async_local
    async def login(self, qq: int):
        headers = {
            'verifyKey': self.verify_key or
                         '',  # 关闭认证时,WebSocket 可传入任意 verify_key
            'qq': str(qq),
        }
        if self.session:
            headers['sessionKey'] = self.session

        self.connection = await connect(self.host_name, extra_headers=headers)
        self._receiver_task = asyncio.create_task(self._receiver())

        verify_response = await self._recv('')  # 神奇现象:这里的 syncId 是个空字符串
        self.session = verify_response['session']

        self.qq = qq
        logger.info(f'[WebSocket] 成功登录到账号{qq}。')

    @_error_handler_async_local
    async def logout(self, terminate: bool = True):
        if self.connection:
            await self.connection.close()

            await self._receiver_task

            logger.info(f"[WebSocket] 从账号{self.qq}退出。")

    async def poll_event(self):
        """获取并处理事件。"""
        event = await self._recv(self.sync_id, -1)

        self._tasks.create_task(self.emit(event['type'], event))

    async def call_api(self, api: str, method: Method = Method.GET, **params):
        if not self.connection:
            raise exceptions.NetworkError(
                f'WebSocket 通道 {self.host_name} 未连接!'
            )
        self._local_sync_id += 1  # 使用不同的 sync_id
        sync_id = str(self._local_sync_id)
        content = {
            'syncId': sync_id,
            'command': api.replace('/', '_'),
            'content': params
        }
        if method == Method.RESTGET:
            content['subCommand'] = 'get'
        elif method == Method.RESTPOST:
            content['subCommand'] = 'update'
        elif method == Method.MULTIPART:
            raise NotImplementedError(
                "WebSocket 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。"
            )

        await self.connection.send(json_dumps(content))
        self._last_send_time = time.time()
        logger.debug(f"[WebSocket] 发送 WebSocket 数据,同步 ID:{sync_id}。")
        try:
            return await self._recv(sync_id)
        except TimeoutError as e:
            logger.debug(e)

    async def _heartbeat(self):
        while True:
            await asyncio.sleep(self.heartbeat_interval)
            if time.time() - self._last_send_time > self.heartbeat_interval:
                await self.call_api('about')
                self._last_send_time = time.time()
                logger.debug("[WebSocket] 发送心跳包。")

    async def _background(self):
        """开始接收事件。"""
        logger.info('[WebSocket] 机器人开始运行。按 Ctrl + C 停止。')

        try:
            heartbeat = asyncio.create_task(self._heartbeat())
            while True:
                await self.poll_event()
        finally:
            await Tasks.cancel(heartbeat)
            await self._tasks.cancel_all()

Ancestors

Class variables

var connection : Optional[websockets.legacy.client.WebSocketClientProtocol]

WebSocket 客户端连接。

var heartbeat_interval : float

每隔多久发送心跳包,单位:秒。

var host_name : str

WebSocket Server 的地址。

var qq : int

机器人的 QQ 号。

var sync_id : str

mirai-api-http 配置的同步 ID。

Methods

async def poll_event(self)

获取并处理事件。

Expand source code
async def poll_event(self):
    """获取并处理事件。"""
    event = await self._recv(self.sync_id, -1)

    self._tasks.create_task(self.emit(event['type'], event))

Inherited members