Package mirai
YiriMirai
一个轻量级、低耦合的基于 mirai-api-http 的 Python SDK。
更多信息请看文档。
Expand source code
# -*- coding: utf-8 -*-
"""
# YiriMirai
一个轻量级、低耦合的基于 mirai-api-http 的 Python SDK。
更多信息请看[文档](https://yiri-mirai.vercel.app/)。
"""
__version__ = '0.2.6.2'
__author__ = '忘忧北萱草'
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mirai.adapters import (
Adapter, ComposeAdapter, HTTPAdapter, WebHookAdapter, WebSocketAdapter
)
else:
from mirai.adapters import Adapter
from mirai.api_provider import Method
from mirai.bot import (
LifeSpan, Mirai, MiraiRunner, Shutdown, SimpleMirai, Startup
)
from mirai.bus import EventBus
from mirai.colorlog import ColoredFormatter
from mirai.exceptions import (
ApiError, NetworkError, SkipExecution, StopExecution, StopPropagation
)
from mirai.models import (
At, AtAll, Dice, Event, Face, FriendMessage, GroupMessage, Image,
MessageChain, MessageEvent, Plain, Poke, PokeNames, StrangerMessage,
TempMessage, Voice, deserialize, serialize
)
__all__ = [
'Mirai', 'SimpleMirai', 'MiraiRunner', 'LifeSpan', 'Startup', 'Shutdown',
'Adapter', 'Method', 'HTTPAdapter', 'WebSocketAdapter', 'WebHookAdapter',
'ComposeAdapter', 'EventBus', 'get_logger', 'Event', 'MessageEvent',
'FriendMessage', 'GroupMessage', 'TempMessage', 'StrangerMessage',
'MessageChain', 'Plain', 'At', 'AtAll', 'Dice', 'Face', 'Poke',
'PokeNames', 'Image', 'Voice', 'serialize', 'deserialize', 'ApiError',
'NetworkError', 'SkipExecution', 'StopExecution', 'StopPropagation'
]
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = ColoredFormatter(
'%(asctime)s - %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
ch.setFormatter(formatter)
logger.addHandler(ch)
def get_logger() -> logging.Logger:
"""获取 YiriMirai 的模块 Logger。
所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。
Returns:
logging.Logger: 模块 Logger。
"""
return logger
def __getattr__(name):
if name in (
'HTTPAdapter', 'WebSocketAdapter', 'WebHookAdapter', 'ComposeAdapter'
):
import mirai.adapters
return getattr(mirai.adapters, name)
raise AttributeError(f'Module {__name__} has no attribute {name}')
Sub-modules
mirai.adapters-
此模块提供网络适配器相关。 …
mirai.api_provider-
此模块提供 ApiProvider 的基础定义。
mirai.asgi-
此模块提供公共 ASGI 前端。
mirai.bot-
此模块定义机器人类,包含处于 model 层之下的
SimpleMirai和建立在 model 层上的Mirai。 mirai.bus-
此模块提供基础事件总线相关。 …
mirai.colorlog-
此模块提供带颜色的格式化日志输出的功能。 …
mirai.exceptions-
此模块提供异常相关。
mirai.models-
此模块提供 model 层封装。 …
mirai.tasks-
此模块提供了管理多个异步任务的一种方式。
mirai.utils-
此模块提供一些实用的辅助方法。
Functions
def deserialize(s: str) ‑> str-
mirai 码去转义。
Args
s- 待去转义的字符串。
Returns
str- 去转义后的字符串。
Expand source code
def deserialize(s: str) -> str: """mirai 码去转义。 Args: s: 待去转义的字符串。 Returns: str: 去转义后的字符串。 """ return re.sub( r'\\([\[\]:,\\])', lambda match: match.group(1), s.replace('\\n', '\n').replace('\\r', '\r') ) def get_logger() ‑> logging.Logger-
获取 YiriMirai 的模块 Logger。
所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。
Returns
logging.Logger- 模块 Logger。
Expand source code
def get_logger() -> logging.Logger: """获取 YiriMirai 的模块 Logger。 所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。 Returns: logging.Logger: 模块 Logger。 """ return logger def serialize(s: str) ‑> str-
mirai 码转义。
Args
s- 待转义的字符串。
Returns
str- 去转义后的字符串。
Expand source code
def serialize(s: str) -> str: """mirai 码转义。 Args: s: 待转义的字符串。 Returns: str: 去转义后的字符串。 """ return re.sub(r'[\[\]:,\\]', lambda match: '\\' + match.group(0), s).replace('\n', '\\n').replace('\r', '\\r')
Classes
class Adapter (verify_key: Optional[str], single_mode: bool = False)-
适配器基类,与 mirai-api-http 沟通的底层实现。
属性
buses为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。Args
verify_key- mirai-api-http 配置的认证 key,关闭认证时为 None。
single_mode- 是否开启 single_mode,开启后与 session 将无效。
Expand source code
class Adapter(ApiProvider, AdapterInterface): """适配器基类,与 mirai-api-http 沟通的底层实现。 属性 `buses` 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。 """ verify_key: Optional[str] """mirai-api-http 配置的认证 key,关闭认证时为 None。""" single_mode: bool """是否开启 single_mode,开启后与 session 将无效。""" session: str """从 mirai-api-http 处获得的 session。""" buses: Set[AbstractEventBus] """注册的事件总线集合。""" background: Optional[asyncio.Task] """背景事件循环任务。""" def __init__(self, verify_key: Optional[str], single_mode: bool = False): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 single_mode: 是否开启 single_mode,开启后与 session 将无效。 """ self.verify_key = verify_key self.single_mode = single_mode self.session = '' self.buses = set() self.background = None @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "Adapter": """从适配器接口创建适配器。 Args: adapter_interface: 适配器接口。 Returns: Adapter: 创建的适配器。 """ info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['single_mode'] if info.get(key) is not None } ) adapter.session = cast(str, info.get('session')) return adapter def register_event_bus(self, *buses: AbstractEventBus): """注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses |= set(buses) def unregister_event_bus(self, *buses: AbstractEventBus): """解除注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses -= set(buses) @abc.abstractmethod async def login(self, qq: int): """登录到 mirai-api-http。""" @abc.abstractmethod async def logout(self, terminate: bool = True): """登出。""" @abc.abstractmethod async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。 Args: api: API 名称,需与 mirai-api-http 中的定义一致。 method: 调用方法。默认为 GET。 **params: 参数。 """ @abc.abstractmethod async def _background(self): """背景事件循环,用于接收事件。""" async def start(self): """运行背景事件循环。""" if not self.buses: raise RuntimeError('事件总线未指定!') if not self.session: raise RuntimeError('未登录!') self.background = asyncio.create_task(self._background()) async def shutdown(self): """停止背景事件循环。""" if self.background: await Tasks.cancel(self.background) async def emit(self, event: str, *args, **kwargs): """向事件总线发送一个事件。 Args: event: 事件名称。 *args: 事件参数。 **kwargs: 事件参数。 """ coros = [bus.emit(event, *args, **kwargs) for bus in self.buses] return sum(await asyncio.gather(*coros), [])Ancestors
- ApiProvider
- AdapterInterface
- abc.ABC
Subclasses
Class variables
var background : Optional[_asyncio.Task]-
背景事件循环任务。
var buses : Set[AbstractEventBus]-
注册的事件总线集合。
var session : str-
从 mirai-api-http 处获得的 session。
var single_mode : bool-
是否开启 single_mode,开启后与 session 将无效。
var verify_key : Optional[str]-
mirai-api-http 配置的认证 key,关闭认证时为 None。
Static methods
def via(adapter_interface: AdapterInterface) ‑> Adapter-
Expand source code
@classmethod def via(cls, adapter_interface: AdapterInterface) -> "Adapter": """从适配器接口创建适配器。 Args: adapter_interface: 适配器接口。 Returns: Adapter: 创建的适配器。 """ info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['single_mode'] if info.get(key) is not None } ) adapter.session = cast(str, info.get('session')) return adapter
Methods
async def call_api(self, api: str, method: Method = Method.GET, **params)-
调用 API。
Args
api- API 名称,需与 mirai-api-http 中的定义一致。
method- 调用方法。默认为 GET。
**params- 参数。
Expand source code
@abc.abstractmethod async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。 Args: api: API 名称,需与 mirai-api-http 中的定义一致。 method: 调用方法。默认为 GET。 **params: 参数。 """ async def emit(self, event: str, *args, **kwargs)-
向事件总线发送一个事件。
Args
event- 事件名称。
*args- 事件参数。
**kwargs- 事件参数。
Expand source code
async def emit(self, event: str, *args, **kwargs): """向事件总线发送一个事件。 Args: event: 事件名称。 *args: 事件参数。 **kwargs: 事件参数。 """ coros = [bus.emit(event, *args, **kwargs) for bus in self.buses] return sum(await asyncio.gather(*coros), []) async def login(self, qq: int)-
登录到 mirai-api-http。
Expand source code
@abc.abstractmethod async def login(self, qq: int): """登录到 mirai-api-http。""" async def logout(self, terminate: bool = True)-
登出。
Expand source code
@abc.abstractmethod async def logout(self, terminate: bool = True): """登出。""" def register_event_bus(self, *buses: AbstractEventBus)-
注册事件总线。
Args
*buses- 一个或多个事件总线。
Expand source code
def register_event_bus(self, *buses: AbstractEventBus): """注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses |= set(buses) async def shutdown(self)-
停止背景事件循环。
Expand source code
async def shutdown(self): """停止背景事件循环。""" if self.background: await Tasks.cancel(self.background) async def start(self)-
运行背景事件循环。
Expand source code
async def start(self): """运行背景事件循环。""" if not self.buses: raise RuntimeError('事件总线未指定!') if not self.session: raise RuntimeError('未登录!') self.background = asyncio.create_task(self._background()) def unregister_event_bus(self, *buses: AbstractEventBus)-
解除注册事件总线。
Args
*buses- 一个或多个事件总线。
Expand source code
def unregister_event_bus(self, *buses: AbstractEventBus): """解除注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses -= set(buses)
Inherited members
class ApiError (response: dict)-
调用 API 出错。
Args
response(
dict): mirai-api-http 的 API 返回结果。Expand source code
class ApiError(RuntimeError): """调用 API 出错。""" def __init__(self, response: dict): """ Args: response(`dict`): mirai-api-http 的 API 返回结果。 """ code = response['code'] self.code = code self.args = ( code, f'[ERROR {code}]' + API_ERROR_FMT.get(code, ''), response.get('msg', '') )Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class At (*args, **kwargs)-
At某人。
Expand source code
class At(MessageComponent): """At某人。""" type: str = "At" """消息组件类型。""" target: int """群员 QQ 号。""" display: Optional[str] = None """At时显示的文字,发送消息时无效,自动使用群名片。""" def __eq__(self, other): return isinstance(other, At) and self.target == other.target def __str__(self): return f"@{self.display or self.target}" def as_mirai_code(self) -> str: return f"[mirai:at:{self.target}]"Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var display : Optional[str]-
At时显示的文字,发送消息时无效,自动使用群名片。
var target : int-
群员 QQ 号。
Inherited members
class AtAll (*args, **kwargs)-
At全体。
Expand source code
class AtAll(MessageComponent): """At全体。""" type: str = "AtAll" """消息组件类型。""" def __str__(self): return "@全体成员" def as_mirai_code(self) -> str: return f"[mirai:atall]"Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class ComposeAdapter (api_channel: Adapter, event_channel: Adapter)-
组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。
Args
api_channel- 提供 API 调用的适配器。
event_channel- 提供事件处理的适配器。
Expand source code
class ComposeAdapter(Adapter): """组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。""" api_channel: Adapter """提供 API 调用的适配器。""" event_channel: Adapter """提供事件处理的适配器。""" def __init__(self, api_channel: Adapter, event_channel: Adapter): """ Args: api_channel: 提供 API 调用的适配器。 event_channel: 提供事件处理的适配器。 """ super().__init__( verify_key=api_channel.verify_key, single_mode=api_channel.single_mode ) if api_channel.verify_key != event_channel.verify_key: raise ValueError('组合适配器应使用相同的 verify_key。') self.api_channel = api_channel self.event_channel = event_channel event_channel.buses = self.buses self.verify_key = api_channel.verify_key self.single_mode = api_channel.single_mode @property def adapter_info(self): return self.api_channel.adapter_info async def login(self, qq: int): await self.api_channel.login(qq) # 绑定 session self.event_channel.session = self.api_channel.session self.session = self.api_channel.session await self.event_channel.login(qq) async def logout(self): await self.event_channel.logout() await self.api_channel.logout() async def call_api(self, api: str, method: Method = Method.GET, **params): return await self.api_channel.call_api(api, method, **params) async def _background(self): await self.event_channel._background()Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var api_channel : Adapter-
提供 API 调用的适配器。
var event_channel : Adapter-
提供事件处理的适配器。
Inherited members
class Dice (*args, **kwargs)-
骰子。
Expand source code
class Dice(MessageComponent): """骰子。""" type: str = "Dice" """消息组件类型。""" value: int """点数。""" def __str__(self): return f'[骰子{self.value}]' def as_mirai_code(self) -> str: return f'[mirai:dice:{self.value}]'Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var value : int-
点数。
Inherited members
class Event (*args, **kwargs)-
事件基类。
Args
type- 事件名。
Expand source code
class Event(MiraiIndexedModel): """事件基类。 Args: type: 事件名。 """ type: str """事件名。""" def __repr__(self): return self.__class__.__name__ + '(' + ', '.join( ( f'{k}={repr(v)}' for k, v in self.__dict__.items() if k != 'type' and v ) ) + ')' @classmethod def parse_subtype(cls, obj: dict) -> 'Event': try: return cast(Event, super().parse_subtype(obj)) except ValueError: return Event(type=obj['type']) @classmethod def get_subtype(cls, name: str) -> Type['Event']: try: return cast(Type[Event], super().get_subtype(name)) except ValueError: return EventAncestors
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
- LifeSpan
- BotEvent
- CommandEvent
- FriendEvent
- FriendRecallEvent
- GroupEvent
- MessageEvent
- NudgeEvent
- mirai.models.events.OtherClientEvent
- RequestEvent
Class variables
var type : str-
事件名。
Inherited members
class EventBus (event_chain_generator: Callable[[str], Iterable[str]] = <function event_chain_single>)-
事件总线。
事件总线提供了一个简单的方法,用于分发事件。事件处理器可以通过
subscribe或on注册事件, 并通过emit来触发事件。事件链(Event Chain)是一种特殊的事件处理机制。事件链包含一系列事件,其中底层事件触发时,上层事件也会响应。
事件总线的构造函数中的
event_chain_generator参数规定了生成事件链的方式。 此模块中的event_chain_single和event_chain_separator可应用于此参数,分别生成单一事件的事件链和按照分隔符划分的事件链。Args
event_chain_generator- 一个函数, 输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器, 默认行为是事件链只包含单一事件。
Expand source code
class EventBus(AbstractEventBus): """事件总线。 事件总线提供了一个简单的方法,用于分发事件。事件处理器可以通过 `subscribe` 或 `on` 注册事件, 并通过 `emit` 来触发事件。 事件链(Event Chain)是一种特殊的事件处理机制。事件链包含一系列事件,其中底层事件触发时,上层事件也会响应。 事件总线的构造函数中的 `event_chain_generator` 参数规定了生成事件链的方式。 此模块中的 `event_chain_single` 和 `event_chain_separator` 可应用于此参数,分别生成单一事件的事件链和按照分隔符划分的事件链。 """ def __init__( self, event_chain_generator: Callable[[str], Iterable[str]] = event_chain_single, ): """ Args: event_chain_generator: 一个函数, 输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器, 默认行为是事件链只包含单一事件。 """ self._subscribers: Dict[ str, PriorityDict[Callable]] = defaultdict(PriorityDict) self.event_chain_generator = event_chain_generator def subscribe(self, event: str, func: Callable, priority: int = 0) -> None: """注册事件处理器。 Args: event: 事件名。 func: 事件处理器。 priority: 优先级,小者优先。 """ self._subscribers[event].add(priority, func) def unsubscribe(self, event: str, func: Callable) -> None: """移除事件处理器。 Args: event: 事件名。 func: 事件处理器。 """ try: self._subscribers[event].remove(func) except KeyError: logger.warning(f'试图移除事件 `{event}` 的一个不存在的事件处理器 `{func}`。') def on(self, event: str, priority: int = 0) -> Callable: """以装饰器的方式注册事件处理器。 例如: ```py @bus.on('Event.MyEvent') def my_event_handler(event): print(event) ``` Args: event: 事件名。 priority: 优先级,小者优先。 """ def decorator(func: Callable) -> Callable: self.subscribe(event, func, priority) return func return decorator async def emit(self, event: str, *args, **kwargs) -> List[Awaitable[Any]]: """触发一个事件。 异步执行说明:`await emit` 时执行事件处理器,所有事件处理器执行完后,并行运行所有快速响应。 Args: event: 要触发的事件名称。 *args: 传递给事件处理器的参数。 **kwargs: 传递给事件处理器的参数。 Returns: List[Awaitable[Any]]: 所有事件处理器的快速响应协程的 Task。 """ async def call(f) -> Optional[Awaitable[Any]]: result = await async_with_exception(f(*args, **kwargs)) # 快速响应:如果事件处理器返回一个协程,那么立即运行这个协程。 if inspect.isawaitable(result): return async_with_exception(result) # 当不使用快速响应时,返回值无意义。 return None coros: List[Optional[Awaitable[Any]]] = [] try: for m_event in self.event_chain_generator(event): try: # 使用 list 避免 _subscribers 被改变引起错误。 for listeners in list(self._subscribers[m_event]): try: # noinspection PyTypeChecker callee = (call(f) for f in listeners) coros += await asyncio.gather(*callee) except SkipExecution: continue except StopExecution: continue except StopPropagation: pass # 只保留快速响应的返回值。 return [asyncio.create_task(coro) for coro in filter(None, coros)]Ancestors
Subclasses
Inherited members
class Face (*args, **kwargs)-
表情。
Expand source code
class Face(MessageComponent): """表情。""" type: str = "Face" """消息组件类型。""" face_id: Optional[int] = None """QQ 表情编号,可选,优先度高于 name。""" name: Optional[str] = None """QQ表情名称,可选。""" def __init__(self, *args, **kwargs): if len(args) == 1: if isinstance(args[0], str): self.name = args[0] elif isinstance(args[0], int): self.face_id = args[0] super().__init__(**kwargs) super().__init__(*args, **kwargs) def __eq__(self, other): return isinstance(other, Face) and \ (self.face_id == other.face_id or self.name == other.name) def __str__(self): if self.name: return f'[{self.name}]' return '[表情]' def as_mirai_code(self): return f"[mirai:face:{self.face_id}]"Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var face_id : Optional[int]-
QQ 表情编号,可选,优先度高于 name。
var name : Optional[str]-
QQ表情名称,可选。
Inherited members
class FriendMessage (*args, **kwargs)-
好友消息。
Args
type- 事件名。
sender- 发送消息的好友。
message_chain- 消息内容。
Expand source code
class FriendMessage(MessageEvent): """好友消息。 Args: type: 事件名。 sender: 发送消息的好友。 message_chain: 消息内容。 """ type: str = 'FriendMessage' """事件名。""" sender: Friend """发送消息的好友。""" message_chain: MessageChain """消息内容。"""Ancestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : Friend-
发送消息的好友。
Inherited members
class GroupMessage (*args, **kwargs)-
群消息。
Args
type- 事件名。
sender- 发送消息的群成员。
message_chain- 消息内容。
Expand source code
class GroupMessage(MessageEvent): """群消息。 Args: type: 事件名。 sender: 发送消息的群成员。 message_chain: 消息内容。 """ type: str = 'GroupMessage' """事件名。""" sender: GroupMember """发送消息的群成员。""" message_chain: MessageChain """消息内容。""" @property def group(self) -> Group: return self.sender.groupAncestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : GroupMember-
发送消息的群成员。
Instance variables
var group : Group-
Expand source code
@property def group(self) -> Group: return self.sender.group
Inherited members
class HTTPAdapter (verify_key: Optional[str], host: str, port: int, poll_interval: float = 1.0, single_mode: bool = False)-
HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。
Args
verify_key- mirai-api-http 配置的认证 key,关闭认证时为 None。
host- HTTP Server 的地址。
port- HTTP Server 的端口。
poll_interval- 轮询时间间隔,单位秒。
single_mode- 是否为单例模式。
Expand source code
class HTTPAdapter(Adapter): """HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。""" host_name: str """mirai-api-http 的 HTTPAdapter Server 主机名。""" poll_interval: float """轮询时间间隔,单位秒。""" qq: int """机器人的 QQ 号。""" headers: httpx.Headers """HTTP 请求头。""" def __init__( self, verify_key: Optional[str], host: str, port: int, poll_interval: float = 1., single_mode: bool = False ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 host: HTTP Server 的地址。 port: HTTP Server 的端口。 poll_interval: 轮询时间间隔,单位秒。 single_mode: 是否为单例模式。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self._host = host self._port = port if host[:2] == '//': host = 'http:' + host elif host[:8] == 'https://': raise exceptions.NetworkError('不支持 HTTPS!') elif host[:7] != 'http://': host = 'http://' + host if host[-1:] == '/': host = host[:-1] self.host_name = f'{host}:{port}' self.poll_interval = poll_interval self.qq = 0 self.headers = httpx.Headers() # 使用 headers 传递 session self._tasks = Tasks() @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'host': self._host, 'port': self._port, 'poll_interval': self.poll_interval, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "HTTPAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['host', 'port', 'poll_interval', 'single_mode'] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter @_error_handler_async_local async def _post(self, client: httpx.AsyncClient, url: str, json: dict) -> Optional[dict]: """调用 POST 方法。""" # 使用自定义的 json.dumps content = json_dumps(json).encode('utf-8') try: response = await client.post( url, content=content, headers={'Content-Type': 'application/json'}, timeout=10. ) logger.debug( f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] POST 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def _get(self, client: httpx.AsyncClient, url: str, params: dict) -> Optional[dict]: """调用 GET 方法。""" try: response = await client.get(url, params=params, timeout=10.) logger.debug( f'[HTTP] 发送 GET 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] GET 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def _post_multipart( self, client: httpx.AsyncClient, url: str, data: dict, files: dict ) -> Optional[dict]: """调用 POST 方法,发送 multipart 数据。""" try: response = await client.post( url, data=data, files=files, timeout=30. ) logger.debug( f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] POST 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def login(self, qq: int): async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: if not self.session: if self.verify_key is not None: self.session = ( await self._post( client, '/verify', { "verifyKey": self.verify_key, } ) )['session'] else: self.session = str(random.randint(1, 2**20)) if not self.single_mode: await self._post( client, '/bind', { "sessionKey": self.session, "qq": qq, } ) self.headers = httpx.Headers({'sessionKey': self.session}) self.qq = qq logger.info(f'[HTTP] 成功登录到账号{qq}。') @_error_handler_async_local async def logout(self, terminate: bool = True): if self.session and not self.single_mode: if terminate: async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: await self._post( client, '/release', { "sessionKey": self.session, "qq": self.qq, } ) logger.info(f"[HTTP] 从账号{self.qq}退出。") async def poll_event(self): """进行一次轮询,获取并处理事件。""" async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: msg_count = (await self._get(client, '/countMessage', {}))['data'] if msg_count > 0: msg_list = ( await self._get(client, '/fetchMessage', {'count': msg_count}) )['data'] coros = [self.emit(msg['type'], msg) for msg in msg_list] await asyncio.gather(*coros) async def call_api(self, api: str, method: Method = Method.GET, **params) -> Optional[dict]: async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: if method == Method.GET or method == Method.RESTGET: return await self._get(client, f'/{api}', params) if method == Method.POST or method == Method.RESTPOST: return await self._post(client, f'/{api}', params) if method == Method.MULTIPART: return await self._post_multipart( client, f'/{api}', params['data'], params['files'] ) return None async def _background(self): """开始轮询。""" logger.info('[HTTP] 机器人开始运行。按 Ctrl + C 停止。') try: while True: self._tasks.create_task(self.poll_event()) await asyncio.sleep(self.poll_interval) finally: await self._tasks.cancel_all()Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var headers : httpx.Headers-
HTTP 请求头。
var host_name : str-
mirai-api-http 的 HTTPAdapter Server 主机名。
var poll_interval : float-
轮询时间间隔,单位秒。
var qq : int-
机器人的 QQ 号。
Methods
async def poll_event(self)-
进行一次轮询,获取并处理事件。
Expand source code
async def poll_event(self): """进行一次轮询,获取并处理事件。""" async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: msg_count = (await self._get(client, '/countMessage', {}))['data'] if msg_count > 0: msg_list = ( await self._get(client, '/fetchMessage', {'count': msg_count}) )['data'] coros = [self.emit(msg['type'], msg) for msg in msg_list] await asyncio.gather(*coros)
Inherited members
class Image (*args, **kwargs)-
图片。
Expand source code
class Image(MessageComponent): """图片。""" type: str = "Image" """消息组件类型。""" image_id: Optional[str] = None """图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。""" url: Optional[HttpUrl] = None """图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。""" path: Union[str, Path, None] = None """图片的路径,发送本地图片。""" base64: Optional[str] = None """图片的 Base64 编码。""" width: int """图片的宽度""" height: int """图片的高度""" def __eq__(self, other): return isinstance( other, Image ) and self.type == other.type and self.uuid == other.uuid def __str__(self): return '[图片]' def as_mirai_code(self) -> str: return f"[mirai:image:{self.image_id}]" @validator('path') def validate_path(cls, path: Union[str, Path, None]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path @property def uuid(self): image_id = self.image_id if image_id[0] == '{': # 群图片 image_id = image_id[1:37] elif image_id[0] == '/': # 好友图片 image_id = image_id[1:] return image_id def as_group_image(self) -> str: return f"{{{self.uuid.upper()}}}.jpg" def as_friend_image(self) -> str: return f"/{self.uuid.lower()}" def as_flash_image(self) -> "FlashImage": return FlashImage( image_id=self.image_id, url=self.url, path=self.path, base64=self.base64 ) async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None, determine_type: bool = True ): """下载图片到本地。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 determine_type: 是否自动根据图片类型确定拓展名,默认为 True。 """ if not self.url: logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) if determine_type: import imghdr path = path.with_suffix( '.' + str(imghdr.what(None, content)) ) path.parent.mkdir(parents=True, exist_ok=True) elif directory: import imghdr path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.uuid}.{imghdr.what(None, content)}' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) return path @classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Image": """从本地文件路径加载图片,以 base64 的形式传递。 Args: filename: 从本地文件路径加载图片,与 `content` 二选一。 content: 从本地文件内容加载图片,与 `filename` 二选一。 Returns: Image: 图片对象。 """ if content: pass elif filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定图片路径或图片内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img @classmethod def from_unsafe_path(cls, path: Union[str, Path]) -> "Image": """从不安全的路径加载图片。 Args: path: 从不安全的路径加载图片。 Returns: Image: 图片对象。 """ return cls.construct(path=str(path))Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var base64 : Optional[str]-
图片的 Base64 编码。
var height : int-
图片的高度
var image_id : Optional[str]-
图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。
var path : Union[str, pathlib.Path, NoneType]-
图片的路径,发送本地图片。
var url : Optional[pydantic.networks.HttpUrl]-
图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。
var width : int-
图片的宽度
Static methods
async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Image-
从本地文件路径加载图片,以 base64 的形式传递。
Args
filename- 从本地文件路径加载图片,与
content二选一。 content- 从本地文件内容加载图片,与
filename二选一。
Returns
Image- 图片对象。
Expand source code
@classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Image": """从本地文件路径加载图片,以 base64 的形式传递。 Args: filename: 从本地文件路径加载图片,与 `content` 二选一。 content: 从本地文件内容加载图片,与 `filename` 二选一。 Returns: Image: 图片对象。 """ if content: pass elif filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定图片路径或图片内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img def from_unsafe_path(path: Union[str, pathlib.Path]) ‑> Image-
Expand source code
@classmethod def from_unsafe_path(cls, path: Union[str, Path]) -> "Image": """从不安全的路径加载图片。 Args: path: 从不安全的路径加载图片。 Returns: Image: 图片对象。 """ return cls.construct(path=str(path)) def validate_path(path: Union[str, pathlib.Path, NoneType])-
修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。
Expand source code
@validator('path') def validate_path(cls, path: Union[str, Path, None]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path
Instance variables
var uuid-
Expand source code
@property def uuid(self): image_id = self.image_id if image_id[0] == '{': # 群图片 image_id = image_id[1:37] elif image_id[0] == '/': # 好友图片 image_id = image_id[1:] return image_id
Methods
def as_flash_image(self) ‑> FlashImage-
Expand source code
def as_flash_image(self) -> "FlashImage": return FlashImage( image_id=self.image_id, url=self.url, path=self.path, base64=self.base64 ) def as_friend_image(self) ‑> str-
Expand source code
def as_friend_image(self) -> str: return f"/{self.uuid.lower()}" def as_group_image(self) ‑> str-
Expand source code
def as_group_image(self) -> str: return f"{{{self.uuid.upper()}}}.jpg" async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None, determine_type: bool = True)-
下载图片到本地。
Args
filename- 下载到本地的文件路径。与
directory二选一。 directory- 下载到本地的文件夹路径。与
filename二选一。 determine_type- 是否自动根据图片类型确定拓展名,默认为 True。
Expand source code
async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None, determine_type: bool = True ): """下载图片到本地。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 determine_type: 是否自动根据图片类型确定拓展名,默认为 True。 """ if not self.url: logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) if determine_type: import imghdr path = path.with_suffix( '.' + str(imghdr.what(None, content)) ) path.parent.mkdir(parents=True, exist_ok=True) elif directory: import imghdr path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.uuid}.{imghdr.what(None, content)}' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) return path
Inherited members
class LifeSpan (*args, **kwargs)-
生命周期事件。
Expand source code
class LifeSpan(Event): """生命周期事件。""" type: str = 'LifeSpan' """事件名。"""Ancestors
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Inherited members
class MessageChain-
消息链。
一个构造消息链的例子:
message_chain = MessageChain([ AtAll(), Plain("Hello World!"), ])Plain可以省略。message_chain = MessageChain([ AtAll(), "Hello World!", ])在调用 API 时,参数中需要 MessageChain 的,也可以使用
List[MessageComponent]代替。 例如,以下两种写法是等价的:await bot.send_friend_message(12345678, [ Plain("Hello World!") ])await bot.send_friend_message(12345678, MessageChain([ Plain("Hello World!") ]))使用
str(message_chain)获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的文档。 获取未转义的消息链字符串,可以使用deserialize()(str(message_chain))。可以使用 for 循环遍历消息链中的消息组件。
for component in message_chain: print(repr(component))可以使用
==运算符比较两个消息链是否相同。another_msg_chain = MessageChain([ { "type": "AtAll" }, { "type": "Plain", "text": "Hello World!" }, ]) print(message_chain == another_msg_chain) 'True'可以使用
in运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。if AtAll in message_chain: print('AtAll') if At(bot.qq) in message_chain: print('At Me') if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain: print('Hello!') if 'Hello' in message_chain: print('Hi!')消息链的
has方法和in等价。if message_chain.has(AtAll): print('AtAll')也可以使用
>=和<=运算符:if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain: print('Hello!')需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。
消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。
plain_list = message_chain[Plain] '[Plain("Hello World!")]'以
类型, 数量为索引,获取前至多多少个该类型的消息组件。plain_list_first = message_chain[Plain, 1] '[Plain("Hello World!")]'消息链的
get方法和索引操作等价。plain_list_first = message_chain.get(Plain) '[Plain("Hello World!")]'消息链的
get方法还可指定第二个参数count,这相当于以类型, 数量为索引。plain_list_first = message_chain.get(Plain, 1) # 这等价于 plain_list_first = message_chain[Plain, 1]可以用加号连接两个消息链。
MessageChain(['Hello World!']) + MessageChain(['Goodbye World!']) # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")])可以用
*运算符复制消息链。MessageChain(['Hello World!']) * 2 # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")])除此之外,消息链还支持很多 list 拥有的操作,比如
index和count。message_chain = MessageChain([ AtAll(), "Hello World!", ]) message_chain.index(Plain) # 返回 0 message_chain.count(Plain) # 返回 1消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。
Expand source code
class MessageChain(MiraiBaseModel): """消息链。 一个构造消息链的例子: ```py message_chain = MessageChain([ AtAll(), Plain("Hello World!"), ]) ``` `Plain` 可以省略。 ```py message_chain = MessageChain([ AtAll(), "Hello World!", ]) ``` 在调用 API 时,参数中需要 MessageChain 的,也可以使用 `List[MessageComponent]` 代替。 例如,以下两种写法是等价的: ```py await bot.send_friend_message(12345678, [ Plain("Hello World!") ]) ``` ```py await bot.send_friend_message(12345678, MessageChain([ Plain("Hello World!") ])) ``` 使用`str(message_chain)`获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的[文档](https://github.com/mamoe/mirai/blob/dev/docs/Messages.md#mirai-%E7%A0%81)。 获取未转义的消息链字符串,可以使用`deserialize(str(message_chain))`。 可以使用 for 循环遍历消息链中的消息组件。 ```py for component in message_chain: print(repr(component)) ``` 可以使用 `==` 运算符比较两个消息链是否相同。 ```py another_msg_chain = MessageChain([ { "type": "AtAll" }, { "type": "Plain", "text": "Hello World!" }, ]) print(message_chain == another_msg_chain) 'True' ``` 可以使用 `in` 运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 ```py if AtAll in message_chain: print('AtAll') if At(bot.qq) in message_chain: print('At Me') if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain: print('Hello!') if 'Hello' in message_chain: print('Hi!') ``` 消息链的 `has` 方法和 `in` 等价。 ```py if message_chain.has(AtAll): print('AtAll') ``` 也可以使用 `>=` 和 `<= `运算符: ```py if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain: print('Hello!') ``` 需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。 消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。 ```py plain_list = message_chain[Plain] '[Plain("Hello World!")]' ``` 以 `类型, 数量` 为索引,获取前至多多少个该类型的消息组件。 ```py plain_list_first = message_chain[Plain, 1] '[Plain("Hello World!")]' ``` 消息链的 `get` 方法和索引操作等价。 ```py plain_list_first = message_chain.get(Plain) '[Plain("Hello World!")]' ``` 消息链的 `get` 方法还可指定第二个参数 `count`,这相当于以 `类型, 数量` 为索引。 ```py plain_list_first = message_chain.get(Plain, 1) # 这等价于 plain_list_first = message_chain[Plain, 1] ``` 可以用加号连接两个消息链。 ```py MessageChain(['Hello World!']) + MessageChain(['Goodbye World!']) # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")]) ``` 可以用 `*` 运算符复制消息链。 ```py MessageChain(['Hello World!']) * 2 # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")]) ``` 除此之外,消息链还支持很多 list 拥有的操作,比如 `index` 和 `count`。 ```py message_chain = MessageChain([ AtAll(), "Hello World!", ]) message_chain.index(Plain) # 返回 0 message_chain.count(Plain) # 返回 1 ``` 消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。 """ __root__: List[MessageComponent] @staticmethod def _parse_message_chain(msg_chain: Iterable): result = [] for msg in msg_chain: if isinstance(msg, dict): result.append(MessageComponent.parse_subtype(msg)) elif isinstance(msg, MessageComponent): result.append(msg) elif isinstance(msg, str): result.append(Plain(msg)) else: raise TypeError( f"消息链中元素需为 dict 或 str 或 MessageComponent,当前类型:{type(msg)}" ) return result @validator('__root__', always=True, pre=True) def _parse_component(cls, msg_chain): if isinstance(msg_chain, (str, MessageComponent)): msg_chain = [msg_chain] if not msg_chain: msg_chain = [] return cls._parse_message_chain(msg_chain) @classmethod def parse_obj(cls, msg_chain: Iterable): """通过列表形式的消息链,构造对应的 `MessageChain` 对象。 Args: msg_chain: 列表形式的消息链。 """ result = cls._parse_message_chain(msg_chain) return cls(__root__=result) def __init__(self, __root__: Iterable[MessageComponent] = None): super().__init__(__root__=__root__) def __str__(self): return "".join(str(component) for component in self.__root__) def as_mirai_code(self) -> str: """将消息链转换为 mirai 码字符串。 该方法会自动转换消息链中的元素。 Returns: mirai 码字符串。 """ return "".join( component.as_mirai_code() for component in self.__root__ ) def __repr__(self): return f'{self.__class__.__name__}({self.__root__!r})' def __iter__(self): yield from self.__root__ @overload def get(self, index: int) -> MessageComponent: ... @overload def get(self, index: slice) -> List[MessageComponent]: ... @overload def get(self, index: Type[TMessageComponent]) -> List[TMessageComponent]: ... @overload def get( self, index: Tuple[Type[TMessageComponent], int] ) -> List[TMessageComponent]: ... def get( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]], count: Optional[int] = None ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: """获取消息链中的某个(某些)消息组件,或某类型的消息组件。 Args: index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`): 如果为 `int`,则返回该索引处的消息组件。 如果为 `slice`,则返回该索引范围处的消息组件。 如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。 如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。 count: 如果为 `int`,则返回至多 `count` 个消息组件。 Returns: MessageComponent: 返回指定索引处的消息组件。 List[MessageComponent]: 返回指定索引范围的消息组件。 List[TMessageComponent]: 返回指定类型的消息组件构成的列表。 """ # 正常索引 if isinstance(index, int): return self.__root__[index] # 切片索引 if isinstance(index, slice): return self.__root__[index] # 指定 count if count: if isinstance(index, type): index = (index, count) elif isinstance(index, tuple): index = (index[0], count if count < index[1] else index[1]) # 索引对象为 MessageComponent 类,返回所有对应 component if isinstance(index, type): return [ component for component in self if type(component) is index ] # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component if isinstance(index, tuple): components = ( component for component in self if type(component) is index[0] ) return [ component for component, _ in zip(components, range(index[1])) ] raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}") def get_first(self, t: Type[TMessageComponent]) -> Optional[TMessageComponent]: """获取消息链中第一个符合类型的消息组件。""" for component in self: if isinstance(component, t): return component return None @overload def __getitem__(self, index: int) -> MessageComponent: ... @overload def __getitem__(self, index: slice) -> List[MessageComponent]: ... @overload def __getitem__(self, index: Type[TMessageComponent]) -> List[TMessageComponent]: ... @overload def __getitem__( self, index: Tuple[Type[TMessageComponent], int] ) -> List[TMessageComponent]: ... def __getitem__( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]] ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: return self.get(index) def __setitem__( self, key: Union[int, slice], value: Union[MessageComponent, str, Iterable[Union[MessageComponent, str]]] ): if isinstance(value, str): value = Plain(value) if isinstance(value, Iterable): value = (Plain(c) if isinstance(c, str) else c for c in value) self.__root__[key] = value # type: ignore def __delitem__(self, key: Union[int, slice]): del self.__root__[key] def __reversed__(self) -> Iterable[MessageComponent]: return reversed(self.__root__) def has( self, sub: Union[MessageComponent, Type[MessageComponent], 'MessageChain', str] ) -> bool: """判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 Args: sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`): 若为 `MessageComponent`,则判断该组件是否在消息链中。 若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。 若为 `MessageChain`,则判断该子消息链是否在消息链中。 若为 `str`,则判断对应的 mirai 码中是否有某子字符串。 Returns: bool: 是否找到。 """ if isinstance(sub, type): # 检测消息链中是否有某种类型的对象 for i in self: if type(i) is sub: return True return False if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件 for i in self: if i == sub: return True return False if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链 return bool(kmp(self, sub)) if isinstance(sub, str): # 检查消息中有无指定字符串子串 return sub in deserialize(str(self)) raise TypeError(f"类型不匹配,当前类型:{type(sub)}") def __contains__(self, sub) -> bool: return self.has(sub) def __ge__(self, other): return other in self def __len__(self) -> int: return len(self.__root__) def __add__( self, other: Union['MessageChain', MessageComponent, str] ) -> 'MessageChain': if isinstance(other, MessageChain): return self.__class__(self.__root__ + other.__root__) if isinstance(other, str): return self.__class__(self.__root__ + [Plain(other)]) if isinstance(other, MessageComponent): return self.__class__(self.__root__ + [other]) return NotImplemented def __radd__(self, other: Union[MessageComponent, str]) -> 'MessageChain': if isinstance(other, MessageComponent): return self.__class__([other] + self.__root__) if isinstance(other, str): return self.__class__( [cast(MessageComponent, Plain(other))] + self.__root__ ) return NotImplemented def __mul__(self, other: int): if isinstance(other, int): return self.__class__(self.__root__ * other) return NotImplemented def __rmul__(self, other: int): return self.__mul__(other) def __iadd__(self, other: Iterable[Union[MessageComponent, str]]): self.extend(other) def __imul__(self, other: int): if isinstance(other, int): self.__root__ *= other return NotImplemented def index( self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1 ) -> int: """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 i: 从哪个位置开始查找。 j: 查找到哪个位置结束。 Returns: int: 如果找到,则返回索引号。 Raises: ValueError: 没有找到。 TypeError: 类型不匹配。 """ if isinstance(x, type): l = len(self) if i < 0: i += l if i < 0: i = 0 if j < 0: j += l if j > l: j = l for index in range(i, j): if type(self[index]) is x: return index raise ValueError("消息链中不存在该类型的组件。") if isinstance(x, MessageComponent): return self.__root__.index(x, i, j) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int: """返回消息链中 x 出现的次数。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 Returns: int: 次数。 """ if isinstance(x, type): return sum(1 for i in self if type(i) is x) if isinstance(x, MessageComponent): return self.__root__.count(x) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def extend(self, x: Iterable[Union[MessageComponent, str]]): """将另一个消息链中的元素添加到消息链末尾。 Args: x: 另一个消息链,也可为消息元素或字符串元素的序列。 """ self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x) def append(self, x: Union[MessageComponent, str]): """将一个消息元素或字符串元素添加到消息链末尾。 Args: x: 消息元素或字符串元素。 """ self.__root__.append(Plain(x) if isinstance(x, str) else x) def insert(self, i: int, x: Union[MessageComponent, str]): """将一个消息元素或字符串添加到消息链中指定位置。 Args: i: 插入位置。 x: 消息元素或字符串元素。 """ self.__root__.insert(i, Plain(x) if isinstance(x, str) else x) def pop(self, i: int = -1) -> MessageComponent: """从消息链中移除并返回指定位置的元素。 Args: i: 移除位置。默认为末尾。 Returns: MessageComponent: 移除的元素。 """ return self.__root__.pop(i) def remove(self, x: Union[MessageComponent, Type[MessageComponent]]): """从消息链中移除指定元素或指定类型的一个元素。 Args: x: 指定的元素或元素类型。 """ if isinstance(x, type): self.pop(self.index(x)) if isinstance(x, MessageComponent): self.__root__.remove(x) def exclude( self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1 ) -> 'MessageChain': """返回移除指定元素或指定类型的元素后剩余的消息链。 Args: x: 指定的元素或元素类型。 count: 至多移除的数量。默认为全部移除。 Returns: MessageChain: 剩余的消息链。 """ def _exclude(): nonlocal count x_is_type = isinstance(x, type) for c in self: if count > 0 and ((x_is_type and type(c) is x) or c == x): count -= 1 continue yield c return self.__class__(_exclude()) def reverse(self): """将消息链原地翻转。""" self.__root__.reverse() @classmethod def join(cls, *args: Iterable[Union[str, MessageComponent]]): return cls( Plain(c) if isinstance(c, str) else c for c in itertools.chain(*args) ) @property def source(self) -> Optional['Source']: """获取消息链中的 `Source` 对象。""" return self.get_first(Source) @property def message_id(self) -> int: """获取消息链的 message_id,若无法获取,返回 -1。""" source = self.source return source.id if source else -1Ancestors
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Static methods
def join(*args: Iterable[Union[MessageComponent, str]])-
Expand source code
@classmethod def join(cls, *args: Iterable[Union[str, MessageComponent]]): return cls( Plain(c) if isinstance(c, str) else c for c in itertools.chain(*args) ) def parse_obj(msg_chain: Iterable)-
Expand source code
@classmethod def parse_obj(cls, msg_chain: Iterable): """通过列表形式的消息链,构造对应的 `MessageChain` 对象。 Args: msg_chain: 列表形式的消息链。 """ result = cls._parse_message_chain(msg_chain) return cls(__root__=result)
Instance variables
var message_id : int-
获取消息链的 message_id,若无法获取,返回 -1。
Expand source code
@property def message_id(self) -> int: """获取消息链的 message_id,若无法获取,返回 -1。""" source = self.source return source.id if source else -1 var source : Optional[Source]-
获取消息链中的
Source对象。Expand source code
@property def source(self) -> Optional['Source']: """获取消息链中的 `Source` 对象。""" return self.get_first(Source)
Methods
def append(self, x: Union[MessageComponent, str])-
将一个消息元素或字符串元素添加到消息链末尾。
Args
x- 消息元素或字符串元素。
Expand source code
def append(self, x: Union[MessageComponent, str]): """将一个消息元素或字符串元素添加到消息链末尾。 Args: x: 消息元素或字符串元素。 """ self.__root__.append(Plain(x) if isinstance(x, str) else x) def as_mirai_code(self) ‑> str-
将消息链转换为 mirai 码字符串。
该方法会自动转换消息链中的元素。
Returns
mirai 码字符串。
Expand source code
def as_mirai_code(self) -> str: """将消息链转换为 mirai 码字符串。 该方法会自动转换消息链中的元素。 Returns: mirai 码字符串。 """ return "".join( component.as_mirai_code() for component in self.__root__ ) def count(self, x: Union[MessageComponent, Type[MessageComponent]]) ‑> int-
返回消息链中 x 出现的次数。
Args
x (
Union[MessageComponent, Type[MessageComponent]]): 要查找的消息元素或消息元素类型。Returns
int- 次数。
Expand source code
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int: """返回消息链中 x 出现的次数。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 Returns: int: 次数。 """ if isinstance(x, type): return sum(1 for i in self if type(i) is x) if isinstance(x, MessageComponent): return self.__root__.count(x) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def exclude(self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1) ‑> MessageChain-
Expand source code
def exclude( self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1 ) -> 'MessageChain': """返回移除指定元素或指定类型的元素后剩余的消息链。 Args: x: 指定的元素或元素类型。 count: 至多移除的数量。默认为全部移除。 Returns: MessageChain: 剩余的消息链。 """ def _exclude(): nonlocal count x_is_type = isinstance(x, type) for c in self: if count > 0 and ((x_is_type and type(c) is x) or c == x): count -= 1 continue yield c return self.__class__(_exclude()) def extend(self, x: Iterable[Union[MessageComponent, str]])-
将另一个消息链中的元素添加到消息链末尾。
Args
x- 另一个消息链,也可为消息元素或字符串元素的序列。
Expand source code
def extend(self, x: Iterable[Union[MessageComponent, str]]): """将另一个消息链中的元素添加到消息链末尾。 Args: x: 另一个消息链,也可为消息元素或字符串元素的序列。 """ self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x) def get(self, index: Union[int, slice, Type[~TMessageComponent], Tuple[Type[~TMessageComponent], int]], count: Optional[int] = None) ‑> Union[MessageComponent, List[MessageComponent], List[~TMessageComponent]]-
获取消息链中的某个(某些)消息组件,或某类型的消息组件。
Args
index (
Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]): 如果为int,则返回该索引处的消息组件。 如果为slice,则返回该索引范围处的消息组件。 如果为Type[TMessageComponent],则返回该类型的全部消息组件。 如果为Tuple[Type[TMessageComponent], int],则返回该类型的至多index[1]个消息组件。count- 如果为
int,则返回至多count个消息组件。
Returns
MessageComponent- 返回指定索引处的消息组件。
List[MessageComponent]- 返回指定索引范围的消息组件。
List[TMessageComponent]- 返回指定类型的消息组件构成的列表。
Expand source code
def get( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]], count: Optional[int] = None ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: """获取消息链中的某个(某些)消息组件,或某类型的消息组件。 Args: index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`): 如果为 `int`,则返回该索引处的消息组件。 如果为 `slice`,则返回该索引范围处的消息组件。 如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。 如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。 count: 如果为 `int`,则返回至多 `count` 个消息组件。 Returns: MessageComponent: 返回指定索引处的消息组件。 List[MessageComponent]: 返回指定索引范围的消息组件。 List[TMessageComponent]: 返回指定类型的消息组件构成的列表。 """ # 正常索引 if isinstance(index, int): return self.__root__[index] # 切片索引 if isinstance(index, slice): return self.__root__[index] # 指定 count if count: if isinstance(index, type): index = (index, count) elif isinstance(index, tuple): index = (index[0], count if count < index[1] else index[1]) # 索引对象为 MessageComponent 类,返回所有对应 component if isinstance(index, type): return [ component for component in self if type(component) is index ] # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component if isinstance(index, tuple): components = ( component for component in self if type(component) is index[0] ) return [ component for component, _ in zip(components, range(index[1])) ] raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}") def get_first(self, t: Type[~TMessageComponent]) ‑> Optional[~TMessageComponent]-
获取消息链中第一个符合类型的消息组件。
Expand source code
def get_first(self, t: Type[TMessageComponent]) -> Optional[TMessageComponent]: """获取消息链中第一个符合类型的消息组件。""" for component in self: if isinstance(component, t): return component return None def has(self, sub: Union[MessageComponent, Type[MessageComponent], ForwardRef('MessageChain'), str]) ‑> bool-
判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。
Args
sub (
Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]): 若为MessageComponent,则判断该组件是否在消息链中。 若为Type[MessageComponent],则判断该组件类型是否在消息链中。 若为MessageChain,则判断该子消息链是否在消息链中。 若为str,则判断对应的 mirai 码中是否有某子字符串。Returns
bool- 是否找到。
Expand source code
def has( self, sub: Union[MessageComponent, Type[MessageComponent], 'MessageChain', str] ) -> bool: """判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 Args: sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`): 若为 `MessageComponent`,则判断该组件是否在消息链中。 若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。 若为 `MessageChain`,则判断该子消息链是否在消息链中。 若为 `str`,则判断对应的 mirai 码中是否有某子字符串。 Returns: bool: 是否找到。 """ if isinstance(sub, type): # 检测消息链中是否有某种类型的对象 for i in self: if type(i) is sub: return True return False if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件 for i in self: if i == sub: return True return False if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链 return bool(kmp(self, sub)) if isinstance(sub, str): # 检查消息中有无指定字符串子串 return sub in deserialize(str(self)) raise TypeError(f"类型不匹配,当前类型:{type(sub)}") def index(self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1) ‑> int-
返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。
Args
- x (
Union[MessageComponent, Type[MessageComponent]]): - 要查找的消息元素或消息元素类型。
i- 从哪个位置开始查找。
j- 查找到哪个位置结束。
Returns
int- 如果找到,则返回索引号。
Raises
ValueError- 没有找到。
TypeError- 类型不匹配。
Expand source code
def index( self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1 ) -> int: """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 i: 从哪个位置开始查找。 j: 查找到哪个位置结束。 Returns: int: 如果找到,则返回索引号。 Raises: ValueError: 没有找到。 TypeError: 类型不匹配。 """ if isinstance(x, type): l = len(self) if i < 0: i += l if i < 0: i = 0 if j < 0: j += l if j > l: j = l for index in range(i, j): if type(self[index]) is x: return index raise ValueError("消息链中不存在该类型的组件。") if isinstance(x, MessageComponent): return self.__root__.index(x, i, j) raise TypeError(f"类型不匹配,当前类型:{type(x)}") - x (
def insert(self, i: int, x: Union[MessageComponent, str])-
将一个消息元素或字符串添加到消息链中指定位置。
Args
i- 插入位置。
x- 消息元素或字符串元素。
Expand source code
def insert(self, i: int, x: Union[MessageComponent, str]): """将一个消息元素或字符串添加到消息链中指定位置。 Args: i: 插入位置。 x: 消息元素或字符串元素。 """ self.__root__.insert(i, Plain(x) if isinstance(x, str) else x) def pop(self, i: int = -1) ‑> MessageComponent-
从消息链中移除并返回指定位置的元素。
Args
i- 移除位置。默认为末尾。
Returns
MessageComponent- 移除的元素。
Expand source code
def pop(self, i: int = -1) -> MessageComponent: """从消息链中移除并返回指定位置的元素。 Args: i: 移除位置。默认为末尾。 Returns: MessageComponent: 移除的元素。 """ return self.__root__.pop(i) def remove(self, x: Union[MessageComponent, Type[MessageComponent]])-
从消息链中移除指定元素或指定类型的一个元素。
Args
x- 指定的元素或元素类型。
Expand source code
def remove(self, x: Union[MessageComponent, Type[MessageComponent]]): """从消息链中移除指定元素或指定类型的一个元素。 Args: x: 指定的元素或元素类型。 """ if isinstance(x, type): self.pop(self.index(x)) if isinstance(x, MessageComponent): self.__root__.remove(x) def reverse(self)-
将消息链原地翻转。
Expand source code
def reverse(self): """将消息链原地翻转。""" self.__root__.reverse()
class MessageEvent (*args, **kwargs)-
消息事件。
Args
type- 事件名。
message_chain- 消息内容。
Expand source code
class MessageEvent(Event): """消息事件。 Args: type: 事件名。 message_chain: 消息内容。 """ type: str """事件名。""" message_chain: MessageChain """消息内容。"""Ancestors
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
- FriendMessage
- FriendSyncMessage
- GroupMessage
- GroupSyncMessage
- OtherClientMessage
- StrangerMessage
- StrangerSyncMessage
- TempMessage
- TempSyncMessage
Class variables
var message_chain : MessageChain-
消息内容。
Inherited members
class Method (value, names=None, *, module=None, qualname=None, type=None, start=1)-
API 接口的调用方法。
Expand source code
class Method(str, Enum): """API 接口的调用方法。""" GET = "GET" """使用 GET 方法调用。""" POST = "POST" """使用 POST 方法调用。""" # 区分下面两个,是为了兼容 websocket RESTGET = "RESTGET" """表明这是一个对 RESTful 接口的 GET。""" RESTPOST = "RESTPOST" """表明这是一个对 RESTful 接口的 POST。""" MULTIPART = "MULTIPART" """表明这是一个使用了 multipart/form-data 的 POST。"""Ancestors
- builtins.str
- enum.Enum
Class variables
var GET-
使用 GET 方法调用。
var MULTIPART-
表明这是一个使用了 multipart/form-data 的 POST。
var POST-
使用 POST 方法调用。
var RESTGET-
表明这是一个对 RESTful 接口的 GET。
var RESTPOST-
表明这是一个对 RESTful 接口的 POST。
class Mirai (qq: int, adapter: Adapter)-
机器人主类。
使用了
__getattr__魔术方法,可以直接在对象上调用 API。Mirai: 类包含 model 层封装,API 名称经过转写以符合命名规范,所有的 API 全部使用小写字母及下划线命名。(API 名称也可使用原名。) API 参数可以使用具名参数,也可以使用位置参数,关于 API 参数的更多信息请参见模块
mirai.models.api。例如:
await bot.send_friend_message(12345678, [ Plain("Hello World!") ])也可以使用
call_api方法,须注意此方法直接继承自SimpleMirai,因此未经 model 层封装, 需要遵循SimpleMirai的规定。Args
qq- QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
adapter- 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
Expand source code
class Mirai(SimpleMirai): """ 机器人主类。 使用了 `__getattr__` 魔术方法,可以直接在对象上调用 API。 Mirai: 类包含 model 层封装,API 名称经过转写以符合命名规范,所有的 API 全部使用小写字母及下划线命名。 (API 名称也可使用原名。) API 参数可以使用具名参数,也可以使用位置参数,关于 API 参数的更多信息请参见模块 `mirai.models.api`。 例如: ```py await bot.send_friend_message(12345678, [ Plain("Hello World!") ]) ``` 也可以使用 `call_api` 方法,须注意此方法直接继承自 `SimpleMirai`,因此未经 model 层封装, 需要遵循 `SimpleMirai` 的规定。 """ def __init__(self, qq: int, adapter: Adapter): super().__init__(qq=qq, adapter=adapter) # 将 bus 更换为 ModelEventBus adapter.unregister_event_bus(self._bus) self._bus: ModelEventBus = ModelEventBus() adapter.register_event_bus(self._bus.base_bus) @property def bus(self) -> ModelEventBus: return self._bus async def call_api(self, api: str, *args, **kwargs): """调用 API。 Args: api: API 名称。 *args: 参数。 **kwargs: 参数。 """ return await self._adapter.call_api(api, *args, **kwargs) def on( self, event_type: Union[Type[Event], str], priority: int = 0, ) -> Callable: """注册事件处理器。 用法举例: ```python @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): print(f"收到来自{event.sender.nickname}的消息。") ``` Args: event_type: 事件类或事件名。 priority: 优先级,较小者优先。 """ return self._bus.on(event_type, priority) def api(self, api: str) -> ApiModel.Proxy: """获取 API Proxy 对象。 API Proxy 提供更加简便的调用 API 的写法,详见 `mirai.models.api`。 `Mirai` 的 `__getattr__` 与此方法完全相同,可支持直接在对象上调用 API。 Args: api: API 名称。 Returns: ApiModel.Proxy: API Proxy 对象。 """ api_type = ApiModel.get_subtype(api) return api_type.Proxy(self, api_type) def __getattr__(self, api: str) -> ApiModel.Proxy: return self.api(api) async def send( self, target: Union[Entity, MessageEvent], message: TMessage, quote: bool = False ) -> int: """发送消息。可以从 `Friend` `Group` 等对象,或者从 `MessageEvent` 中自动识别消息发送对象。 Args: target: 目标对象。 message: 发送的消息。 quote: 是否以回复消息的形式发送,默认为 False。 Returns: int: 发送的消息的 message_id。 """ # 识别消息发送对象 if isinstance(target, TempMessage): quoting = target.message_chain.message_id if quote else None return ( await self.send_temp_message( qq=target.sender.id, group=target.group.id, message_chain=message, quote=quoting ) ).message_id if isinstance(target, MessageEvent): quoting = target.message_chain.message_id if quote else None target = target.sender else: quoting = None if isinstance(target, Friend): send_message = self.send_friend_message id_ = target.id elif isinstance(target, Group): send_message = self.send_group_message id_ = target.id elif isinstance(target, GroupMember): send_message = self.send_group_message id_ = target.group.id else: raise ValueError(f"{target} 不是有效的消息发送对象。") response = await send_message( target=id_, message_chain=message, quote=quoting ) return response.message_id if response else -1 async def get_friend(self, id_: int) -> Optional[Friend]: """获取好友对象。 Args: id_: 好友 QQ 号。 Returns: Friend: 好友对象。 None: 好友不存在。 """ friend_list = await self.friend_list.get() if not friend_list: return None for friend in cast(List[Friend], friend_list): if friend.id == id_: return friend return None async def get_group(self, id_: int) -> Optional[Group]: """获取群组对象。 Args: id_: 群号。 Returns: Group: 群组对象。 None: 群组不存在或 bot 未入群。 """ group_list = await self.group_list.get() if not group_list: return None for group in cast(List[Group], group_list): if group.id == id_: return group return None async def get_group_member(self, group: Union[Group, int], id_: int) -> Optional[GroupMember]: """获取群成员对象。 Args: group: 群组对象或群号。 id_: 群成员 QQ 号。 Returns: GroupMember: 群成员对象。 None: 群成员不存在。 """ if isinstance(group, Group): group = group.id member_list = await self.member_list.get(group) if not member_list: return None for member in cast(List[GroupMember], member_list): if member.id == id_: return member return None async def get_entity(self, subject: Subject) -> Optional[Entity]: """获取实体对象。 Args: subject: 以 `Subject` 表示的实体对象。 Returns: Entity: 实体对象。 None: 实体不存在。 """ if subject.kind == 'Friend': return await self.get_friend(subject.id) if subject.kind == 'Group': return await self.get_group(subject.id) return None @staticmethod async def is_admin(group: Group) -> bool: """判断机器人在群组中是否是管理员。 Args: group: 群组对象。 Returns: bool: 是否是管理员。 """ return group.permission in (Permission.Administrator, Permission.Owner) async def process_request( self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '' ): """处理申请。 Args: event: 申请事件。 operate: 处理操作。 message: 回复的信息。 """ api_type = cast( Type[RespEvent], getattr(mirai.models.api, 'Resp' + event.type) ) api = api_type.from_event(event, operate, message) await api.call(self, 'POST') async def allow(self, event: RequestEvent, message: str = ''): """允许申请。 Args: event: 申请事件。 message: 回复的信息。 """ await self.process_request(event, RespOperate.ALLOW, message) async def decline( self, event: RequestEvent, message: str = '', ban: bool = False ): """拒绝申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.DECLINE & RespOperate.BAN if ban else RespOperate.DECLINE, message ) async def ignore( self, event: RequestEvent, message: str = '', ban: bool = False ): """忽略申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.IGNORE & RespOperate.BAN if ban else RespOperate.DECLINE, message )Ancestors
Class variables
var qq : int
Static methods
async def is_admin(group: Group) ‑> bool-
判断机器人在群组中是否是管理员。
Args
group- 群组对象。
Returns
bool- 是否是管理员。
Expand source code
@staticmethod async def is_admin(group: Group) -> bool: """判断机器人在群组中是否是管理员。 Args: group: 群组对象。 Returns: bool: 是否是管理员。 """ return group.permission in (Permission.Administrator, Permission.Owner)
Instance variables
var bus : ModelEventBus-
Expand source code
@property def bus(self) -> ModelEventBus: return self._bus
Methods
async def allow(self, event: RequestEvent, message: str = '')-
允许申请。
Args
event- 申请事件。
message- 回复的信息。
Expand source code
async def allow(self, event: RequestEvent, message: str = ''): """允许申请。 Args: event: 申请事件。 message: 回复的信息。 """ await self.process_request(event, RespOperate.ALLOW, message) def api(self, api: str) ‑> ApiModel.Proxy-
获取 API Proxy 对象。
API Proxy 提供更加简便的调用 API 的写法,详见
mirai.models.api。Mirai的__getattr__与此方法完全相同,可支持直接在对象上调用 API。Args
api- API 名称。
Returns
ApiModel.Proxy- API Proxy 对象。
Expand source code
def api(self, api: str) -> ApiModel.Proxy: """获取 API Proxy 对象。 API Proxy 提供更加简便的调用 API 的写法,详见 `mirai.models.api`。 `Mirai` 的 `__getattr__` 与此方法完全相同,可支持直接在对象上调用 API。 Args: api: API 名称。 Returns: ApiModel.Proxy: API Proxy 对象。 """ api_type = ApiModel.get_subtype(api) return api_type.Proxy(self, api_type) async def decline(self, event: RequestEvent, message: str = '', ban: bool = False)-
拒绝申请。
Args
event- 申请事件。
message- 回复的信息。
ban- 是否拉黑,默认为 False。
Expand source code
async def decline( self, event: RequestEvent, message: str = '', ban: bool = False ): """拒绝申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.DECLINE & RespOperate.BAN if ban else RespOperate.DECLINE, message ) async def get_entity(self, subject: Subject) ‑> Optional[Entity]-
获取实体对象。
Args
subject- 以
Subject表示的实体对象。
Returns
Entity- 实体对象。
None- 实体不存在。
Expand source code
async def get_entity(self, subject: Subject) -> Optional[Entity]: """获取实体对象。 Args: subject: 以 `Subject` 表示的实体对象。 Returns: Entity: 实体对象。 None: 实体不存在。 """ if subject.kind == 'Friend': return await self.get_friend(subject.id) if subject.kind == 'Group': return await self.get_group(subject.id) return None async def get_friend(self, id_: int) ‑> Optional[Friend]-
获取好友对象。
Args
id_- 好友 QQ 号。
Returns
Friend- 好友对象。
None- 好友不存在。
Expand source code
async def get_friend(self, id_: int) -> Optional[Friend]: """获取好友对象。 Args: id_: 好友 QQ 号。 Returns: Friend: 好友对象。 None: 好友不存在。 """ friend_list = await self.friend_list.get() if not friend_list: return None for friend in cast(List[Friend], friend_list): if friend.id == id_: return friend return None async def get_group(self, id_: int) ‑> Optional[Group]-
获取群组对象。
Args
id_- 群号。
Returns
Group- 群组对象。
None- 群组不存在或 bot 未入群。
Expand source code
async def get_group(self, id_: int) -> Optional[Group]: """获取群组对象。 Args: id_: 群号。 Returns: Group: 群组对象。 None: 群组不存在或 bot 未入群。 """ group_list = await self.group_list.get() if not group_list: return None for group in cast(List[Group], group_list): if group.id == id_: return group return None async def get_group_member(self, group: Union[Group, int], id_: int) ‑> Optional[GroupMember]-
获取群成员对象。
Args
group- 群组对象或群号。
id_- 群成员 QQ 号。
Returns
GroupMember- 群成员对象。
None- 群成员不存在。
Expand source code
async def get_group_member(self, group: Union[Group, int], id_: int) -> Optional[GroupMember]: """获取群成员对象。 Args: group: 群组对象或群号。 id_: 群成员 QQ 号。 Returns: GroupMember: 群成员对象。 None: 群成员不存在。 """ if isinstance(group, Group): group = group.id member_list = await self.member_list.get(group) if not member_list: return None for member in cast(List[GroupMember], member_list): if member.id == id_: return member return None async def ignore(self, event: RequestEvent, message: str = '', ban: bool = False)-
忽略申请。
Args
event- 申请事件。
message- 回复的信息。
ban- 是否拉黑,默认为 False。
Expand source code
async def ignore( self, event: RequestEvent, message: str = '', ban: bool = False ): """忽略申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.IGNORE & RespOperate.BAN if ban else RespOperate.DECLINE, message ) def on(self, event_type: Union[Type[Event], str], priority: int = 0) ‑> Callable-
注册事件处理器。
用法举例:
@bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): print(f"收到来自{event.sender.nickname}的消息。")Args
event_type- 事件类或事件名。
priority- 优先级,较小者优先。
Expand source code
def on( self, event_type: Union[Type[Event], str], priority: int = 0, ) -> Callable: """注册事件处理器。 用法举例: ```python @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): print(f"收到来自{event.sender.nickname}的消息。") ``` Args: event_type: 事件类或事件名。 priority: 优先级,较小者优先。 """ return self._bus.on(event_type, priority) async def process_request(self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '')-
处理申请。
Args
event- 申请事件。
operate- 处理操作。
message- 回复的信息。
Expand source code
async def process_request( self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '' ): """处理申请。 Args: event: 申请事件。 operate: 处理操作。 message: 回复的信息。 """ api_type = cast( Type[RespEvent], getattr(mirai.models.api, 'Resp' + event.type) ) api = api_type.from_event(event, operate, message) await api.call(self, 'POST') async def send(self, target: Union[Entity, MessageEvent], message: Union[MessageChain, Iterable[Union[MessageComponent, str]], MessageComponent, str], quote: bool = False) ‑> int-
发送消息。可以从
FriendGroup等对象,或者从MessageEvent中自动识别消息发送对象。Args
target- 目标对象。
message- 发送的消息。
quote- 是否以回复消息的形式发送,默认为 False。
Returns
int- 发送的消息的 message_id。
Expand source code
async def send( self, target: Union[Entity, MessageEvent], message: TMessage, quote: bool = False ) -> int: """发送消息。可以从 `Friend` `Group` 等对象,或者从 `MessageEvent` 中自动识别消息发送对象。 Args: target: 目标对象。 message: 发送的消息。 quote: 是否以回复消息的形式发送,默认为 False。 Returns: int: 发送的消息的 message_id。 """ # 识别消息发送对象 if isinstance(target, TempMessage): quoting = target.message_chain.message_id if quote else None return ( await self.send_temp_message( qq=target.sender.id, group=target.group.id, message_chain=message, quote=quoting ) ).message_id if isinstance(target, MessageEvent): quoting = target.message_chain.message_id if quote else None target = target.sender else: quoting = None if isinstance(target, Friend): send_message = self.send_friend_message id_ = target.id elif isinstance(target, Group): send_message = self.send_group_message id_ = target.id elif isinstance(target, GroupMember): send_message = self.send_group_message id_ = target.group.id else: raise ValueError(f"{target} 不是有效的消息发送对象。") response = await send_message( target=id_, message_chain=message, quote=quoting ) return response.message_id if response else -1
Inherited members
class MiraiRunner (*args, **kwargs_)-
运行 SimpleMirai 对象的托管类。
使用此类以实现机器人的多例运行。
例如:
runner = MiraiRunner(mirai) runner.run(host='127.0.0.1', port=8000)Expand source code
class MiraiRunner(Singleton): """运行 SimpleMirai 对象的托管类。 使用此类以实现机器人的多例运行。 例如: ```py runner = MiraiRunner(mirai) runner.run(host='127.0.0.1', port=8000) ``` """ bots: Iterable[SimpleMirai] """运行的 SimpleMirai 对象。""" def __init__(self, *bots: SimpleMirai): """ Args: *bots: 要运行的机器人。 """ self.bots = bots self._asgi = ASGI() self._asgi.add_event_handler('startup', self.startup) self._asgi.add_event_handler('shutdown', self.shutdown) async def startup(self): """开始运行。""" coros = [bot.startup() for bot in self.bots] await asyncio.gather(*coros) async def shutdown(self): """结束运行。""" coros = [bot.shutdown() for bot in self.bots] await asyncio.gather(*coros) async def __call__(self, scope, recv, send): await self._asgi(scope, recv, send) async def _run(self): try: await self.startup() backgrounds = [bot.background() for bot in self.bots] await asyncio.gather(*backgrounds) finally: await self.shutdown() def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 """ if not asgi_serve( self, host=host, port=port, asgi_server=asgi_server, **kwargs ): import textwrap logger = logging.getLogger(__name__) logger.warning( textwrap.dedent( """ 未找到可用的 ASGI 服务,反向 WebSocket 和 WebHook 上报将不可用。 仅 HTTP 轮询与正向 WebSocket 可用。 建议安装 ASGI 服务器,如 `uvicorn` 或 `hypercorn`。 在命令行键入: pip install uvicorn 或者 pip install hypercorn """ ).strip() ) try: asyncio.run(self._run()) except (KeyboardInterrupt, SystemExit): exit()Ancestors
Class variables
var bots : Iterable[SimpleMirai]-
运行的 SimpleMirai 对象。
Methods
def run(self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)-
开始运行机器人。
一般情况下,此函数会进入主循环,不再返回。
Expand source code
def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 """ if not asgi_serve( self, host=host, port=port, asgi_server=asgi_server, **kwargs ): import textwrap logger = logging.getLogger(__name__) logger.warning( textwrap.dedent( """ 未找到可用的 ASGI 服务,反向 WebSocket 和 WebHook 上报将不可用。 仅 HTTP 轮询与正向 WebSocket 可用。 建议安装 ASGI 服务器,如 `uvicorn` 或 `hypercorn`。 在命令行键入: pip install uvicorn 或者 pip install hypercorn """ ).strip() ) try: asyncio.run(self._run()) except (KeyboardInterrupt, SystemExit): exit() async def shutdown(self)-
结束运行。
Expand source code
async def shutdown(self): """结束运行。""" coros = [bot.shutdown() for bot in self.bots] await asyncio.gather(*coros) async def startup(self)-
开始运行。
Expand source code
async def startup(self): """开始运行。""" coros = [bot.startup() for bot in self.bots] await asyncio.gather(*coros)
class NetworkError (*args, **kwargs)-
网络连接出错。
Expand source code
class NetworkError(RuntimeError): """网络连接出错。"""Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class Plain (*args, **kwargs)-
纯文本。
Expand source code
class Plain(MessageComponent): """纯文本。""" type: str = "Plain" """消息组件类型。""" text: str """文字消息。""" def __str__(self): return self.text def as_mirai_code(self) -> str: return serialize(self.text) def __repr__(self): return f'Plain({self.text!r})'Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var text : str-
文字消息。
Inherited members
class Poke (*args, **kwargs)-
戳一戳。
Expand source code
class Poke(MessageComponent): """戳一戳。""" type: str = "Poke" """消息组件类型。""" name: PokeNames """名称。""" @property def poke_type(self): return POKE_TYPE[self.name] @property def poke_id(self): return POKE_ID[self.name] def __str__(self): return f'[{POKE_NAME[self.name]}]' def as_mirai_code(self) -> str: return f'[mirai:poke:{self.name},{self.poke_type},{self.poke_id}]'Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var name : PokeNames-
名称。
Instance variables
var poke_id-
Expand source code
@property def poke_id(self): return POKE_ID[self.name] var poke_type-
Expand source code
@property def poke_type(self): return POKE_TYPE[self.name]
Inherited members
class PokeNames (value, names=None, *, module=None, qualname=None, type=None, start=1)-
戳一戳名称。
Expand source code
class PokeNames(str, Enum): """戳一戳名称。""" ChuoYiChuo = "ChuoYiChuo" BiXin = "BiXin" DianZan = "DianZan" XinSui = "XinSui" LiuLiuLiu = "LiuLiuLiu" FangDaZhao = "FangDaZhao" BaoBeiQiu = "BaoBeiQiu" Rose = "Rose" ZhaoHuanShu = "ZhaoHuanShu" RangNiPi = "RangNiPi" JeiYin = "JeiYin" ShouLei = "ShouLei" GouYin = "GouYin" ZhuaYiXia = "ZhuaYiXia" SuiPing = "SuiPing" QiaoMen = "QiaoMen" def as_component(self) -> 'Poke': return Poke(name=self.value)Ancestors
- builtins.str
- enum.Enum
Class variables
var BaoBeiQiuvar BiXinvar ChuoYiChuovar DianZanvar FangDaZhaovar GouYinvar JeiYinvar LiuLiuLiuvar QiaoMenvar RangNiPivar Rosevar ShouLeivar SuiPingvar XinSuivar ZhaoHuanShuvar ZhuaYiXia
Methods
def as_component(self) ‑> Poke-
Expand source code
def as_component(self) -> 'Poke': return Poke(name=self.value)
class Shutdown (*args, **kwargs)-
关闭事件。
Expand source code
class Shutdown(LifeSpan): """关闭事件。""" type: str = 'Shutdown' """事件名。"""Ancestors
- LifeSpan
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class SimpleMirai (qq: int, adapter: Adapter)-
基于 adapter 和 bus,处于 model 层之下的机器人类。
使用了
__getattr__魔术方法,可以直接在对象上调用 API。通过
SimpleMirai调用 API 时,需注意此类不含 model 层封装, 因此 API 名称与参数名称需与 mirai-api-http 中的定义相同, 参数需要全部以具名参数的形式给出,并且需要指明使用的方法(GET/POST)。例如:
await bot.sendFriendMessage(target=12345678, messageChain=[ {"type": "Plain", "text": "Hello World!"} ], method="POST")也可以使用
call_api方法。对于名称的路由含有二级目录的 API,由于名称中含有斜杠,必须使用
call_api调用,例如:file_list = await bot.call_api( "file/list", id="", target=12345678, method="GET" )Args
qq- QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
adapter- 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
Expand source code
class SimpleMirai(ApiProvider, AdapterInterface, AbstractEventBus): """ 基于 adapter 和 bus,处于 model 层之下的机器人类。 使用了 `__getattr__` 魔术方法,可以直接在对象上调用 API。 通过 `SimpleMirai` 调用 API 时,需注意此类不含 model 层封装, 因此 API 名称与参数名称需与 mirai-api-http 中的定义相同, 参数需要全部以具名参数的形式给出,并且需要指明使用的方法(GET/POST)。 例如: ```py await bot.sendFriendMessage(target=12345678, messageChain=[ {"type": "Plain", "text": "Hello World!"} ], method="POST") ``` 也可以使用 `call_api` 方法。 对于名称的路由含有二级目录的 API,由于名称中含有斜杠,必须使用 `call_api` 调用,例如: ```py file_list = await bot.call_api( "file/list", id="", target=12345678, method="GET" ) ``` """ qq: int def __init__(self, qq: int, adapter: Adapter): """ Args: qq: QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。 adapter: 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。 """ self.qq = qq self._adapter = adapter self._bus = EventBus() self._adapter.register_event_bus(self._bus) @property def bus(self) -> EventBus: return self._bus def subscribe(self, event, func: Callable, priority: int = 0) -> None: self._bus.subscribe(event, func, priority) def unsubscribe(self, event, func: Callable) -> None: self._bus.unsubscribe(event, func) async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]: return await self._bus.emit(event, *args, **kwargs) async def call_api(self, api: str, *args, **kwargs): """调用 API。 Args: api: API 名称。 *args: 参数。 **kwargs: 参数。 """ warnings.warn("SimpleMirai 已弃用,将在 0.3 后移除。", DeprecationWarning) return await self._adapter.call_api(api, *args, **kwargs) def on(self, event: str, priority: int = 0) -> Callable: """注册事件处理器。 用法举例: ```py @bot.on('FriendMessage') async def on_friend_message(event: dict): print(f"收到来自{event['sender']['nickname']}的消息。") ``` Args: event: 事件名。 priority: 优先级,小者优先。 """ return self._bus.on(event, priority=priority) @property def adapter_info(self) -> Dict[str, Any]: return self._adapter.adapter_info @contextlib.asynccontextmanager async def use_adapter(self, adapter: Adapter): """临时使用另一个适配器。 用法: ```py async with bot.use_adapter(HTTPAdapter.via(bot)): ... ``` Args: adapter: 使用的适配器。 """ origin_adapter = self._adapter await adapter.login(self.qq) self._adapter = adapter yield self._adapter = origin_adapter await adapter.logout(False) async def startup(self): """开始运行机器人(立即返回)。""" await self._adapter.login(self.qq) if self._adapter.single_mode: # Single Mode 下,QQ 号可以随便传入。这里从 session info 中获取正确的 QQ 号。 self.qq = (await self.call_api('sessionInfo'))['data']['qq']['id'] asyncio.create_task(self._adapter.emit("Startup", {'type': 'Startup'})) await self._adapter.start() async def background(self): """等待背景任务完成。""" await self._adapter.background async def shutdown(self): """结束运行机器人。""" await asyncio.create_task( self._adapter.emit("Shutdown", {'type': 'Shutdown'}) ) await self._adapter.logout() await self._adapter.shutdown() @property def session(self) -> str: """获取 session key,可用于调试。 Returns: str: session key。 """ return self._adapter.session @property def asgi(self) -> 'MiraiRunner': """ASGI 对象,用于使用 uvicorn 等启动。""" return MiraiRunner(self) @staticmethod def add_background_task(func: Union[Callable, Awaitable, None] = None): """注册背景任务,将在 bot 启动后自动运行。 Args: func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。 """ asgi = ASGI() return asgi.add_background_task(func) def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 Args: host: YiriMirai 作为服务器的地址,默认为 127.0.0.1。 port: YiriMirai 作为服务器的端口,默认为 8000。 asgi_server: ASGI 服务器类型,可选项有 `'uvicorn'` `'hypercorn'` 和 `'auto'`。 **kwargs: 可选参数。多余的参数将传递给 `uvicorn.run` 和 `hypercorn.run`。 """ MiraiRunner(self).run(host, port, asgi_server, **kwargs)Ancestors
Subclasses
Class variables
var qq : int
Static methods
def add_background_task(func: Union[Callable, Awaitable, NoneType] = None)-
注册背景任务,将在 bot 启动后自动运行。
Args
func(
Union[Callable, Awaitable, None]): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。Expand source code
@staticmethod def add_background_task(func: Union[Callable, Awaitable, None] = None): """注册背景任务,将在 bot 启动后自动运行。 Args: func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。 """ asgi = ASGI() return asgi.add_background_task(func)
Instance variables
var asgi : MiraiRunner-
ASGI 对象,用于使用 uvicorn 等启动。
Expand source code
@property def asgi(self) -> 'MiraiRunner': """ASGI 对象,用于使用 uvicorn 等启动。""" return MiraiRunner(self) var bus : EventBus-
Expand source code
@property def bus(self) -> EventBus: return self._bus var session : str-
获取 session key,可用于调试。
Returns
str- session key。
Expand source code
@property def session(self) -> str: """获取 session key,可用于调试。 Returns: str: session key。 """ return self._adapter.session
Methods
async def background(self)-
等待背景任务完成。
Expand source code
async def background(self): """等待背景任务完成。""" await self._adapter.background async def call_api(self, api: str, *args, **kwargs)-
调用 API。
Args
api- API 名称。
*args- 参数。
**kwargs- 参数。
Expand source code
async def call_api(self, api: str, *args, **kwargs): """调用 API。 Args: api: API 名称。 *args: 参数。 **kwargs: 参数。 """ warnings.warn("SimpleMirai 已弃用,将在 0.3 后移除。", DeprecationWarning) return await self._adapter.call_api(api, *args, **kwargs) def on(self, event: str, priority: int = 0) ‑> Callable-
注册事件处理器。
用法举例:
@bot.on('FriendMessage') async def on_friend_message(event: dict): print(f"收到来自{event['sender']['nickname']}的消息。")Args
event- 事件名。
priority- 优先级,小者优先。
Expand source code
def on(self, event: str, priority: int = 0) -> Callable: """注册事件处理器。 用法举例: ```py @bot.on('FriendMessage') async def on_friend_message(event: dict): print(f"收到来自{event['sender']['nickname']}的消息。") ``` Args: event: 事件名。 priority: 优先级,小者优先。 """ return self._bus.on(event, priority=priority) def run(self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)-
开始运行机器人。
一般情况下,此函数会进入主循环,不再返回。
Args
host- YiriMirai 作为服务器的地址,默认为 127.0.0.1。
port- YiriMirai 作为服务器的端口,默认为 8000。
asgi_server- ASGI 服务器类型,可选项有
'uvicorn''hypercorn'和'auto'。 **kwargs- 可选参数。多余的参数将传递给
uvicorn.run和hypercorn.run。
Expand source code
def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 Args: host: YiriMirai 作为服务器的地址,默认为 127.0.0.1。 port: YiriMirai 作为服务器的端口,默认为 8000。 asgi_server: ASGI 服务器类型,可选项有 `'uvicorn'` `'hypercorn'` 和 `'auto'`。 **kwargs: 可选参数。多余的参数将传递给 `uvicorn.run` 和 `hypercorn.run`。 """ MiraiRunner(self).run(host, port, asgi_server, **kwargs) async def shutdown(self)-
结束运行机器人。
Expand source code
async def shutdown(self): """结束运行机器人。""" await asyncio.create_task( self._adapter.emit("Shutdown", {'type': 'Shutdown'}) ) await self._adapter.logout() await self._adapter.shutdown() async def startup(self)-
开始运行机器人(立即返回)。
Expand source code
async def startup(self): """开始运行机器人(立即返回)。""" await self._adapter.login(self.qq) if self._adapter.single_mode: # Single Mode 下,QQ 号可以随便传入。这里从 session info 中获取正确的 QQ 号。 self.qq = (await self.call_api('sessionInfo'))['data']['qq']['id'] asyncio.create_task(self._adapter.emit("Startup", {'type': 'Startup'})) await self._adapter.start() async def use_adapter(self, adapter: Adapter)-
临时使用另一个适配器。
用法:
async with bot.use_adapter(HTTPAdapter.via(bot)): ...Args
adapter- 使用的适配器。
Expand source code
@contextlib.asynccontextmanager async def use_adapter(self, adapter: Adapter): """临时使用另一个适配器。 用法: ```py async with bot.use_adapter(HTTPAdapter.via(bot)): ... ``` Args: adapter: 使用的适配器。 """ origin_adapter = self._adapter await adapter.login(self.qq) self._adapter = adapter yield self._adapter = origin_adapter await adapter.logout(False)
Inherited members
class SkipExecution (*args, **kwargs)-
跳过同优先度的事件处理器,进入下一优先度。
Expand source code
class SkipExecution(Exception): """跳过同优先度的事件处理器,进入下一优先度。"""Ancestors
- builtins.Exception
- builtins.BaseException
class Startup (*args, **kwargs)-
启动事件。
Expand source code
class Startup(LifeSpan): """启动事件。""" type: str = 'Startup' """事件名。"""Ancestors
- LifeSpan
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class StopExecution (*args, **kwargs)-
终止事件处理器执行,但不阻止事件向上传播。
Expand source code
class StopExecution(Exception): """终止事件处理器执行,但不阻止事件向上传播。"""Ancestors
- builtins.Exception
- builtins.BaseException
class StopPropagation (*args, **kwargs)-
终止事件处理器执行,并停止事件向上传播。
Expand source code
class StopPropagation(Exception): """终止事件处理器执行,并停止事件向上传播。"""Ancestors
- builtins.Exception
- builtins.BaseException
class StrangerMessage (*args, **kwargs)-
陌生人消息。
Args
type- 事件名。
sender- 发送消息的人。
message_chain- 消息内容。
Expand source code
class StrangerMessage(MessageEvent): """陌生人消息。 Args: type: 事件名。 sender: 发送消息的人。 message_chain: 消息内容。 """ type: str = 'StrangerMessage' """事件名。""" sender: Friend """发送消息的人。""" message_chain: MessageChain """消息内容。"""Ancestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : Friend-
发送消息的人。
Inherited members
class TempMessage (*args, **kwargs)-
群临时消息。
Args
type- 事件名。
sender- 发送消息的群成员。
message_chain- 消息内容。
Expand source code
class TempMessage(MessageEvent): """群临时消息。 Args: type: 事件名。 sender: 发送消息的群成员。 message_chain: 消息内容。 """ type: str = 'TempMessage' """事件名。""" sender: GroupMember """发送消息的群成员。""" message_chain: MessageChain """消息内容。""" @property def group(self) -> Group: return self.sender.groupAncestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : GroupMember-
发送消息的群成员。
Instance variables
var group : Group-
Expand source code
@property def group(self) -> Group: return self.sender.group
Inherited members
class Voice (*args, **kwargs)-
语音。
Expand source code
class Voice(MessageComponent): """语音。""" type: str = "Voice" """消息组件类型。""" voice_id: Optional[str] = None """语音的 voice_id,不为空时将忽略 url 属性。""" url: Optional[str] = None """语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。""" path: Optional[str] = None """语音的路径,发送本地语音。""" base64: Optional[str] = None """语音的 Base64 编码。""" length: Optional[int] = None """语音的长度,单位为秒。""" @validator('path') def validate_path(cls, path: Optional[str]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path def __str__(self): return '[语音]' async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None ): """下载语音到本地。 语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 """ if not self.url: logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) path.parent.mkdir(parents=True, exist_ok=True) elif directory: path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.voice_id}.silk' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) @classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Voice": """从本地文件路径加载语音,以 base64 的形式传递。 Args: filename: 从本地文件路径加载语音,与 `content` 二选一。 content: 从本地文件内容加载语音,与 `filename` 二选一。 """ if content: pass if filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定语音路径或语音内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return imgAncestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var base64 : Optional[str]-
语音的 Base64 编码。
var length : Optional[int]-
语音的长度,单位为秒。
var path : Optional[str]-
语音的路径,发送本地语音。
var url : Optional[str]-
语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。
var voice_id : Optional[str]-
语音的 voice_id,不为空时将忽略 url 属性。
Static methods
async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Voice-
从本地文件路径加载语音,以 base64 的形式传递。
Args
filename- 从本地文件路径加载语音,与
content二选一。 content- 从本地文件内容加载语音,与
filename二选一。
Expand source code
@classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Voice": """从本地文件路径加载语音,以 base64 的形式传递。 Args: filename: 从本地文件路径加载语音,与 `content` 二选一。 content: 从本地文件内容加载语音,与 `filename` 二选一。 """ if content: pass if filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定语音路径或语音内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img def validate_path(path: Optional[str])-
修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。
Expand source code
@validator('path') def validate_path(cls, path: Optional[str]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path
Methods
async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None)-
下载语音到本地。
语音采用 silk v3 格式,silk 格式的编码解码请使用 graiax-silkcoder。
Args
filename- 下载到本地的文件路径。与
directory二选一。 directory- 下载到本地的文件夹路径。与
filename二选一。
Expand source code
async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None ): """下载语音到本地。 语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 """ if not self.url: logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) path.parent.mkdir(parents=True, exist_ok=True) elif directory: path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.voice_id}.silk' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content)
Inherited members
class WebHookAdapter (verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False)-
WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。
Args
verify_key- mirai-api-http 配置的认证 key,关闭认证时为 None。
route- 适配器的路由,默认在根目录上提供服务。
extra_headers- 额外请求头,与 mirai-api-http 的配置一致。
enable_quick_response- 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
single_mode- 是否启用单例模式。
Expand source code
class WebHookAdapter(Adapter): """WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。""" session: str """WebHook 不需要 session,此处为机器人的 QQ 号。""" route: str """适配器的路由。""" extra_headers: Mapping[str, str] """额外请求头。""" enable_quick_response: bool """是否启用快速响应。""" def __init__( self, verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 route: 适配器的路由,默认在根目录上提供服务。 extra_headers: 额外请求头,与 mirai-api-http 的配置一致。 enable_quick_response: 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。 single_mode: 是否启用单例模式。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self.route = route self.extra_headers = extra_headers or {} self.enable_quick_response = enable_quick_response async def endpoint(request: Request): # 鉴权(QQ 号和额外请求头) if request.headers.get('bot') != self.session: # 验证 QQ 号 logger.debug(f"收到来自其他账号({request.headers.get('bot')})的事件。") return for key in self.extra_headers: # 验证请求头 key = key.lower() # HTTP headers 不区分大小写 request_value = request.headers.get(key, '').lower() expect_value = self.extra_headers[key].lower() if (request_value != expect_value ) and (request_value != '[' + expect_value + ']'): logger.info( f"请求头验证失败:expect [{expect_value}], " + f"got {request_value}。" ) return JSONResponse( status_code=401, content={'error': 'Unauthorized'} ) # 处理事件 event = await request.json() result = await self.handle_event(event) return YiriMiraiJSONResponse(result) ASGI().add_route(self.route, endpoint, methods=['POST']) class QuickResponse(BaseException): """WebHook 快速响应,以异常的方式跳出。""" def __init__(self, data: dict): self.data = data @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'route': self.route, 'extra_headers': self.extra_headers, 'enable_quick_response': self.enable_quick_response, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "WebHookAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in [ 'route', 'extra_headers', 'enable_quick_response', 'single_mode' ] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter async def login(self, qq: int): """WebHook 不需要登录。直接返回。""" self.session = str(qq) logger.info(f'[WebHook] 成功登录到账号{qq}。') async def logout(self, terminate: bool = True): """WebHook 不需要登出。直接返回。""" logger.info(f"[WebHook] 从账号{self.session}退出。") async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。WebHook 的 API 调用只能在快速响应中发生。""" if self.enable_quick_response: content = {'command': api.replace('/', '_'), 'content': params} if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) logger.debug(f'[WebHook] WebHook 快速响应 {api}。') raise WebHookAdapter.QuickResponse(content) return None async def _background(self): """WebHook 不需要事件循环。直接返回。""" async def handle_event(self, event): try: tasks = await self.emit(event['type'], event) await asyncio.gather(*tasks) except WebHookAdapter.QuickResponse as response: # 快速响应,直接返回。 return response.data return {}Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var QuickResponse-
WebHook 快速响应,以异常的方式跳出。
var enable_quick_response : bool-
是否启用快速响应。
var extra_headers : Mapping[str, str]-
额外请求头。
var route : str-
适配器的路由。
Methods
async def call_api(self, api: str, method: Method = Method.GET, **params)-
调用 API。WebHook 的 API 调用只能在快速响应中发生。
Expand source code
async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。WebHook 的 API 调用只能在快速响应中发生。""" if self.enable_quick_response: content = {'command': api.replace('/', '_'), 'content': params} if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) logger.debug(f'[WebHook] WebHook 快速响应 {api}。') raise WebHookAdapter.QuickResponse(content) return None async def handle_event(self, event)-
Expand source code
async def handle_event(self, event): try: tasks = await self.emit(event['type'], event) await asyncio.gather(*tasks) except WebHookAdapter.QuickResponse as response: # 快速响应,直接返回。 return response.data return {} async def login(self, qq: int)-
WebHook 不需要登录。直接返回。
Expand source code
async def login(self, qq: int): """WebHook 不需要登录。直接返回。""" self.session = str(qq) logger.info(f'[WebHook] 成功登录到账号{qq}。') async def logout(self, terminate: bool = True)-
WebHook 不需要登出。直接返回。
Expand source code
async def logout(self, terminate: bool = True): """WebHook 不需要登出。直接返回。""" logger.info(f"[WebHook] 从账号{self.session}退出。")
Inherited members
class WebSocketAdapter (verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60.0)-
WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。
Args
verify_key- mirai-api-http 配置的认证 key,关闭认证时为 None。
host- WebSocket Server 的地址。
port- WebSocket Server 的端口。
sync_id- mirai-api-http 配置的同步 ID。
single_mode- 是否启用单例模式。
heartbeat_interval- 每隔多久发送心跳包,单位秒。
Expand source code
class WebSocketAdapter(Adapter): """WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。""" host_name: str """WebSocket Server 的地址。""" sync_id: str """mirai-api-http 配置的同步 ID。""" qq: int """机器人的 QQ 号。""" connection: Optional[WebSocketClientProtocol] """WebSocket 客户端连接。""" heartbeat_interval: float """每隔多久发送心跳包,单位:秒。""" def __init__( self, verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60., ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 host: WebSocket Server 的地址。 port: WebSocket Server 的端口。 sync_id: mirai-api-http 配置的同步 ID。 single_mode: 是否启用单例模式。 heartbeat_interval: 每隔多久发送心跳包,单位秒。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self._host = host self._port = port if host[:2] == '//': host = 'ws:' + host elif host[:7] == 'http://' or host[:8] == 'https://': raise exceptions.NetworkError(f'{host} 不是一个可用的 WebSocket 地址!') elif host[:5] != 'ws://': host = 'ws://' + host if host[-1:] == '/': host = host[:-1] self.host_name = f'{host}:{port}/all' self.sync_id = sync_id # 这个神奇的 sync_id,默认值 -1,居然是个字符串 # 既然这样不如把 sync_id 全改成字符串好了 self.qq = 0 self.connection = None self.heartbeat_interval = heartbeat_interval # 接收 WebSocket 数据的 Task self._receiver_task: Optional[asyncio.Task] = None # 用于临时保存接收到的数据,以便根据 sync_id 进行同步识别 self._recv_dict: Dict[str, deque] = defaultdict(deque) # 本地同步 ID,每次调用 API 递增。 self._local_sync_id = random.randint(1, 1024) * 1024 # 事件处理任务管理器 self._tasks = Tasks() # 心跳机制(Keep-Alive):上次发送数据包的时间 self._last_send_time: float = 0. @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'host': self._host, 'port': self._port, 'sync_id': self.sync_id, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "WebSocketAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['host', 'port', 'sync_id', 'single_mode'] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter @_error_handler_async_local async def _receiver(self): """开始接收 websocket 数据。""" if not self.connection: raise exceptions.NetworkError( f'WebSocket 通道 {self.host_name} 未连接!' ) while True: try: # 数据格式: # { # 'syncId': '-1', # 'data': { # // Event Content # } # } response = json.loads(await self.connection.recv()) data = response['data'] logger.debug( f"[WebSocket] 收到 WebSocket 数据,同步 ID:{response['syncId']}。" ) self._recv_dict[response['syncId']].append(data) except KeyError: logger.error(f'[WebSocket] 不正确的数据:{response}') except ConnectionClosedOK: raise SystemExit() except ConnectionClosed as e: exit_message = f'[WebSocket] WebSocket 通道意外关闭。code: {e.code}, reason: {e.reason}' logger.error(exit_message) raise SystemExit(exit_message) async def _recv(self, sync_id: str = '-1', timeout: int = 600) -> dict: """接收并解析 websocket 数据。""" timer = range(timeout) if timeout > 0 else repeat(0) for _ in timer: if self._recv_dict[sync_id]: data = self._recv_dict[sync_id].popleft() if data.get('code', 0) != 0: raise exceptions.ApiError(data) return data # 如果没有对应同步 ID 的数据,则等待 websocket 数据 # 目前存在问题:如果 mah 发回的数据不含 sync_id, # 这里就会无限循环…… # 所以还是限制次数好了。 await asyncio.sleep(0.1) raise TimeoutError( f'[WebSocket] mirai-api-http 响应超时,可能是由于调用出错。同步 ID:{sync_id}。' ) @_error_handler_async_local async def login(self, qq: int): headers = { 'verifyKey': self.verify_key or '', # 关闭认证时,WebSocket 可传入任意 verify_key 'qq': str(qq), } if self.session: headers['sessionKey'] = self.session self.connection = await connect(self.host_name, extra_headers=headers) self._receiver_task = asyncio.create_task(self._receiver()) verify_response = await self._recv('') # 神奇现象:这里的 syncId 是个空字符串 self.session = verify_response['session'] self.qq = qq logger.info(f'[WebSocket] 成功登录到账号{qq}。') @_error_handler_async_local async def logout(self, terminate: bool = True): if self.connection: await self.connection.close() await self._receiver_task logger.info(f"[WebSocket] 从账号{self.qq}退出。") async def poll_event(self): """获取并处理事件。""" event = await self._recv(self.sync_id, -1) self._tasks.create_task(self.emit(event['type'], event)) async def call_api(self, api: str, method: Method = Method.GET, **params): if not self.connection: raise exceptions.NetworkError( f'WebSocket 通道 {self.host_name} 未连接!' ) self._local_sync_id += 1 # 使用不同的 sync_id sync_id = str(self._local_sync_id) content = { 'syncId': sync_id, 'command': api.replace('/', '_'), 'content': params } if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebSocket 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) await self.connection.send(json_dumps(content)) self._last_send_time = time.time() logger.debug(f"[WebSocket] 发送 WebSocket 数据,同步 ID:{sync_id}。") try: return await self._recv(sync_id) except TimeoutError as e: logger.debug(e) async def _heartbeat(self): while True: await asyncio.sleep(self.heartbeat_interval) if time.time() - self._last_send_time > self.heartbeat_interval: await self.call_api('about') self._last_send_time = time.time() logger.debug("[WebSocket] 发送心跳包。") async def _background(self): """开始接收事件。""" logger.info('[WebSocket] 机器人开始运行。按 Ctrl + C 停止。') try: heartbeat = asyncio.create_task(self._heartbeat()) while True: await self.poll_event() finally: await Tasks.cancel(heartbeat) await self._tasks.cancel_all()Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var connection : Optional[websockets.legacy.client.WebSocketClientProtocol]-
WebSocket 客户端连接。
var heartbeat_interval : float-
每隔多久发送心跳包,单位:秒。
var host_name : str-
WebSocket Server 的地址。
var qq : int-
机器人的 QQ 号。
var sync_id : str-
mirai-api-http 配置的同步 ID。
Methods
async def poll_event(self)-
获取并处理事件。
Expand source code
async def poll_event(self): """获取并处理事件。""" event = await self._recv(self.sync_id, -1) self._tasks.create_task(self.emit(event['type'], event))
Inherited members