Module mirai.asgi
此模块提供公共 ASGI 前端。
Expand source code
# -*- coding: utf-8 -*-
"""此模块提供公共 ASGI 前端。"""
import asyncio
import functools
import logging
from inspect import iscoroutinefunction
from typing import (
TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple, Union,
cast
)
if TYPE_CHECKING:
from typing_extensions import Literal
else:
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import RedirectResponse
from mirai.utils import Singleton
logger = logging.getLogger(__name__)
async def default_endpoint(_: Request):
return
class ASGI(Singleton):
"""单例类。公共 ASGI 前端。
对 Starlette 功能的一个扩充,支持当某一个 endpoint 无返回时,调用其他同一路由的 endpoint。
"""
app: Starlette
"""内部的 Starlette 实例。"""
def __init__(self):
self.app = Starlette()
self._routes: Dict[Tuple[str, str], List[Callable]] = {}
self.add_route('/', default_endpoint)
async def _global_endpoint(self, key, request: Request):
for endpoint in self._routes[key]:
result = await endpoint(request)
if result:
return result
return RedirectResponse(
'https://yiri-mirai.vercel.app', status_code=301
)
def add_route(
self,
path: str,
endpoint: Callable,
methods: Optional[List[str]] = None
) -> 'ASGI':
"""添加路由。
Args:
path: 路由的路径。
endpoint: 路由的处理函数。
methods: 路由的请求方法。默认为 `['GET']`。
Returns:
ASGI: 返回自身。
"""
methods = methods or ['GET']
for method in methods: # 拆分不同的 method
key = (path, method)
if key in self._routes:
self._routes[key].append(endpoint)
else:
self._routes[key] = [endpoint]
self.app.add_route(
path,
functools.partial(self._global_endpoint, key),
methods=[method]
)
return self
def add_event_handler(
self,
event_type: Literal["startup", "shutdown"],
handler: Optional[Callable] = None
):
"""注册生命周期事件处理函数。
Args:
event_type(`Literal["startup", "shutdown"]`): 事件类型,可选值为 `"startup"` 或 `"shutdown"`。
handler(`Optional[Callable]`): 事件处理函数,省略此参数以作为装饰器调用。
"""
if handler:
self.app.add_event_handler(event_type, handler)
return self
else: # 装饰器用法
def decorator(func):
self.app.add_event_handler(event_type, func)
return func
return decorator
def add_background_task(
self, func: Union[Callable, Awaitable, None] = None
):
"""注册背景任务,将在 bot 启动后自动运行。
Args:
func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。
"""
if func is None:
def decorator(func_):
self.add_background_task(func_)
return func_
return decorator
if iscoroutinefunction(func): # 异步调用转化为传入协程
func = cast(Callable[..., Awaitable], func)()
if callable(func):
self.app.add_event_handler('startup', func)
else:
_task: Optional[asyncio.Task] = None
async def _startup():
nonlocal _task
_task = asyncio.create_task(func)
# 不阻塞
async def _shutdown():
if _task and not _task.done():
_task.cancel()
self.app.add_event_handler('startup', _startup)
self.app.add_event_handler('shutdown', _shutdown)
return func
def mount(self, path: str, app: Callable) -> 'ASGI':
"""挂载另一个 ASGI 服务器。通过这个方法,可以同时运行 FastAPI 之类的服务。
Args:
path: 要挂载的路径。
app: 要挂载的 ASGI 服务器。
Returns:
ASGI: 返回自身。
"""
if not path.startswith('/'):
path = '/' + path
self.app.mount(path, app)
logger.debug(f'向 {path} 挂载 {app}。')
return self
async def __call__(self, scope, recv, send):
await self.app(scope, recv, send)
# noinspection PyUnresolvedReferences
def asgi_serve(
app,
host: str = '127.0.0.1',
port: int = 8000,
asgi_server: str = 'auto',
**kwargs
):
"""运行一个 ASGI 服务器。
Args:
app: ASGI 应用程序。
host: 服务器地址,默认为 127.0.0.1。
port: 服务器端口,默认为 8000。
asgi_server: ASGI 服务器,可选的有 `hypercorn` `uvicorn` 和 `auto`。
如果设置为 `auto`,自动寻找是否已安装可用的 ASGI 服务(`unicorn` 或 `hypercorn`),并运行。
"""
if asgi_server == 'auto':
try:
from uvicorn import run
asgi = 'uvicorn'
except ImportError:
try:
import hypercorn.config as config
from hypercorn.asyncio import serve
asgi = 'hypercorn'
except ImportError:
asgi = 'none'
else:
asgi = asgi_server
if asgi_server == 'uvicorn':
from uvicorn import run
elif asgi_server == 'hypercorn':
import hypercorn.config as config
from hypercorn.asyncio import serve
if asgi == 'uvicorn':
run(app, host=host, port=port, debug=True, **kwargs)
return True
if asgi == 'hypercorn':
import asyncio
config = config.Config().from_mapping(bind=f'{host}:{port}', **kwargs)
asyncio.run(serve(app, config), debug=True)
return True
return False
Functions
def asgi_serve(app, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs)
-
运行一个 ASGI 服务器。
Args
app
- ASGI 应用程序。
host
- 服务器地址,默认为 127.0.0.1。
port
- 服务器端口,默认为 8000。
asgi_server
- ASGI 服务器,可选的有
hypercorn
uvicorn
和auto
。 如果设置为auto
,自动寻找是否已安装可用的 ASGI 服务(unicorn
或hypercorn
),并运行。
Expand source code
def asgi_serve( app, host: str = '127.0.0.1', port: int = 8000, asgi_server: str = 'auto', **kwargs ): """运行一个 ASGI 服务器。 Args: app: ASGI 应用程序。 host: 服务器地址,默认为 127.0.0.1。 port: 服务器端口,默认为 8000。 asgi_server: ASGI 服务器,可选的有 `hypercorn` `uvicorn` 和 `auto`。 如果设置为 `auto`,自动寻找是否已安装可用的 ASGI 服务(`unicorn` 或 `hypercorn`),并运行。 """ if asgi_server == 'auto': try: from uvicorn import run asgi = 'uvicorn' except ImportError: try: import hypercorn.config as config from hypercorn.asyncio import serve asgi = 'hypercorn' except ImportError: asgi = 'none' else: asgi = asgi_server if asgi_server == 'uvicorn': from uvicorn import run elif asgi_server == 'hypercorn': import hypercorn.config as config from hypercorn.asyncio import serve if asgi == 'uvicorn': run(app, host=host, port=port, debug=True, **kwargs) return True if asgi == 'hypercorn': import asyncio config = config.Config().from_mapping(bind=f'{host}:{port}', **kwargs) asyncio.run(serve(app, config), debug=True) return True return False
async def default_endpoint(_: starlette.requests.Request)
-
Expand source code
async def default_endpoint(_: Request): return
Classes
class ASGI (*args, **kwargs_)
-
单例类。公共 ASGI 前端。
对 Starlette 功能的一个扩充,支持当某一个 endpoint 无返回时,调用其他同一路由的 endpoint。
Expand source code
class ASGI(Singleton): """单例类。公共 ASGI 前端。 对 Starlette 功能的一个扩充,支持当某一个 endpoint 无返回时,调用其他同一路由的 endpoint。 """ app: Starlette """内部的 Starlette 实例。""" def __init__(self): self.app = Starlette() self._routes: Dict[Tuple[str, str], List[Callable]] = {} self.add_route('/', default_endpoint) async def _global_endpoint(self, key, request: Request): for endpoint in self._routes[key]: result = await endpoint(request) if result: return result return RedirectResponse( 'https://yiri-mirai.vercel.app', status_code=301 ) def add_route( self, path: str, endpoint: Callable, methods: Optional[List[str]] = None ) -> 'ASGI': """添加路由。 Args: path: 路由的路径。 endpoint: 路由的处理函数。 methods: 路由的请求方法。默认为 `['GET']`。 Returns: ASGI: 返回自身。 """ methods = methods or ['GET'] for method in methods: # 拆分不同的 method key = (path, method) if key in self._routes: self._routes[key].append(endpoint) else: self._routes[key] = [endpoint] self.app.add_route( path, functools.partial(self._global_endpoint, key), methods=[method] ) return self def add_event_handler( self, event_type: Literal["startup", "shutdown"], handler: Optional[Callable] = None ): """注册生命周期事件处理函数。 Args: event_type(`Literal["startup", "shutdown"]`): 事件类型,可选值为 `"startup"` 或 `"shutdown"`。 handler(`Optional[Callable]`): 事件处理函数,省略此参数以作为装饰器调用。 """ if handler: self.app.add_event_handler(event_type, handler) return self else: # 装饰器用法 def decorator(func): self.app.add_event_handler(event_type, func) return func return decorator def add_background_task( self, func: Union[Callable, Awaitable, None] = None ): """注册背景任务,将在 bot 启动后自动运行。 Args: func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。 """ if func is None: def decorator(func_): self.add_background_task(func_) return func_ return decorator if iscoroutinefunction(func): # 异步调用转化为传入协程 func = cast(Callable[..., Awaitable], func)() if callable(func): self.app.add_event_handler('startup', func) else: _task: Optional[asyncio.Task] = None async def _startup(): nonlocal _task _task = asyncio.create_task(func) # 不阻塞 async def _shutdown(): if _task and not _task.done(): _task.cancel() self.app.add_event_handler('startup', _startup) self.app.add_event_handler('shutdown', _shutdown) return func def mount(self, path: str, app: Callable) -> 'ASGI': """挂载另一个 ASGI 服务器。通过这个方法,可以同时运行 FastAPI 之类的服务。 Args: path: 要挂载的路径。 app: 要挂载的 ASGI 服务器。 Returns: ASGI: 返回自身。 """ if not path.startswith('/'): path = '/' + path self.app.mount(path, app) logger.debug(f'向 {path} 挂载 {app}。') return self async def __call__(self, scope, recv, send): await self.app(scope, recv, send)
Ancestors
Class variables
var app : starlette.applications.Starlette
-
内部的 Starlette 实例。
Methods
def add_background_task(self, func: Union[Callable, Awaitable, NoneType] = None)
-
注册背景任务,将在 bot 启动后自动运行。
Args
func(
Union[Callable, Awaitable, None]
): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。Expand source code
def add_background_task( self, func: Union[Callable, Awaitable, None] = None ): """注册背景任务,将在 bot 启动后自动运行。 Args: func(`Union[Callable, Awaitable, None]`): 背景任务,可以是函数或者协程,省略参数以作为装饰器调用。 """ if func is None: def decorator(func_): self.add_background_task(func_) return func_ return decorator if iscoroutinefunction(func): # 异步调用转化为传入协程 func = cast(Callable[..., Awaitable], func)() if callable(func): self.app.add_event_handler('startup', func) else: _task: Optional[asyncio.Task] = None async def _startup(): nonlocal _task _task = asyncio.create_task(func) # 不阻塞 async def _shutdown(): if _task and not _task.done(): _task.cancel() self.app.add_event_handler('startup', _startup) self.app.add_event_handler('shutdown', _shutdown) return func
def add_event_handler(self, event_type: Literal['startup', 'shutdown'], handler: Optional[Callable] = None)
-
注册生命周期事件处理函数。
Args
event_type(
Literal["startup", "shutdown"]
): 事件类型,可选值为"startup"
或"shutdown"
。 handler(Optional[Callable]
): 事件处理函数,省略此参数以作为装饰器调用。Expand source code
def add_event_handler( self, event_type: Literal["startup", "shutdown"], handler: Optional[Callable] = None ): """注册生命周期事件处理函数。 Args: event_type(`Literal["startup", "shutdown"]`): 事件类型,可选值为 `"startup"` 或 `"shutdown"`。 handler(`Optional[Callable]`): 事件处理函数,省略此参数以作为装饰器调用。 """ if handler: self.app.add_event_handler(event_type, handler) return self else: # 装饰器用法 def decorator(func): self.app.add_event_handler(event_type, func) return func return decorator
def add_route(self, path: str, endpoint: Callable, methods: Optional[List[str]] = None) ‑> ASGI
-
Expand source code
def add_route( self, path: str, endpoint: Callable, methods: Optional[List[str]] = None ) -> 'ASGI': """添加路由。 Args: path: 路由的路径。 endpoint: 路由的处理函数。 methods: 路由的请求方法。默认为 `['GET']`。 Returns: ASGI: 返回自身。 """ methods = methods or ['GET'] for method in methods: # 拆分不同的 method key = (path, method) if key in self._routes: self._routes[key].append(endpoint) else: self._routes[key] = [endpoint] self.app.add_route( path, functools.partial(self._global_endpoint, key), methods=[method] ) return self
def mount(self, path: str, app: Callable) ‑> ASGI
-
Expand source code
def mount(self, path: str, app: Callable) -> 'ASGI': """挂载另一个 ASGI 服务器。通过这个方法,可以同时运行 FastAPI 之类的服务。 Args: path: 要挂载的路径。 app: 要挂载的 ASGI 服务器。 Returns: ASGI: 返回自身。 """ if not path.startswith('/'): path = '/' + path self.app.mount(path, app) logger.debug(f'向 {path} 挂载 {app}。') return self