Module mirai.adapters

此模块提供网络适配器相关。

网络适配器负责与 mirai-api-http 沟通,详见各子模块。

Expand source code
# -*- coding: utf-8 -*-
"""
此模块提供网络适配器相关。

网络适配器负责与 mirai-api-http 沟通,详见各子模块。
"""
from typing import TYPE_CHECKING

from .base import Adapter

if TYPE_CHECKING:
    from .compose import ComposeAdapter
    from .http import HTTPAdapter
    from .webhook import WebHookAdapter
    from .websocket import WebSocketAdapter


def __getattr__(name):
    import importlib
    MODULES = {
        'Adapter': '.base',
        'ComposeAdapter': '.compose',
        'HTTPAdapter': '.http',
        'WebHookAdapter': '.webhook',
        'WebSocketAdapter': '.websocket',
    }
    if name in MODULES:
        module_name = MODULES[name]
        module = importlib.import_module(module_name, __name__)
        return getattr(module, name)
    raise AttributeError(f'Module {__name__} has no attribute {name}')


__all__ = [
    'Adapter',
    'HTTPAdapter',
    'WebSocketAdapter',
    'WebHookAdapter',
    'ComposeAdapter',
]

Sub-modules

mirai.adapters.base

此模块提供网络适配器的一系列基础定义。

mirai.adapters.compose

此模块提供组合适配器,可以将两个适配器组合使用。

mirai.adapters.http

此模块提供 HTTP 轮询适配器,适用于 mirai-api-http 的 http adapter。

mirai.adapters.reverse_websocket

此模块提供反向 WebSocket 适配器,适用于 mirai-api-http 的 reverse websocket adapter。

mirai.adapters.webhook

此模块提供 HTTP 回调适配器,适用于 mirai-api-http 的 webhook adapter。

mirai.adapters.websocket

此模块提供正向 WebSocket 适配器,适用于 mirai-api-http 的 websocket adapter。

Classes

class Adapter (verify_key: Optional[str], single_mode: bool = False)

适配器基类,与 mirai-api-http 沟通的底层实现。

属性 buses 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
single_mode
是否开启 single_mode,开启后与 session 将无效。
Expand source code
class Adapter(ApiProvider, AdapterInterface):
    """适配器基类,与 mirai-api-http 沟通的底层实现。

    属性 `buses` 为适配器注册的事件总线集合。适配器被绑定到 bot 时,bot 会自动将自身的事件总线注册到适配器。
    """
    verify_key: Optional[str]
    """mirai-api-http 配置的认证 key,关闭认证时为 None。"""
    single_mode: bool
    """是否开启 single_mode,开启后与 session 将无效。"""
    session: str
    """从 mirai-api-http 处获得的 session。"""
    buses: Set[AbstractEventBus]
    """注册的事件总线集合。"""
    background: Optional[asyncio.Task]
    """背景事件循环任务。"""
    def __init__(self, verify_key: Optional[str], single_mode: bool = False):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            single_mode: 是否开启 single_mode,开启后与 session 将无效。
        """
        self.verify_key = verify_key
        self.single_mode = single_mode
        self.session = ''
        self.buses = set()
        self.background = None

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "Adapter":
        """从适配器接口创建适配器。

        Args:
            adapter_interface: 适配器接口。

        Returns:
            Adapter: 创建的适配器。
        """
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in ['single_mode'] if info.get(key) is not None
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    def register_event_bus(self, *buses: AbstractEventBus):
        """注册事件总线。

        Args:
            *buses: 一个或多个事件总线。
        """
        self.buses |= set(buses)

    def unregister_event_bus(self, *buses: AbstractEventBus):
        """解除注册事件总线。

        Args:
            *buses: 一个或多个事件总线。
        """
        self.buses -= set(buses)

    @abc.abstractmethod
    async def login(self, qq: int):
        """登录到 mirai-api-http。"""

    @abc.abstractmethod
    async def logout(self, terminate: bool = True):
        """登出。"""

    @abc.abstractmethod
    async def call_api(self, api: str, method: Method = Method.GET, **params):
        """调用 API。

        Args:
            api: API 名称,需与 mirai-api-http 中的定义一致。
            method: 调用方法。默认为 GET。
            **params: 参数。
        """

    @abc.abstractmethod
    async def _background(self):
        """背景事件循环,用于接收事件。"""

    async def start(self):
        """运行背景事件循环。"""
        if not self.buses:
            raise RuntimeError('事件总线未指定!')
        if not self.session:
            raise RuntimeError('未登录!')

        self.background = asyncio.create_task(self._background())

    async def shutdown(self):
        """停止背景事件循环。"""
        if self.background:
            await Tasks.cancel(self.background)

    async def emit(self, event: str, *args, **kwargs):
        """向事件总线发送一个事件。

        Args:
            event: 事件名称。
            *args: 事件参数。
            **kwargs: 事件参数。
        """
        coros = [bus.emit(event, *args, **kwargs) for bus in self.buses]
        return sum(await asyncio.gather(*coros), [])

Ancestors

Subclasses

Class variables

var background : Optional[_asyncio.Task]

背景事件循环任务。

var buses : Set[AbstractEventBus]

注册的事件总线集合。

var session : str

从 mirai-api-http 处获得的 session。

var single_mode : bool

是否开启 single_mode,开启后与 session 将无效。

var verify_key : Optional[str]

mirai-api-http 配置的认证 key,关闭认证时为 None。

Static methods

def via(adapter_interface: AdapterInterface) ‑> Adapter

从适配器接口创建适配器。

Args

adapter_interface
适配器接口。

Returns

Adapter
创建的适配器。
Expand source code
@classmethod
def via(cls, adapter_interface: AdapterInterface) -> "Adapter":
    """从适配器接口创建适配器。

    Args:
        adapter_interface: 适配器接口。

    Returns:
        Adapter: 创建的适配器。
    """
    info = adapter_interface.adapter_info
    adapter = cls(
        verify_key=info['verify_key'],
        **{
            key: info[key]
            for key in ['single_mode'] if info.get(key) is not None
        }
    )
    adapter.session = cast(str, info.get('session'))
    return adapter

Methods

async def call_api(self, api: str, method: Method = Method.GET, **params)

调用 API。

Args

api
API 名称,需与 mirai-api-http 中的定义一致。
method
调用方法。默认为 GET。
**params
参数。
Expand source code
@abc.abstractmethod
async def call_api(self, api: str, method: Method = Method.GET, **params):
    """调用 API。

    Args:
        api: API 名称,需与 mirai-api-http 中的定义一致。
        method: 调用方法。默认为 GET。
        **params: 参数。
    """
async def emit(self, event: str, *args, **kwargs)

向事件总线发送一个事件。

Args

event
事件名称。
*args
事件参数。
**kwargs
事件参数。
Expand source code
async def emit(self, event: str, *args, **kwargs):
    """向事件总线发送一个事件。

    Args:
        event: 事件名称。
        *args: 事件参数。
        **kwargs: 事件参数。
    """
    coros = [bus.emit(event, *args, **kwargs) for bus in self.buses]
    return sum(await asyncio.gather(*coros), [])
async def login(self, qq: int)

登录到 mirai-api-http。

Expand source code
@abc.abstractmethod
async def login(self, qq: int):
    """登录到 mirai-api-http。"""
async def logout(self, terminate: bool = True)

登出。

Expand source code
@abc.abstractmethod
async def logout(self, terminate: bool = True):
    """登出。"""
def register_event_bus(self, *buses: AbstractEventBus)

注册事件总线。

Args

*buses
一个或多个事件总线。
Expand source code
def register_event_bus(self, *buses: AbstractEventBus):
    """注册事件总线。

    Args:
        *buses: 一个或多个事件总线。
    """
    self.buses |= set(buses)
async def shutdown(self)

停止背景事件循环。

Expand source code
async def shutdown(self):
    """停止背景事件循环。"""
    if self.background:
        await Tasks.cancel(self.background)
async def start(self)

运行背景事件循环。

Expand source code
async def start(self):
    """运行背景事件循环。"""
    if not self.buses:
        raise RuntimeError('事件总线未指定!')
    if not self.session:
        raise RuntimeError('未登录!')

    self.background = asyncio.create_task(self._background())
def unregister_event_bus(self, *buses: AbstractEventBus)

解除注册事件总线。

Args

*buses
一个或多个事件总线。
Expand source code
def unregister_event_bus(self, *buses: AbstractEventBus):
    """解除注册事件总线。

    Args:
        *buses: 一个或多个事件总线。
    """
    self.buses -= set(buses)

Inherited members

class ComposeAdapter (api_channel: Adapter, event_channel: Adapter)

组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。

Args

api_channel
提供 API 调用的适配器。
event_channel
提供事件处理的适配器。
Expand source code
class ComposeAdapter(Adapter):
    """组合适配器。使用一个适配器提供 API 调用,另一个适配器提供事件处理。"""
    api_channel: Adapter
    """提供 API 调用的适配器。"""
    event_channel: Adapter
    """提供事件处理的适配器。"""
    def __init__(self, api_channel: Adapter, event_channel: Adapter):
        """
        Args:
            api_channel: 提供 API 调用的适配器。
            event_channel: 提供事件处理的适配器。
        """
        super().__init__(
            verify_key=api_channel.verify_key,
            single_mode=api_channel.single_mode
        )
        if api_channel.verify_key != event_channel.verify_key:
            raise ValueError('组合适配器应使用相同的 verify_key。')

        self.api_channel = api_channel
        self.event_channel = event_channel

        event_channel.buses = self.buses

        self.verify_key = api_channel.verify_key
        self.single_mode = api_channel.single_mode

    @property
    def adapter_info(self):
        return self.api_channel.adapter_info

    async def login(self, qq: int):
        await self.api_channel.login(qq)
        # 绑定 session
        self.event_channel.session = self.api_channel.session
        self.session = self.api_channel.session
        await self.event_channel.login(qq)

    async def logout(self):
        await self.event_channel.logout()
        await self.api_channel.logout()

    async def call_api(self, api: str, method: Method = Method.GET, **params):
        return await self.api_channel.call_api(api, method, **params)

    async def _background(self):
        await self.event_channel._background()

Ancestors

Class variables

var api_channelAdapter

提供 API 调用的适配器。

var event_channelAdapter

提供事件处理的适配器。

Inherited members

class HTTPAdapter (verify_key: Optional[str], host: str, port: int, poll_interval: float = 1.0, single_mode: bool = False)

HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
host
HTTP Server 的地址。
port
HTTP Server 的端口。
poll_interval
轮询时间间隔,单位秒。
single_mode
是否为单例模式。
Expand source code
class HTTPAdapter(Adapter):
    """HTTP 轮询适配器。使用 HTTP 轮询的方式与 mirai-api-http 沟通。"""
    host_name: str
    """mirai-api-http 的 HTTPAdapter Server 主机名。"""
    poll_interval: float
    """轮询时间间隔,单位秒。"""
    qq: int
    """机器人的 QQ 号。"""
    headers: httpx.Headers
    """HTTP 请求头。"""
    def __init__(
        self,
        verify_key: Optional[str],
        host: str,
        port: int,
        poll_interval: float = 1.,
        single_mode: bool = False
    ):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            host: HTTP Server 的地址。
            port: HTTP Server 的端口。
            poll_interval: 轮询时间间隔,单位秒。
            single_mode: 是否为单例模式。
        """
        super().__init__(verify_key=verify_key, single_mode=single_mode)

        self._host = host
        self._port = port

        if host[:2] == '//':
            host = 'http:' + host
        elif host[:8] == 'https://':
            raise exceptions.NetworkError('不支持 HTTPS!')
        elif host[:7] != 'http://':
            host = 'http://' + host

        if host[-1:] == '/':
            host = host[:-1]

        self.host_name = f'{host}:{port}'

        self.poll_interval = poll_interval

        self.qq = 0
        self.headers = httpx.Headers()  # 使用 headers 传递 session
        self._tasks = Tasks()

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
            'host': self._host,
            'port': self._port,
            'poll_interval': self.poll_interval,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "HTTPAdapter":
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in ['host', 'port', 'poll_interval', 'single_mode']
                if key in info
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    @_error_handler_async_local
    async def _post(self, client: httpx.AsyncClient, url: str,
                    json: dict) -> Optional[dict]:
        """调用 POST 方法。"""
        # 使用自定义的 json.dumps
        content = json_dumps(json).encode('utf-8')
        try:
            response = await client.post(
                url,
                content=content,
                headers={'Content-Type': 'application/json'},
                timeout=10.
            )
            logger.debug(
                f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。'
            )
        except httpx.TimeoutException:
            logger.error(f'[HTTP] POST 请求超时,地址{url}。')
            return None
        return _parse_response(response)

    @_error_handler_async_local
    async def _get(self, client: httpx.AsyncClient, url: str,
                   params: dict) -> Optional[dict]:
        """调用 GET 方法。"""
        try:
            response = await client.get(url, params=params, timeout=10.)
            logger.debug(
                f'[HTTP] 发送 GET 请求,地址{url},状态 {response.status_code}。'
            )
        except httpx.TimeoutException:
            logger.error(f'[HTTP] GET 请求超时,地址{url}。')
            return None
        return _parse_response(response)

    @_error_handler_async_local
    async def _post_multipart(
        self, client: httpx.AsyncClient, url: str, data: dict, files: dict
    ) -> Optional[dict]:
        """调用 POST 方法,发送 multipart 数据。"""
        try:
            response = await client.post(
                url, data=data, files=files, timeout=30.
            )
            logger.debug(
                f'[HTTP] 发送 POST 请求,地址{url},状态 {response.status_code}。'
            )
        except httpx.TimeoutException:
            logger.error(f'[HTTP] POST 请求超时,地址{url}。')
            return None
        return _parse_response(response)

    @_error_handler_async_local
    async def login(self, qq: int):
        async with httpx.AsyncClient(
            base_url=self.host_name, headers=self.headers
        ) as client:
            if not self.session:
                if self.verify_key is not None:
                    self.session = (
                        await self._post(
                            client, '/verify', {
                                "verifyKey": self.verify_key,
                            }
                        )
                    )['session']
                else:
                    self.session = str(random.randint(1, 2**20))

            if not self.single_mode:
                await self._post(
                    client, '/bind', {
                        "sessionKey": self.session,
                        "qq": qq,
                    }
                )

            self.headers = httpx.Headers({'sessionKey': self.session})
            self.qq = qq
            logger.info(f'[HTTP] 成功登录到账号{qq}。')

    @_error_handler_async_local
    async def logout(self, terminate: bool = True):
        if self.session and not self.single_mode:
            if terminate:
                async with httpx.AsyncClient(
                    base_url=self.host_name, headers=self.headers
                ) as client:
                    await self._post(
                        client, '/release', {
                            "sessionKey": self.session,
                            "qq": self.qq,
                        }
                    )
        logger.info(f"[HTTP] 从账号{self.qq}退出。")

    async def poll_event(self):
        """进行一次轮询,获取并处理事件。"""
        async with httpx.AsyncClient(
            base_url=self.host_name, headers=self.headers
        ) as client:
            msg_count = (await self._get(client, '/countMessage', {}))['data']
            if msg_count > 0:
                msg_list = (
                    await
                    self._get(client, '/fetchMessage', {'count': msg_count})
                )['data']

                coros = [self.emit(msg['type'], msg) for msg in msg_list]
                await asyncio.gather(*coros)

    async def call_api(self,
                       api: str,
                       method: Method = Method.GET,
                       **params) -> Optional[dict]:
        async with httpx.AsyncClient(
            base_url=self.host_name, headers=self.headers
        ) as client:
            if method == Method.GET or method == Method.RESTGET:
                return await self._get(client, f'/{api}', params)
            if method == Method.POST or method == Method.RESTPOST:
                return await self._post(client, f'/{api}', params)
            if method == Method.MULTIPART:
                return await self._post_multipart(
                    client, f'/{api}', params['data'], params['files']
                )
            return None

    async def _background(self):
        """开始轮询。"""
        logger.info('[HTTP] 机器人开始运行。按 Ctrl + C 停止。')

        try:
            while True:
                self._tasks.create_task(self.poll_event())
                await asyncio.sleep(self.poll_interval)
        finally:
            await self._tasks.cancel_all()

Ancestors

Class variables

var headers : httpx.Headers

HTTP 请求头。

var host_name : str

mirai-api-http 的 HTTPAdapter Server 主机名。

var poll_interval : float

轮询时间间隔,单位秒。

var qq : int

机器人的 QQ 号。

Methods

async def poll_event(self)

进行一次轮询,获取并处理事件。

Expand source code
async def poll_event(self):
    """进行一次轮询,获取并处理事件。"""
    async with httpx.AsyncClient(
        base_url=self.host_name, headers=self.headers
    ) as client:
        msg_count = (await self._get(client, '/countMessage', {}))['data']
        if msg_count > 0:
            msg_list = (
                await
                self._get(client, '/fetchMessage', {'count': msg_count})
            )['data']

            coros = [self.emit(msg['type'], msg) for msg in msg_list]
            await asyncio.gather(*coros)

Inherited members

class WebHookAdapter (verify_key: Optional[str], route: str = '/', extra_headers: Optional[Mapping[str, str]] = None, enable_quick_response: bool = True, single_mode: bool = False)

WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
route
适配器的路由,默认在根目录上提供服务。
extra_headers
额外请求头,与 mirai-api-http 的配置一致。
enable_quick_response
是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
single_mode
是否启用单例模式。
Expand source code
class WebHookAdapter(Adapter):
    """WebHook 适配器。作为 HTTP 服务器与 mirai-api-http 沟通。"""
    session: str
    """WebHook 不需要 session,此处为机器人的 QQ 号。"""
    route: str
    """适配器的路由。"""
    extra_headers: Mapping[str, str]
    """额外请求头。"""
    enable_quick_response: bool
    """是否启用快速响应。"""
    def __init__(
        self,
        verify_key: Optional[str],
        route: str = '/',
        extra_headers: Optional[Mapping[str, str]] = None,
        enable_quick_response: bool = True,
        single_mode: bool = False
    ):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            route: 适配器的路由,默认在根目录上提供服务。
            extra_headers: 额外请求头,与 mirai-api-http 的配置一致。
            enable_quick_response: 是否启用快速响应,当与其他适配器混合使用时,禁用可以提高响应速度。
            single_mode: 是否启用单例模式。
        """
        super().__init__(verify_key=verify_key, single_mode=single_mode)
        self.route = route
        self.extra_headers = extra_headers or {}
        self.enable_quick_response = enable_quick_response

        async def endpoint(request: Request):
            # 鉴权(QQ 号和额外请求头)
            if request.headers.get('bot') != self.session:  # 验证 QQ 号
                logger.debug(f"收到来自其他账号({request.headers.get('bot')})的事件。")
                return
            for key in self.extra_headers:  # 验证请求头
                key = key.lower()  # HTTP headers 不区分大小写
                request_value = request.headers.get(key, '').lower()
                expect_value = self.extra_headers[key].lower()
                if (request_value != expect_value
                    ) and (request_value != '[' + expect_value + ']'):
                    logger.info(
                        f"请求头验证失败:expect [{expect_value}], " +
                        f"got {request_value}。"
                    )
                    return JSONResponse(
                        status_code=401, content={'error': 'Unauthorized'}
                    )
            # 处理事件
            event = await request.json()
            result = await self.handle_event(event)
            return YiriMiraiJSONResponse(result)

        ASGI().add_route(self.route, endpoint, methods=['POST'])

    class QuickResponse(BaseException):
        """WebHook 快速响应,以异常的方式跳出。"""
        def __init__(self, data: dict):
            self.data = data

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
            'route': self.route,
            'extra_headers': self.extra_headers,
            'enable_quick_response': self.enable_quick_response,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "WebHookAdapter":
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in [
                    'route', 'extra_headers', 'enable_quick_response',
                    'single_mode'
                ] if key in info
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    async def login(self, qq: int):
        """WebHook 不需要登录。直接返回。"""
        self.session = str(qq)
        logger.info(f'[WebHook] 成功登录到账号{qq}。')

    async def logout(self, terminate: bool = True):
        """WebHook 不需要登出。直接返回。"""
        logger.info(f"[WebHook] 从账号{self.session}退出。")

    async def call_api(self, api: str, method: Method = Method.GET, **params):
        """调用 API。WebHook 的 API 调用只能在快速响应中发生。"""
        if self.enable_quick_response:
            content = {'command': api.replace('/', '_'), 'content': params}
            if method == Method.RESTGET:
                content['subCommand'] = 'get'
            elif method == Method.RESTPOST:
                content['subCommand'] = 'update'
            elif method == Method.MULTIPART:
                raise NotImplementedError(
                    "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。"
                )

            logger.debug(f'[WebHook] WebHook 快速响应 {api}。')
            raise WebHookAdapter.QuickResponse(content)
        return None

    async def _background(self):
        """WebHook 不需要事件循环。直接返回。"""

    async def handle_event(self, event):
        try:
            tasks = await self.emit(event['type'], event)
            await asyncio.gather(*tasks)
        except WebHookAdapter.QuickResponse as response:
            # 快速响应,直接返回。
            return response.data

        return {}

Ancestors

Class variables

var QuickResponse

WebHook 快速响应,以异常的方式跳出。

var enable_quick_response : bool

是否启用快速响应。

var extra_headers : Mapping[str, str]

额外请求头。

var route : str

适配器的路由。

Methods

async def call_api(self, api: str, method: Method = Method.GET, **params)

调用 API。WebHook 的 API 调用只能在快速响应中发生。

Expand source code
async def call_api(self, api: str, method: Method = Method.GET, **params):
    """调用 API。WebHook 的 API 调用只能在快速响应中发生。"""
    if self.enable_quick_response:
        content = {'command': api.replace('/', '_'), 'content': params}
        if method == Method.RESTGET:
            content['subCommand'] = 'get'
        elif method == Method.RESTPOST:
            content['subCommand'] = 'update'
        elif method == Method.MULTIPART:
            raise NotImplementedError(
                "WebHook 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。"
            )

        logger.debug(f'[WebHook] WebHook 快速响应 {api}。')
        raise WebHookAdapter.QuickResponse(content)
    return None
async def handle_event(self, event)
Expand source code
async def handle_event(self, event):
    try:
        tasks = await self.emit(event['type'], event)
        await asyncio.gather(*tasks)
    except WebHookAdapter.QuickResponse as response:
        # 快速响应,直接返回。
        return response.data

    return {}
async def login(self, qq: int)

WebHook 不需要登录。直接返回。

Expand source code
async def login(self, qq: int):
    """WebHook 不需要登录。直接返回。"""
    self.session = str(qq)
    logger.info(f'[WebHook] 成功登录到账号{qq}。')
async def logout(self, terminate: bool = True)

WebHook 不需要登出。直接返回。

Expand source code
async def logout(self, terminate: bool = True):
    """WebHook 不需要登出。直接返回。"""
    logger.info(f"[WebHook] 从账号{self.session}退出。")

Inherited members

class WebSocketAdapter (verify_key: Optional[str], host: str, port: int, sync_id: str = '-1', single_mode: bool = False, heartbeat_interval: float = 60.0)

WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。

Args

verify_key
mirai-api-http 配置的认证 key,关闭认证时为 None。
host
WebSocket Server 的地址。
port
WebSocket Server 的端口。
sync_id
mirai-api-http 配置的同步 ID。
single_mode
是否启用单例模式。
heartbeat_interval
每隔多久发送心跳包,单位秒。
Expand source code
class WebSocketAdapter(Adapter):
    """WebSocket 适配器。作为 WebSocket 客户端与 mirai-api-http 沟通。"""
    host_name: str
    """WebSocket Server 的地址。"""
    sync_id: str
    """mirai-api-http 配置的同步 ID。"""
    qq: int
    """机器人的 QQ 号。"""
    connection: Optional[WebSocketClientProtocol]
    """WebSocket 客户端连接。"""
    heartbeat_interval: float
    """每隔多久发送心跳包,单位:秒。"""
    def __init__(
        self,
        verify_key: Optional[str],
        host: str,
        port: int,
        sync_id: str = '-1',
        single_mode: bool = False,
        heartbeat_interval: float = 60.,
    ):
        """
        Args:
            verify_key: mirai-api-http 配置的认证 key,关闭认证时为 None。
            host: WebSocket Server 的地址。
            port: WebSocket Server 的端口。
            sync_id: mirai-api-http 配置的同步 ID。
            single_mode: 是否启用单例模式。
            heartbeat_interval: 每隔多久发送心跳包,单位秒。
        """
        super().__init__(verify_key=verify_key, single_mode=single_mode)

        self._host = host
        self._port = port

        if host[:2] == '//':
            host = 'ws:' + host
        elif host[:7] == 'http://' or host[:8] == 'https://':
            raise exceptions.NetworkError(f'{host} 不是一个可用的 WebSocket 地址!')
        elif host[:5] != 'ws://':
            host = 'ws://' + host

        if host[-1:] == '/':
            host = host[:-1]

        self.host_name = f'{host}:{port}/all'

        self.sync_id = sync_id  # 这个神奇的 sync_id,默认值 -1,居然是个字符串
        # 既然这样不如把 sync_id 全改成字符串好了

        self.qq = 0
        self.connection = None

        self.heartbeat_interval = heartbeat_interval

        # 接收 WebSocket 数据的 Task
        self._receiver_task: Optional[asyncio.Task] = None
        # 用于临时保存接收到的数据,以便根据 sync_id 进行同步识别
        self._recv_dict: Dict[str, deque] = defaultdict(deque)
        # 本地同步 ID,每次调用 API 递增。
        self._local_sync_id = random.randint(1, 1024) * 1024
        # 事件处理任务管理器
        self._tasks = Tasks()
        # 心跳机制(Keep-Alive):上次发送数据包的时间
        self._last_send_time: float = 0.

    @property
    def adapter_info(self):
        return {
            'verify_key': self.verify_key,
            'session': self.session,
            'single_mode': self.single_mode,
            'host': self._host,
            'port': self._port,
            'sync_id': self.sync_id,
        }

    @classmethod
    def via(cls, adapter_interface: AdapterInterface) -> "WebSocketAdapter":
        info = adapter_interface.adapter_info
        adapter = cls(
            verify_key=info['verify_key'],
            **{
                key: info[key]
                for key in ['host', 'port', 'sync_id', 'single_mode']
                if key in info
            }
        )
        adapter.session = cast(str, info.get('session'))
        return adapter

    @_error_handler_async_local
    async def _receiver(self):
        """开始接收 websocket 数据。"""
        if not self.connection:
            raise exceptions.NetworkError(
                f'WebSocket 通道 {self.host_name} 未连接!'
            )
        while True:
            try:
                # 数据格式:
                # {
                #   'syncId': '-1',
                #   'data': {
                #       // Event Content
                #   }
                # }
                response = json.loads(await self.connection.recv())
                data = response['data']

                logger.debug(
                    f"[WebSocket] 收到 WebSocket 数据,同步 ID:{response['syncId']}。"
                )
                self._recv_dict[response['syncId']].append(data)
            except KeyError:
                logger.error(f'[WebSocket] 不正确的数据:{response}')
            except ConnectionClosedOK:
                raise SystemExit()
            except ConnectionClosed as e:
                exit_message = f'[WebSocket] WebSocket 通道意外关闭。code: {e.code}, reason: {e.reason}'
                logger.error(exit_message)
                raise SystemExit(exit_message)

    async def _recv(self, sync_id: str = '-1', timeout: int = 600) -> dict:
        """接收并解析 websocket 数据。"""
        timer = range(timeout) if timeout > 0 else repeat(0)
        for _ in timer:
            if self._recv_dict[sync_id]:
                data = self._recv_dict[sync_id].popleft()

                if data.get('code', 0) != 0:
                    raise exceptions.ApiError(data)

                return data
                # 如果没有对应同步 ID 的数据,则等待 websocket 数据
                # 目前存在问题:如果 mah 发回的数据不含 sync_id,
                # 这里就会无限循环……
                # 所以还是限制次数好了。
            await asyncio.sleep(0.1)
        raise TimeoutError(
            f'[WebSocket] mirai-api-http 响应超时,可能是由于调用出错。同步 ID:{sync_id}。'
        )

    @_error_handler_async_local
    async def login(self, qq: int):
        headers = {
            'verifyKey': self.verify_key or
                         '',  # 关闭认证时,WebSocket 可传入任意 verify_key
            'qq': str(qq),
        }
        if self.session:
            headers['sessionKey'] = self.session

        self.connection = await connect(self.host_name, extra_headers=headers)
        self._receiver_task = asyncio.create_task(self._receiver())

        verify_response = await self._recv('')  # 神奇现象:这里的 syncId 是个空字符串
        self.session = verify_response['session']

        self.qq = qq
        logger.info(f'[WebSocket] 成功登录到账号{qq}。')

    @_error_handler_async_local
    async def logout(self, terminate: bool = True):
        if self.connection:
            await self.connection.close()

            await self._receiver_task

            logger.info(f"[WebSocket] 从账号{self.qq}退出。")

    async def poll_event(self):
        """获取并处理事件。"""
        event = await self._recv(self.sync_id, -1)

        self._tasks.create_task(self.emit(event['type'], event))

    async def call_api(self, api: str, method: Method = Method.GET, **params):
        if not self.connection:
            raise exceptions.NetworkError(
                f'WebSocket 通道 {self.host_name} 未连接!'
            )
        self._local_sync_id += 1  # 使用不同的 sync_id
        sync_id = str(self._local_sync_id)
        content = {
            'syncId': sync_id,
            'command': api.replace('/', '_'),
            'content': params
        }
        if method == Method.RESTGET:
            content['subCommand'] = 'get'
        elif method == Method.RESTPOST:
            content['subCommand'] = 'update'
        elif method == Method.MULTIPART:
            raise NotImplementedError(
                "WebSocket 适配器不支持上传操作。请使用 bot.use_adapter 临时调用 HTTP 适配器。"
            )

        await self.connection.send(json_dumps(content))
        self._last_send_time = time.time()
        logger.debug(f"[WebSocket] 发送 WebSocket 数据,同步 ID:{sync_id}。")
        try:
            return await self._recv(sync_id)
        except TimeoutError as e:
            logger.debug(e)

    async def _heartbeat(self):
        while True:
            await asyncio.sleep(self.heartbeat_interval)
            if time.time() - self._last_send_time > self.heartbeat_interval:
                await self.call_api('about')
                self._last_send_time = time.time()
                logger.debug("[WebSocket] 发送心跳包。")

    async def _background(self):
        """开始接收事件。"""
        logger.info('[WebSocket] 机器人开始运行。按 Ctrl + C 停止。')

        try:
            heartbeat = asyncio.create_task(self._heartbeat())
            while True:
                await self.poll_event()
        finally:
            await Tasks.cancel(heartbeat)
            await self._tasks.cancel_all()

Ancestors

Class variables

var connection : Optional[websockets.legacy.client.WebSocketClientProtocol]

WebSocket 客户端连接。

var heartbeat_interval : float

每隔多久发送心跳包,单位:秒。

var host_name : str

WebSocket Server 的地址。

var qq : int

机器人的 QQ 号。

var sync_id : str

mirai-api-http 配置的同步 ID。

Methods

async def poll_event(self)

获取并处理事件。

Expand source code
async def poll_event(self):
    """获取并处理事件。"""
    event = await self._recv(self.sync_id, -1)

    self._tasks.create_task(self.emit(event['type'], event))

Inherited members