Module mirai.models.message
此模块提供消息链相关。
Expand source code
# -*- coding: utf-8 -*-
"""
此模块提供消息链相关。
"""
import itertools
import logging
import re
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import (
Iterable, List, Optional, Tuple, Type, TypeVar, Union, cast, overload
)
from pydantic import HttpUrl, validator
from mirai.models.base import (
MiraiBaseModel, MiraiIndexedMetaclass, MiraiIndexedModel
)
from mirai.models.entities import Friend, GroupMember
from mirai.utils import kmp
logger = logging.getLogger(__name__)
def serialize(s: str) -> str:
"""mirai 码转义。
Args:
s: 待转义的字符串。
Returns:
str: 去转义后的字符串。
"""
return re.sub(r'[\[\]:,\\]', lambda match: '\\' + match.group(0),
s).replace('\n', '\\n').replace('\r', '\\r')
def deserialize(s: str) -> str:
"""mirai 码去转义。
Args:
s: 待去转义的字符串。
Returns:
str: 去转义后的字符串。
"""
return re.sub(
r'\\([\[\]:,\\])', lambda match: match.group(1),
s.replace('\\n', '\n').replace('\\r', '\r')
)
class MessageComponentMetaclass(MiraiIndexedMetaclass):
"""消息组件元类。"""
__message_component__ = None
def __new__(cls, name, bases, attrs, **kwargs):
new_cls = super().__new__(cls, name, bases, attrs, **kwargs)
if name == 'MessageComponent':
cls.__message_component__ = new_cls
if not cls.__message_component__:
return new_cls
for base in bases:
if issubclass(base, cls.__message_component__):
# 获取字段名
if hasattr(new_cls, '__fields__'):
# 忽略 type 字段
new_cls.__parameter_names__ = list(new_cls.__fields__)[1:]
else:
new_cls.__parameter_names__ = []
break
return new_cls
class MessageComponent(MiraiIndexedModel, metaclass=MessageComponentMetaclass):
"""消息组件。"""
type: str
"""消息组件类型。"""
def __str__(self):
return ''
def as_mirai_code(self) -> str:
"""转化为 mirai 码。"""
return ''
def __repr__(self):
return self.__class__.__name__ + '(' + ', '.join(
(
f'{k}={repr(v)}'
for k, v in self.__dict__.items() if k != 'type' and v
)
) + ')'
def __init__(self, *args, **kwargs):
# 解析参数列表,将位置参数转化为具名参数
parameter_names = self.__parameter_names__
if len(args) > len(parameter_names):
raise TypeError(
f'`{self.type}`需要{len(parameter_names)}个参数,但传入了{len(args)}个。'
)
for name, value in zip(parameter_names, args):
if name in kwargs:
raise TypeError(f'在 `{self.type}` 中,具名参数 `{name}` 与位置参数重复。')
kwargs[name] = value
super().__init__(**kwargs)
TMessageComponent = TypeVar('TMessageComponent', bound=MessageComponent)
class MessageChain(MiraiBaseModel):
"""消息链。
一个构造消息链的例子:
```py
message_chain = MessageChain([
AtAll(),
Plain("Hello World!"),
])
```
`Plain` 可以省略。
```py
message_chain = MessageChain([
AtAll(),
"Hello World!",
])
```
在调用 API 时,参数中需要 MessageChain 的,也可以使用 `List[MessageComponent]` 代替。
例如,以下两种写法是等价的:
```py
await bot.send_friend_message(12345678, [
Plain("Hello World!")
])
```
```py
await bot.send_friend_message(12345678, MessageChain([
Plain("Hello World!")
]))
```
使用`str(message_chain)`获取消息链的字符串表示,字符串采用 mirai 码格式,
并自动按照 mirai 码的规定转义。
参看 mirai 的[文档](https://github.com/mamoe/mirai/blob/dev/docs/Messages.md#mirai-%E7%A0%81)。
获取未转义的消息链字符串,可以使用`deserialize(str(message_chain))`。
可以使用 for 循环遍历消息链中的消息组件。
```py
for component in message_chain:
print(repr(component))
```
可以使用 `==` 运算符比较两个消息链是否相同。
```py
another_msg_chain = MessageChain([
{
"type": "AtAll"
}, {
"type": "Plain",
"text": "Hello World!"
},
])
print(message_chain == another_msg_chain)
'True'
```
可以使用 `in` 运算检查消息链中:
1. 是否有某个消息组件。
2. 是否有某个类型的消息组件。
3. 是否有某个子消息链。
4. 对应的 mirai 码中是否有某子字符串。
```py
if AtAll in message_chain:
print('AtAll')
if At(bot.qq) in message_chain:
print('At Me')
if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain:
print('Hello!')
if 'Hello' in message_chain:
print('Hi!')
```
消息链的 `has` 方法和 `in` 等价。
```py
if message_chain.has(AtAll):
print('AtAll')
```
也可以使用 `>=` 和 `<= `运算符:
```py
if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain:
print('Hello!')
```
需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。
如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。
消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。
```py
plain_list = message_chain[Plain]
'[Plain("Hello World!")]'
```
以 `类型, 数量` 为索引,获取前至多多少个该类型的消息组件。
```py
plain_list_first = message_chain[Plain, 1]
'[Plain("Hello World!")]'
```
消息链的 `get` 方法和索引操作等价。
```py
plain_list_first = message_chain.get(Plain)
'[Plain("Hello World!")]'
```
消息链的 `get` 方法还可指定第二个参数 `count`,这相当于以 `类型, 数量` 为索引。
```py
plain_list_first = message_chain.get(Plain, 1)
# 这等价于
plain_list_first = message_chain[Plain, 1]
```
可以用加号连接两个消息链。
```py
MessageChain(['Hello World!']) + MessageChain(['Goodbye World!'])
# 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")])
```
可以用 `*` 运算符复制消息链。
```py
MessageChain(['Hello World!']) * 2
# 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")])
```
除此之外,消息链还支持很多 list 拥有的操作,比如 `index` 和 `count`。
```py
message_chain = MessageChain([
AtAll(),
"Hello World!",
])
message_chain.index(Plain)
# 返回 0
message_chain.count(Plain)
# 返回 1
```
消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。
"""
__root__: List[MessageComponent]
@staticmethod
def _parse_message_chain(msg_chain: Iterable):
result = []
for msg in msg_chain:
if isinstance(msg, dict):
result.append(MessageComponent.parse_subtype(msg))
elif isinstance(msg, MessageComponent):
result.append(msg)
elif isinstance(msg, str):
result.append(Plain(msg))
else:
raise TypeError(
f"消息链中元素需为 dict 或 str 或 MessageComponent,当前类型:{type(msg)}"
)
return result
@validator('__root__', always=True, pre=True)
def _parse_component(cls, msg_chain):
if isinstance(msg_chain, (str, MessageComponent)):
msg_chain = [msg_chain]
if not msg_chain:
msg_chain = []
return cls._parse_message_chain(msg_chain)
@classmethod
def parse_obj(cls, msg_chain: Iterable):
"""通过列表形式的消息链,构造对应的 `MessageChain` 对象。
Args:
msg_chain: 列表形式的消息链。
"""
result = cls._parse_message_chain(msg_chain)
return cls(__root__=result)
def __init__(self, __root__: Iterable[MessageComponent] = None):
super().__init__(__root__=__root__)
def __str__(self):
return "".join(str(component) for component in self.__root__)
def as_mirai_code(self) -> str:
"""将消息链转换为 mirai 码字符串。
该方法会自动转换消息链中的元素。
Returns:
mirai 码字符串。
"""
return "".join(
component.as_mirai_code() for component in self.__root__
)
def __repr__(self):
return f'{self.__class__.__name__}({self.__root__!r})'
def __iter__(self):
yield from self.__root__
@overload
def get(self, index: int) -> MessageComponent:
...
@overload
def get(self, index: slice) -> List[MessageComponent]:
...
@overload
def get(self, index: Type[TMessageComponent]) -> List[TMessageComponent]:
...
@overload
def get(
self, index: Tuple[Type[TMessageComponent], int]
) -> List[TMessageComponent]:
...
def get(
self,
index: Union[int, slice, Type[TMessageComponent],
Tuple[Type[TMessageComponent], int]],
count: Optional[int] = None
) -> Union[MessageComponent, List[MessageComponent],
List[TMessageComponent]]:
"""获取消息链中的某个(某些)消息组件,或某类型的消息组件。
Args:
index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`):
如果为 `int`,则返回该索引处的消息组件。
如果为 `slice`,则返回该索引范围处的消息组件。
如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。
如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。
count: 如果为 `int`,则返回至多 `count` 个消息组件。
Returns:
MessageComponent: 返回指定索引处的消息组件。
List[MessageComponent]: 返回指定索引范围的消息组件。
List[TMessageComponent]: 返回指定类型的消息组件构成的列表。
"""
# 正常索引
if isinstance(index, int):
return self.__root__[index]
# 切片索引
if isinstance(index, slice):
return self.__root__[index]
# 指定 count
if count:
if isinstance(index, type):
index = (index, count)
elif isinstance(index, tuple):
index = (index[0], count if count < index[1] else index[1])
# 索引对象为 MessageComponent 类,返回所有对应 component
if isinstance(index, type):
return [
component for component in self if type(component) is index
]
# 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component
if isinstance(index, tuple):
components = (
component for component in self if type(component) is index[0]
)
return [
component for component, _ in zip(components, range(index[1]))
]
raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}")
def get_first(self,
t: Type[TMessageComponent]) -> Optional[TMessageComponent]:
"""获取消息链中第一个符合类型的消息组件。"""
for component in self:
if isinstance(component, t):
return component
return None
@overload
def __getitem__(self, index: int) -> MessageComponent:
...
@overload
def __getitem__(self, index: slice) -> List[MessageComponent]:
...
@overload
def __getitem__(self,
index: Type[TMessageComponent]) -> List[TMessageComponent]:
...
@overload
def __getitem__(
self, index: Tuple[Type[TMessageComponent], int]
) -> List[TMessageComponent]:
...
def __getitem__(
self, index: Union[int, slice, Type[TMessageComponent],
Tuple[Type[TMessageComponent], int]]
) -> Union[MessageComponent, List[MessageComponent],
List[TMessageComponent]]:
return self.get(index)
def __setitem__(
self, key: Union[int, slice],
value: Union[MessageComponent, str, Iterable[Union[MessageComponent,
str]]]
):
if isinstance(value, str):
value = Plain(value)
if isinstance(value, Iterable):
value = (Plain(c) if isinstance(c, str) else c for c in value)
self.__root__[key] = value # type: ignore
def __delitem__(self, key: Union[int, slice]):
del self.__root__[key]
def __reversed__(self) -> Iterable[MessageComponent]:
return reversed(self.__root__)
def has(
self, sub: Union[MessageComponent, Type[MessageComponent],
'MessageChain', str]
) -> bool:
"""判断消息链中:
1. 是否有某个消息组件。
2. 是否有某个类型的消息组件。
3. 是否有某个子消息链。
4. 对应的 mirai 码中是否有某子字符串。
Args:
sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`):
若为 `MessageComponent`,则判断该组件是否在消息链中。
若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。
若为 `MessageChain`,则判断该子消息链是否在消息链中。
若为 `str`,则判断对应的 mirai 码中是否有某子字符串。
Returns:
bool: 是否找到。
"""
if isinstance(sub, type): # 检测消息链中是否有某种类型的对象
for i in self:
if type(i) is sub:
return True
return False
if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件
for i in self:
if i == sub:
return True
return False
if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链
return bool(kmp(self, sub))
if isinstance(sub, str): # 检查消息中有无指定字符串子串
return sub in deserialize(str(self))
raise TypeError(f"类型不匹配,当前类型:{type(sub)}")
def __contains__(self, sub) -> bool:
return self.has(sub)
def __ge__(self, other):
return other in self
def __len__(self) -> int:
return len(self.__root__)
def __add__(
self, other: Union['MessageChain', MessageComponent, str]
) -> 'MessageChain':
if isinstance(other, MessageChain):
return self.__class__(self.__root__ + other.__root__)
if isinstance(other, str):
return self.__class__(self.__root__ + [Plain(other)])
if isinstance(other, MessageComponent):
return self.__class__(self.__root__ + [other])
return NotImplemented
def __radd__(self, other: Union[MessageComponent, str]) -> 'MessageChain':
if isinstance(other, MessageComponent):
return self.__class__([other] + self.__root__)
if isinstance(other, str):
return self.__class__(
[cast(MessageComponent, Plain(other))] + self.__root__
)
return NotImplemented
def __mul__(self, other: int):
if isinstance(other, int):
return self.__class__(self.__root__ * other)
return NotImplemented
def __rmul__(self, other: int):
return self.__mul__(other)
def __iadd__(self, other: Iterable[Union[MessageComponent, str]]):
self.extend(other)
def __imul__(self, other: int):
if isinstance(other, int):
self.__root__ *= other
return NotImplemented
def index(
self,
x: Union[MessageComponent, Type[MessageComponent]],
i: int = 0,
j: int = -1
) -> int:
"""返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。
Args:
x (`Union[MessageComponent, Type[MessageComponent]]`):
要查找的消息元素或消息元素类型。
i: 从哪个位置开始查找。
j: 查找到哪个位置结束。
Returns:
int: 如果找到,则返回索引号。
Raises:
ValueError: 没有找到。
TypeError: 类型不匹配。
"""
if isinstance(x, type):
l = len(self)
if i < 0:
i += l
if i < 0:
i = 0
if j < 0:
j += l
if j > l:
j = l
for index in range(i, j):
if type(self[index]) is x:
return index
raise ValueError("消息链中不存在该类型的组件。")
if isinstance(x, MessageComponent):
return self.__root__.index(x, i, j)
raise TypeError(f"类型不匹配,当前类型:{type(x)}")
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int:
"""返回消息链中 x 出现的次数。
Args:
x (`Union[MessageComponent, Type[MessageComponent]]`):
要查找的消息元素或消息元素类型。
Returns:
int: 次数。
"""
if isinstance(x, type):
return sum(1 for i in self if type(i) is x)
if isinstance(x, MessageComponent):
return self.__root__.count(x)
raise TypeError(f"类型不匹配,当前类型:{type(x)}")
def extend(self, x: Iterable[Union[MessageComponent, str]]):
"""将另一个消息链中的元素添加到消息链末尾。
Args:
x: 另一个消息链,也可为消息元素或字符串元素的序列。
"""
self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x)
def append(self, x: Union[MessageComponent, str]):
"""将一个消息元素或字符串元素添加到消息链末尾。
Args:
x: 消息元素或字符串元素。
"""
self.__root__.append(Plain(x) if isinstance(x, str) else x)
def insert(self, i: int, x: Union[MessageComponent, str]):
"""将一个消息元素或字符串添加到消息链中指定位置。
Args:
i: 插入位置。
x: 消息元素或字符串元素。
"""
self.__root__.insert(i, Plain(x) if isinstance(x, str) else x)
def pop(self, i: int = -1) -> MessageComponent:
"""从消息链中移除并返回指定位置的元素。
Args:
i: 移除位置。默认为末尾。
Returns:
MessageComponent: 移除的元素。
"""
return self.__root__.pop(i)
def remove(self, x: Union[MessageComponent, Type[MessageComponent]]):
"""从消息链中移除指定元素或指定类型的一个元素。
Args:
x: 指定的元素或元素类型。
"""
if isinstance(x, type):
self.pop(self.index(x))
if isinstance(x, MessageComponent):
self.__root__.remove(x)
def exclude(
self,
x: Union[MessageComponent, Type[MessageComponent]],
count: int = -1
) -> 'MessageChain':
"""返回移除指定元素或指定类型的元素后剩余的消息链。
Args:
x: 指定的元素或元素类型。
count: 至多移除的数量。默认为全部移除。
Returns:
MessageChain: 剩余的消息链。
"""
def _exclude():
nonlocal count
x_is_type = isinstance(x, type)
for c in self:
if count > 0 and ((x_is_type and type(c) is x) or c == x):
count -= 1
continue
yield c
return self.__class__(_exclude())
def reverse(self):
"""将消息链原地翻转。"""
self.__root__.reverse()
@classmethod
def join(cls, *args: Iterable[Union[str, MessageComponent]]):
return cls(
Plain(c) if isinstance(c, str) else c
for c in itertools.chain(*args)
)
@property
def source(self) -> Optional['Source']:
"""获取消息链中的 `Source` 对象。"""
return self.get_first(Source)
@property
def message_id(self) -> int:
"""获取消息链的 message_id,若无法获取,返回 -1。"""
source = self.source
return source.id if source else -1
TMessage = Union[MessageChain, Iterable[Union[MessageComponent, str]],
MessageComponent, str]
"""可以转化为 MessageChain 的类型。"""
class Source(MessageComponent):
"""源。包含消息的基本信息。"""
type: str = "Source"
"""消息组件类型。"""
id: int
"""消息的识别号,用于引用回复(Source 类型永远为 MessageChain 的第一个元素)。"""
time: datetime
"""消息时间。"""
class Plain(MessageComponent):
"""纯文本。"""
type: str = "Plain"
"""消息组件类型。"""
text: str
"""文字消息。"""
def __str__(self):
return self.text
def as_mirai_code(self) -> str:
return serialize(self.text)
def __repr__(self):
return f'Plain({self.text!r})'
class Quote(MessageComponent):
"""引用。"""
type: str = "Quote"
"""消息组件类型。"""
id: Optional[int] = None
"""被引用回复的原消息的 message_id。"""
group_id: Optional[int] = None
"""被引用回复的原消息所接收的群号,当为好友消息时为0。"""
sender_id: Optional[int] = None
"""被引用回复的原消息的发送者的QQ号。"""
target_id: Optional[int] = None
"""被引用回复的原消息的接收者者的QQ号(或群号)。"""
origin: MessageChain
"""被引用回复的原消息的消息链对象。"""
@validator("origin", always=True, pre=True)
def origin_formater(cls, v):
return MessageChain.parse_obj(v)
class At(MessageComponent):
"""At某人。"""
type: str = "At"
"""消息组件类型。"""
target: int
"""群员 QQ 号。"""
display: Optional[str] = None
"""At时显示的文字,发送消息时无效,自动使用群名片。"""
def __eq__(self, other):
return isinstance(other, At) and self.target == other.target
def __str__(self):
return f"@{self.display or self.target}"
def as_mirai_code(self) -> str:
return f"[mirai:at:{self.target}]"
class AtAll(MessageComponent):
"""At全体。"""
type: str = "AtAll"
"""消息组件类型。"""
def __str__(self):
return "@全体成员"
def as_mirai_code(self) -> str:
return f"[mirai:atall]"
class Face(MessageComponent):
"""表情。"""
type: str = "Face"
"""消息组件类型。"""
face_id: Optional[int] = None
"""QQ 表情编号,可选,优先度高于 name。"""
name: Optional[str] = None
"""QQ表情名称,可选。"""
def __init__(self, *args, **kwargs):
if len(args) == 1:
if isinstance(args[0], str):
self.name = args[0]
elif isinstance(args[0], int):
self.face_id = args[0]
super().__init__(**kwargs)
super().__init__(*args, **kwargs)
def __eq__(self, other):
return isinstance(other, Face) and \
(self.face_id == other.face_id or self.name == other.name)
def __str__(self):
if self.name:
return f'[{self.name}]'
return '[表情]'
def as_mirai_code(self):
return f"[mirai:face:{self.face_id}]"
class MarketFace(MessageComponent):
"""商店表情(目前只支持接收)。"""
type: str = "MarketFace"
"""消息组件类型。"""
id: int
"""商店表情编号。"""
name: str
"""商店表情名称。"""
class Image(MessageComponent):
"""图片。"""
type: str = "Image"
"""消息组件类型。"""
image_id: Optional[str] = None
"""图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。"""
url: Optional[HttpUrl] = None
"""图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。"""
path: Union[str, Path, None] = None
"""图片的路径,发送本地图片。"""
base64: Optional[str] = None
"""图片的 Base64 编码。"""
width: int
"""图片的宽度"""
height: int
"""图片的高度"""
def __eq__(self, other):
return isinstance(
other, Image
) and self.type == other.type and self.uuid == other.uuid
def __str__(self):
return '[图片]'
def as_mirai_code(self) -> str:
return f"[mirai:image:{self.image_id}]"
@validator('path')
def validate_path(cls, path: Union[str, Path, None]):
"""修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。"""
if path:
try:
return str(Path(path).resolve(strict=True))
except FileNotFoundError:
raise ValueError(f"无效路径:{path}")
else:
return path
@property
def uuid(self):
image_id = self.image_id
if image_id[0] == '{': # 群图片
image_id = image_id[1:37]
elif image_id[0] == '/': # 好友图片
image_id = image_id[1:]
return image_id
def as_group_image(self) -> str:
return f"{{{self.uuid.upper()}}}.jpg"
def as_friend_image(self) -> str:
return f"/{self.uuid.lower()}"
def as_flash_image(self) -> "FlashImage":
return FlashImage(
image_id=self.image_id,
url=self.url,
path=self.path,
base64=self.base64
)
async def download(
self,
filename: Union[str, Path, None] = None,
directory: Union[str, Path, None] = None,
determine_type: bool = True
):
"""下载图片到本地。
Args:
filename: 下载到本地的文件路径。与 `directory` 二选一。
directory: 下载到本地的文件夹路径。与 `filename` 二选一。
determine_type: 是否自动根据图片类型确定拓展名,默认为 True。
"""
if not self.url:
logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。')
return
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(self.url)
response.raise_for_status()
content = response.content
if filename:
path = Path(filename)
if determine_type:
import imghdr
path = path.with_suffix(
'.' + str(imghdr.what(None, content))
)
path.parent.mkdir(parents=True, exist_ok=True)
elif directory:
import imghdr
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
path = path / f'{self.uuid}.{imghdr.what(None, content)}'
else:
raise ValueError("请指定文件路径或文件夹路径!")
import aiofiles
async with aiofiles.open(path, 'wb') as f:
await f.write(content)
return path
@classmethod
async def from_local(
cls,
filename: Union[str, Path, None] = None,
content: Optional[bytes] = None,
) -> "Image":
"""从本地文件路径加载图片,以 base64 的形式传递。
Args:
filename: 从本地文件路径加载图片,与 `content` 二选一。
content: 从本地文件内容加载图片,与 `filename` 二选一。
Returns:
Image: 图片对象。
"""
if content:
pass
elif filename:
path = Path(filename)
import aiofiles
async with aiofiles.open(path, 'rb') as f:
content = await f.read()
else:
raise ValueError("请指定图片路径或图片内容!")
import base64
img = cls(base64=base64.b64encode(content).decode())
return img
@classmethod
def from_unsafe_path(cls, path: Union[str, Path]) -> "Image":
"""从不安全的路径加载图片。
Args:
path: 从不安全的路径加载图片。
Returns:
Image: 图片对象。
"""
return cls.construct(path=str(path))
class Xml(MessageComponent):
"""XML。"""
type: str = "Xml"
"""消息组件类型。"""
xml: str
"""XML文本。"""
class Json(MessageComponent):
"""JSON。"""
type: str = "Json"
"""消息组件类型。"""
json_: str
"""JSON 文本。"""
class App(MessageComponent):
"""应用。"""
type: str = "App"
"""消息组件类型。"""
content: str
"""内容。"""
def as_json(self):
from json import loads as json_loads
return json_loads(self.content)
def __str__(self):
try:
json = self.as_json()
return json.get('prompt', '[应用消息]')
except:
return '[应用消息]'
def as_mirai_code(self) -> str:
return f'[mirai:app:{serialize(self.content)}]'
POKE_TYPE = {
"ChuoYiChuo": 1,
"BiXin": 2,
"DianZan": 3,
"XinSui": 4,
"LiuLiuLiu": 5,
"FangDaZhao": 6,
"BaoBeiQiu": 126,
"Rose": 126,
"ZhaoHuanShu": 126,
"RangNiPi": 126,
"JeiYin": 126,
"ShouLei": 126,
"GouYin": 126,
"ZhuaYiXia": 126,
"SuiPing": 126,
"QiaoMen": 126,
}
POKE_ID = {
"ChuoYiChuo": -1,
"BiXin": -1,
"DianZan": -1,
"XinSui": -1,
"LiuLiuLiu": -1,
"FangDaZhao": -1,
"BaoBeiQiu": 2011,
"Rose": 2007,
"ZhaoHuanShu": 2006,
"RangNiPi": 2009,
"JeiYin": 2005,
"ShouLei": 2004,
"GouYin": 2003,
"ZhuaYiXia": 2001,
"SuiPing": 2002,
"QiaoMen": 2002,
}
POKE_NAME = {
"ChuoYiChuo": '戳一戳',
"BiXin": '比心',
"DianZan": '点赞',
"XinSui": '心碎',
"LiuLiuLiu": '666',
"FangDaZhao": '放大招',
"BaoBeiQiu": '宝贝球',
"Rose": '玫瑰花',
"ZhaoHuanShu": '召唤术',
"RangNiPi": '让你皮',
"JeiYin": '结印',
"ShouLei": '手雷',
"GouYin": '勾引',
"ZhuaYiXia": '抓一下',
"SuiPing": '碎屏',
"QiaoMen": '敲门',
}
class PokeNames(str, Enum):
"""戳一戳名称。"""
ChuoYiChuo = "ChuoYiChuo"
BiXin = "BiXin"
DianZan = "DianZan"
XinSui = "XinSui"
LiuLiuLiu = "LiuLiuLiu"
FangDaZhao = "FangDaZhao"
BaoBeiQiu = "BaoBeiQiu"
Rose = "Rose"
ZhaoHuanShu = "ZhaoHuanShu"
RangNiPi = "RangNiPi"
JeiYin = "JeiYin"
ShouLei = "ShouLei"
GouYin = "GouYin"
ZhuaYiXia = "ZhuaYiXia"
SuiPing = "SuiPing"
QiaoMen = "QiaoMen"
def as_component(self) -> 'Poke':
return Poke(name=self.value)
class Poke(MessageComponent):
"""戳一戳。"""
type: str = "Poke"
"""消息组件类型。"""
name: PokeNames
"""名称。"""
@property
def poke_type(self):
return POKE_TYPE[self.name]
@property
def poke_id(self):
return POKE_ID[self.name]
def __str__(self):
return f'[{POKE_NAME[self.name]}]'
def as_mirai_code(self) -> str:
return f'[mirai:poke:{self.name},{self.poke_type},{self.poke_id}]'
class Unknown(MessageComponent):
"""未知。"""
type: str = "Unknown"
"""消息组件类型。"""
text: str
"""文本。"""
class FlashImage(Image):
"""闪照。"""
type: str = "FlashImage"
"""消息组件类型。"""
image_id: Optional[str] = None
"""图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。"""
url: Optional[HttpUrl] = None
"""图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。"""
path: Optional[str] = None
"""图片的路径,发送本地图片,路径相对于 `plugins/MiraiAPIHTTP/images`。"""
base64: Optional[str] = None
"""图片的 Base64 编码。"""
width: int
"""图片的宽度"""
height: int
"""图片的高度"""
def __str__(self):
return '[闪照]'
def as_mirai_code(self) -> str:
return f"[mirai:flash:{self.image_id}]"
def as_image(self) -> Image:
return Image(self.image_id, self.url)
class Voice(MessageComponent):
"""语音。"""
type: str = "Voice"
"""消息组件类型。"""
voice_id: Optional[str] = None
"""语音的 voice_id,不为空时将忽略 url 属性。"""
url: Optional[str] = None
"""语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。"""
path: Optional[str] = None
"""语音的路径,发送本地语音。"""
base64: Optional[str] = None
"""语音的 Base64 编码。"""
length: Optional[int] = None
"""语音的长度,单位为秒。"""
@validator('path')
def validate_path(cls, path: Optional[str]):
"""修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。"""
if path:
try:
return str(Path(path).resolve(strict=True))
except FileNotFoundError:
raise ValueError(f"无效路径:{path}")
else:
return path
def __str__(self):
return '[语音]'
async def download(
self,
filename: Union[str, Path, None] = None,
directory: Union[str, Path, None] = None
):
"""下载语音到本地。
语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。
Args:
filename: 下载到本地的文件路径。与 `directory` 二选一。
directory: 下载到本地的文件夹路径。与 `filename` 二选一。
"""
if not self.url:
logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。')
return
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(self.url)
response.raise_for_status()
content = response.content
if filename:
path = Path(filename)
path.parent.mkdir(parents=True, exist_ok=True)
elif directory:
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
path = path / f'{self.voice_id}.silk'
else:
raise ValueError("请指定文件路径或文件夹路径!")
import aiofiles
async with aiofiles.open(path, 'wb') as f:
await f.write(content)
@classmethod
async def from_local(
cls,
filename: Union[str, Path, None] = None,
content: Optional[bytes] = None,
) -> "Voice":
"""从本地文件路径加载语音,以 base64 的形式传递。
Args:
filename: 从本地文件路径加载语音,与 `content` 二选一。
content: 从本地文件内容加载语音,与 `filename` 二选一。
"""
if content:
pass
if filename:
path = Path(filename)
import aiofiles
async with aiofiles.open(path, 'rb') as f:
content = await f.read()
else:
raise ValueError("请指定语音路径或语音内容!")
import base64
img = cls(base64=base64.b64encode(content).decode())
return img
class Dice(MessageComponent):
"""骰子。"""
type: str = "Dice"
"""消息组件类型。"""
value: int
"""点数。"""
def __str__(self):
return f'[骰子{self.value}]'
def as_mirai_code(self) -> str:
return f'[mirai:dice:{self.value}]'
class MusicShareKind(str, Enum):
"""音乐分享的来源。"""
NeteaseCloudMusic = "NeteaseCloudMusic"
QQMusic = "QQMusic"
MiguMusic = "MiguMusic"
KugouMusic = "KugouMusic"
KuwoMusic = "KuwoMusic"
class MusicShare(MessageComponent):
"""音乐分享。"""
type: str = "MusicShare"
"""消息组件类型。"""
kind: MusicShareKind
"""音乐分享的来源。"""
title: str
"""标题。"""
summary: str
"""歌手。"""
jump_url: HttpUrl
"""跳转路径。"""
picture_url: HttpUrl
"""封面路径。"""
music_url: HttpUrl
"""音源路径。"""
brief: str = ""
"""在消息列表中显示的内容。"""
def __str__(self):
return self.brief
class ForwardMessageNode(MiraiBaseModel):
"""合并转发中的一条消息。"""
sender_id: Optional[int] = None
"""发送人QQ号。"""
sender_name: Optional[str] = None
"""显示名称。"""
message_chain: Optional[MessageChain] = None
"""消息内容。"""
message_id: Optional[int] = None
"""消息的 message_id,可以只使用此属性,从缓存中读取消息内容。"""
time: Optional[datetime] = None
"""发送时间。"""
@validator('message_chain', check_fields=False)
def _validate_message_chain(cls, value: Union[MessageChain, list]):
if isinstance(value, list):
return MessageChain.parse_obj(value)
return value
@classmethod
def create(
cls, sender: Union[Friend, GroupMember], message: MessageChain
) -> 'ForwardMessageNode':
"""从消息链生成转发消息。
Args:
sender: 发送人。
message: 消息内容。
Returns:
ForwardMessageNode: 生成的一条消息。
"""
return ForwardMessageNode(
sender_id=sender.id,
sender_name=sender.get_name(),
message_chain=message
)
class ForwardMessageDiaplay(MiraiBaseModel):
"""合并转发组件的显示信息。"""
title: str = "群聊的聊天记录"
brief: str = "[聊天记录]"
source: str = "聊天记录"
preview: List[str] = []
summary: str = "查看x条转发消息"
@classmethod
def create(
cls, nodes: List[ForwardMessageNode]
):
"""从转发消息节点列表生成显示信息。"""
return ForwardMessageDiaplay(
preview=[nodes[0].sender_name+": "+str(nodes[0].message_chain)],
summary=f"查看{len(nodes)}条转发消息"
)
class Forward(MessageComponent):
"""合并转发。"""
type: str = "Forward"
"""消息组件类型。"""
display: Optional[ForwardMessageDiaplay]
"""转发组件的显示信息。"""
node_list: List[ForwardMessageNode]
"""转发消息节点列表。"""
def __init__(self, *args, **kwargs):
if len(args) == 1:
kwargs['node_list'] = args[0]
args=()
if 'display' not in kwargs:
kwargs['display'] = ForwardMessageDiaplay.create(kwargs['node_list'])
super().__init__(*args, **kwargs)
def __str__(self):
return '[聊天记录]'
class File(MessageComponent):
"""文件。"""
type: str = "File"
"""消息组件类型。"""
id: str
"""文件识别 ID。"""
name: str
"""文件名称。"""
size: int
"""文件大小。"""
def __str__(self):
return f'[文件]{self.name}'
class MiraiCode(MessageComponent):
"""Mirai 码。"""
type: str = "MiraiCode"
"""消息组件类型。"""
code: str
"""Mirai 码。"""
def __str__(self):
return serialize(self.code)
def __repr__(self):
return f'MiraiCode({self.code!r})'
__all__ = [
'App',
'At',
'AtAll',
'Dice',
'Face',
'File',
'FlashImage',
'Forward',
'ForwardMessageNode',
'Image',
'Json',
'MarketFace',
'MessageChain',
'MessageComponent',
'MusicShareKind',
'MusicShare',
'Plain',
'PokeNames',
'Poke',
'Quote',
'Source',
'Unknown',
'Voice',
'Xml',
'serialize',
'deserialize',
'TMessage',
]
Global variables
var TMessage
-
可以转化为 MessageChain 的类型。
Functions
def deserialize(s: str) ‑> str
-
mirai 码去转义。
Args
s
- 待去转义的字符串。
Returns
str
- 去转义后的字符串。
Expand source code
def deserialize(s: str) -> str: """mirai 码去转义。 Args: s: 待去转义的字符串。 Returns: str: 去转义后的字符串。 """ return re.sub( r'\\([\[\]:,\\])', lambda match: match.group(1), s.replace('\\n', '\n').replace('\\r', '\r') )
def serialize(s: str) ‑> str
-
mirai 码转义。
Args
s
- 待转义的字符串。
Returns
str
- 去转义后的字符串。
Expand source code
def serialize(s: str) -> str: """mirai 码转义。 Args: s: 待转义的字符串。 Returns: str: 去转义后的字符串。 """ return re.sub(r'[\[\]:,\\]', lambda match: '\\' + match.group(0), s).replace('\n', '\\n').replace('\r', '\\r')
Classes
class App (*args, **kwargs)
-
应用。
Expand source code
class App(MessageComponent): """应用。""" type: str = "App" """消息组件类型。""" content: str """内容。""" def as_json(self): from json import loads as json_loads return json_loads(self.content) def __str__(self): try: json = self.as_json() return json.get('prompt', '[应用消息]') except: return '[应用消息]' def as_mirai_code(self) -> str: return f'[mirai:app:{serialize(self.content)}]'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var content : str
-
内容。
Methods
def as_json(self)
-
Expand source code
def as_json(self): from json import loads as json_loads return json_loads(self.content)
Inherited members
class At (*args, **kwargs)
-
At某人。
Expand source code
class At(MessageComponent): """At某人。""" type: str = "At" """消息组件类型。""" target: int """群员 QQ 号。""" display: Optional[str] = None """At时显示的文字,发送消息时无效,自动使用群名片。""" def __eq__(self, other): return isinstance(other, At) and self.target == other.target def __str__(self): return f"@{self.display or self.target}" def as_mirai_code(self) -> str: return f"[mirai:at:{self.target}]"
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var display : Optional[str]
-
At时显示的文字,发送消息时无效,自动使用群名片。
var target : int
-
群员 QQ 号。
Inherited members
class AtAll (*args, **kwargs)
-
At全体。
Expand source code
class AtAll(MessageComponent): """At全体。""" type: str = "AtAll" """消息组件类型。""" def __str__(self): return "@全体成员" def as_mirai_code(self) -> str: return f"[mirai:atall]"
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Inherited members
class Dice (*args, **kwargs)
-
骰子。
Expand source code
class Dice(MessageComponent): """骰子。""" type: str = "Dice" """消息组件类型。""" value: int """点数。""" def __str__(self): return f'[骰子{self.value}]' def as_mirai_code(self) -> str: return f'[mirai:dice:{self.value}]'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var value : int
-
点数。
Inherited members
class Face (*args, **kwargs)
-
表情。
Expand source code
class Face(MessageComponent): """表情。""" type: str = "Face" """消息组件类型。""" face_id: Optional[int] = None """QQ 表情编号,可选,优先度高于 name。""" name: Optional[str] = None """QQ表情名称,可选。""" def __init__(self, *args, **kwargs): if len(args) == 1: if isinstance(args[0], str): self.name = args[0] elif isinstance(args[0], int): self.face_id = args[0] super().__init__(**kwargs) super().__init__(*args, **kwargs) def __eq__(self, other): return isinstance(other, Face) and \ (self.face_id == other.face_id or self.name == other.name) def __str__(self): if self.name: return f'[{self.name}]' return '[表情]' def as_mirai_code(self): return f"[mirai:face:{self.face_id}]"
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var face_id : Optional[int]
-
QQ 表情编号,可选,优先度高于 name。
var name : Optional[str]
-
QQ表情名称,可选。
Inherited members
class File (*args, **kwargs)
-
文件。
Expand source code
class File(MessageComponent): """文件。""" type: str = "File" """消息组件类型。""" id: str """文件识别 ID。""" name: str """文件名称。""" size: int """文件大小。""" def __str__(self): return f'[文件]{self.name}'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var id : str
-
文件识别 ID。
var name : str
-
文件名称。
var size : int
-
文件大小。
Inherited members
class FlashImage (*args, **kwargs)
-
闪照。
Expand source code
class FlashImage(Image): """闪照。""" type: str = "FlashImage" """消息组件类型。""" image_id: Optional[str] = None """图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。""" url: Optional[HttpUrl] = None """图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。""" path: Optional[str] = None """图片的路径,发送本地图片,路径相对于 `plugins/MiraiAPIHTTP/images`。""" base64: Optional[str] = None """图片的 Base64 编码。""" width: int """图片的宽度""" height: int """图片的高度""" def __str__(self): return '[闪照]' def as_mirai_code(self) -> str: return f"[mirai:flash:{self.image_id}]" def as_image(self) -> Image: return Image(self.image_id, self.url)
Ancestors
- Image
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Methods
def as_image(self) ‑> Image
-
Expand source code
def as_image(self) -> Image: return Image(self.image_id, self.url)
Inherited members
class Forward (*args, **kwargs)
-
合并转发。
Expand source code
class Forward(MessageComponent): """合并转发。""" type: str = "Forward" """消息组件类型。""" display: Optional[ForwardMessageDiaplay] """转发组件的显示信息。""" node_list: List[ForwardMessageNode] """转发消息节点列表。""" def __init__(self, *args, **kwargs): if len(args) == 1: kwargs['node_list'] = args[0] args=() if 'display' not in kwargs: kwargs['display'] = ForwardMessageDiaplay.create(kwargs['node_list']) super().__init__(*args, **kwargs) def __str__(self): return '[聊天记录]'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var display : Optional[mirai.models.message.ForwardMessageDiaplay]
-
转发组件的显示信息。
var node_list : List[ForwardMessageNode]
-
转发消息节点列表。
Inherited members
class ForwardMessageNode (*args, **kwargs)
-
合并转发中的一条消息。
Expand source code
class ForwardMessageNode(MiraiBaseModel): """合并转发中的一条消息。""" sender_id: Optional[int] = None """发送人QQ号。""" sender_name: Optional[str] = None """显示名称。""" message_chain: Optional[MessageChain] = None """消息内容。""" message_id: Optional[int] = None """消息的 message_id,可以只使用此属性,从缓存中读取消息内容。""" time: Optional[datetime] = None """发送时间。""" @validator('message_chain', check_fields=False) def _validate_message_chain(cls, value: Union[MessageChain, list]): if isinstance(value, list): return MessageChain.parse_obj(value) return value @classmethod def create( cls, sender: Union[Friend, GroupMember], message: MessageChain ) -> 'ForwardMessageNode': """从消息链生成转发消息。 Args: sender: 发送人。 message: 消息内容。 Returns: ForwardMessageNode: 生成的一条消息。 """ return ForwardMessageNode( sender_id=sender.id, sender_name=sender.get_name(), message_chain=message )
Ancestors
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var message_chain : Optional[MessageChain]
-
消息内容。
var message_id : Optional[int]
-
消息的 message_id,可以只使用此属性,从缓存中读取消息内容。
var sender_id : Optional[int]
-
发送人QQ号。
var sender_name : Optional[str]
-
显示名称。
var time : Optional[datetime.datetime]
-
发送时间。
Static methods
def create(sender: Union[Friend, GroupMember], message: MessageChain) ‑> ForwardMessageNode
-
Expand source code
@classmethod def create( cls, sender: Union[Friend, GroupMember], message: MessageChain ) -> 'ForwardMessageNode': """从消息链生成转发消息。 Args: sender: 发送人。 message: 消息内容。 Returns: ForwardMessageNode: 生成的一条消息。 """ return ForwardMessageNode( sender_id=sender.id, sender_name=sender.get_name(), message_chain=message )
class Image (*args, **kwargs)
-
图片。
Expand source code
class Image(MessageComponent): """图片。""" type: str = "Image" """消息组件类型。""" image_id: Optional[str] = None """图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。""" url: Optional[HttpUrl] = None """图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。""" path: Union[str, Path, None] = None """图片的路径,发送本地图片。""" base64: Optional[str] = None """图片的 Base64 编码。""" width: int """图片的宽度""" height: int """图片的高度""" def __eq__(self, other): return isinstance( other, Image ) and self.type == other.type and self.uuid == other.uuid def __str__(self): return '[图片]' def as_mirai_code(self) -> str: return f"[mirai:image:{self.image_id}]" @validator('path') def validate_path(cls, path: Union[str, Path, None]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path @property def uuid(self): image_id = self.image_id if image_id[0] == '{': # 群图片 image_id = image_id[1:37] elif image_id[0] == '/': # 好友图片 image_id = image_id[1:] return image_id def as_group_image(self) -> str: return f"{{{self.uuid.upper()}}}.jpg" def as_friend_image(self) -> str: return f"/{self.uuid.lower()}" def as_flash_image(self) -> "FlashImage": return FlashImage( image_id=self.image_id, url=self.url, path=self.path, base64=self.base64 ) async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None, determine_type: bool = True ): """下载图片到本地。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 determine_type: 是否自动根据图片类型确定拓展名,默认为 True。 """ if not self.url: logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) if determine_type: import imghdr path = path.with_suffix( '.' + str(imghdr.what(None, content)) ) path.parent.mkdir(parents=True, exist_ok=True) elif directory: import imghdr path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.uuid}.{imghdr.what(None, content)}' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) return path @classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Image": """从本地文件路径加载图片,以 base64 的形式传递。 Args: filename: 从本地文件路径加载图片,与 `content` 二选一。 content: 从本地文件内容加载图片,与 `filename` 二选一。 Returns: Image: 图片对象。 """ if content: pass elif filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定图片路径或图片内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img @classmethod def from_unsafe_path(cls, path: Union[str, Path]) -> "Image": """从不安全的路径加载图片。 Args: path: 从不安全的路径加载图片。 Returns: Image: 图片对象。 """ return cls.construct(path=str(path))
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var base64 : Optional[str]
-
图片的 Base64 编码。
var height : int
-
图片的高度
var image_id : Optional[str]
-
图片的 image_id,群图片与好友图片格式不同。不为空时将忽略 url 属性。
var path : Union[str, pathlib.Path, NoneType]
-
图片的路径,发送本地图片。
var url : Optional[pydantic.networks.HttpUrl]
-
图片的 URL,发送时可作网络图片的链接;接收时为腾讯图片服务器的链接,可用于图片下载。
var width : int
-
图片的宽度
Static methods
async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Image
-
从本地文件路径加载图片,以 base64 的形式传递。
Args
filename
- 从本地文件路径加载图片,与
content
二选一。 content
- 从本地文件内容加载图片,与
filename
二选一。
Returns
Image
- 图片对象。
Expand source code
@classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Image": """从本地文件路径加载图片,以 base64 的形式传递。 Args: filename: 从本地文件路径加载图片,与 `content` 二选一。 content: 从本地文件内容加载图片,与 `filename` 二选一。 Returns: Image: 图片对象。 """ if content: pass elif filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定图片路径或图片内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img
def from_unsafe_path(path: Union[str, pathlib.Path]) ‑> Image
-
Expand source code
@classmethod def from_unsafe_path(cls, path: Union[str, Path]) -> "Image": """从不安全的路径加载图片。 Args: path: 从不安全的路径加载图片。 Returns: Image: 图片对象。 """ return cls.construct(path=str(path))
def validate_path(path: Union[str, pathlib.Path, NoneType])
-
修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。
Expand source code
@validator('path') def validate_path(cls, path: Union[str, Path, None]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path
Instance variables
var uuid
-
Expand source code
@property def uuid(self): image_id = self.image_id if image_id[0] == '{': # 群图片 image_id = image_id[1:37] elif image_id[0] == '/': # 好友图片 image_id = image_id[1:] return image_id
Methods
def as_flash_image(self) ‑> FlashImage
-
Expand source code
def as_flash_image(self) -> "FlashImage": return FlashImage( image_id=self.image_id, url=self.url, path=self.path, base64=self.base64 )
def as_friend_image(self) ‑> str
-
Expand source code
def as_friend_image(self) -> str: return f"/{self.uuid.lower()}"
def as_group_image(self) ‑> str
-
Expand source code
def as_group_image(self) -> str: return f"{{{self.uuid.upper()}}}.jpg"
async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None, determine_type: bool = True)
-
下载图片到本地。
Args
filename
- 下载到本地的文件路径。与
directory
二选一。 directory
- 下载到本地的文件夹路径。与
filename
二选一。 determine_type
- 是否自动根据图片类型确定拓展名,默认为 True。
Expand source code
async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None, determine_type: bool = True ): """下载图片到本地。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 determine_type: 是否自动根据图片类型确定拓展名,默认为 True。 """ if not self.url: logger.warning(f'图片 `{self.uuid}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) if determine_type: import imghdr path = path.with_suffix( '.' + str(imghdr.what(None, content)) ) path.parent.mkdir(parents=True, exist_ok=True) elif directory: import imghdr path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.uuid}.{imghdr.what(None, content)}' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) return path
Inherited members
class Json (*args, **kwargs)
-
JSON。
Expand source code
class Json(MessageComponent): """JSON。""" type: str = "Json" """消息组件类型。""" json_: str """JSON 文本。"""
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var json_ : str
-
JSON 文本。
Inherited members
class MarketFace (*args, **kwargs)
-
商店表情(目前只支持接收)。
Expand source code
class MarketFace(MessageComponent): """商店表情(目前只支持接收)。""" type: str = "MarketFace" """消息组件类型。""" id: int """商店表情编号。""" name: str """商店表情名称。"""
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var id : int
-
商店表情编号。
var name : str
-
商店表情名称。
Inherited members
class MessageChain
-
消息链。
一个构造消息链的例子:
message_chain = MessageChain([ AtAll(), Plain("Hello World!"), ])
Plain
可以省略。message_chain = MessageChain([ AtAll(), "Hello World!", ])
在调用 API 时,参数中需要 MessageChain 的,也可以使用
List[MessageComponent]
代替。 例如,以下两种写法是等价的:await bot.send_friend_message(12345678, [ Plain("Hello World!") ])
await bot.send_friend_message(12345678, MessageChain([ Plain("Hello World!") ]))
使用
str(message_chain)
获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的文档。 获取未转义的消息链字符串,可以使用deserialize()(str(message_chain))
。可以使用 for 循环遍历消息链中的消息组件。
for component in message_chain: print(repr(component))
可以使用
==
运算符比较两个消息链是否相同。another_msg_chain = MessageChain([ { "type": "AtAll" }, { "type": "Plain", "text": "Hello World!" }, ]) print(message_chain == another_msg_chain) 'True'
可以使用
in
运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。if AtAll in message_chain: print('AtAll') if At(bot.qq) in message_chain: print('At Me') if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain: print('Hello!') if 'Hello' in message_chain: print('Hi!')
消息链的
has
方法和in
等价。if message_chain.has(AtAll): print('AtAll')
也可以使用
>=
和<=
运算符:if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain: print('Hello!')
需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。
消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。
plain_list = message_chain[Plain] '[Plain("Hello World!")]'
以
类型, 数量
为索引,获取前至多多少个该类型的消息组件。plain_list_first = message_chain[Plain, 1] '[Plain("Hello World!")]'
消息链的
get
方法和索引操作等价。plain_list_first = message_chain.get(Plain) '[Plain("Hello World!")]'
消息链的
get
方法还可指定第二个参数count
,这相当于以类型, 数量
为索引。plain_list_first = message_chain.get(Plain, 1) # 这等价于 plain_list_first = message_chain[Plain, 1]
可以用加号连接两个消息链。
MessageChain(['Hello World!']) + MessageChain(['Goodbye World!']) # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")])
可以用
*
运算符复制消息链。MessageChain(['Hello World!']) * 2 # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")])
除此之外,消息链还支持很多 list 拥有的操作,比如
index
和count
。message_chain = MessageChain([ AtAll(), "Hello World!", ]) message_chain.index(Plain) # 返回 0 message_chain.count(Plain) # 返回 1
消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。
Expand source code
class MessageChain(MiraiBaseModel): """消息链。 一个构造消息链的例子: ```py message_chain = MessageChain([ AtAll(), Plain("Hello World!"), ]) ``` `Plain` 可以省略。 ```py message_chain = MessageChain([ AtAll(), "Hello World!", ]) ``` 在调用 API 时,参数中需要 MessageChain 的,也可以使用 `List[MessageComponent]` 代替。 例如,以下两种写法是等价的: ```py await bot.send_friend_message(12345678, [ Plain("Hello World!") ]) ``` ```py await bot.send_friend_message(12345678, MessageChain([ Plain("Hello World!") ])) ``` 使用`str(message_chain)`获取消息链的字符串表示,字符串采用 mirai 码格式, 并自动按照 mirai 码的规定转义。 参看 mirai 的[文档](https://github.com/mamoe/mirai/blob/dev/docs/Messages.md#mirai-%E7%A0%81)。 获取未转义的消息链字符串,可以使用`deserialize(str(message_chain))`。 可以使用 for 循环遍历消息链中的消息组件。 ```py for component in message_chain: print(repr(component)) ``` 可以使用 `==` 运算符比较两个消息链是否相同。 ```py another_msg_chain = MessageChain([ { "type": "AtAll" }, { "type": "Plain", "text": "Hello World!" }, ]) print(message_chain == another_msg_chain) 'True' ``` 可以使用 `in` 运算检查消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 ```py if AtAll in message_chain: print('AtAll') if At(bot.qq) in message_chain: print('At Me') if MessageChain([At(bot.qq), Plain('Hello!')]) in message_chain: print('Hello!') if 'Hello' in message_chain: print('Hi!') ``` 消息链的 `has` 方法和 `in` 等价。 ```py if message_chain.has(AtAll): print('AtAll') ``` 也可以使用 `>=` 和 `<= `运算符: ```py if MessageChain([At(bot.qq), Plain('Hello!')]) <= message_chain: print('Hello!') ``` 需注意,此处的子消息链匹配会把 Plain 看成一个整体,而不是匹配其文本的一部分。 如需对文本进行部分匹配,请采用 mirai 码字符串匹配的方式。 消息链对索引操作进行了增强。以消息组件类型为索引,获取消息链中的全部该类型的消息组件。 ```py plain_list = message_chain[Plain] '[Plain("Hello World!")]' ``` 以 `类型, 数量` 为索引,获取前至多多少个该类型的消息组件。 ```py plain_list_first = message_chain[Plain, 1] '[Plain("Hello World!")]' ``` 消息链的 `get` 方法和索引操作等价。 ```py plain_list_first = message_chain.get(Plain) '[Plain("Hello World!")]' ``` 消息链的 `get` 方法还可指定第二个参数 `count`,这相当于以 `类型, 数量` 为索引。 ```py plain_list_first = message_chain.get(Plain, 1) # 这等价于 plain_list_first = message_chain[Plain, 1] ``` 可以用加号连接两个消息链。 ```py MessageChain(['Hello World!']) + MessageChain(['Goodbye World!']) # 返回 MessageChain([Plain("Hello World!"), Plain("Goodbye World!")]) ``` 可以用 `*` 运算符复制消息链。 ```py MessageChain(['Hello World!']) * 2 # 返回 MessageChain([Plain("Hello World!"), Plain("Hello World!")]) ``` 除此之外,消息链还支持很多 list 拥有的操作,比如 `index` 和 `count`。 ```py message_chain = MessageChain([ AtAll(), "Hello World!", ]) message_chain.index(Plain) # 返回 0 message_chain.count(Plain) # 返回 1 ``` 消息链对这些操作进行了拓展。在传入元素的地方,一般都可以传入元素的类型。 """ __root__: List[MessageComponent] @staticmethod def _parse_message_chain(msg_chain: Iterable): result = [] for msg in msg_chain: if isinstance(msg, dict): result.append(MessageComponent.parse_subtype(msg)) elif isinstance(msg, MessageComponent): result.append(msg) elif isinstance(msg, str): result.append(Plain(msg)) else: raise TypeError( f"消息链中元素需为 dict 或 str 或 MessageComponent,当前类型:{type(msg)}" ) return result @validator('__root__', always=True, pre=True) def _parse_component(cls, msg_chain): if isinstance(msg_chain, (str, MessageComponent)): msg_chain = [msg_chain] if not msg_chain: msg_chain = [] return cls._parse_message_chain(msg_chain) @classmethod def parse_obj(cls, msg_chain: Iterable): """通过列表形式的消息链,构造对应的 `MessageChain` 对象。 Args: msg_chain: 列表形式的消息链。 """ result = cls._parse_message_chain(msg_chain) return cls(__root__=result) def __init__(self, __root__: Iterable[MessageComponent] = None): super().__init__(__root__=__root__) def __str__(self): return "".join(str(component) for component in self.__root__) def as_mirai_code(self) -> str: """将消息链转换为 mirai 码字符串。 该方法会自动转换消息链中的元素。 Returns: mirai 码字符串。 """ return "".join( component.as_mirai_code() for component in self.__root__ ) def __repr__(self): return f'{self.__class__.__name__}({self.__root__!r})' def __iter__(self): yield from self.__root__ @overload def get(self, index: int) -> MessageComponent: ... @overload def get(self, index: slice) -> List[MessageComponent]: ... @overload def get(self, index: Type[TMessageComponent]) -> List[TMessageComponent]: ... @overload def get( self, index: Tuple[Type[TMessageComponent], int] ) -> List[TMessageComponent]: ... def get( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]], count: Optional[int] = None ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: """获取消息链中的某个(某些)消息组件,或某类型的消息组件。 Args: index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`): 如果为 `int`,则返回该索引处的消息组件。 如果为 `slice`,则返回该索引范围处的消息组件。 如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。 如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。 count: 如果为 `int`,则返回至多 `count` 个消息组件。 Returns: MessageComponent: 返回指定索引处的消息组件。 List[MessageComponent]: 返回指定索引范围的消息组件。 List[TMessageComponent]: 返回指定类型的消息组件构成的列表。 """ # 正常索引 if isinstance(index, int): return self.__root__[index] # 切片索引 if isinstance(index, slice): return self.__root__[index] # 指定 count if count: if isinstance(index, type): index = (index, count) elif isinstance(index, tuple): index = (index[0], count if count < index[1] else index[1]) # 索引对象为 MessageComponent 类,返回所有对应 component if isinstance(index, type): return [ component for component in self if type(component) is index ] # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component if isinstance(index, tuple): components = ( component for component in self if type(component) is index[0] ) return [ component for component, _ in zip(components, range(index[1])) ] raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}") def get_first(self, t: Type[TMessageComponent]) -> Optional[TMessageComponent]: """获取消息链中第一个符合类型的消息组件。""" for component in self: if isinstance(component, t): return component return None @overload def __getitem__(self, index: int) -> MessageComponent: ... @overload def __getitem__(self, index: slice) -> List[MessageComponent]: ... @overload def __getitem__(self, index: Type[TMessageComponent]) -> List[TMessageComponent]: ... @overload def __getitem__( self, index: Tuple[Type[TMessageComponent], int] ) -> List[TMessageComponent]: ... def __getitem__( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]] ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: return self.get(index) def __setitem__( self, key: Union[int, slice], value: Union[MessageComponent, str, Iterable[Union[MessageComponent, str]]] ): if isinstance(value, str): value = Plain(value) if isinstance(value, Iterable): value = (Plain(c) if isinstance(c, str) else c for c in value) self.__root__[key] = value # type: ignore def __delitem__(self, key: Union[int, slice]): del self.__root__[key] def __reversed__(self) -> Iterable[MessageComponent]: return reversed(self.__root__) def has( self, sub: Union[MessageComponent, Type[MessageComponent], 'MessageChain', str] ) -> bool: """判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 Args: sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`): 若为 `MessageComponent`,则判断该组件是否在消息链中。 若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。 若为 `MessageChain`,则判断该子消息链是否在消息链中。 若为 `str`,则判断对应的 mirai 码中是否有某子字符串。 Returns: bool: 是否找到。 """ if isinstance(sub, type): # 检测消息链中是否有某种类型的对象 for i in self: if type(i) is sub: return True return False if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件 for i in self: if i == sub: return True return False if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链 return bool(kmp(self, sub)) if isinstance(sub, str): # 检查消息中有无指定字符串子串 return sub in deserialize(str(self)) raise TypeError(f"类型不匹配,当前类型:{type(sub)}") def __contains__(self, sub) -> bool: return self.has(sub) def __ge__(self, other): return other in self def __len__(self) -> int: return len(self.__root__) def __add__( self, other: Union['MessageChain', MessageComponent, str] ) -> 'MessageChain': if isinstance(other, MessageChain): return self.__class__(self.__root__ + other.__root__) if isinstance(other, str): return self.__class__(self.__root__ + [Plain(other)]) if isinstance(other, MessageComponent): return self.__class__(self.__root__ + [other]) return NotImplemented def __radd__(self, other: Union[MessageComponent, str]) -> 'MessageChain': if isinstance(other, MessageComponent): return self.__class__([other] + self.__root__) if isinstance(other, str): return self.__class__( [cast(MessageComponent, Plain(other))] + self.__root__ ) return NotImplemented def __mul__(self, other: int): if isinstance(other, int): return self.__class__(self.__root__ * other) return NotImplemented def __rmul__(self, other: int): return self.__mul__(other) def __iadd__(self, other: Iterable[Union[MessageComponent, str]]): self.extend(other) def __imul__(self, other: int): if isinstance(other, int): self.__root__ *= other return NotImplemented def index( self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1 ) -> int: """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 i: 从哪个位置开始查找。 j: 查找到哪个位置结束。 Returns: int: 如果找到,则返回索引号。 Raises: ValueError: 没有找到。 TypeError: 类型不匹配。 """ if isinstance(x, type): l = len(self) if i < 0: i += l if i < 0: i = 0 if j < 0: j += l if j > l: j = l for index in range(i, j): if type(self[index]) is x: return index raise ValueError("消息链中不存在该类型的组件。") if isinstance(x, MessageComponent): return self.__root__.index(x, i, j) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int: """返回消息链中 x 出现的次数。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 Returns: int: 次数。 """ if isinstance(x, type): return sum(1 for i in self if type(i) is x) if isinstance(x, MessageComponent): return self.__root__.count(x) raise TypeError(f"类型不匹配,当前类型:{type(x)}") def extend(self, x: Iterable[Union[MessageComponent, str]]): """将另一个消息链中的元素添加到消息链末尾。 Args: x: 另一个消息链,也可为消息元素或字符串元素的序列。 """ self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x) def append(self, x: Union[MessageComponent, str]): """将一个消息元素或字符串元素添加到消息链末尾。 Args: x: 消息元素或字符串元素。 """ self.__root__.append(Plain(x) if isinstance(x, str) else x) def insert(self, i: int, x: Union[MessageComponent, str]): """将一个消息元素或字符串添加到消息链中指定位置。 Args: i: 插入位置。 x: 消息元素或字符串元素。 """ self.__root__.insert(i, Plain(x) if isinstance(x, str) else x) def pop(self, i: int = -1) -> MessageComponent: """从消息链中移除并返回指定位置的元素。 Args: i: 移除位置。默认为末尾。 Returns: MessageComponent: 移除的元素。 """ return self.__root__.pop(i) def remove(self, x: Union[MessageComponent, Type[MessageComponent]]): """从消息链中移除指定元素或指定类型的一个元素。 Args: x: 指定的元素或元素类型。 """ if isinstance(x, type): self.pop(self.index(x)) if isinstance(x, MessageComponent): self.__root__.remove(x) def exclude( self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1 ) -> 'MessageChain': """返回移除指定元素或指定类型的元素后剩余的消息链。 Args: x: 指定的元素或元素类型。 count: 至多移除的数量。默认为全部移除。 Returns: MessageChain: 剩余的消息链。 """ def _exclude(): nonlocal count x_is_type = isinstance(x, type) for c in self: if count > 0 and ((x_is_type and type(c) is x) or c == x): count -= 1 continue yield c return self.__class__(_exclude()) def reverse(self): """将消息链原地翻转。""" self.__root__.reverse() @classmethod def join(cls, *args: Iterable[Union[str, MessageComponent]]): return cls( Plain(c) if isinstance(c, str) else c for c in itertools.chain(*args) ) @property def source(self) -> Optional['Source']: """获取消息链中的 `Source` 对象。""" return self.get_first(Source) @property def message_id(self) -> int: """获取消息链的 message_id,若无法获取,返回 -1。""" source = self.source return source.id if source else -1
Ancestors
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Static methods
def join(*args: Iterable[Union[MessageComponent, str]])
-
Expand source code
@classmethod def join(cls, *args: Iterable[Union[str, MessageComponent]]): return cls( Plain(c) if isinstance(c, str) else c for c in itertools.chain(*args) )
def parse_obj(msg_chain: Iterable)
-
Expand source code
@classmethod def parse_obj(cls, msg_chain: Iterable): """通过列表形式的消息链,构造对应的 `MessageChain` 对象。 Args: msg_chain: 列表形式的消息链。 """ result = cls._parse_message_chain(msg_chain) return cls(__root__=result)
Instance variables
var message_id : int
-
获取消息链的 message_id,若无法获取,返回 -1。
Expand source code
@property def message_id(self) -> int: """获取消息链的 message_id,若无法获取,返回 -1。""" source = self.source return source.id if source else -1
var source : Optional[Source]
-
获取消息链中的
Source
对象。Expand source code
@property def source(self) -> Optional['Source']: """获取消息链中的 `Source` 对象。""" return self.get_first(Source)
Methods
def append(self, x: Union[MessageComponent, str])
-
将一个消息元素或字符串元素添加到消息链末尾。
Args
x
- 消息元素或字符串元素。
Expand source code
def append(self, x: Union[MessageComponent, str]): """将一个消息元素或字符串元素添加到消息链末尾。 Args: x: 消息元素或字符串元素。 """ self.__root__.append(Plain(x) if isinstance(x, str) else x)
def as_mirai_code(self) ‑> str
-
将消息链转换为 mirai 码字符串。
该方法会自动转换消息链中的元素。
Returns
mirai 码字符串。
Expand source code
def as_mirai_code(self) -> str: """将消息链转换为 mirai 码字符串。 该方法会自动转换消息链中的元素。 Returns: mirai 码字符串。 """ return "".join( component.as_mirai_code() for component in self.__root__ )
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) ‑> int
-
返回消息链中 x 出现的次数。
Args
x (
Union[MessageComponent, Type[MessageComponent]]
): 要查找的消息元素或消息元素类型。Returns
int
- 次数。
Expand source code
def count(self, x: Union[MessageComponent, Type[MessageComponent]]) -> int: """返回消息链中 x 出现的次数。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 Returns: int: 次数。 """ if isinstance(x, type): return sum(1 for i in self if type(i) is x) if isinstance(x, MessageComponent): return self.__root__.count(x) raise TypeError(f"类型不匹配,当前类型:{type(x)}")
def exclude(self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1) ‑> MessageChain
-
Expand source code
def exclude( self, x: Union[MessageComponent, Type[MessageComponent]], count: int = -1 ) -> 'MessageChain': """返回移除指定元素或指定类型的元素后剩余的消息链。 Args: x: 指定的元素或元素类型。 count: 至多移除的数量。默认为全部移除。 Returns: MessageChain: 剩余的消息链。 """ def _exclude(): nonlocal count x_is_type = isinstance(x, type) for c in self: if count > 0 and ((x_is_type and type(c) is x) or c == x): count -= 1 continue yield c return self.__class__(_exclude())
def extend(self, x: Iterable[Union[MessageComponent, str]])
-
将另一个消息链中的元素添加到消息链末尾。
Args
x
- 另一个消息链,也可为消息元素或字符串元素的序列。
Expand source code
def extend(self, x: Iterable[Union[MessageComponent, str]]): """将另一个消息链中的元素添加到消息链末尾。 Args: x: 另一个消息链,也可为消息元素或字符串元素的序列。 """ self.__root__.extend(Plain(c) if isinstance(c, str) else c for c in x)
def get(self, index: Union[int, slice, Type[~TMessageComponent], Tuple[Type[~TMessageComponent], int]], count: Optional[int] = None) ‑> Union[MessageComponent, List[MessageComponent], List[~TMessageComponent]]
-
获取消息链中的某个(某些)消息组件,或某类型的消息组件。
Args
index (
Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]
): 如果为int
,则返回该索引处的消息组件。 如果为slice
,则返回该索引范围处的消息组件。 如果为Type[TMessageComponent]
,则返回该类型的全部消息组件。 如果为Tuple[Type[TMessageComponent], int]
,则返回该类型的至多index[1]
个消息组件。count
- 如果为
int
,则返回至多count
个消息组件。
Returns
MessageComponent
- 返回指定索引处的消息组件。
List[MessageComponent]
- 返回指定索引范围的消息组件。
List[TMessageComponent]
- 返回指定类型的消息组件构成的列表。
Expand source code
def get( self, index: Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]], count: Optional[int] = None ) -> Union[MessageComponent, List[MessageComponent], List[TMessageComponent]]: """获取消息链中的某个(某些)消息组件,或某类型的消息组件。 Args: index (`Union[int, slice, Type[TMessageComponent], Tuple[Type[TMessageComponent], int]]`): 如果为 `int`,则返回该索引处的消息组件。 如果为 `slice`,则返回该索引范围处的消息组件。 如果为 `Type[TMessageComponent]`,则返回该类型的全部消息组件。 如果为 `Tuple[Type[TMessageComponent], int]`,则返回该类型的至多 `index[1]` 个消息组件。 count: 如果为 `int`,则返回至多 `count` 个消息组件。 Returns: MessageComponent: 返回指定索引处的消息组件。 List[MessageComponent]: 返回指定索引范围的消息组件。 List[TMessageComponent]: 返回指定类型的消息组件构成的列表。 """ # 正常索引 if isinstance(index, int): return self.__root__[index] # 切片索引 if isinstance(index, slice): return self.__root__[index] # 指定 count if count: if isinstance(index, type): index = (index, count) elif isinstance(index, tuple): index = (index[0], count if count < index[1] else index[1]) # 索引对象为 MessageComponent 类,返回所有对应 component if isinstance(index, type): return [ component for component in self if type(component) is index ] # 索引对象为 MessageComponent 和 int 构成的 tuple, 返回指定数量的 component if isinstance(index, tuple): components = ( component for component in self if type(component) is index[0] ) return [ component for component, _ in zip(components, range(index[1])) ] raise TypeError(f"消息链索引需为 int 或 MessageComponent,当前类型:{type(index)}")
def get_first(self, t: Type[~TMessageComponent]) ‑> Optional[~TMessageComponent]
-
获取消息链中第一个符合类型的消息组件。
Expand source code
def get_first(self, t: Type[TMessageComponent]) -> Optional[TMessageComponent]: """获取消息链中第一个符合类型的消息组件。""" for component in self: if isinstance(component, t): return component return None
def has(self, sub: Union[MessageComponent, Type[MessageComponent], ForwardRef('MessageChain'), str]) ‑> bool
-
判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。
Args
sub (
Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]
): 若为MessageComponent
,则判断该组件是否在消息链中。 若为Type[MessageComponent]
,则判断该组件类型是否在消息链中。 若为MessageChain
,则判断该子消息链是否在消息链中。 若为str
,则判断对应的 mirai 码中是否有某子字符串。Returns
bool
- 是否找到。
Expand source code
def has( self, sub: Union[MessageComponent, Type[MessageComponent], 'MessageChain', str] ) -> bool: """判断消息链中: 1. 是否有某个消息组件。 2. 是否有某个类型的消息组件。 3. 是否有某个子消息链。 4. 对应的 mirai 码中是否有某子字符串。 Args: sub (`Union[MessageComponent, Type[MessageComponent], 'MessageChain', str]`): 若为 `MessageComponent`,则判断该组件是否在消息链中。 若为 `Type[MessageComponent]`,则判断该组件类型是否在消息链中。 若为 `MessageChain`,则判断该子消息链是否在消息链中。 若为 `str`,则判断对应的 mirai 码中是否有某子字符串。 Returns: bool: 是否找到。 """ if isinstance(sub, type): # 检测消息链中是否有某种类型的对象 for i in self: if type(i) is sub: return True return False if isinstance(sub, MessageComponent): # 检查消息链中是否有某个组件 for i in self: if i == sub: return True return False if isinstance(sub, MessageChain): # 检查消息链中是否有某个子消息链 return bool(kmp(self, sub)) if isinstance(sub, str): # 检查消息中有无指定字符串子串 return sub in deserialize(str(self)) raise TypeError(f"类型不匹配,当前类型:{type(sub)}")
def index(self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1) ‑> int
-
返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。
Args
- x (
Union[MessageComponent, Type[MessageComponent]]
): - 要查找的消息元素或消息元素类型。
i
- 从哪个位置开始查找。
j
- 查找到哪个位置结束。
Returns
int
- 如果找到,则返回索引号。
Raises
ValueError
- 没有找到。
TypeError
- 类型不匹配。
Expand source code
def index( self, x: Union[MessageComponent, Type[MessageComponent]], i: int = 0, j: int = -1 ) -> int: """返回 x 在消息链中首次出现项的索引号(索引号在 i 或其后且在 j 之前)。 Args: x (`Union[MessageComponent, Type[MessageComponent]]`): 要查找的消息元素或消息元素类型。 i: 从哪个位置开始查找。 j: 查找到哪个位置结束。 Returns: int: 如果找到,则返回索引号。 Raises: ValueError: 没有找到。 TypeError: 类型不匹配。 """ if isinstance(x, type): l = len(self) if i < 0: i += l if i < 0: i = 0 if j < 0: j += l if j > l: j = l for index in range(i, j): if type(self[index]) is x: return index raise ValueError("消息链中不存在该类型的组件。") if isinstance(x, MessageComponent): return self.__root__.index(x, i, j) raise TypeError(f"类型不匹配,当前类型:{type(x)}")
- x (
def insert(self, i: int, x: Union[MessageComponent, str])
-
将一个消息元素或字符串添加到消息链中指定位置。
Args
i
- 插入位置。
x
- 消息元素或字符串元素。
Expand source code
def insert(self, i: int, x: Union[MessageComponent, str]): """将一个消息元素或字符串添加到消息链中指定位置。 Args: i: 插入位置。 x: 消息元素或字符串元素。 """ self.__root__.insert(i, Plain(x) if isinstance(x, str) else x)
def pop(self, i: int = -1) ‑> MessageComponent
-
Expand source code
def pop(self, i: int = -1) -> MessageComponent: """从消息链中移除并返回指定位置的元素。 Args: i: 移除位置。默认为末尾。 Returns: MessageComponent: 移除的元素。 """ return self.__root__.pop(i)
def remove(self, x: Union[MessageComponent, Type[MessageComponent]])
-
从消息链中移除指定元素或指定类型的一个元素。
Args
x
- 指定的元素或元素类型。
Expand source code
def remove(self, x: Union[MessageComponent, Type[MessageComponent]]): """从消息链中移除指定元素或指定类型的一个元素。 Args: x: 指定的元素或元素类型。 """ if isinstance(x, type): self.pop(self.index(x)) if isinstance(x, MessageComponent): self.__root__.remove(x)
def reverse(self)
-
将消息链原地翻转。
Expand source code
def reverse(self): """将消息链原地翻转。""" self.__root__.reverse()
class MessageComponent (*args, **kwargs)
-
消息组件。
Expand source code
class MessageComponent(MiraiIndexedModel, metaclass=MessageComponentMetaclass): """消息组件。""" type: str """消息组件类型。""" def __str__(self): return '' def as_mirai_code(self) -> str: """转化为 mirai 码。""" return '' def __repr__(self): return self.__class__.__name__ + '(' + ', '.join( ( f'{k}={repr(v)}' for k, v in self.__dict__.items() if k != 'type' and v ) ) + ')' def __init__(self, *args, **kwargs): # 解析参数列表,将位置参数转化为具名参数 parameter_names = self.__parameter_names__ if len(args) > len(parameter_names): raise TypeError( f'`{self.type}`需要{len(parameter_names)}个参数,但传入了{len(args)}个。' ) for name, value in zip(parameter_names, args): if name in kwargs: raise TypeError(f'在 `{self.type}` 中,具名参数 `{name}` 与位置参数重复。') kwargs[name] = value super().__init__(**kwargs)
Ancestors
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
- App
- At
- AtAll
- Dice
- Face
- File
- Forward
- Image
- Json
- MarketFace
- mirai.models.message.MiraiCode
- MusicShare
- Plain
- Poke
- Quote
- Source
- Unknown
- Voice
- Xml
Class variables
var type : str
-
消息组件类型。
Methods
def as_mirai_code(self) ‑> str
-
转化为 mirai 码。
Expand source code
def as_mirai_code(self) -> str: """转化为 mirai 码。""" return ''
Inherited members
-
音乐分享。
Expand source code
class MusicShare(MessageComponent): """音乐分享。""" type: str = "MusicShare" """消息组件类型。""" kind: MusicShareKind """音乐分享的来源。""" title: str """标题。""" summary: str """歌手。""" jump_url: HttpUrl """跳转路径。""" picture_url: HttpUrl """封面路径。""" music_url: HttpUrl """音源路径。""" brief: str = "" """在消息列表中显示的内容。""" def __str__(self): return self.brief
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
-
在消息列表中显示的内容。
-
跳转路径。
-
音乐分享的来源。
-
音源路径。
-
封面路径。
-
歌手。
-
标题。
Inherited members
-
音乐分享的来源。
Expand source code
class MusicShareKind(str, Enum): """音乐分享的来源。""" NeteaseCloudMusic = "NeteaseCloudMusic" QQMusic = "QQMusic" MiguMusic = "MiguMusic" KugouMusic = "KugouMusic" KuwoMusic = "KuwoMusic"
Ancestors
- builtins.str
- enum.Enum
Class variables
class Plain (*args, **kwargs)
-
纯文本。
Expand source code
class Plain(MessageComponent): """纯文本。""" type: str = "Plain" """消息组件类型。""" text: str """文字消息。""" def __str__(self): return self.text def as_mirai_code(self) -> str: return serialize(self.text) def __repr__(self): return f'Plain({self.text!r})'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var text : str
-
文字消息。
Inherited members
class Poke (*args, **kwargs)
-
戳一戳。
Expand source code
class Poke(MessageComponent): """戳一戳。""" type: str = "Poke" """消息组件类型。""" name: PokeNames """名称。""" @property def poke_type(self): return POKE_TYPE[self.name] @property def poke_id(self): return POKE_ID[self.name] def __str__(self): return f'[{POKE_NAME[self.name]}]' def as_mirai_code(self) -> str: return f'[mirai:poke:{self.name},{self.poke_type},{self.poke_id}]'
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var name : PokeNames
-
名称。
Instance variables
var poke_id
-
Expand source code
@property def poke_id(self): return POKE_ID[self.name]
var poke_type
-
Expand source code
@property def poke_type(self): return POKE_TYPE[self.name]
Inherited members
class PokeNames (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
戳一戳名称。
Expand source code
class PokeNames(str, Enum): """戳一戳名称。""" ChuoYiChuo = "ChuoYiChuo" BiXin = "BiXin" DianZan = "DianZan" XinSui = "XinSui" LiuLiuLiu = "LiuLiuLiu" FangDaZhao = "FangDaZhao" BaoBeiQiu = "BaoBeiQiu" Rose = "Rose" ZhaoHuanShu = "ZhaoHuanShu" RangNiPi = "RangNiPi" JeiYin = "JeiYin" ShouLei = "ShouLei" GouYin = "GouYin" ZhuaYiXia = "ZhuaYiXia" SuiPing = "SuiPing" QiaoMen = "QiaoMen" def as_component(self) -> 'Poke': return Poke(name=self.value)
Ancestors
- builtins.str
- enum.Enum
Class variables
var BaoBeiQiu
var BiXin
var ChuoYiChuo
var DianZan
var FangDaZhao
var GouYin
var JeiYin
var LiuLiuLiu
var QiaoMen
var RangNiPi
var Rose
var ShouLei
var SuiPing
var XinSui
var ZhaoHuanShu
var ZhuaYiXia
Methods
def as_component(self) ‑> Poke
-
Expand source code
def as_component(self) -> 'Poke': return Poke(name=self.value)
class Quote (*args, **kwargs)
-
引用。
Expand source code
class Quote(MessageComponent): """引用。""" type: str = "Quote" """消息组件类型。""" id: Optional[int] = None """被引用回复的原消息的 message_id。""" group_id: Optional[int] = None """被引用回复的原消息所接收的群号,当为好友消息时为0。""" sender_id: Optional[int] = None """被引用回复的原消息的发送者的QQ号。""" target_id: Optional[int] = None """被引用回复的原消息的接收者者的QQ号(或群号)。""" origin: MessageChain """被引用回复的原消息的消息链对象。""" @validator("origin", always=True, pre=True) def origin_formater(cls, v): return MessageChain.parse_obj(v)
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var group_id : Optional[int]
-
被引用回复的原消息所接收的群号,当为好友消息时为0。
var id : Optional[int]
-
被引用回复的原消息的 message_id。
var origin : MessageChain
-
被引用回复的原消息的消息链对象。
var sender_id : Optional[int]
-
被引用回复的原消息的发送者的QQ号。
var target_id : Optional[int]
-
被引用回复的原消息的接收者者的QQ号(或群号)。
Static methods
def origin_formater(v)
-
Expand source code
@validator("origin", always=True, pre=True) def origin_formater(cls, v): return MessageChain.parse_obj(v)
Inherited members
class Source (*args, **kwargs)
-
源。包含消息的基本信息。
Expand source code
class Source(MessageComponent): """源。包含消息的基本信息。""" type: str = "Source" """消息组件类型。""" id: int """消息的识别号,用于引用回复(Source 类型永远为 MessageChain 的第一个元素)。""" time: datetime """消息时间。"""
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var id : int
-
消息的识别号,用于引用回复(Source 类型永远为 MessageChain 的第一个元素)。
var time : datetime.datetime
-
消息时间。
Inherited members
class Unknown (*args, **kwargs)
-
未知。
Expand source code
class Unknown(MessageComponent): """未知。""" type: str = "Unknown" """消息组件类型。""" text: str """文本。"""
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var text : str
-
文本。
Inherited members
class Voice (*args, **kwargs)
-
语音。
Expand source code
class Voice(MessageComponent): """语音。""" type: str = "Voice" """消息组件类型。""" voice_id: Optional[str] = None """语音的 voice_id,不为空时将忽略 url 属性。""" url: Optional[str] = None """语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。""" path: Optional[str] = None """语音的路径,发送本地语音。""" base64: Optional[str] = None """语音的 Base64 编码。""" length: Optional[int] = None """语音的长度,单位为秒。""" @validator('path') def validate_path(cls, path: Optional[str]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path def __str__(self): return '[语音]' async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None ): """下载语音到本地。 语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 """ if not self.url: logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) path.parent.mkdir(parents=True, exist_ok=True) elif directory: path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.voice_id}.silk' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content) @classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Voice": """从本地文件路径加载语音,以 base64 的形式传递。 Args: filename: 从本地文件路径加载语音,与 `content` 二选一。 content: 从本地文件内容加载语音,与 `filename` 二选一。 """ if content: pass if filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定语音路径或语音内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var base64 : Optional[str]
-
语音的 Base64 编码。
var length : Optional[int]
-
语音的长度,单位为秒。
var path : Optional[str]
-
语音的路径,发送本地语音。
var url : Optional[str]
-
语音的 URL,发送时可作网络语音的链接;接收时为腾讯语音服务器的链接,可用于语音下载。
var voice_id : Optional[str]
-
语音的 voice_id,不为空时将忽略 url 属性。
Static methods
async def from_local(filename: Union[str, pathlib.Path, NoneType] = None, content: Optional[bytes] = None) ‑> Voice
-
从本地文件路径加载语音,以 base64 的形式传递。
Args
filename
- 从本地文件路径加载语音,与
content
二选一。 content
- 从本地文件内容加载语音,与
filename
二选一。
Expand source code
@classmethod async def from_local( cls, filename: Union[str, Path, None] = None, content: Optional[bytes] = None, ) -> "Voice": """从本地文件路径加载语音,以 base64 的形式传递。 Args: filename: 从本地文件路径加载语音,与 `content` 二选一。 content: 从本地文件内容加载语音,与 `filename` 二选一。 """ if content: pass if filename: path = Path(filename) import aiofiles async with aiofiles.open(path, 'rb') as f: content = await f.read() else: raise ValueError("请指定语音路径或语音内容!") import base64 img = cls(base64=base64.b64encode(content).decode()) return img
def validate_path(path: Optional[str])
-
修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。
Expand source code
@validator('path') def validate_path(cls, path: Optional[str]): """修复 path 参数的行为,使之相对于 YiriMirai 的启动路径。""" if path: try: return str(Path(path).resolve(strict=True)) except FileNotFoundError: raise ValueError(f"无效路径:{path}") else: return path
Methods
async def download(self, filename: Union[str, pathlib.Path, NoneType] = None, directory: Union[str, pathlib.Path, NoneType] = None)
-
下载语音到本地。
语音采用 silk v3 格式,silk 格式的编码解码请使用 graiax-silkcoder。
Args
filename
- 下载到本地的文件路径。与
directory
二选一。 directory
- 下载到本地的文件夹路径。与
filename
二选一。
Expand source code
async def download( self, filename: Union[str, Path, None] = None, directory: Union[str, Path, None] = None ): """下载语音到本地。 语音采用 silk v3 格式,silk 格式的编码解码请使用 [graiax-silkcoder](https://pypi.org/project/graiax-silkcoder/)。 Args: filename: 下载到本地的文件路径。与 `directory` 二选一。 directory: 下载到本地的文件夹路径。与 `filename` 二选一。 """ if not self.url: logger.warning(f'语音 `{self.voice_id}` 无 url 参数,下载失败。') return import httpx async with httpx.AsyncClient() as client: response = await client.get(self.url) response.raise_for_status() content = response.content if filename: path = Path(filename) path.parent.mkdir(parents=True, exist_ok=True) elif directory: path = Path(directory) path.mkdir(parents=True, exist_ok=True) path = path / f'{self.voice_id}.silk' else: raise ValueError("请指定文件路径或文件夹路径!") import aiofiles async with aiofiles.open(path, 'wb') as f: await f.write(content)
Inherited members
class Xml (*args, **kwargs)
-
XML。
Expand source code
class Xml(MessageComponent): """XML。""" type: str = "Xml" """消息组件类型。""" xml: str """XML文本。"""
Ancestors
- MessageComponent
- MiraiIndexedModel
- MiraiBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var xml : str
-
XML文本。
Inherited members