Module mirai.adapters
此模块提供网络适配器相关。
网络适配器负责与 mirai-api-http 沟通,详见各子模块。
Expand source code
# -*- coding: utf-8 -*-
"""
此模块提供网络适配器相关。
网络适配器负责与 mirai-api-http 沟通,详见各子模块。
"""
from typing import TYPE_CHECKING
from .base import Adapter
if TYPE_CHECKING:
from .compose import ComposeAdapter
from .http import HTTPAdapter
from .webhook import WebHookAdapter
from .websocket import WebSocketAdapter
def __getattr__(name):
import importlib
MODULES = {
'Adapter': '.base',
'ComposeAdapter': '.compose',
'HTTPAdapter': '.http',
'WebHookAdapter': '.webhook',
'WebSocketAdapter': '.websocket',
}
if name in MODULES:
module_name = MODULES[name]
module = importlib.import_module(module_name, __name__)
return getattr(module, name)
raise AttributeError(f'Module {__name__} has no attribute {name}')
__all__ = [
'Adapter',
'HTTPAdapter',
'WebSocketAdapter',
'WebHookAdapter',
'ComposeAdapter',
]
Sub-modules
mirai.adapters.base
-
此模块提供网络适配器的一系列基础定义。
mirai.adapters.compose
-
此模块提供组合适配器,可以将两个适配器组合使用。
mirai.adapters.http
-
此模块提供 HTTP 轮询适配器,适用于 mirai-api-http 的 http adapter。
mirai.adapters.reverse_websocket
-
此模块提供反向 WebSocket 适配器,适用于 mirai-api-http 的 reverse websocket adapter。
mirai.adapters.webhook
-
此模块提供 HTTP 回调适配器,适用于 mirai-api-http 的 webhook adapter。
mirai.adapters.websocket
-
此模块提供正向 WebSocket 适配器,适用于 mirai-api-http 的 websocket adapter。
Classes
class Adapter (verify_key: Optional[str], single_mode: bool = False)
-
适配器基类,与 mirai-api-http 沟通的底层实现。
属性
buses
为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
single_mode
- 是否开启 single_mode,开启后与 session 将无效。
Expand source code
class Adapter(ApiProvider, AdapterInterface): """适配器基类,与 mirai-api-http 沟通的底层实现。 属性 `buses` 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。 """ verify_key: Optional[str] """mirai-api-http 配置的认证 key,关闭认证时为 None。""" single_mode: bool """是否开启 single_mode,开启后与 session 将无效。""" session: str """从 mirai-api-http 处获得的 session。""" buses: Set[AbstractEventBus] """注册的事件总线集合。""" background: Optional[asyncio.Task] """背景事件循环任务。""" def __init__(self, verify_key: Optional[str], single_mode: bool = False): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 single_mode: 是否开启 single_mode,开启后与 session 将无效。 """ self.verify_key = verify_key self.single_mode = single_mode self.session = '' self.buses = set() self.background = None @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "Adapter": """从适配器接口创建适配器。 Args: adapter_interface: 适配器接口。 Returns: Adapter: 创建的适配器。 """ info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['single_mode'] if info.get(key) is not None } ) adapter.session = cast(str, info.get('session')) return adapter def register_event_bus(self, *buses: AbstractEventBus): """注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses |= set(buses) def unregister_event_bus(self, *buses: AbstractEventBus): """解除注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses -= set(buses) @abc.abstractmethod async def login(self, qq: int): """登录到 mirai-api-http。""" @abc.abstractmethod async def logout(self, terminate: bool = True): """登出。""" @abc.abstractmethod async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。 Args: api: API 名称,需与 mirai-api-http 中的定义一致。 method: 调用方法。默认为 GET。 **params: 参数。 """ @abc.abstractmethod async def _background(self): """背景事件循环,用于接收事件。""" async def start(self): """运行背景事件循环。""" if not self.buses: raise RuntimeError('事件总线未指定!') if not self.session: raise RuntimeError('未登录!') self.background = asyncio.create_task(self._background()) async def shutdown(self): """停止背景事件循环。""" if self.background: await Tasks.cancel(self.background) async def emit(self, event: str, *args, **kwargs): """向事件总线发送一个事件。 Args: event: 事件名称。 *args: 事件参数。 **kwargs: 事件参数。 """ coros = [bus.emit(event, *args, **kwargs) for bus in self.buses] return sum(await asyncio.gather(*coros), [])
Ancestors
- ApiProvider
- AdapterInterface
- abc.ABC
Subclasses
Class variables
var background : Optional[_asyncio.Task]
-
背景事件循环任务。
var buses : Set[AbstractEventBus]
-
注册的事件总线集合。
var session : str
-
从 mirai-api-http 处获得的 session。
var single_mode : bool
-
是否开启 single_mode,开启后与 session 将无效。
var verify_key : Optional[str]
-
mirai-api-http 配置的认证 key,关闭认证时为 None。
Static methods
def via(adapter_interface: AdapterInterface) ‑> Adapter
-
Expand source code
@classmethod def via(cls, adapter_interface: AdapterInterface) -> "Adapter": """从适配器接口创建适配器。 Args: adapter_interface: 适配器接口。 Returns: Adapter: 创建的适配器。 """ info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['single_mode'] if info.get(key) is not None } ) adapter.session = cast(str, info.get('session')) return adapter
Methods
async def call_api(self, api: str, method: Method = Method.GET, **params)
-
调用 API。
Args
api
- API 名称,需与 mirai-api-http 中的定义一致。
method
- 调用方法。默认为 GET。
**params
- 参数。
Expand source code
@abc.abstractmethod async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。 Args: api: API 名称,需与 mirai-api-http 中的定义一致。 method: 调用方法。默认为 GET。 **params: 参数。 """
async def emit(self, event: str, *args, **kwargs)
-
向事件总线发送一个事件。
Args
event
- 事件名称。
*args
- 事件参数。
**kwargs
- 事件参数。
Expand source code
async def emit(self, event: str, *args, **kwargs): """向事件总线发送一个事件。 Args: event: 事件名称。 *args: 事件参数。 **kwargs: 事件参数。 """ coros = [bus.emit(event, *args, **kwargs) for bus in self.buses] return sum(await asyncio.gather(*coros), [])
async def login(self, qq: int)
-
登录到 mirai-api-http。
Expand source code
@abc.abstractmethod async def login(self, qq: int): """登录到 mirai-api-http。"""
async def logout(self, terminate: bool = True)
-
登出。
Expand source code
@abc.abstractmethod async def logout(self, terminate: bool = True): """登出。"""
def register_event_bus(self, *buses: AbstractEventBus)
-
注册事件总线。
Args
*buses
- 一个或多个事件总线。
Expand source code
def register_event_bus(self, *buses: AbstractEventBus): """注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses |= set(buses)
async def shutdown(self)
-
停止背景事件循环。
Expand source code
async def shutdown(self): """停止背景事件循环。""" if self.background: await Tasks.cancel(self.background)
async def start(self)
-
运行背景事件循环。
Expand source code
async def start(self): """运行背景事件循环。""" if not self.buses: raise RuntimeError('事件总线未指定!') if not self.session: raise RuntimeError('未登录!') self.background = asyncio.create_task(self._background())
def unregister_event_bus(self, *buses: AbstractEventBus)
-
解除注册事件总线。
Args
*buses
- 一个或多个事件总线。
Expand source code
def unregister_event_bus(self, *buses: AbstractEventBus): """解除注册事件总线。 Args: *buses: 一个或多个事件总线。 """ self.buses -= set(buses)
Inherited members
class ComposeAdapter (api_channel: Adapter, event_channel: Adapter)
-
组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。
Args
api_channel
- 提供 API 调用的适配器。
event_channel
- 提供事件处理的适配器。
Expand source code
class ComposeAdapter(Adapter): """组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。""" api_channel: Adapter """提供 API 调用的适配器。""" event_channel: Adapter """提供事件处理的适配器。""" def __init__(self, api_channel: Adapter, event_channel: Adapter): """ Args: api_channel: 提供 API 调用的适配器。 event_channel: 提供事件处理的适配器。 """ super().__init__( verify_key=api_channel.verify_key, single_mode=api_channel.single_mode ) if api_channel.verify_key != event_channel.verify_key: raise ValueError('组合适配器应使用相同的 verify_key。') self.api_channel = api_channel self.event_channel = event_channel event_channel.buses = self.buses self.verify_key = api_channel.verify_key self.single_mode = api_channel.single_mode @property def adapter_info(self): return self.api_channel.adapter_info async def login(self, qq: int): await self.api_channel.login(qq) # 绑定 session self.event_channel.session = self.api_channel.session self.session = self.api_channel.session await self.event_channel.login(qq) async def logout(self): await self.event_channel.logout() await self.api_channel.logout() async def call_api(self, api: str, method: Method = Method.GET, **params): return await self.api_channel.call_api(api, method, **params) async def _background(self): await self.event_channel._background()
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var api_channel : Adapter
-
提供 API 调用的适配器。
var event_channel : Adapter
-
提供事件处理的适配器。
Inherited members
class HTTPAdapter (verify_key: Optional[str], host: str, port: int, poll_interval: float = 1.0, single_mode: bool = False)
-
HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。
Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
host
- HTTP Server 的地址。
port
- HTTP Server 的端口。
poll_interval
- 轮询时间间隔,单位秒。
single_mode
- 是否为单例模式。
Expand source code
class HTTPAdapter(Adapter): """HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。""" host_name: str """mirai-api-http 的 HTTPAdapter Server 主机名。""" poll_interval: float """轮询时间间隔,单位秒。""" qq: int """机器人的 QQ 号。""" headers: httpx.Headers """HTTP 请求头。""" def __init__( self, verify_key: Optional[str], host: str, port: int, poll_interval: float = 1., single_mode: bool = False ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 host: HTTP Server 的地址。 port: HTTP Server 的端口。 poll_interval: 轮询时间间隔,单位秒。 single_mode: 是否为单例模式。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self._host = host self._port = port if host[:2] == '//': host = 'http:' + host elif host[:8] == 'https://': raise exceptions.NetworkError('不支持 HTTPS!') elif host[:7] != 'http://': host = 'http://' + host if host[-1:] == '/': host = host[:-1] self.host_name = f'{host}:{port}' self.poll_interval = poll_interval self.qq = 0 self.headers = httpx.Headers() # 使用 headers 传递 session self._tasks = Tasks() @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'host': self._host, 'port': self._port, 'poll_interval': self.poll_interval, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "HTTPAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['host', 'port', 'poll_interval', 'single_mode'] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter @_error_handler_async_local async def _post(self, client: httpx.AsyncClient, url: str, json: dict) -> Optional[dict]: """调用 POST 方法。""" # 使用自定义的 json.dumps content = json_dumps(json).encode('utf-8') try: response = await client.post( url, content=content, headers={'Content-Type': 'application/json'}, timeout=10. ) logger.debug( f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] POST 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def _get(self, client: httpx.AsyncClient, url: str, params: dict) -> Optional[dict]: """调用 GET 方法。""" try: response = await client.get(url, params=params, timeout=10.) logger.debug( f'[HTTP] 发送 GET 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] GET 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def _post_multipart( self, client: httpx.AsyncClient, url: str, data: dict, files: dict ) -> Optional[dict]: """调用 POST 方法,发送 multipart 数据。""" try: response = await client.post( url, data=data, files=files, timeout=30. ) logger.debug( f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。' ) except httpx.TimeoutException: logger.error(f'[HTTP] POST 请求超时,地址{url}。') return None return _parse_response(response) @_error_handler_async_local async def login(self, qq: int): async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: if not self.session: if self.verify_key is not None: self.session = ( await self._post( client, '/verify', { "verifyKey": self.verify_key, } ) )['session'] else: self.session = str(random.randint(1, 2**20)) if not self.single_mode: await self._post( client, '/bind', { "sessionKey": self.session, "qq": qq, } ) self.headers = httpx.Headers({'sessionKey': self.session}) self.qq = qq logger.info(f'[HTTP] 成功登录到账号{qq}。') @_error_handler_async_local async def logout(self, terminate: bool = True): if self.session and not self.single_mode: if terminate: async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: await self._post( client, '/release', { "sessionKey": self.session, "qq": self.qq, } ) logger.info(f"[HTTP] 从账号{self.qq}退出。") async def poll_event(self): """进行一次轮询,获取并处理事件。""" async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: msg_count = (await self._get(client, '/countMessage', {}))['data'] if msg_count > 0: msg_list = ( await self._get(client, '/fetchMessage', {'count': msg_count}) )['data'] coros = [self.emit(msg['type'], msg) for msg in msg_list] await asyncio.gather(*coros) async def call_api(self, api: str, method: Method = Method.GET, **params) -> Optional[dict]: async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: if method == Method.GET or method == Method.RESTGET: return await self._get(client, f'/{api}', params) if method == Method.POST or method == Method.RESTPOST: return await self._post(client, f'/{api}', params) if method == Method.MULTIPART: return await self._post_multipart( client, f'/{api}', params['data'], params['files'] ) return None async def _background(self): """开始轮询。""" logger.info('[HTTP] 机器人开始运行。按 Ctrl + C 停止。') try: while True: self._tasks.create_task(self.poll_event()) await asyncio.sleep(self.poll_interval) finally: await self._tasks.cancel_all()
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var headers : httpx.Headers
-
HTTP 请求头。
var host_name : str
-
mirai-api-http 的 HTTPAdapter Server 主机名。
var poll_interval : float
-
轮询时间间隔,单位秒。
var qq : int
-
机器人的 QQ 号。
Methods
async def poll_event(self)
-
进行一次轮询,获取并处理事件。
Expand source code
async def poll_event(self): """进行一次轮询,获取并处理事件。""" async with httpx.AsyncClient( base_url=self.host_name, headers=self.headers ) as client: msg_count = (await self._get(client, '/countMessage', {}))['data'] if msg_count > 0: msg_list = ( await self._get(client, '/fetchMessage', {'count': msg_count}) )['data'] coros = [self.emit(msg['type'], msg) for msg in msg_list] await asyncio.gather(*coros)
Inherited members
class WebHookAdapter (verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False)
-
WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。
Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
route
- 适配器的路由,默认在根目录上提供服务。
extra_headers
- 额外请求头,与 mirai-api-http 的配置一致。
enable_quick_response
- 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
single_mode
- 是否启用单例模式。
Expand source code
class WebHookAdapter(Adapter): """WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。""" session: str """WebHook 不需要 session,此处为机器人的 QQ 号。""" route: str """适配器的路由。""" extra_headers: Mapping[str, str] """额外请求头。""" enable_quick_response: bool """是否启用快速响应。""" def __init__( self, verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 route: 适配器的路由,默认在根目录上提供服务。 extra_headers: 额外请求头,与 mirai-api-http 的配置一致。 enable_quick_response: 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。 single_mode: 是否启用单例模式。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self.route = route self.extra_headers = extra_headers or {} self.enable_quick_response = enable_quick_response async def endpoint(request: Request): # 鉴权(QQ 号和额外请求头) if request.headers.get('bot') != self.session: # 验证 QQ 号 logger.debug(f"收到来自其他账号({request.headers.get('bot')})的事件。") return for key in self.extra_headers: # 验证请求头 key = key.lower() # HTTP headers 不区分大小写 request_value = request.headers.get(key, '').lower() expect_value = self.extra_headers[key].lower() if (request_value != expect_value ) and (request_value != '[' + expect_value + ']'): logger.info( f"请求头验证失败:expect [{expect_value}], " + f"got {request_value}。" ) return JSONResponse( status_code=401, content={'error': 'Unauthorized'} ) # 处理事件 event = await request.json() result = await self.handle_event(event) return YiriMiraiJSONResponse(result) ASGI().add_route(self.route, endpoint, methods=['POST']) class QuickResponse(BaseException): """WebHook 快速响应,以异常的方式跳出。""" def __init__(self, data: dict): self.data = data @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'route': self.route, 'extra_headers': self.extra_headers, 'enable_quick_response': self.enable_quick_response, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "WebHookAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in [ 'route', 'extra_headers', 'enable_quick_response', 'single_mode' ] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter async def login(self, qq: int): """WebHook 不需要登录。直接返回。""" self.session = str(qq) logger.info(f'[WebHook] 成功登录到账号{qq}。') async def logout(self, terminate: bool = True): """WebHook 不需要登出。直接返回。""" logger.info(f"[WebHook] 从账号{self.session}退出。") async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。WebHook 的 API 调用只能在快速响应中发生。""" if self.enable_quick_response: content = {'command': api.replace('/', '_'), 'content': params} if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) logger.debug(f'[WebHook] WebHook 快速响应 {api}。') raise WebHookAdapter.QuickResponse(content) return None async def _background(self): """WebHook 不需要事件循环。直接返回。""" async def handle_event(self, event): try: tasks = await self.emit(event['type'], event) await asyncio.gather(*tasks) except WebHookAdapter.QuickResponse as response: # 快速响应,直接返回。 return response.data return {}
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var QuickResponse
-
WebHook 快速响应,以异常的方式跳出。
var enable_quick_response : bool
-
是否启用快速响应。
var extra_headers : Mapping[str, str]
-
额外请求头。
var route : str
-
适配器的路由。
Methods
async def call_api(self, api: str, method: Method = Method.GET, **params)
-
调用 API。WebHook 的 API 调用只能在快速响应中发生。
Expand source code
async def call_api(self, api: str, method: Method = Method.GET, **params): """调用 API。WebHook 的 API 调用只能在快速响应中发生。""" if self.enable_quick_response: content = {'command': api.replace('/', '_'), 'content': params} if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) logger.debug(f'[WebHook] WebHook 快速响应 {api}。') raise WebHookAdapter.QuickResponse(content) return None
async def handle_event(self, event)
-
Expand source code
async def handle_event(self, event): try: tasks = await self.emit(event['type'], event) await asyncio.gather(*tasks) except WebHookAdapter.QuickResponse as response: # 快速响应,直接返回。 return response.data return {}
async def login(self, qq: int)
-
WebHook 不需要登录。直接返回。
Expand source code
async def login(self, qq: int): """WebHook 不需要登录。直接返回。""" self.session = str(qq) logger.info(f'[WebHook] 成功登录到账号{qq}。')
async def logout(self, terminate: bool = True)
-
WebHook 不需要登出。直接返回。
Expand source code
async def logout(self, terminate: bool = True): """WebHook 不需要登出。直接返回。""" logger.info(f"[WebHook] 从账号{self.session}退出。")
Inherited members
class WebSocketAdapter (verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60.0)
-
WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。
Args
verify_key
- mirai-api-http 配置的认证 key,关闭认证时为 None。
host
- WebSocket Server 的地址。
port
- WebSocket Server 的端口。
sync_id
- mirai-api-http 配置的同步 ID。
single_mode
- 是否启用单例模式。
heartbeat_interval
- 每隔多久发送心跳包,单位秒。
Expand source code
class WebSocketAdapter(Adapter): """WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。""" host_name: str """WebSocket Server 的地址。""" sync_id: str """mirai-api-http 配置的同步 ID。""" qq: int """机器人的 QQ 号。""" connection: Optional[WebSocketClientProtocol] """WebSocket 客户端连接。""" heartbeat_interval: float """每隔多久发送心跳包,单位:秒。""" def __init__( self, verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60., ): """ Args: verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。 host: WebSocket Server 的地址。 port: WebSocket Server 的端口。 sync_id: mirai-api-http 配置的同步 ID。 single_mode: 是否启用单例模式。 heartbeat_interval: 每隔多久发送心跳包,单位秒。 """ super().__init__(verify_key=verify_key, single_mode=single_mode) self._host = host self._port = port if host[:2] == '//': host = 'ws:' + host elif host[:7] == 'http://' or host[:8] == 'https://': raise exceptions.NetworkError(f'{host} 不是一个可用的 WebSocket 地址!') elif host[:5] != 'ws://': host = 'ws://' + host if host[-1:] == '/': host = host[:-1] self.host_name = f'{host}:{port}/all' self.sync_id = sync_id # 这个神奇的 sync_id,默认值 -1,居然是个字符串 # 既然这样不如把 sync_id 全改成字符串好了 self.qq = 0 self.connection = None self.heartbeat_interval = heartbeat_interval # 接收 WebSocket 数据的 Task self._receiver_task: Optional[asyncio.Task] = None # 用于临时保存接收到的数据,以便根据 sync_id 进行同步识别 self._recv_dict: Dict[str, deque] = defaultdict(deque) # 本地同步 ID,每次调用 API 递增。 self._local_sync_id = random.randint(1, 1024) * 1024 # 事件处理任务管理器 self._tasks = Tasks() # 心跳机制(Keep-Alive):上次发送数据包的时间 self._last_send_time: float = 0. @property def adapter_info(self): return { 'verify_key': self.verify_key, 'session': self.session, 'single_mode': self.single_mode, 'host': self._host, 'port': self._port, 'sync_id': self.sync_id, } @classmethod def via(cls, adapter_interface: AdapterInterface) -> "WebSocketAdapter": info = adapter_interface.adapter_info adapter = cls( verify_key=info['verify_key'], **{ key: info[key] for key in ['host', 'port', 'sync_id', 'single_mode'] if key in info } ) adapter.session = cast(str, info.get('session')) return adapter @_error_handler_async_local async def _receiver(self): """开始接收 websocket 数据。""" if not self.connection: raise exceptions.NetworkError( f'WebSocket 通道 {self.host_name} 未连接!' ) while True: try: # 数据格式: # { # 'syncId': '-1', # 'data': { # // Event Content # } # } response = json.loads(await self.connection.recv()) data = response['data'] logger.debug( f"[WebSocket] 收到 WebSocket 数据,同步 ID:{response['syncId']}。" ) self._recv_dict[response['syncId']].append(data) except KeyError: logger.error(f'[WebSocket] 不正确的数据:{response}') except ConnectionClosedOK: raise SystemExit() except ConnectionClosed as e: exit_message = f'[WebSocket] WebSocket 通道意外关闭。code: {e.code}, reason: {e.reason}' logger.error(exit_message) raise SystemExit(exit_message) async def _recv(self, sync_id: str = '-1', timeout: int = 600) -> dict: """接收并解析 websocket 数据。""" timer = range(timeout) if timeout > 0 else repeat(0) for _ in timer: if self._recv_dict[sync_id]: data = self._recv_dict[sync_id].popleft() if data.get('code', 0) != 0: raise exceptions.ApiError(data) return data # 如果没有对应同步 ID 的数据,则等待 websocket 数据 # 目前存在问题:如果 mah 发回的数据不含 sync_id, # 这里就会无限循环…… # 所以还是限制次数好了。 await asyncio.sleep(0.1) raise TimeoutError( f'[WebSocket] mirai-api-http 响应超时,可能是由于调用出错。同步 ID:{sync_id}。' ) @_error_handler_async_local async def login(self, qq: int): headers = { 'verifyKey': self.verify_key or '', # 关闭认证时,WebSocket 可传入任意 verify_key 'qq': str(qq), } if self.session: headers['sessionKey'] = self.session self.connection = await connect(self.host_name, extra_headers=headers) self._receiver_task = asyncio.create_task(self._receiver()) verify_response = await self._recv('') # 神奇现象:这里的 syncId 是个空字符串 self.session = verify_response['session'] self.qq = qq logger.info(f'[WebSocket] 成功登录到账号{qq}。') @_error_handler_async_local async def logout(self, terminate: bool = True): if self.connection: await self.connection.close() await self._receiver_task logger.info(f"[WebSocket] 从账号{self.qq}退出。") async def poll_event(self): """获取并处理事件。""" event = await self._recv(self.sync_id, -1) self._tasks.create_task(self.emit(event['type'], event)) async def call_api(self, api: str, method: Method = Method.GET, **params): if not self.connection: raise exceptions.NetworkError( f'WebSocket 通道 {self.host_name} 未连接!' ) self._local_sync_id += 1 # 使用不同的 sync_id sync_id = str(self._local_sync_id) content = { 'syncId': sync_id, 'command': api.replace('/', '_'), 'content': params } if method == Method.RESTGET: content['subCommand'] = 'get' elif method == Method.RESTPOST: content['subCommand'] = 'update' elif method == Method.MULTIPART: raise NotImplementedError( "WebSocket 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。" ) await self.connection.send(json_dumps(content)) self._last_send_time = time.time() logger.debug(f"[WebSocket] 发送 WebSocket 数据,同步 ID:{sync_id}。") try: return await self._recv(sync_id) except TimeoutError as e: logger.debug(e) async def _heartbeat(self): while True: await asyncio.sleep(self.heartbeat_interval) if time.time() - self._last_send_time > self.heartbeat_interval: await self.call_api('about') self._last_send_time = time.time() logger.debug("[WebSocket] 发送心跳包。") async def _background(self): """开始接收事件。""" logger.info('[WebSocket] 机器人开始运行。按 Ctrl + C 停止。') try: heartbeat = asyncio.create_task(self._heartbeat()) while True: await self.poll_event() finally: await Tasks.cancel(heartbeat) await self._tasks.cancel_all()
Ancestors
- Adapter
- ApiProvider
- AdapterInterface
- abc.ABC
Class variables
var connection : Optional[websockets.legacy.client.WebSocketClientProtocol]
-
WebSocket 客户端连接。
var heartbeat_interval : float
-
每隔多久发送心跳包,单位:秒。
var host_name : str
-
WebSocket Server 的地址。
var qq : int
-
机器人的 QQ 号。
var sync_id : str
-
mirai-api-http 配置的同步 ID。
Methods
async def poll_event(self)
-
获取并处理事件。
Expand source code
async def poll_event(self): """获取并处理事件。""" event = await self._recv(self.sync_id, -1) self._tasks.create_task(self.emit(event['type'], event))
Inherited members