Package mirai
YiriMirai
一个轻量级、低耦合的基于 mirai-api-http 的 Python SDK。
更多信息请看文档。
Expand source code
# -*- coding: utf-8 -*-
"""
# YiriMirai
一个轻量级、低耦合的基于 mirai-api-http 的 Python SDK。
更多信息请看[文档](https://yiri-mirai.vercel.app/)。
"""
__version__ = '0.2.6.2'
__author__ = '忘忧北萱草'
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mirai.adapters import (
Adapter, ComposeAdapter, HTTPAdapter, WebHookAdapter, WebSocketAdapter
)
else:
from mirai.adapters import Adapter
from mirai.api_provider import Method
from mirai.bot import (
LifeSpan, Mirai, MiraiRunner, Shutdown, SimpleMirai, Startup
)
from mirai.bus import EventBus
from mirai.colorlog import ColoredFormatter
from mirai.exceptions import (
ApiError, NetworkError, SkipExecution, StopExecution, StopPropagation
)
from mirai.models import (
At, AtAll, Dice, Event, Face, FriendMessage, GroupMessage, Image,
MessageChain, MessageEvent, Plain, Poke, PokeNames, StrangerMessage,
TempMessage, Voice, deserialize, serialize
)
__all__ = [
'Mirai', 'SimpleMirai', 'MiraiRunner', 'LifeSpan', 'Startup', 'Shutdown',
'Adapter', 'Method', 'HTTPAdapter', 'WebSocketAdapter', 'WebHookAdapter',
'ComposeAdapter', 'EventBus', 'get_logger', 'Event', 'MessageEvent',
'FriendMessage', 'GroupMessage', 'TempMessage', 'StrangerMessage',
'MessageChain', 'Plain', 'At', 'AtAll', 'Dice', 'Face', 'Poke',
'PokeNames', 'Image', 'Voice', 'serialize', 'deserialize', 'ApiError',
'NetworkError', 'SkipExecution', 'StopExecution', 'StopPropagation'
]
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = ColoredFormatter(
'%(asctime)s - %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
ch.setFormatter(formatter)
logger.addHandler(ch)
def get_logger() -> logging.Logger:
"""获取 YiriMirai 的模块 Logger。
所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。
Returns:
logging.Logger: 模块 Logger。
"""
return logger
def __getattr__(name):
if name in (
'HTTPAdapter', 'WebSocketAdapter', 'WebHookAdapter', 'ComposeAdapter'
):
import mirai.adapters
return getattr(mirai.adapters, name)
raise AttributeError(f'Module {__name__} has no attribute {name}')
Sub-modules
mirai.adapters
-
此模块提供网络适配器相关。 …
mirai.api_provider
-
此模块提供 ApiProvider 的基础定义。
mirai.asgi
-
此模块提供公共 ASGI 前端。
mirai.bot
-
此模块定义机器人类,包含处于 model 层之下的
SimpleMirai
和建立在 model 层上的Mirai
。 mirai.bus
-
此模块提供基础事件总线相关。 …
mirai.colorlog
-
此模块提供带颜色的格式化日志输出的功能。 …
mirai.exceptions
-
此模块提供异常相关。
mirai.models
-
此模块提供 model 层封装。 …
mirai.tasks
-
此模块提供了管理多个异步任务的一种方式。
mirai.utils
-
此模块提供一些实用的辅助方法。
Functions
def deserialize(s: str) ‑> str
-
mirai 码去转义。
Args
s
- 待去转义的字符串。
Returns
str
- 去转义后的字符串。
Expand source code
def deserialize(s: str) -> str: """mirai 码去转义。 Args: s: 待去转义的字符串。 Returns: str: 去转义后的字符串。 """ return re.sub( r'\\([\[\]:,\\])', lambda match: match.group(1), s.replace('\\n', '\n').replace('\\r', '\r') )
def get_logger() ‑> logging.Logger
-
获取 YiriMirai 的模块 Logger。
所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。
Returns
logging.Logger
- 模块 Logger。
Expand source code
def get_logger() -> logging.Logger: """获取 YiriMirai 的模块 Logger。 所有的模块的 Logger 都是此 Logger 的子 Logger,修改此 Logger 的属性以应用到 YiriMirai 全局。 Returns: logging.Logger: 模块 Logger。 """ return logger
def serialize(s: str) ‑> str
-
mirai 码转义。
Args
s
- 待转义的字符串。
Returns
str
- 去转义后的字符串。
Expand source code
def serialize(s: str) -> str: """mirai 码转义。 Args: s: 待转义的字符串。 Returns: str: 去转义后的字符串。 """ return re.sub(r'[\[\]:,\\]', lambda match: '\\' + match.group(0), s).replace('\n', '\\n').replace('\r', '\\r')
Classes
class Adapter (verify_key: Optional[str], single_mode: bool = False)
-
适配器基类,与 mirai-api-http 沟通的底层实现。
属性
buses
为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
single_mode
- 是否开启 single_mode,开启后与 session 将无效。
Expand source code
class Adapter(ApiProvider, AdapterInterface): """适配器基类,与 mirai-api-http 沟通的底层实现。 属性 `buses` 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。 """ verify_key: Optional[str] """mirai-api-http 配置的认证 key,关闭认证时为 None。""" single_mode: bool """是否开启 single_mode,开启后与 session 将无效。""" session: str """从 mirai-api-http 处获得的 session。""" buses: Set[AbstractEventBus] """注册的事件总线集合。""" background: Optional[asyncio.Task] """背景事件循环任务。""" def __init__(self, verify_key: Optional[str], single_mode: bool = False): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 single_mode: 是否开启 single_mode,开启后与 session 将无效。 """ self.verify_key = verify_key self.single_mode = single_mode self.session = '' self.buses = set() self.background = None @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "Adapter": """从适配器接口创建适配器。 Args: adapter_interface: 适配器接口。 Returns: Adapter: 创建的适配器。 """ info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['single_mode'] if info.get(key) is not None } ) adapter.session = cast(str, info.get('session')) return adapter def register_event_bus(self, *buses: AbstractEventBus): """注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses |= set(buses) def unregister_event_bus(self, *buses: AbstractEventBus): """解除注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses -= set(buses) @abc.abstractmethod async def login(self, qq: int): """登录到 mirai-api-http。""" @abc.abstractmethod async def logout(self, terminate: bool = True): """登出。""" @abc.abstractmethod async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。 Args: api: API 名称,需与 mirai-api-http 中的定义一致。 method: 调用方法。默认为 GET。 **params: 参数。 """ @abc.abstractmethod async def _background(self): """背景事件循环,用于接收事件。""" async def start(self): """运行背景事件循环。""" if not self.buses: raise RuntimeError('事件总线未指定!') if not self.session: raise RuntimeError('未登录!') self.background = asyncio.create_task(self._background()) async def shutdown(self): """停止背景事件循环。""" if self.background: await Tasks.cancel(self.background) async def emit(self, event: str, *args, **kwargs): """向事件总线发送一个事件。 Args: event: 事件名称。 *args: 事件参数。 **kwargs: 事件参数。 """ coros = [bus.emit(event, *args, **kwargs) for bus in self.buses] return sum(await asyncio.gather(*coros), [])
Ancestors
- ApiProvider
- AdapterInterface
- abc.ABC
Subclasses
Class variables
var background : Optional[_asyncio.Task]
-
背景事件循环任务。
var buses : Set[AbstractEventBus]
-
注册的事件总线集合。
var session : str
-
从 mirai-api-http 处获得的 session。
var single_mode : bool
-
是否开启 single_mode,开启后与 session 将无效。
var verify_key : Optional[str]
-
mirai-api-http 配置的认证 key,关闭认证时为 None。
Static methods
def via(adapter_interface: AdapterInterface) ‑> Adapter
-
Expand source code
@classmethod def via(cls, adapter_interface: AdapterInterface) -> "Adapter": """从适配器接口创建适配器。 Args: adapter_interface: 适配器接口。 Returns: Adapter: 创建的适配器。 """ info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['single_mode'] if info.get(key) is not None } ) adapter.session = cast(str, info.get('session')) return adapter
Methods
async def call_api(self, api: str, method: Method = Method.GET, **params)
-
调用 API。
Args
api
- API 名称,需与 mirai-api-http 中的定义一致。
method
- 调用方法。默认为 GET。
**params
- 参数。
Expand source code
@abc.abstractmethod async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。 Args: api: API 名称,需与 mirai-api-http 中的定义一致。 method: 调用方法。默认为 GET。 **params: 参数。 """
async def emit(self, event: str, *args, **kwargs)
-
向事件总线发送一个事件。
Args
event
- 事件名称。
*args
- 事件参数。
**kwargs
- 事件参数。
Expand source code
async def emit(self, event: str, *args, **kwargs): """向事件总线发送一个事件。 Args: event: 事件名称。 *args: 事件参数。 **kwargs: 事件参数。 """ coros = [bus.emit(event, *args, **kwargs) for bus in self.buses] return sum(await asyncio.gather(*coros), [])
async def login(self, qq: int)
-
登录到 mirai-api-http。
Expand source code
@abc.abstractmethod async def login(self, qq: int): """登录到 mirai-api-http。"""
async def logout(self, terminate: bool = True)
-
登出。
Expand source code
@abc.abstractmethod async def logout(self, terminate: bool = True): """登出。"""
def register_event_bus(self, *buses: AbstractEventBus)
-
注册事件总线。
Args
*buses
- 一个或多个事件总线。
Expand source code
def register_event_bus(self, *buses: AbstractEventBus): """注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses |= set(buses)
async def shutdown(self)
-
停止背景事件循环。
Expand source code
async def shutdown(self): """停止背景事件循环。""" if self.background: await Tasks.cancel(self.background)
async def start(self)
-
运行背景事件循环。
Expand source code
async def start(self): """运行背景事件循环。""" if not self.buses: raise RuntimeError('事件总线未指定!') if not self.session: raise RuntimeError('未登录!') self.background = asyncio.create_task(self._background())
def unregister_event_bus(self, *buses: AbstractEventBus)
-
解除注册事件总线。
Args
*buses
- 一个或多个事件总线。
Expand source code
def unregister_event_bus(self, *buses: AbstractEventBus): """解除注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses -= set(buses)
Inherited members
class ApiError (response: dict)
-
调用 API 出错。
Args
response(
dict
): mirai-api-http 的 API 返回结果。Expand source code
class ApiError(RuntimeError): """调用 API 出错。""" def __init__(self, response: dict): """ Args: response(`dict`): mirai-api-http 的 API 返回结果。 """ code = response['code'] self.code = code self.args = ( code, f'[ERROR {code}]' + API_ERROR_FMT.get(code, ''), response.get('msg', '') )
Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class At (*args, **kwargs)
-
At某人。
Expand source code
class At(MessageComponent): """At某人。""" type: str = "At" """消息组件类型。""" target: int """群员 QQ 号。""" display: Optional[str] = None """At时显示的文字,发送消息时无效,自动使用群名片。""" def __eq__(self, other): return isinstance(other, At) and self.target == other.target def __str__(self): return f"@{self.display or self.target}" def as_mirai_code(self) -> str: return f"[mirai:at:{self.target}]"
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var display : Optional[str]
-
At时显示的文字,发送消息时无效,自动使用群名片。
var target : int
-
群员 QQ 号。
Inherited members
class AtAll (*args, **kwargs)
-
At全体。
Expand source code
class AtAll(MessageComponent): """At全体。""" type: str = "AtAll" """消息组件类型。""" def __str__(self): return "@全体成员" def as_mirai_code(self) -> str: return f"[mirai:atall]"
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class ComposeAdapter (api_channel: Adapter, event_channel: Adapter)
-
组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。
Args
api_channel
- 提供 API 调用的适配器。
event_channel
- 提供事件处理的适配器。
Expand source code
class ComposeAdapter(Adapter): """组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。""" api_channel: Adapter """提供 API 调用的适配器。""" event_channel: Adapter """提供事件处理的适配器。""" def __init__(self, api_channel: Adapter, event_channel: Adapter): """ Args: api_channel: 提供 API 调用的适配器。 event_channel: 提供事件处理的适配器。 """ super().__init__( verify_key=api_channel.verify_key, single_mode=api_channel.single_mode ) if api_channel.verify_key != event_channel.verify_key: raise ValueError('组合适配器应使用相同的 verify_key。') self.api_channel = api_channel self.event_channel = event_channel event_channel.buses = self.buses self.verify_key = api_channel.verify_key self.single_mode = api_channel.single_mode @property def adapter_info(self): return self.api_channel.adapter_info async def login(self, qq: int): await self.api_channel.login(qq) # 绑定 session self.event_channel.session = self.api_channel.session self.session = self.api_channel.session await self.event_channel.login(qq) async def logout(self): await self.event_channel.logout() await self.api_channel.logout() async def call_api(self, api: str, method: Method = Method.GET, **params): return await self.api_channel.call_api(api, method, **params) async def _background(self): await self.event_channel._background()
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var api_channel : Adapter
-
提供 API 调用的适配器。
var event_channel : Adapter
-
提供事件处理的适配器。
Inherited members
class Dice (*args, **kwargs)
-
骰子。
Expand source code
class Dice(MessageComponent): """骰子。""" type: str = "Dice" """消息组件类型。""" value: int """点数。""" def __str__(self): return f'[骰子{self.value}]' def as_mirai_code(self) -> str: return f'[mirai:dice:{self.value}]'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var value : int
-
点数。
Inherited members
class Event (*args, **kwargs)
-
事件基类。
Args
type
- 事件名。
Expand source code
class Event(MiraiIndexedModel): """事件基类。 Args: type: 事件名。 """ type: str """事件名。""" def __repr__(self): return self.__class__.__name__ + '(' + ', '.join( ( f'{k}={repr(v)}' for k, v in self.__dict__.items() if k != 'type' and v ) ) + ')' @classmethod def parse_subtype(cls, obj: dict) -> 'Event': try: return cast(Event, super().parse_subtype(obj)) except ValueError: return Event(type=obj['type']) @classmethod def get_subtype(cls, name: str) -> Type['Event']: try: return cast(Type[Event], super().get_subtype(name)) except ValueError: return Event
Ancestors
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
- LifeSpan
- BotEvent
- CommandEvent
- FriendEvent
- FriendRecallEvent
- GroupEvent
- MessageEvent
- NudgeEvent
- mirai.models.events.OtherClientEvent
- RequestEvent
Class variables
var type : str
-
事件名。
Inherited members
class EventBus (event_chain_generator: Callable[[str], Iterable[str]] = <function event_chain_single>)
-
事件总线。
事件总线提供了一个简单的方法,用于分发事件。事件处理器可以通过
subscribe
或on
注册事件, 并通过emit
来触发事件。事件链(Event Chain)是一种特殊的事件处理机制。事件链包含一系列事件,其中底层事件触发时,上层事件也会响应。
事件总线的构造函数中的
event_chain_generator
参数规定了生成事件链的方式。 此模块中的event_chain_single
和event_chain_separator
可应用于此参数,分别生成单一事件的事件链和按照分隔符划分的事件链。Args
event_chain_generator
- 一个函数, 输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器, 默认行为是事件链只包含单一事件。
Expand source code
class EventBus(AbstractEventBus): """事件总线。 事件总线提供了一个简单的方法,用于分发事件。事件处理器可以通过 `subscribe` 或 `on` 注册事件, 并通过 `emit` 来触发事件。 事件链(Event Chain)是一种特殊的事件处理机制。事件链包含一系列事件,其中底层事件触发时,上层事件也会响应。 事件总线的构造函数中的 `event_chain_generator` 参数规定了生成事件链的方式。 此模块中的 `event_chain_single` 和 `event_chain_separator` 可应用于此参数,分别生成单一事件的事件链和按照分隔符划分的事件链。 """ def __init__( self, event_chain_generator: Callable[[str], Iterable[str]] = event_chain_single, ): """ Args: event_chain_generator: 一个函数, 输入事件名,返回一个生成此事件所在事件链的全部事件的事件名的生成器, 默认行为是事件链只包含单一事件。 """ self._subscribers: Dict[ str, PriorityDict[Callable]] = defaultdict(PriorityDict) self.event_chain_generator = event_chain_generator def subscribe(self, event: str, func: Callable, priority: int = 0) -> None: """注册事件处理器。 Args: event: 事件名。 func: 事件处理器。 priority: 优先级,小者优先。 """ self._subscribers[event].add(priority, func) def unsubscribe(self, event: str, func: Callable) -> None: """移除事件处理器。 Args: event: 事件名。 func: 事件处理器。 """ try: self._subscribers[event].remove(func) except KeyError: logger.warning(f'试图移除事件 `{event}` 的一个不存在的事件处理器 `{func}`。') def on(self, event: str, priority: int = 0) -> Callable: """以装饰器的方式注册事件处理器。 例如: ```py @bus.on('Event.MyEvent') def my_event_handler(event): print(event) ``` Args: event: 事件名。 priority: 优先级,小者优先。 """ def decorator(func: Callable) -> Callable: self.subscribe(event, func, priority) return func return decorator async def emit(self, event: str, *args, **kwargs) -> List[Awaitable[Any]]: """触发一个事件。 异步执行说明:`await emit` 时执行事件处理器,所有事件处理器执行完后,并行运行所有快速响应。 Args: event: 要触发的事件名称。 *args: 传递给事件处理器的参数。 **kwargs: 传递给事件处理器的参数。 Returns: List[Awaitable[Any]]: 所有事件处理器的快速响应协程的 Task。 """ async def call(f) -> Optional[Awaitable[Any]]: result = await async_with_exception(f(*args, **kwargs)) # 快速响应:如果事件处理器返回一个协程,那么立即运行这个协程。 if inspect.isawaitable(result): return async_with_exception(result) # 当不使用快速响应时,返回值无意义。 return None coros: List[Optional[Awaitable[Any]]] = [] try: for m_event in self.event_chain_generator(event): try: # 使用 list 避免 _subscribers 被改变引起错误。 for listeners in list(self._subscribers[m_event]): try: # noinspection PyTypeChecker callee = (call(f) for f in listeners) coros += await asyncio.gather(*callee) except SkipExecution: continue except StopExecution: continue except StopPropagation: pass # 只保留快速响应的返回值。 return [asyncio.create_task(coro) for coro in filter(None, coros)]
Ancestors
Subclasses
Inherited members
class Face (*args, **kwargs)
-
表情。
Expand source code
class Face(MessageComponent): """表情。""" type: str = "Face" """消息组件类型。""" face_id: Optional[int] = None """QQ 表情编号,可选,优先度高于 name。""" name: Optional[str] = None """QQ表情名称,可选。""" def __init__(self, *args, **kwargs): if len(args) == 1: if isinstance(args[0], str): self.name = args[0] elif isinstance(args[0], int): self.face_id = args[0] super().__init__(**kwargs) super().__init__(*args, **kwargs) def __eq__(self, other): return isinstance(other, Face) and \ (self.face_id == other.face_id or self.name == other.name) def __str__(self): if self.name: return f'[{self.name}]' return '[表情]' def as_mirai_code(self): return f"[mirai:face:{self.face_id}]"
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var face_id : Optional[int]
-
QQ 表情编号,可选,优先度高于 name。
var name : Optional[str]
-
QQ表情名称,可选。
Inherited members
class FriendMessage (*args, **kwargs)
-
好友消息。
Args
type
- 事件名。
sender
- 发送消息的好友。
message_chain
- 消息内容。
Expand source code
class FriendMessage(MessageEvent): """好友消息。 Args: type: 事件名。 sender: 发送消息的好友。 message_chain: 消息内容。 """ type: str = 'FriendMessage' """事件名。""" sender: Friend """发送消息的好友。""" message_chain: MessageChain """消息内容。"""
Ancestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : Friend
-
发送消息的好友。
Inherited members
class GroupMessage (*args, **kwargs)
-
群消息。
Args
type
- 事件名。
sender
- 发送消息的群成员。
message_chain
- 消息内容。
Expand source code
class GroupMessage(MessageEvent): """群消息。 Args: type: 事件名。 sender: 发送消息的群成员。 message_chain: 消息内容。 """ type: str = 'GroupMessage' """事件名。""" sender: GroupMember """发送消息的群成员。""" message_chain: MessageChain """消息内容。""" @property def group(self) -> Group: return self.sender.group
Ancestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : GroupMember
-
发送消息的群成员。
Instance variables
var group : Group
-
Expand source code
@property def group(self) -> Group: return self.sender.group
Inherited members
class HTTPAdapter (verify_key: Optional[str], host: str, port: int, poll_interval: float = 1.0, single_mode: bool = False)
-
HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。
Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
host
- HTTP Server 的地址。
port
- HTTP Server 的端口。
poll_interval
- 轮询时间间隔,单位秒。
single_mode
- 是否为单例模式。
Expand source code
class HTTPAdapter(Adapter): """HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。""" host_name: str """mirai-api-http 的 HTTPAdapter Server 主机名。""" poll_interval: float """轮询时间间隔,单位秒。""" qq: int """机器人的 QQ 号。""" headers: httpx.Headers """HTTP 请求头。""" def __init__( self, verify_key: Optional[str], host: str, port: int, poll_interval: float = 1., single_mode: bool = False ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 host: HTTP Server 的地址。 port: HTTP Server 的端口。 poll_interval: 轮询时间间隔,单位秒。 single_mode: 是否为单例模式。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self._host = host self._port = port if host[:2] == '//': host = 'http:' + host elif host[:8] == 'https://': raise exceptions.NetworkError('不支持 HTTPS!') elif host[:7] != 'http://': host = 'http://' + host if host[-1:] == '/': host = host[:-1] self.host_name = f'{host}:{port}' self.poll_interval = poll_interval self.qq = 0 self.headers = httpx.Headers() # 使用 headers 传递 session self._tasks = Tasks() @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'host': self._host, 'port': self._port, 'poll_interval': self.poll_interval, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "HTTPAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['host', 'port', 'poll_interval', 'single_mode'] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter @_error_handler_async_local async def _post(self, client: httpx.AsyncClient, url: str, json: dict) -> Optional[dict]: """调用 POST 方法。""" # 使用自定义的 json.dumps content = json_dumps(json).encode('utf-8') try: response = await client.post( url, content=content, headers={'Content-Type': 'application/json'}, timeout=10. ) logger.debug( f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] POST 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def _get(self, client: httpx.AsyncClient, url: str, params: dict) -> Optional[dict]: """调用 GET 方法。""" try: response = await client.get(url, params=params, timeout=10.) logger.debug( f'[HTTP] 发送 GET 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] GET 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def _post_multipart( self, client: httpx.AsyncClient, url: str, data: dict, files: dict ) -> Optional[dict]: """调用 POST 方法,发送 multipart 数据。""" try: response = await client.post( url, data=data, files=files, timeout=30. ) logger.debug( f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] POST 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def login(self, qq: int): async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: if not self.session: if self.verify_key is not None: self.session = ( await self._post( client, '/verify', { "verifyKey": self.verify_key, } ) )['session'] else: self.session = str(random.randint(1, 2**20)) if not self.single_mode: await self._post( client, '/bind', { "sessionKey": self.session, "qq": qq, } ) self.headers = httpx.Headers({'sessionKey': self.session}) self.qq = qq logger.info(f'[HTTP] 成功登录到账号{qq}。') @_error_handler_async_local async def logout(self, terminate: bool = True): if self.session and not self.single_mode: if terminate: async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: await self._post( client, '/release', { "sessionKey": self.session, "qq": self.qq, } ) logger.info(f"[HTTP] 从账号{self.qq}退出。") async def poll_event(self): """进行一次轮询,获取并处理事件。""" async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: msg_count = (await self._get(client, '/countMessage', {}))['data'] if msg_count > 0: msg_list = ( await self._get(client, '/fetchMessage', {'count': msg_count}) )['data'] coros = [self.emit(msg['type'], msg) for msg in msg_list] await asyncio.gather(*coros) async def call_api(self, api: str, method: Method = Method.GET, **params) -> Optional[dict]: async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: if method == Method.GET or method == Method.RESTGET: return await self._get(client, f'/{api}', params) if method == Method.POST or method == Method.RESTPOST: return await self._post(client, f'/{api}', params) if method == Method.MULTIPART: return await self._post_multipart( client, f'/{api}', params['data'], params['files'] ) return None async def _background(self): """开始轮询。""" logger.info('[HTTP] 机器人开始运行。按 Ctrl + C 停止。') try: while True: self._tasks.create_task(self.poll_event()) await asyncio.sleep(self.poll_interval) finally: await self._tasks.cancel_all()
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var headers : httpx.Headers
-
HTTP 请求头。
var host_name : str
-
mirai-api-http 的 HTTPAdapter Server 主机名。
var poll_interval : float
-
轮询时间间隔,单位秒。
var qq : int
-
机器人的 QQ 号。
Methods
async def poll_event(self)
-
进行一次轮询,获取并处理事件。
Expand source code
async def poll_event(self): """进行一次轮询,获取并处理事件。""" async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: msg_count = (await self._get(client, '/countMessage', {}))['data'] if msg_count > 0: msg_list = ( await self._get(client, '/fetchMessage', {'count': msg_count}) )['data'] coros = [self.emit(msg['type'], msg) for msg in msg_list] await asyncio.gather(*coros)
Inherited members
class Image (*args, **kwargs)
-
图片。
Expand source code
class Image(MessageComponent): """图片。""" type: str = "Image" """消息组件类型。""" image_id: Optional[str] = None """图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。""" url: Optional[HttpUrl] = None """图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。""" path: Union[str, Path, None] = None """图片的路径,发送本地图片。""" base64: Optional[str] = None """图片的 Base64 编码。""" width: int """图片的宽度""" height: int """图片的高度""" def __eq__(self, other): return isinstance( other, Image ) and self.type == other.type and self.uuid == other.uuid def __str__(self): return '[图片]' def as_mirai_code(self) -> str: return f"[mirai:image:{self.image_id}]" @validator('path') def validate_path(cls, path: Union[str, Path, None]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path @property def uuid(self): image_id = self.image_id if image_id[0] == '{': # 群图片 image_id = image_id[1:37] elif image_id[0] == '/': # 好友图片 image_id = image_id[1:] return image_id def as_group_image(self) -> str: return f"{{{self.uuid.upper()}}}.jpg" def as_friend_image(self) -> str: return f"/{self.uuid.lower()}" def as_flash_image(self) -> "FlashImage": return FlashImage( image_id=self.image_id, url=self.url, path=self.path, base64=self.base64 ) async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None, determine_type: bool = True ): """下载图片到本地。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 determine_type: 是否自动根据图片类型确定拓展名,默认为 True。 """ if not self.url: logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) if determine_type: import imghdr path = path.with_suffix( '.' + str(imghdr.what(None, content)) ) path.parent.mkdir(parents=True, exist_ok=True) elif directory: import imghdr path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.uuid}.{imghdr.what(None, content)}' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) return path @classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Image": """从本地文件路径加载图片,以 base64 的形式传递。 Args: filename: 从本地文件路径加载图片,与 `content` 二选一。 content: 从本地文件内容加载图片,与 `filename` 二选一。 Returns: Image: 图片对象。 """ if content: pass elif filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定图片路径或图片内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img @classmethod def from_unsafe_path(cls, path: Union[str, Path]) -> "Image": """从不安全的路径加载图片。 Args: path: 从不安全的路径加载图片。 Returns: Image: 图片对象。 """ return cls.construct(path=str(path))
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var base64 : Optional[str]
-
图片的 Base64 编码。
var height : int
-
图片的高度
var image_id : Optional[str]
-
图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。
var path : Union[str, pathlib.Path, NoneType]
-
图片的路径,发送本地图片。
var url : Optional[pydantic.networks.HttpUrl]
-
图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。
var width : int
-
图片的宽度
Static methods
async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Image
-
从本地文件路径加载图片,以 base64 的形式传递。
Args
filename
- 从本地文件路径加载图片,与
content
二选一。 content
- 从本地文件内容加载图片,与
filename
二选一。
Returns
Image
- 图片对象。
Expand source code
@classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Image": """从本地文件路径加载图片,以 base64 的形式传递。 Args: filename: 从本地文件路径加载图片,与 `content` 二选一。 content: 从本地文件内容加载图片,与 `filename` 二选一。 Returns: Image: 图片对象。 """ if content: pass elif filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定图片路径或图片内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img
def from_unsafe_path(path: Union[str, pathlib.Path]) ‑> Image
-
Expand source code
@classmethod def from_unsafe_path(cls, path: Union[str, Path]) -> "Image": """从不安全的路径加载图片。 Args: path: 从不安全的路径加载图片。 Returns: Image: 图片对象。 """ return cls.construct(path=str(path))
def validate_path(path: Union[str, pathlib.Path, NoneType])
-
修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。
Expand source code
@validator('path') def validate_path(cls, path: Union[str, Path, None]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path
Instance variables
var uuid
-
Expand source code
@property def uuid(self): image_id = self.image_id if image_id[0] == '{': # 群图片 image_id = image_id[1:37] elif image_id[0] == '/': # 好友图片 image_id = image_id[1:] return image_id
Methods
def as_flash_image(self) ‑> FlashImage
-
Expand source code
def as_flash_image(self) -> "FlashImage": return FlashImage( image_id=self.image_id, url=self.url, path=self.path, base64=self.base64 )
def as_friend_image(self) ‑> str
-
Expand source code
def as_friend_image(self) -> str: return f"/{self.uuid.lower()}"
def as_group_image(self) ‑> str
-
Expand source code
def as_group_image(self) -> str: return f"{{{self.uuid.upper()}}}.jpg"
async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None, determine_type: bool = True)
-
下载图片到本地。
Args
filename
- 下载到本地的文件路径。与
directory
二选一。 directory
- 下载到本地的文件夹路径。与
filename
二选一。 determine_type
- 是否自动根据图片类型确定拓展名,默认为 True。
Expand source code
async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None, determine_type: bool = True ): """下载图片到本地。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 determine_type: 是否自动根据图片类型确定拓展名,默认为 True。 """ if not self.url: logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) if determine_type: import imghdr path = path.with_suffix( '.' + str(imghdr.what(None, content)) ) path.parent.mkdir(parents=True, exist_ok=True) elif directory: import imghdr path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.uuid}.{imghdr.what(None, content)}' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) return path
Inherited members
class LifeSpan (*args, **kwargs)
-
生命周期事件。
Expand source code
class LifeSpan(Event): """生命周期事件。""" type: str = 'LifeSpan' """事件名。"""
Ancestors
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Inherited members
class MessageChain
-
消息链。
一个构造消息链的例子:
message_chain = MessageChain([ AtAll(), Plain("Hello World!"), ])
Plain
可以省略。message_chain = MessageChain([ AtAll(), "Hello World!", ])
在调用 API 时,参数中需要 MessageChain 的,也可以使用
List[MessageComponent]
代替。 例如,以下两种写法是等价的:await bot.send_friend_message(12345678, [ Plain("Hello World!") ])
await bot.send_friend_message(12345678, MessageChain([ Plain("Hello World!") ]))
使用
str(message_chain)
获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的文档。 获取未转义的消息链字符串,可以使用deserialize()(str(message_chain))
。可以使用 for 循环遍历消息链中的消息组件。
for component in message_chain: print(repr(component))
可以使用
==
运算符比较两个消息链是否相同。another_msg_chain = MessageChain([ { "type": "AtAll" }, { "type": "Plain", "text": "Hello World!" }, ]) print(message_chain == another_msg_chain) 'True'
可以使用
in
运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。if AtAll in message_chain: print('AtAll') if At(bot.qq) in message_chain: print('At Me') if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain: print('Hello!') if 'Hello' in message_chain: print('Hi!')
消息链的
has
方法和in
等价。if message_chain.has(AtAll): print('AtAll')
也可以使用
>=
和<=
运算符:if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain: print('Hello!')
需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。
消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。
plain_list = message_chain[Plain] '[Plain("Hello World!")]'
以
类型, 数量
为索引,获取前至多多少个该类型的消息组件。plain_list_first = message_chain[Plain, 1] '[Plain("Hello World!")]'
消息链的
get
方法和索引操作等价。plain_list_first = message_chain.get(Plain) '[Plain("Hello World!")]'
消息链的
get
方法还可指定第二个参数count
,这相当于以类型, 数量
为索引。plain_list_first = message_chain.get(Plain, 1) # 这等价于 plain_list_first = message_chain[Plain, 1]
可以用加号连接两个消息链。
MessageChain(['Hello World!']) + MessageChain(['Goodbye World!']) # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")])
可以用
*
运算符复制消息链。MessageChain(['Hello World!']) * 2 # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")])
除此之外,消息链还支持很多 list 拥有的操作,比如
index
和count
。message_chain = MessageChain([ AtAll(), "Hello World!", ]) message_chain.index(Plain) # 返回 0 message_chain.count(Plain) # 返回 1
消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。
Expand source code
class MessageChain(MiraiBaseModel): """消息链。 一个构造消息链的例子: ```py message_chain = MessageChain([ AtAll(), Plain("Hello World!"), ]) ``` `Plain` 可以省略。 ```py message_chain = MessageChain([ AtAll(), "Hello World!", ]) ``` 在调用 API 时,参数中需要 MessageChain 的,也可以使用 `List[MessageComponent]` 代替。 例如,以下两种写法是等价的: ```py await bot.send_friend_message(12345678, [ Plain("Hello World!") ]) ``` ```py await bot.send_friend_message(12345678, MessageChain([ Plain("Hello World!") ])) ``` 使用`str(message_chain)`获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的[文档](https://github.com/mamoe/mirai/blob/dev/docs/Messages.md#mirai-%E7%A0%81)。 获取未转义的消息链字符串,可以使用`deserialize(str(message_chain))`。 可以使用 for 循环遍历消息链中的消息组件。 ```py for component in message_chain: print(repr(component)) ``` 可以使用 `==` 运算符比较两个消息链是否相同。 ```py another_msg_chain = MessageChain([ { "type": "AtAll" }, { "type": "Plain", "text": "Hello World!" }, ]) print(message_chain == another_msg_chain) 'True' ``` 可以使用 `in` 运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 ```py if AtAll in message_chain: print('AtAll') if At(bot.qq) in message_chain: print('At Me') if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain: print('Hello!') if 'Hello' in message_chain: print('Hi!') ``` 消息链的 `has` 方法和 `in` 等价。 ```py if message_chain.has(AtAll): print('AtAll') ``` 也可以使用 `>=` 和 `<= `运算符: ```py if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain: print('Hello!') ``` 需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。 消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。 ```py plain_list = message_chain[Plain] '[Plain("Hello World!")]' ``` 以 `类型, 数量` 为索引,获取前至多多少个该类型的消息组件。 ```py plain_list_first = message_chain[Plain, 1] '[Plain("Hello World!")]' ``` 消息链的 `get` 方法和索引操作等价。 ```py plain_list_first = message_chain.get(Plain) '[Plain("Hello World!")]' ``` 消息链的 `get` 方法还可指定第二个参数 `count`,这相当于以 `类型, 数量` 为索引。 ```py plain_list_first = message_chain.get(Plain, 1) # 这等价于 plain_list_first = message_chain[Plain, 1] ``` 可以用加号连接两个消息链。 ```py MessageChain(['Hello World!']) + MessageChain(['Goodbye World!']) # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")]) ``` 可以用 `*` 运算符复制消息链。 ```py MessageChain(['Hello World!']) * 2 # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")]) ``` 除此之外,消息链还支持很多 list 拥有的操作,比如 `index` 和 `count`。 ```py message_chain = MessageChain([ AtAll(), "Hello World!", ]) message_chain.index(Plain) # 返回 0 message_chain.count(Plain) # 返回 1 ``` 消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。 """ __root__: List[MessageComponent] @staticmethod def _parse_message_chain(msg_chain: Iterable): result = [] for msg in msg_chain: if isinstance(msg, dict): result.append(MessageComponent.parse_subtype(msg)) elif isinstance(msg, MessageComponent): result.append(msg) elif isinstance(msg, str): result.append(Plain(msg)) else: raise TypeError( f"消息链中元素需为 dict 或 str 或 MessageComponent,当前类型:{type(msg)}" ) return result @validator('__root__', always=True, pre=True) def _parse_component(cls, msg_chain): if isinstance(msg_chain, (str, MessageComponent)): msg_chain = [msg_chain] if not msg_chain: msg_chain = [] return cls._parse_message_chain(msg_chain) @classmethod def parse_obj(cls, msg_chain: Iterable): """通过列表形式的消息链,构造对应的 `MessageChain` 对象。 Args: msg_chain: 列表形式的消息链。 """ result = cls._parse_message_chain(msg_chain) return cls(__root__=result) def __init__(self, __root__: Iterable[MessageComponent] = None): super().__init__(__root__=__root__) def __str__(self): return "".join(str(component) for component in self.__root__) def as_mirai_code(self) -> str: """将消息链转换为 mirai 码字符串。 该方法会自动转换消息链中的元素。 Returns: mirai 码字符串。 """ return "".join( component.as_mirai_code() for component in self.__root__ ) def __repr__(self): return f'{self.__class__.__name__}({self.__root__!r})' def __iter__(self): yield from self.__root__ @overload def get(self, index: int) -> MessageComponent: ... @overload def get(self, index: slice) -> List[MessageComponent]: ... @overload def get(self, index: Type[TMessageComponent]) -> List[TMessageComponent]: ... @overload def get( self, index: Tuple[Type[TMessageComponent], int] ) -> List[TMessageComponent]: ... def get( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]], count: Optional[int] = None ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: """获取消息链中的某个(某些)消息组件,或某类型的消息组件。 Args: index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`): 如果为 `int`,则返回该索引处的消息组件。 如果为 `slice`,则返回该索引范围处的消息组件。 如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。 如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。 count: 如果为 `int`,则返回至多 `count` 个消息组件。 Returns: MessageComponent: 返回指定索引处的消息组件。 List[MessageComponent]: 返回指定索引范围的消息组件。 List[TMessageComponent]: 返回指定类型的消息组件构成的列表。 """ # 正常索引 if isinstance(index, int): return self.__root__[index] # 切片索引 if isinstance(index, slice): return self.__root__[index] # 指定 count if count: if isinstance(index, type): index = (index, count) elif isinstance(index, tuple): index = (index[0], count if count < index[1] else index[1]) # 索引对象为 MessageComponent 类,返回所有对应 component if isinstance(index, type): return [ component for component in self if type(component) is index ] # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component if isinstance(index, tuple): components = ( component for component in self if type(component) is index[0] ) return [ component for component, _ in zip(components, range(index[1])) ] raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}") def get_first(self, t: Type[TMessageComponent]) -> Optional[TMessageComponent]: """获取消息链中第一个符合类型的消息组件。""" for component in self: if isinstance(component, t): return component return None @overload def __getitem__(self, index: int) -> MessageComponent: ... @overload def __getitem__(self, index: slice) -> List[MessageComponent]: ... @overload def __getitem__(self, index: Type[TMessageComponent]) -> List[TMessageComponent]: ... @overload def __getitem__( self, index: Tuple[Type[TMessageComponent], int] ) -> List[TMessageComponent]: ... def __getitem__( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]] ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: return self.get(index) def __setitem__( self, key: Union[int, slice], value: Union[MessageComponent, str, Iterable[Union[MessageComponent, str]]] ): if isinstance(value, str): value = Plain(value) if isinstance(value, Iterable): value = (Plain(c) if isinstance(c, str) else c for c in value) self.__root__[key] = value # type: ignore def __delitem__(self, key: Union[int, slice]): del self.__root__[key] def __reversed__(self) -> Iterable[MessageComponent]: return reversed(self.__root__) def has( self, sub: Union[MessageComponent, Type[MessageComponent], 'MessageChain', str] ) -> bool: """判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 Args: sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`): 若为 `MessageComponent`,则判断该组件是否在消息链中。 若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。 若为 `MessageChain`,则判断该子消息链是否在消息链中。 若为 `str`,则判断对应的 mirai 码中是否有某子字符串。 Returns: bool: 是否找到。 """ if isinstance(sub, type): # 检测消息链中是否有某种类型的对象 for i in self: if type(i) is sub: return True return False if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件 for i in self: if i == sub: return True return False if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链 return bool(kmp(self, sub)) if isinstance(sub, str): # 检查消息中有无指定字符串子串 return sub in deserialize(str(self)) raise TypeError(f"类型不匹配,当前类型:{type(sub)}") def __contains__(self, sub) -> bool: return self.has(sub) def __ge__(self, other): return other in self def __len__(self) -> int: return len(self.__root__) def __add__( self, other: Union['MessageChain', MessageComponent, str] ) -> 'MessageChain': if isinstance(other, MessageChain): return self.__class__(self.__root__ + other.__root__) if isinstance(other, str): return self.__class__(self.__root__ + [Plain(other)]) if isinstance(other, MessageComponent): return self.__class__(self.__root__ + [other]) return NotImplemented def __radd__(self, other: Union[MessageComponent, str]) -> 'MessageChain': if isinstance(other, MessageComponent): return self.__class__([other] + self.__root__) if isinstance(other, str): return self.__class__( [cast(MessageComponent, Plain(other))] + self.__root__ ) return NotImplemented def __mul__(self, other: int): if isinstance(other, int): return self.__class__(self.__root__ * other) return NotImplemented def __rmul__(self, other: int): return self.__mul__(other) def __iadd__(self, other: Iterable[Union[MessageComponent, str]]): self.extend(other) def __imul__(self, other: int): if isinstance(other, int): self.__root__ *= other return NotImplemented def index( self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1 ) -> int: """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 i: 从哪个位置开始查找。 j: 查找到哪个位置结束。 Returns: int: 如果找到,则返回索引号。 Raises: ValueError: 没有找到。 TypeError: 类型不匹配。 """ if isinstance(x, type): l = len(self) if i < 0: i += l if i < 0: i = 0 if j < 0: j += l if j > l: j = l for index in range(i, j): if type(self[index]) is x: return index raise ValueError("消息链中不存在该类型的组件。") if isinstance(x, MessageComponent): return self.__root__.index(x, i, j) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int: """返回消息链中 x 出现的次数。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 Returns: int: 次数。 """ if isinstance(x, type): return sum(1 for i in self if type(i) is x) if isinstance(x, MessageComponent): return self.__root__.count(x) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def extend(self, x: Iterable[Union[MessageComponent, str]]): """将另一个消息链中的元素添加到消息链末尾。 Args: x: 另一个消息链,也可为消息元素或字符串元素的序列。 """ self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x) def append(self, x: Union[MessageComponent, str]): """将一个消息元素或字符串元素添加到消息链末尾。 Args: x: 消息元素或字符串元素。 """ self.__root__.append(Plain(x) if isinstance(x, str) else x) def insert(self, i: int, x: Union[MessageComponent, str]): """将一个消息元素或字符串添加到消息链中指定位置。 Args: i: 插入位置。 x: 消息元素或字符串元素。 """ self.__root__.insert(i, Plain(x) if isinstance(x, str) else x) def pop(self, i: int = -1) -> MessageComponent: """从消息链中移除并返回指定位置的元素。 Args: i: 移除位置。默认为末尾。 Returns: MessageComponent: 移除的元素。 """ return self.__root__.pop(i) def remove(self, x: Union[MessageComponent, Type[MessageComponent]]): """从消息链中移除指定元素或指定类型的一个元素。 Args: x: 指定的元素或元素类型。 """ if isinstance(x, type): self.pop(self.index(x)) if isinstance(x, MessageComponent): self.__root__.remove(x) def exclude( self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1 ) -> 'MessageChain': """返回移除指定元素或指定类型的元素后剩余的消息链。 Args: x: 指定的元素或元素类型。 count: 至多移除的数量。默认为全部移除。 Returns: MessageChain: 剩余的消息链。 """ def _exclude(): nonlocal count x_is_type = isinstance(x, type) for c in self: if count > 0 and ((x_is_type and type(c) is x) or c == x): count -= 1 continue yield c return self.__class__(_exclude()) def reverse(self): """将消息链原地翻转。""" self.__root__.reverse() @classmethod def join(cls, *args: Iterable[Union[str, MessageComponent]]): return cls( Plain(c) if isinstance(c, str) else c for c in itertools.chain(*args) ) @property def source(self) -> Optional['Source']: """获取消息链中的 `Source` 对象。""" return self.get_first(Source) @property def message_id(self) -> int: """获取消息链的 message_id,若无法获取,返回 -1。""" source = self.source return source.id if source else -1
Ancestors
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Static methods
def join(*args: Iterable[Union[MessageComponent, str]])
-
Expand source code
@classmethod def join(cls, *args: Iterable[Union[str, MessageComponent]]): return cls( Plain(c) if isinstance(c, str) else c for c in itertools.chain(*args) )
def parse_obj(msg_chain: Iterable)
-
Expand source code
@classmethod def parse_obj(cls, msg_chain: Iterable): """通过列表形式的消息链,构造对应的 `MessageChain` 对象。 Args: msg_chain: 列表形式的消息链。 """ result = cls._parse_message_chain(msg_chain) return cls(__root__=result)
Instance variables
var message_id : int
-
获取消息链的 message_id,若无法获取,返回 -1。
Expand source code
@property def message_id(self) -> int: """获取消息链的 message_id,若无法获取,返回 -1。""" source = self.source return source.id if source else -1
var source : Optional[Source]
-
获取消息链中的
Source
对象。Expand source code
@property def source(self) -> Optional['Source']: """获取消息链中的 `Source` 对象。""" return self.get_first(Source)
Methods
def append(self, x: Union[MessageComponent, str])
-
将一个消息元素或字符串元素添加到消息链末尾。
Args
x
- 消息元素或字符串元素。
Expand source code
def append(self, x: Union[MessageComponent, str]): """将一个消息元素或字符串元素添加到消息链末尾。 Args: x: 消息元素或字符串元素。 """ self.__root__.append(Plain(x) if isinstance(x, str) else x)
def as_mirai_code(self) ‑> str
-
将消息链转换为 mirai 码字符串。
该方法会自动转换消息链中的元素。
Returns
mirai 码字符串。
Expand source code
def as_mirai_code(self) -> str: """将消息链转换为 mirai 码字符串。 该方法会自动转换消息链中的元素。 Returns: mirai 码字符串。 """ return "".join( component.as_mirai_code() for component in self.__root__ )
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) ‑> int
-
返回消息链中 x 出现的次数。
Args
x (
Union[MessageComponent, Type[MessageComponent]]
): 要查找的消息元素或消息元素类型。Returns
int
- 次数。
Expand source code
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int: """返回消息链中 x 出现的次数。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 Returns: int: 次数。 """ if isinstance(x, type): return sum(1 for i in self if type(i) is x) if isinstance(x, MessageComponent): return self.__root__.count(x) raise TypeError(f"类型不匹配,当前类型:{type(x)}")
def exclude(self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1) ‑> MessageChain
-
Expand source code
def exclude( self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1 ) -> 'MessageChain': """返回移除指定元素或指定类型的元素后剩余的消息链。 Args: x: 指定的元素或元素类型。 count: 至多移除的数量。默认为全部移除。 Returns: MessageChain: 剩余的消息链。 """ def _exclude(): nonlocal count x_is_type = isinstance(x, type) for c in self: if count > 0 and ((x_is_type and type(c) is x) or c == x): count -= 1 continue yield c return self.__class__(_exclude())
def extend(self, x: Iterable[Union[MessageComponent, str]])
-
将另一个消息链中的元素添加到消息链末尾。
Args
x
- 另一个消息链,也可为消息元素或字符串元素的序列。
Expand source code
def extend(self, x: Iterable[Union[MessageComponent, str]]): """将另一个消息链中的元素添加到消息链末尾。 Args: x: 另一个消息链,也可为消息元素或字符串元素的序列。 """ self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x)
def get(self, index: Union[int, slice, Type[~TMessageComponent], Tuple[Type[~TMessageComponent], int]], count: Optional[int] = None) ‑> Union[MessageComponent, List[MessageComponent], List[~TMessageComponent]]
-
获取消息链中的某个(某些)消息组件,或某类型的消息组件。
Args
index (
Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]
): 如果为int
,则返回该索引处的消息组件。 如果为slice
,则返回该索引范围处的消息组件。 如果为Type[TMessageComponent]
,则返回该类型的全部消息组件。 如果为Tuple[Type[TMessageComponent], int]
,则返回该类型的至多index[1]
个消息组件。count
- 如果为
int
,则返回至多count
个消息组件。
Returns
MessageComponent
- 返回指定索引处的消息组件。
List[MessageComponent]
- 返回指定索引范围的消息组件。
List[TMessageComponent]
- 返回指定类型的消息组件构成的列表。
Expand source code
def get( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]], count: Optional[int] = None ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: """获取消息链中的某个(某些)消息组件,或某类型的消息组件。 Args: index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`): 如果为 `int`,则返回该索引处的消息组件。 如果为 `slice`,则返回该索引范围处的消息组件。 如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。 如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。 count: 如果为 `int`,则返回至多 `count` 个消息组件。 Returns: MessageComponent: 返回指定索引处的消息组件。 List[MessageComponent]: 返回指定索引范围的消息组件。 List[TMessageComponent]: 返回指定类型的消息组件构成的列表。 """ # 正常索引 if isinstance(index, int): return self.__root__[index] # 切片索引 if isinstance(index, slice): return self.__root__[index] # 指定 count if count: if isinstance(index, type): index = (index, count) elif isinstance(index, tuple): index = (index[0], count if count < index[1] else index[1]) # 索引对象为 MessageComponent 类,返回所有对应 component if isinstance(index, type): return [ component for component in self if type(component) is index ] # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component if isinstance(index, tuple): components = ( component for component in self if type(component) is index[0] ) return [ component for component, _ in zip(components, range(index[1])) ] raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}")
def get_first(self, t: Type[~TMessageComponent]) ‑> Optional[~TMessageComponent]
-
获取消息链中第一个符合类型的消息组件。
Expand source code
def get_first(self, t: Type[TMessageComponent]) -> Optional[TMessageComponent]: """获取消息链中第一个符合类型的消息组件。""" for component in self: if isinstance(component, t): return component return None
def has(self, sub: Union[MessageComponent, Type[MessageComponent], ForwardRef('MessageChain'), str]) ‑> bool
-
判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。
Args
sub (
Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]
): 若为MessageComponent
,则判断该组件是否在消息链中。 若为Type[MessageComponent]
,则判断该组件类型是否在消息链中。 若为MessageChain
,则判断该子消息链是否在消息链中。 若为str
,则判断对应的 mirai 码中是否有某子字符串。Returns
bool
- 是否找到。
Expand source code
def has( self, sub: Union[MessageComponent, Type[MessageComponent], 'MessageChain', str] ) -> bool: """判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 Args: sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`): 若为 `MessageComponent`,则判断该组件是否在消息链中。 若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。 若为 `MessageChain`,则判断该子消息链是否在消息链中。 若为 `str`,则判断对应的 mirai 码中是否有某子字符串。 Returns: bool: 是否找到。 """ if isinstance(sub, type): # 检测消息链中是否有某种类型的对象 for i in self: if type(i) is sub: return True return False if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件 for i in self: if i == sub: return True return False if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链 return bool(kmp(self, sub)) if isinstance(sub, str): # 检查消息中有无指定字符串子串 return sub in deserialize(str(self)) raise TypeError(f"类型不匹配,当前类型:{type(sub)}")
def index(self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1) ‑> int
-
返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。
Args
- x (
Union[MessageComponent, Type[MessageComponent]]
): - 要查找的消息元素或消息元素类型。
i
- 从哪个位置开始查找。
j
- 查找到哪个位置结束。
Returns
int
- 如果找到,则返回索引号。
Raises
ValueError
- 没有找到。
TypeError
- 类型不匹配。
Expand source code
def index( self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1 ) -> int: """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 i: 从哪个位置开始查找。 j: 查找到哪个位置结束。 Returns: int: 如果找到,则返回索引号。 Raises: ValueError: 没有找到。 TypeError: 类型不匹配。 """ if isinstance(x, type): l = len(self) if i < 0: i += l if i < 0: i = 0 if j < 0: j += l if j > l: j = l for index in range(i, j): if type(self[index]) is x: return index raise ValueError("消息链中不存在该类型的组件。") if isinstance(x, MessageComponent): return self.__root__.index(x, i, j) raise TypeError(f"类型不匹配,当前类型:{type(x)}")
- x (
def insert(self, i: int, x: Union[MessageComponent, str])
-
将一个消息元素或字符串添加到消息链中指定位置。
Args
i
- 插入位置。
x
- 消息元素或字符串元素。
Expand source code
def insert(self, i: int, x: Union[MessageComponent, str]): """将一个消息元素或字符串添加到消息链中指定位置。 Args: i: 插入位置。 x: 消息元素或字符串元素。 """ self.__root__.insert(i, Plain(x) if isinstance(x, str) else x)
def pop(self, i: int = -1) ‑> MessageComponent
-
从消息链中移除并返回指定位置的元素。
Args
i
- 移除位置。默认为末尾。
Returns
MessageComponent
- 移除的元素。
Expand source code
def pop(self, i: int = -1) -> MessageComponent: """从消息链中移除并返回指定位置的元素。 Args: i: 移除位置。默认为末尾。 Returns: MessageComponent: 移除的元素。 """ return self.__root__.pop(i)
def remove(self, x: Union[MessageComponent, Type[MessageComponent]])
-
从消息链中移除指定元素或指定类型的一个元素。
Args
x
- 指定的元素或元素类型。
Expand source code
def remove(self, x: Union[MessageComponent, Type[MessageComponent]]): """从消息链中移除指定元素或指定类型的一个元素。 Args: x: 指定的元素或元素类型。 """ if isinstance(x, type): self.pop(self.index(x)) if isinstance(x, MessageComponent): self.__root__.remove(x)
def reverse(self)
-
将消息链原地翻转。
Expand source code
def reverse(self): """将消息链原地翻转。""" self.__root__.reverse()
class MessageEvent (*args, **kwargs)
-
消息事件。
Args
type
- 事件名。
message_chain
- 消息内容。
Expand source code
class MessageEvent(Event): """消息事件。 Args: type: 事件名。 message_chain: 消息内容。 """ type: str """事件名。""" message_chain: MessageChain """消息内容。"""
Ancestors
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
- FriendMessage
- FriendSyncMessage
- GroupMessage
- GroupSyncMessage
- OtherClientMessage
- StrangerMessage
- StrangerSyncMessage
- TempMessage
- TempSyncMessage
Class variables
var message_chain : MessageChain
-
消息内容。
Inherited members
class Method (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
API 接口的调用方法。
Expand source code
class Method(str, Enum): """API 接口的调用方法。""" GET = "GET" """使用 GET 方法调用。""" POST = "POST" """使用 POST 方法调用。""" # 区分下面两个,是为了兼容 websocket RESTGET = "RESTGET" """表明这是一个对 RESTful 接口的 GET。""" RESTPOST = "RESTPOST" """表明这是一个对 RESTful 接口的 POST。""" MULTIPART = "MULTIPART" """表明这是一个使用了 multipart/form-data 的 POST。"""
Ancestors
- builtins.str
- enum.Enum
Class variables
var GET
-
使用 GET 方法调用。
var MULTIPART
-
表明这是一个使用了 multipart/form-data 的 POST。
var POST
-
使用 POST 方法调用。
var RESTGET
-
表明这是一个对 RESTful 接口的 GET。
var RESTPOST
-
表明这是一个对 RESTful 接口的 POST。
class Mirai (qq: int, adapter: Adapter)
-
机器人主类。
使用了
__getattr__
魔术方法,可以直接在对象上调用 API。Mirai: 类包含 model 层封装,API 名称经过转写以符合命名规范,所有的 API 全部使用小写字母及下划线命名。
(API 名称也可使用原名。) API 参数可以使用具名参数,也可以使用位置参数,关于 API 参数的更多信息请参见模块
mirai.models.api
。例如:
await bot.send_friend_message(12345678, [ Plain("Hello World!") ])
也可以使用
call_api
方法,须注意此方法直接继承自SimpleMirai
,因此未经 model 层封装, 需要遵循SimpleMirai
的规定。Args
qq
- QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
adapter
- 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
Expand source code
class Mirai(SimpleMirai): """ 机器人主类。 使用了 `__getattr__` 魔术方法,可以直接在对象上调用 API。 Mirai: 类包含 model 层封装,API 名称经过转写以符合命名规范,所有的 API 全部使用小写字母及下划线命名。 (API 名称也可使用原名。) API 参数可以使用具名参数,也可以使用位置参数,关于 API 参数的更多信息请参见模块 `mirai.models.api`。 例如: ```py await bot.send_friend_message(12345678, [ Plain("Hello World!") ]) ``` 也可以使用 `call_api` 方法,须注意此方法直接继承自 `SimpleMirai`,因此未经 model 层封装, 需要遵循 `SimpleMirai` 的规定。 """ def __init__(self, qq: int, adapter: Adapter): super().__init__(qq=qq, adapter=adapter) # 将 bus 更换为 ModelEventBus adapter.unregister_event_bus(self._bus) self._bus: ModelEventBus = ModelEventBus() adapter.register_event_bus(self._bus.base_bus) @property def bus(self) -> ModelEventBus: return self._bus async def call_api(self, api: str, *args, **kwargs): """调用 API。 Args: api: API 名称。 *args: 参数。 **kwargs: 参数。 """ return await self._adapter.call_api(api, *args, **kwargs) def on( self, event_type: Union[Type[Event], str], priority: int = 0, ) -> Callable: """注册事件处理器。 用法举例: ```python @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): print(f"收到来自{event.sender.nickname}的消息。") ``` Args: event_type: 事件类或事件名。 priority: 优先级,较小者优先。 """ return self._bus.on(event_type, priority) def api(self, api: str) -> ApiModel.Proxy: """获取 API Proxy 对象。 API Proxy 提供更加简便的调用 API 的写法,详见 `mirai.models.api`。 `Mirai` 的 `__getattr__` 与此方法完全相同,可支持直接在对象上调用 API。 Args: api: API 名称。 Returns: ApiModel.Proxy: API Proxy 对象。 """ api_type = ApiModel.get_subtype(api) return api_type.Proxy(self, api_type) def __getattr__(self, api: str) -> ApiModel.Proxy: return self.api(api) async def send( self, target: Union[Entity, MessageEvent], message: TMessage, quote: bool = False ) -> int: """发送消息。可以从 `Friend` `Group` 等对象,或者从 `MessageEvent` 中自动识别消息发送对象。 Args: target: 目标对象。 message: 发送的消息。 quote: 是否以回复消息的形式发送,默认为 False。 Returns: int: 发送的消息的 message_id。 """ # 识别消息发送对象 if isinstance(target, TempMessage): quoting = target.message_chain.message_id if quote else None return ( await self.send_temp_message( qq=target.sender.id, group=target.group.id, message_chain=message, quote=quoting ) ).message_id if isinstance(target, MessageEvent): quoting = target.message_chain.message_id if quote else None target = target.sender else: quoting = None if isinstance(target, Friend): send_message = self.send_friend_message id_ = target.id elif isinstance(target, Group): send_message = self.send_group_message id_ = target.id elif isinstance(target, GroupMember): send_message = self.send_group_message id_ = target.group.id else: raise ValueError(f"{target} 不是有效的消息发送对象。") response = await send_message( target=id_, message_chain=message, quote=quoting ) return response.message_id if response else -1 async def get_friend(self, id_: int) -> Optional[Friend]: """获取好友对象。 Args: id_: 好友 QQ 号。 Returns: Friend: 好友对象。 None: 好友不存在。 """ friend_list = await self.friend_list.get() if not friend_list: return None for friend in cast(List[Friend], friend_list): if friend.id == id_: return friend return None async def get_group(self, id_: int) -> Optional[Group]: """获取群组对象。 Args: id_: 群号。 Returns: Group: 群组对象。 None: 群组不存在或 bot 未入群。 """ group_list = await self.group_list.get() if not group_list: return None for group in cast(List[Group], group_list): if group.id == id_: return group return None async def get_group_member(self, group: Union[Group, int], id_: int) -> Optional[GroupMember]: """获取群成员对象。 Args: group: 群组对象或群号。 id_: 群成员 QQ 号。 Returns: GroupMember: 群成员对象。 None: 群成员不存在。 """ if isinstance(group, Group): group = group.id member_list = await self.member_list.get(group) if not member_list: return None for member in cast(List[GroupMember], member_list): if member.id == id_: return member return None async def get_entity(self, subject: Subject) -> Optional[Entity]: """获取实体对象。 Args: subject: 以 `Subject` 表示的实体对象。 Returns: Entity: 实体对象。 None: 实体不存在。 """ if subject.kind == 'Friend': return await self.get_friend(subject.id) if subject.kind == 'Group': return await self.get_group(subject.id) return None @staticmethod async def is_admin(group: Group) -> bool: """判断机器人在群组中是否是管理员。 Args: group: 群组对象。 Returns: bool: 是否是管理员。 """ return group.permission in (Permission.Administrator, Permission.Owner) async def process_request( self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '' ): """处理申请。 Args: event: 申请事件。 operate: 处理操作。 message: 回复的信息。 """ api_type = cast( Type[RespEvent], getattr(mirai.models.api, 'Resp' + event.type) ) api = api_type.from_event(event, operate, message) await api.call(self, 'POST') async def allow(self, event: RequestEvent, message: str = ''): """允许申请。 Args: event: 申请事件。 message: 回复的信息。 """ await self.process_request(event, RespOperate.ALLOW, message) async def decline( self, event: RequestEvent, message: str = '', ban: bool = False ): """拒绝申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.DECLINE & RespOperate.BAN if ban else RespOperate.DECLINE, message ) async def ignore( self, event: RequestEvent, message: str = '', ban: bool = False ): """忽略申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.IGNORE & RespOperate.BAN if ban else RespOperate.DECLINE, message )
Ancestors
Class variables
var qq : int
Static methods
async def is_admin(group: Group) ‑> bool
-
判断机器人在群组中是否是管理员。
Args
group
- 群组对象。
Returns
bool
- 是否是管理员。
Expand source code
@staticmethod async def is_admin(group: Group) -> bool: """判断机器人在群组中是否是管理员。 Args: group: 群组对象。 Returns: bool: 是否是管理员。 """ return group.permission in (Permission.Administrator, Permission.Owner)
Instance variables
var bus : ModelEventBus
-
Expand source code
@property def bus(self) -> ModelEventBus: return self._bus
Methods
async def allow(self, event: RequestEvent, message: str = '')
-
允许申请。
Args
event
- 申请事件。
message
- 回复的信息。
Expand source code
async def allow(self, event: RequestEvent, message: str = ''): """允许申请。 Args: event: 申请事件。 message: 回复的信息。 """ await self.process_request(event, RespOperate.ALLOW, message)
def api(self, api: str) ‑> ApiModel.Proxy
-
获取 API Proxy 对象。
API Proxy 提供更加简便的调用 API 的写法,详见
mirai.models.api
。Mirai
的__getattr__
与此方法完全相同,可支持直接在对象上调用 API。Args
api
- API 名称。
Returns
ApiModel.Proxy
- API Proxy 对象。
Expand source code
def api(self, api: str) -> ApiModel.Proxy: """获取 API Proxy 对象。 API Proxy 提供更加简便的调用 API 的写法,详见 `mirai.models.api`。 `Mirai` 的 `__getattr__` 与此方法完全相同,可支持直接在对象上调用 API。 Args: api: API 名称。 Returns: ApiModel.Proxy: API Proxy 对象。 """ api_type = ApiModel.get_subtype(api) return api_type.Proxy(self, api_type)
async def decline(self, event: RequestEvent, message: str = '', ban: bool = False)
-
拒绝申请。
Args
event
- 申请事件。
message
- 回复的信息。
ban
- 是否拉黑,默认为 False。
Expand source code
async def decline( self, event: RequestEvent, message: str = '', ban: bool = False ): """拒绝申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.DECLINE & RespOperate.BAN if ban else RespOperate.DECLINE, message )
async def get_entity(self, subject: Subject) ‑> Optional[Entity]
-
获取实体对象。
Args
subject
- 以
Subject
表示的实体对象。
Returns
Entity
- 实体对象。
None
- 实体不存在。
Expand source code
async def get_entity(self, subject: Subject) -> Optional[Entity]: """获取实体对象。 Args: subject: 以 `Subject` 表示的实体对象。 Returns: Entity: 实体对象。 None: 实体不存在。 """ if subject.kind == 'Friend': return await self.get_friend(subject.id) if subject.kind == 'Group': return await self.get_group(subject.id) return None
async def get_friend(self, id_: int) ‑> Optional[Friend]
-
获取好友对象。
Args
id_
- 好友 QQ 号。
Returns
Friend
- 好友对象。
None
- 好友不存在。
Expand source code
async def get_friend(self, id_: int) -> Optional[Friend]: """获取好友对象。 Args: id_: 好友 QQ 号。 Returns: Friend: 好友对象。 None: 好友不存在。 """ friend_list = await self.friend_list.get() if not friend_list: return None for friend in cast(List[Friend], friend_list): if friend.id == id_: return friend return None
async def get_group(self, id_: int) ‑> Optional[Group]
-
获取群组对象。
Args
id_
- 群号。
Returns
Group
- 群组对象。
None
- 群组不存在或 bot 未入群。
Expand source code
async def get_group(self, id_: int) -> Optional[Group]: """获取群组对象。 Args: id_: 群号。 Returns: Group: 群组对象。 None: 群组不存在或 bot 未入群。 """ group_list = await self.group_list.get() if not group_list: return None for group in cast(List[Group], group_list): if group.id == id_: return group return None
async def get_group_member(self, group: Union[Group, int], id_: int) ‑> Optional[GroupMember]
-
获取群成员对象。
Args
group
- 群组对象或群号。
id_
- 群成员 QQ 号。
Returns
GroupMember
- 群成员对象。
None
- 群成员不存在。
Expand source code
async def get_group_member(self, group: Union[Group, int], id_: int) -> Optional[GroupMember]: """获取群成员对象。 Args: group: 群组对象或群号。 id_: 群成员 QQ 号。 Returns: GroupMember: 群成员对象。 None: 群成员不存在。 """ if isinstance(group, Group): group = group.id member_list = await self.member_list.get(group) if not member_list: return None for member in cast(List[GroupMember], member_list): if member.id == id_: return member return None
async def ignore(self, event: RequestEvent, message: str = '', ban: bool = False)
-
忽略申请。
Args
event
- 申请事件。
message
- 回复的信息。
ban
- 是否拉黑,默认为 False。
Expand source code
async def ignore( self, event: RequestEvent, message: str = '', ban: bool = False ): """忽略申请。 Args: event: 申请事件。 message: 回复的信息。 ban: 是否拉黑,默认为 False。 """ await self.process_request( event, RespOperate.IGNORE & RespOperate.BAN if ban else RespOperate.DECLINE, message )
def on(self, event_type: Union[Type[Event], str], priority: int = 0) ‑> Callable
-
注册事件处理器。
用法举例:
@bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): print(f"收到来自{event.sender.nickname}的消息。")
Args
event_type
- 事件类或事件名。
priority
- 优先级,较小者优先。
Expand source code
def on( self, event_type: Union[Type[Event], str], priority: int = 0, ) -> Callable: """注册事件处理器。 用法举例: ```python @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): print(f"收到来自{event.sender.nickname}的消息。") ``` Args: event_type: 事件类或事件名。 priority: 优先级,较小者优先。 """ return self._bus.on(event_type, priority)
async def process_request(self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '')
-
处理申请。
Args
event
- 申请事件。
operate
- 处理操作。
message
- 回复的信息。
Expand source code
async def process_request( self, event: RequestEvent, operate: Union[int, RespOperate], message: str = '' ): """处理申请。 Args: event: 申请事件。 operate: 处理操作。 message: 回复的信息。 """ api_type = cast( Type[RespEvent], getattr(mirai.models.api, 'Resp' + event.type) ) api = api_type.from_event(event, operate, message) await api.call(self, 'POST')
async def send(self, target: Union[Entity, MessageEvent], message: Union[MessageChain, Iterable[Union[MessageComponent, str]], MessageComponent, str], quote: bool = False) ‑> int
-
发送消息。可以从
Friend
Group
等对象,或者从MessageEvent
中自动识别消息发送对象。Args
target
- 目标对象。
message
- 发送的消息。
quote
- 是否以回复消息的形式发送,默认为 False。
Returns
int
- 发送的消息的 message_id。
Expand source code
async def send( self, target: Union[Entity, MessageEvent], message: TMessage, quote: bool = False ) -> int: """发送消息。可以从 `Friend` `Group` 等对象,或者从 `MessageEvent` 中自动识别消息发送对象。 Args: target: 目标对象。 message: 发送的消息。 quote: 是否以回复消息的形式发送,默认为 False。 Returns: int: 发送的消息的 message_id。 """ # 识别消息发送对象 if isinstance(target, TempMessage): quoting = target.message_chain.message_id if quote else None return ( await self.send_temp_message( qq=target.sender.id, group=target.group.id, message_chain=message, quote=quoting ) ).message_id if isinstance(target, MessageEvent): quoting = target.message_chain.message_id if quote else None target = target.sender else: quoting = None if isinstance(target, Friend): send_message = self.send_friend_message id_ = target.id elif isinstance(target, Group): send_message = self.send_group_message id_ = target.id elif isinstance(target, GroupMember): send_message = self.send_group_message id_ = target.group.id else: raise ValueError(f"{target} 不是有效的消息发送对象。") response = await send_message( target=id_, message_chain=message, quote=quoting ) return response.message_id if response else -1
Inherited members
class MiraiRunner (*args, **kwargs_)
-
运行 SimpleMirai 对象的托管类。
使用此类以实现机器人的多例运行。
例如:
runner = MiraiRunner(mirai) runner.run(host='127.0.0.1', port=8000)
Expand source code
class MiraiRunner(Singleton): """运行 SimpleMirai 对象的托管类。 使用此类以实现机器人的多例运行。 例如: ```py runner = MiraiRunner(mirai) runner.run(host='127.0.0.1', port=8000) ``` """ bots: Iterable[SimpleMirai] """运行的 SimpleMirai 对象。""" def __init__(self, *bots: SimpleMirai): """ Args: *bots: 要运行的机器人。 """ self.bots = bots self._asgi = ASGI() self._asgi.add_event_handler('startup', self.startup) self._asgi.add_event_handler('shutdown', self.shutdown) async def startup(self): """开始运行。""" coros = [bot.startup() for bot in self.bots] await asyncio.gather(*coros) async def shutdown(self): """结束运行。""" coros = [bot.shutdown() for bot in self.bots] await asyncio.gather(*coros) async def __call__(self, scope, recv, send): await self._asgi(scope, recv, send) async def _run(self): try: await self.startup() backgrounds = [bot.background() for bot in self.bots] await asyncio.gather(*backgrounds) finally: await self.shutdown() def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 """ if not asgi_serve( self, host=host, port=port, asgi_server=asgi_server, **kwargs ): import textwrap logger = logging.getLogger(__name__) logger.warning( textwrap.dedent( """ 未找到可用的 ASGI 服务,反向 WebSocket 和 WebHook 上报将不可用。 仅 HTTP 轮询与正向 WebSocket 可用。 建议安装 ASGI 服务器,如 `uvicorn` 或 `hypercorn`。 在命令行键入: pip install uvicorn 或者 pip install hypercorn """ ).strip() ) try: asyncio.run(self._run()) except (KeyboardInterrupt, SystemExit): exit()
Ancestors
Class variables
var bots : Iterable[SimpleMirai]
-
运行的 SimpleMirai 对象。
Methods
def run(self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)
-
开始运行机器人。
一般情况下,此函数会进入主循环,不再返回。
Expand source code
def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 """ if not asgi_serve( self, host=host, port=port, asgi_server=asgi_server, **kwargs ): import textwrap logger = logging.getLogger(__name__) logger.warning( textwrap.dedent( """ 未找到可用的 ASGI 服务,反向 WebSocket 和 WebHook 上报将不可用。 仅 HTTP 轮询与正向 WebSocket 可用。 建议安装 ASGI 服务器,如 `uvicorn` 或 `hypercorn`。 在命令行键入: pip install uvicorn 或者 pip install hypercorn """ ).strip() ) try: asyncio.run(self._run()) except (KeyboardInterrupt, SystemExit): exit()
async def shutdown(self)
-
结束运行。
Expand source code
async def shutdown(self): """结束运行。""" coros = [bot.shutdown() for bot in self.bots] await asyncio.gather(*coros)
async def startup(self)
-
开始运行。
Expand source code
async def startup(self): """开始运行。""" coros = [bot.startup() for bot in self.bots] await asyncio.gather(*coros)
class NetworkError (*args, **kwargs)
-
网络连接出错。
Expand source code
class NetworkError(RuntimeError): """网络连接出错。"""
Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class Plain (*args, **kwargs)
-
纯文本。
Expand source code
class Plain(MessageComponent): """纯文本。""" type: str = "Plain" """消息组件类型。""" text: str """文字消息。""" def __str__(self): return self.text def as_mirai_code(self) -> str: return serialize(self.text) def __repr__(self): return f'Plain({self.text!r})'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var text : str
-
文字消息。
Inherited members
class Poke (*args, **kwargs)
-
戳一戳。
Expand source code
class Poke(MessageComponent): """戳一戳。""" type: str = "Poke" """消息组件类型。""" name: PokeNames """名称。""" @property def poke_type(self): return POKE_TYPE[self.name] @property def poke_id(self): return POKE_ID[self.name] def __str__(self): return f'[{POKE_NAME[self.name]}]' def as_mirai_code(self) -> str: return f'[mirai:poke:{self.name},{self.poke_type},{self.poke_id}]'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var name : PokeNames
-
名称。
Instance variables
var poke_id
-
Expand source code
@property def poke_id(self): return POKE_ID[self.name]
var poke_type
-
Expand source code
@property def poke_type(self): return POKE_TYPE[self.name]
Inherited members
class PokeNames (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
戳一戳名称。
Expand source code
class PokeNames(str, Enum): """戳一戳名称。""" ChuoYiChuo = "ChuoYiChuo" BiXin = "BiXin" DianZan = "DianZan" XinSui = "XinSui" LiuLiuLiu = "LiuLiuLiu" FangDaZhao = "FangDaZhao" BaoBeiQiu = "BaoBeiQiu" Rose = "Rose" ZhaoHuanShu = "ZhaoHuanShu" RangNiPi = "RangNiPi" JeiYin = "JeiYin" ShouLei = "ShouLei" GouYin = "GouYin" ZhuaYiXia = "ZhuaYiXia" SuiPing = "SuiPing" QiaoMen = "QiaoMen" def as_component(self) -> 'Poke': return Poke(name=self.value)
Ancestors
- builtins.str
- enum.Enum
Class variables
var BaoBeiQiu
var BiXin
var ChuoYiChuo
var DianZan
var FangDaZhao
var GouYin
var JeiYin
var LiuLiuLiu
var QiaoMen
var RangNiPi
var Rose
var ShouLei
var SuiPing
var XinSui
var ZhaoHuanShu
var ZhuaYiXia
Methods
def as_component(self) ‑> Poke
-
Expand source code
def as_component(self) -> 'Poke': return Poke(name=self.value)
class Shutdown (*args, **kwargs)
-
关闭事件。
Expand source code
class Shutdown(LifeSpan): """关闭事件。""" type: str = 'Shutdown' """事件名。"""
Ancestors
- LifeSpan
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class SimpleMirai (qq: int, adapter: Adapter)
-
基于 adapter 和 bus,处于 model 层之下的机器人类。
使用了
__getattr__
魔术方法,可以直接在对象上调用 API。通过
SimpleMirai
调用 API 时,需注意此类不含 model 层封装, 因此 API 名称与参数名称需与 mirai-api-http 中的定义相同, 参数需要全部以具名参数的形式给出,并且需要指明使用的方法(GET/POST)。例如:
await bot.sendFriendMessage(target=12345678, messageChain=[ {"type": "Plain", "text": "Hello World!"} ], method="POST")
也可以使用
call_api
方法。对于名称的路由含有二级目录的 API,由于名称中含有斜杠,必须使用
call_api
调用,例如:file_list = await bot.call_api( "file/list", id="", target=12345678, method="GET" )
Args
qq
- QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。
adapter
- 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。
Expand source code
class SimpleMirai(ApiProvider, AdapterInterface, AbstractEventBus): """ 基于 adapter 和 bus,处于 model 层之下的机器人类。 使用了 `__getattr__` 魔术方法,可以直接在对象上调用 API。 通过 `SimpleMirai` 调用 API 时,需注意此类不含 model 层封装, 因此 API 名称与参数名称需与 mirai-api-http 中的定义相同, 参数需要全部以具名参数的形式给出,并且需要指明使用的方法(GET/POST)。 例如: ```py await bot.sendFriendMessage(target=12345678, messageChain=[ {"type": "Plain", "text": "Hello World!"} ], method="POST") ``` 也可以使用 `call_api` 方法。 对于名称的路由含有二级目录的 API,由于名称中含有斜杠,必须使用 `call_api` 调用,例如: ```py file_list = await bot.call_api( "file/list", id="", target=12345678, method="GET" ) ``` """ qq: int def __init__(self, qq: int, adapter: Adapter): """ Args: qq: QQ 号。启用 Single Mode 时,可以随便传入,登陆后会自动获取正确的 QQ 号。 adapter: 适配器,负责与 mirai-api-http 沟通,详见模块`mirai.adapters。 """ self.qq = qq self._adapter = adapter self._bus = EventBus() self._adapter.register_event_bus(self._bus) @property def bus(self) -> EventBus: return self._bus def subscribe(self, event, func: Callable, priority: int = 0) -> None: self._bus.subscribe(event, func, priority) def unsubscribe(self, event, func: Callable) -> None: self._bus.unsubscribe(event, func) async def emit(self, event, *args, **kwargs) -> List[Awaitable[Any]]: return await self._bus.emit(event, *args, **kwargs) async def call_api(self, api: str, *args, **kwargs): """调用 API。 Args: api: API 名称。 *args: 参数。 **kwargs: 参数。 """ warnings.warn("SimpleMirai 已弃用,将在 0.3 后移除。", DeprecationWarning) return await self._adapter.call_api(api, *args, **kwargs) def on(self, event: str, priority: int = 0) -> Callable: """注册事件处理器。 用法举例: ```py @bot.on('FriendMessage') async def on_friend_message(event: dict): print(f"收到来自{event['sender']['nickname']}的消息。") ``` Args: event: 事件名。 priority: 优先级,小者优先。 """ return self._bus.on(event, priority=priority) @property def adapter_info(self) -> Dict[str, Any]: return self._adapter.adapter_info @contextlib.asynccontextmanager async def use_adapter(self, adapter: Adapter): """临时使用另一个适配器。 用法: ```py async with bot.use_adapter(HTTPAdapter.via(bot)): ... ``` Args: adapter: 使用的适配器。 """ origin_adapter = self._adapter await adapter.login(self.qq) self._adapter = adapter yield self._adapter = origin_adapter await adapter.logout(False) async def startup(self): """开始运行机器人(立即返回)。""" await self._adapter.login(self.qq) if self._adapter.single_mode: # Single Mode 下,QQ 号可以随便传入。这里从 session info 中获取正确的 QQ 号。 self.qq = (await self.call_api('sessionInfo'))['data']['qq']['id'] asyncio.create_task(self._adapter.emit("Startup", {'type': 'Startup'})) await self._adapter.start() async def background(self): """等待背景任务完成。""" await self._adapter.background async def shutdown(self): """结束运行机器人。""" await asyncio.create_task( self._adapter.emit("Shutdown", {'type': 'Shutdown'}) ) await self._adapter.logout() await self._adapter.shutdown() @property def session(self) -> str: """获取 session key,可用于调试。 Returns: str: session key。 """ return self._adapter.session @property def asgi(self) -> 'MiraiRunner': """ASGI 对象,用于使用 uvicorn 等启动。""" return MiraiRunner(self) @staticmethod def add_background_task(func: Union[Callable, Awaitable, None] = None): """注册背景任务,将在 bot 启动后自动运行。 Args: func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。 """ asgi = ASGI() return asgi.add_background_task(func) def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 Args: host: YiriMirai 作为服务器的地址,默认为 127.0.0.1。 port: YiriMirai 作为服务器的端口,默认为 8000。 asgi_server: ASGI 服务器类型,可选项有 `'uvicorn'` `'hypercorn'` 和 `'auto'`。 **kwargs: 可选参数。多余的参数将传递给 `uvicorn.run` 和 `hypercorn.run`。 """ MiraiRunner(self).run(host, port, asgi_server, **kwargs)
Ancestors
Subclasses
Class variables
var qq : int
Static methods
def add_background_task(func: Union[Callable, Awaitable, NoneType] = None)
-
注册背景任务,将在 bot 启动后自动运行。
Args
func(
Union[Callable, Awaitable, None]
): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。Expand source code
@staticmethod def add_background_task(func: Union[Callable, Awaitable, None] = None): """注册背景任务,将在 bot 启动后自动运行。 Args: func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。 """ asgi = ASGI() return asgi.add_background_task(func)
Instance variables
var asgi : MiraiRunner
-
ASGI 对象,用于使用 uvicorn 等启动。
Expand source code
@property def asgi(self) -> 'MiraiRunner': """ASGI 对象,用于使用 uvicorn 等启动。""" return MiraiRunner(self)
var bus : EventBus
-
Expand source code
@property def bus(self) -> EventBus: return self._bus
var session : str
-
获取 session key,可用于调试。
Returns
str
- session key。
Expand source code
@property def session(self) -> str: """获取 session key,可用于调试。 Returns: str: session key。 """ return self._adapter.session
Methods
async def background(self)
-
等待背景任务完成。
Expand source code
async def background(self): """等待背景任务完成。""" await self._adapter.background
async def call_api(self, api: str, *args, **kwargs)
-
调用 API。
Args
api
- API 名称。
*args
- 参数。
**kwargs
- 参数。
Expand source code
async def call_api(self, api: str, *args, **kwargs): """调用 API。 Args: api: API 名称。 *args: 参数。 **kwargs: 参数。 """ warnings.warn("SimpleMirai 已弃用,将在 0.3 后移除。", DeprecationWarning) return await self._adapter.call_api(api, *args, **kwargs)
def on(self, event: str, priority: int = 0) ‑> Callable
-
注册事件处理器。
用法举例:
@bot.on('FriendMessage') async def on_friend_message(event: dict): print(f"收到来自{event['sender']['nickname']}的消息。")
Args
event
- 事件名。
priority
- 优先级,小者优先。
Expand source code
def on(self, event: str, priority: int = 0) -> Callable: """注册事件处理器。 用法举例: ```py @bot.on('FriendMessage') async def on_friend_message(event: dict): print(f"收到来自{event['sender']['nickname']}的消息。") ``` Args: event: 事件名。 priority: 优先级,小者优先。 """ return self._bus.on(event, priority=priority)
def run(self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)
-
开始运行机器人。
一般情况下,此函数会进入主循环,不再返回。
Args
host
- YiriMirai 作为服务器的地址,默认为 127.0.0.1。
port
- YiriMirai 作为服务器的端口,默认为 8000。
asgi_server
- ASGI 服务器类型,可选项有
'uvicorn'
'hypercorn'
和'auto'
。 **kwargs
- 可选参数。多余的参数将传递给
uvicorn.run
和hypercorn.run
。
Expand source code
def run( self, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """开始运行机器人。 一般情况下,此函数会进入主循环,不再返回。 Args: host: YiriMirai 作为服务器的地址,默认为 127.0.0.1。 port: YiriMirai 作为服务器的端口,默认为 8000。 asgi_server: ASGI 服务器类型,可选项有 `'uvicorn'` `'hypercorn'` 和 `'auto'`。 **kwargs: 可选参数。多余的参数将传递给 `uvicorn.run` 和 `hypercorn.run`。 """ MiraiRunner(self).run(host, port, asgi_server, **kwargs)
async def shutdown(self)
-
结束运行机器人。
Expand source code
async def shutdown(self): """结束运行机器人。""" await asyncio.create_task( self._adapter.emit("Shutdown", {'type': 'Shutdown'}) ) await self._adapter.logout() await self._adapter.shutdown()
async def startup(self)
-
开始运行机器人(立即返回)。
Expand source code
async def startup(self): """开始运行机器人(立即返回)。""" await self._adapter.login(self.qq) if self._adapter.single_mode: # Single Mode 下,QQ 号可以随便传入。这里从 session info 中获取正确的 QQ 号。 self.qq = (await self.call_api('sessionInfo'))['data']['qq']['id'] asyncio.create_task(self._adapter.emit("Startup", {'type': 'Startup'})) await self._adapter.start()
async def use_adapter(self, adapter: Adapter)
-
临时使用另一个适配器。
用法:
async with bot.use_adapter(HTTPAdapter.via(bot)): ...
Args
adapter
- 使用的适配器。
Expand source code
@contextlib.asynccontextmanager async def use_adapter(self, adapter: Adapter): """临时使用另一个适配器。 用法: ```py async with bot.use_adapter(HTTPAdapter.via(bot)): ... ``` Args: adapter: 使用的适配器。 """ origin_adapter = self._adapter await adapter.login(self.qq) self._adapter = adapter yield self._adapter = origin_adapter await adapter.logout(False)
Inherited members
class SkipExecution (*args, **kwargs)
-
跳过同优先度的事件处理器,进入下一优先度。
Expand source code
class SkipExecution(Exception): """跳过同优先度的事件处理器,进入下一优先度。"""
Ancestors
- builtins.Exception
- builtins.BaseException
class Startup (*args, **kwargs)
-
启动事件。
Expand source code
class Startup(LifeSpan): """启动事件。""" type: str = 'Startup' """事件名。"""
Ancestors
- LifeSpan
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class StopExecution (*args, **kwargs)
-
终止事件处理器执行,但不阻止事件向上传播。
Expand source code
class StopExecution(Exception): """终止事件处理器执行,但不阻止事件向上传播。"""
Ancestors
- builtins.Exception
- builtins.BaseException
class StopPropagation (*args, **kwargs)
-
终止事件处理器执行,并停止事件向上传播。
Expand source code
class StopPropagation(Exception): """终止事件处理器执行,并停止事件向上传播。"""
Ancestors
- builtins.Exception
- builtins.BaseException
class StrangerMessage (*args, **kwargs)
-
陌生人消息。
Args
type
- 事件名。
sender
- 发送消息的人。
message_chain
- 消息内容。
Expand source code
class StrangerMessage(MessageEvent): """陌生人消息。 Args: type: 事件名。 sender: 发送消息的人。 message_chain: 消息内容。 """ type: str = 'StrangerMessage' """事件名。""" sender: Friend """发送消息的人。""" message_chain: MessageChain """消息内容。"""
Ancestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : Friend
-
发送消息的人。
Inherited members
class TempMessage (*args, **kwargs)
-
群临时消息。
Args
type
- 事件名。
sender
- 发送消息的群成员。
message_chain
- 消息内容。
Expand source code
class TempMessage(MessageEvent): """群临时消息。 Args: type: 事件名。 sender: 发送消息的群成员。 message_chain: 消息内容。 """ type: str = 'TempMessage' """事件名。""" sender: GroupMember """发送消息的群成员。""" message_chain: MessageChain """消息内容。""" @property def group(self) -> Group: return self.sender.group
Ancestors
- MessageEvent
- Event
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var sender : GroupMember
-
发送消息的群成员。
Instance variables
var group : Group
-
Expand source code
@property def group(self) -> Group: return self.sender.group
Inherited members
class Voice (*args, **kwargs)
-
语音。
Expand source code
class Voice(MessageComponent): """语音。""" type: str = "Voice" """消息组件类型。""" voice_id: Optional[str] = None """语音的 voice_id,不为空时将忽略 url 属性。""" url: Optional[str] = None """语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。""" path: Optional[str] = None """语音的路径,发送本地语音。""" base64: Optional[str] = None """语音的 Base64 编码。""" length: Optional[int] = None """语音的长度,单位为秒。""" @validator('path') def validate_path(cls, path: Optional[str]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path def __str__(self): return '[语音]' async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None ): """下载语音到本地。 语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 """ if not self.url: logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) path.parent.mkdir(parents=True, exist_ok=True) elif directory: path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.voice_id}.silk' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) @classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Voice": """从本地文件路径加载语音,以 base64 的形式传递。 Args: filename: 从本地文件路径加载语音,与 `content` 二选一。 content: 从本地文件内容加载语音,与 `filename` 二选一。 """ if content: pass if filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定语音路径或语音内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var base64 : Optional[str]
-
语音的 Base64 编码。
var length : Optional[int]
-
语音的长度,单位为秒。
var path : Optional[str]
-
语音的路径,发送本地语音。
var url : Optional[str]
-
语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。
var voice_id : Optional[str]
-
语音的 voice_id,不为空时将忽略 url 属性。
Static methods
async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Voice
-
从本地文件路径加载语音,以 base64 的形式传递。
Args
filename
- 从本地文件路径加载语音,与
content
二选一。 content
- 从本地文件内容加载语音,与
filename
二选一。
Expand source code
@classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Voice": """从本地文件路径加载语音,以 base64 的形式传递。 Args: filename: 从本地文件路径加载语音,与 `content` 二选一。 content: 从本地文件内容加载语音,与 `filename` 二选一。 """ if content: pass if filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定语音路径或语音内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img
def validate_path(path: Optional[str])
-
修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。
Expand source code
@validator('path') def validate_path(cls, path: Optional[str]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path
Methods
async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None)
-
下载语音到本地。
语音采用 silk v3 格式,silk 格式的编码解码请使用 graiax-silkcoder。
Args
filename
- 下载到本地的文件路径。与
directory
二选一。 directory
- 下载到本地的文件夹路径。与
filename
二选一。
Expand source code
async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None ): """下载语音到本地。 语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 """ if not self.url: logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) path.parent.mkdir(parents=True, exist_ok=True) elif directory: path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.voice_id}.silk' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content)
Inherited members
class WebHookAdapter (verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False)
-
WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。
Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
route
- 适配器的路由,默认在根目录上提供服务。
extra_headers
- 额外请求头,与 mirai-api-http 的配置一致。
enable_quick_response
- 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
single_mode
- 是否启用单例模式。
Expand source code
class WebHookAdapter(Adapter): """WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。""" session: str """WebHook 不需要 session,此处为机器人的 QQ 号。""" route: str """适配器的路由。""" extra_headers: Mapping[str, str] """额外请求头。""" enable_quick_response: bool """是否启用快速响应。""" def __init__( self, verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 route: 适配器的路由,默认在根目录上提供服务。 extra_headers: 额外请求头,与 mirai-api-http 的配置一致。 enable_quick_response: 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。 single_mode: 是否启用单例模式。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self.route = route self.extra_headers = extra_headers or {} self.enable_quick_response = enable_quick_response async def endpoint(request: Request): # 鉴权(QQ 号和额外请求头) if request.headers.get('bot') != self.session: # 验证 QQ 号 logger.debug(f"收到来自其他账号({request.headers.get('bot')})的事件。") return for key in self.extra_headers: # 验证请求头 key = key.lower() # HTTP headers 不区分大小写 request_value = request.headers.get(key, '').lower() expect_value = self.extra_headers[key].lower() if (request_value != expect_value ) and (request_value != '[' + expect_value + ']'): logger.info( f"请求头验证失败:expect [{expect_value}], " + f"got {request_value}。" ) return JSONResponse( status_code=401, content={'error': 'Unauthorized'} ) # 处理事件 event = await request.json() result = await self.handle_event(event) return YiriMiraiJSONResponse(result) ASGI().add_route(self.route, endpoint, methods=['POST']) class QuickResponse(BaseException): """WebHook 快速响应,以异常的方式跳出。""" def __init__(self, data: dict): self.data = data @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'route': self.route, 'extra_headers': self.extra_headers, 'enable_quick_response': self.enable_quick_response, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "WebHookAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in [ 'route', 'extra_headers', 'enable_quick_response', 'single_mode' ] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter async def login(self, qq: int): """WebHook 不需要登录。直接返回。""" self.session = str(qq) logger.info(f'[WebHook] 成功登录到账号{qq}。') async def logout(self, terminate: bool = True): """WebHook 不需要登出。直接返回。""" logger.info(f"[WebHook] 从账号{self.session}退出。") async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。WebHook 的 API 调用只能在快速响应中发生。""" if self.enable_quick_response: content = {'command': api.replace('/', '_'), 'content': params} if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) logger.debug(f'[WebHook] WebHook 快速响应 {api}。') raise WebHookAdapter.QuickResponse(content) return None async def _background(self): """WebHook 不需要事件循环。直接返回。""" async def handle_event(self, event): try: tasks = await self.emit(event['type'], event) await asyncio.gather(*tasks) except WebHookAdapter.QuickResponse as response: # 快速响应,直接返回。 return response.data return {}
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var QuickResponse
-
WebHook 快速响应,以异常的方式跳出。
var enable_quick_response : bool
-
是否启用快速响应。
var extra_headers : Mapping[str, str]
-
额外请求头。
var route : str
-
适配器的路由。
Methods
async def call_api(self, api: str, method: Method = Method.GET, **params)
-
调用 API。WebHook 的 API 调用只能在快速响应中发生。
Expand source code
async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。WebHook 的 API 调用只能在快速响应中发生。""" if self.enable_quick_response: content = {'command': api.replace('/', '_'), 'content': params} if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) logger.debug(f'[WebHook] WebHook 快速响应 {api}。') raise WebHookAdapter.QuickResponse(content) return None
async def handle_event(self, event)
-
Expand source code
async def handle_event(self, event): try: tasks = await self.emit(event['type'], event) await asyncio.gather(*tasks) except WebHookAdapter.QuickResponse as response: # 快速响应,直接返回。 return response.data return {}
async def login(self, qq: int)
-
WebHook 不需要登录。直接返回。
Expand source code
async def login(self, qq: int): """WebHook 不需要登录。直接返回。""" self.session = str(qq) logger.info(f'[WebHook] 成功登录到账号{qq}。')
async def logout(self, terminate: bool = True)
-
WebHook 不需要登出。直接返回。
Expand source code
async def logout(self, terminate: bool = True): """WebHook 不需要登出。直接返回。""" logger.info(f"[WebHook] 从账号{self.session}退出。")
Inherited members
class WebSocketAdapter (verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60.0)
-
WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。
Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
host
- WebSocket Server 的地址。
port
- WebSocket Server 的端口。
sync_id
- mirai-api-http 配置的同步 ID。
single_mode
- 是否启用单例模式。
heartbeat_interval
- 每隔多久发送心跳包,单位秒。
Expand source code
class WebSocketAdapter(Adapter): """WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。""" host_name: str """WebSocket Server 的地址。""" sync_id: str """mirai-api-http 配置的同步 ID。""" qq: int """机器人的 QQ 号。""" connection: Optional[WebSocketClientProtocol] """WebSocket 客户端连接。""" heartbeat_interval: float """每隔多久发送心跳包,单位:秒。""" def __init__( self, verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60., ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 host: WebSocket Server 的地址。 port: WebSocket Server 的端口。 sync_id: mirai-api-http 配置的同步 ID。 single_mode: 是否启用单例模式。 heartbeat_interval: 每隔多久发送心跳包,单位秒。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self._host = host self._port = port if host[:2] == '//': host = 'ws:' + host elif host[:7] == 'http://' or host[:8] == 'https://': raise exceptions.NetworkError(f'{host} 不是一个可用的 WebSocket 地址!') elif host[:5] != 'ws://': host = 'ws://' + host if host[-1:] == '/': host = host[:-1] self.host_name = f'{host}:{port}/all' self.sync_id = sync_id # 这个神奇的 sync_id,默认值 -1,居然是个字符串 # 既然这样不如把 sync_id 全改成字符串好了 self.qq = 0 self.connection = None self.heartbeat_interval = heartbeat_interval # 接收 WebSocket 数据的 Task self._receiver_task: Optional[asyncio.Task] = None # 用于临时保存接收到的数据,以便根据 sync_id 进行同步识别 self._recv_dict: Dict[str, deque] = defaultdict(deque) # 本地同步 ID,每次调用 API 递增。 self._local_sync_id = random.randint(1, 1024) * 1024 # 事件处理任务管理器 self._tasks = Tasks() # 心跳机制(Keep-Alive):上次发送数据包的时间 self._last_send_time: float = 0. @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'host': self._host, 'port': self._port, 'sync_id': self.sync_id, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "WebSocketAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['host', 'port', 'sync_id', 'single_mode'] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter @_error_handler_async_local async def _receiver(self): """开始接收 websocket 数据。""" if not self.connection: raise exceptions.NetworkError( f'WebSocket 通道 {self.host_name} 未连接!' ) while True: try: # 数据格式: # { # 'syncId': '-1', # 'data': { # // Event Content # } # } response = json.loads(await self.connection.recv()) data = response['data'] logger.debug( f"[WebSocket] 收到 WebSocket 数据,同步 ID:{response['syncId']}。" ) self._recv_dict[response['syncId']].append(data) except KeyError: logger.error(f'[WebSocket] 不正确的数据:{response}') except ConnectionClosedOK: raise SystemExit() except ConnectionClosed as e: exit_message = f'[WebSocket] WebSocket 通道意外关闭。code: {e.code}, reason: {e.reason}' logger.error(exit_message) raise SystemExit(exit_message) async def _recv(self, sync_id: str = '-1', timeout: int = 600) -> dict: """接收并解析 websocket 数据。""" timer = range(timeout) if timeout > 0 else repeat(0) for _ in timer: if self._recv_dict[sync_id]: data = self._recv_dict[sync_id].popleft() if data.get('code', 0) != 0: raise exceptions.ApiError(data) return data # 如果没有对应同步 ID 的数据,则等待 websocket 数据 # 目前存在问题:如果 mah 发回的数据不含 sync_id, # 这里就会无限循环…… # 所以还是限制次数好了。 await asyncio.sleep(0.1) raise TimeoutError( f'[WebSocket] mirai-api-http 响应超时,可能是由于调用出错。同步 ID:{sync_id}。' ) @_error_handler_async_local async def login(self, qq: int): headers = { 'verifyKey': self.verify_key or '', # 关闭认证时,WebSocket 可传入任意 verify_key 'qq': str(qq), } if self.session: headers['sessionKey'] = self.session self.connection = await connect(self.host_name, extra_headers=headers) self._receiver_task = asyncio.create_task(self._receiver()) verify_response = await self._recv('') # 神奇现象:这里的 syncId 是个空字符串 self.session = verify_response['session'] self.qq = qq logger.info(f'[WebSocket] 成功登录到账号{qq}。') @_error_handler_async_local async def logout(self, terminate: bool = True): if self.connection: await self.connection.close() await self._receiver_task logger.info(f"[WebSocket] 从账号{self.qq}退出。") async def poll_event(self): """获取并处理事件。""" event = await self._recv(self.sync_id, -1) self._tasks.create_task(self.emit(event['type'], event)) async def call_api(self, api: str, method: Method = Method.GET, **params): if not self.connection: raise exceptions.NetworkError( f'WebSocket 通道 {self.host_name} 未连接!' ) self._local_sync_id += 1 # 使用不同的 sync_id sync_id = str(self._local_sync_id) content = { 'syncId': sync_id, 'command': api.replace('/', '_'), 'content': params } if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebSocket 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) await self.connection.send(json_dumps(content)) self._last_send_time = time.time() logger.debug(f"[WebSocket] 发送 WebSocket 数据,同步 ID:{sync_id}。") try: return await self._recv(sync_id) except TimeoutError as e: logger.debug(e) async def _heartbeat(self): while True: await asyncio.sleep(self.heartbeat_interval) if time.time() - self._last_send_time > self.heartbeat_interval: await self.call_api('about') self._last_send_time = time.time() logger.debug("[WebSocket] 发送心跳包。") async def _background(self): """开始接收事件。""" logger.info('[WebSocket] 机器人开始运行。按 Ctrl + C 停止。') try: heartbeat = asyncio.create_task(self._heartbeat()) while True: await self.poll_event() finally: await Tasks.cancel(heartbeat) await self._tasks.cancel_all()
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var connection : Optional[websockets.legacy.client.WebSocketClientProtocol]
-
WebSocket 客户端连接。
var heartbeat_interval : float
-
每隔多久发送心跳包,单位:秒。
var host_name : str
-
WebSocket Server 的地址。
var qq : int
-
机器人的 QQ 号。
var sync_id : str
-
mirai-api-http 配置的同步 ID。
Methods
async def poll_event(self)
-
获取并处理事件。
Expand source code
async def poll_event(self): """获取并处理事件。""" event = await self._recv(self.sync_id, -1) self._tasks.create_task(self.emit(event['type'], event))
Inherited members