diff --git a/astrbot/cli/__init__.py b/astrbot/cli/__init__.py new file mode 100644 index 00000000..37f46012 --- /dev/null +++ b/astrbot/cli/__init__.py @@ -0,0 +1 @@ +__version__ = "3.5.6" diff --git a/astrbot/cli/__main__.py b/astrbot/cli/__main__.py index 05a0493d..c8181247 100644 --- a/astrbot/cli/__main__.py +++ b/astrbot/cli/__main__.py @@ -1,8 +1,11 @@ +""" +AstrBot CLI入口 +""" + import click import sys -from astrbot.core.config.default import VERSION -from .commands.cmd_init import init -from .commands.cmd_run import run +from . import __version__ +from .commands import init, run, plug logo_tmpl = r""" ___ _______.___________..______ .______ ______ .___________. @@ -15,12 +18,12 @@ logo_tmpl = r""" @click.group() -@click.version_option(VERSION, prog_name="AstrBot") +@click.version_option(__version__, prog_name="AstrBot") def cli() -> None: """The AstrBot CLI""" click.echo(logo_tmpl) click.echo("Welcome to AstrBot CLI!") - click.echo(f"AstrBot version: {VERSION}") + click.echo(f"AstrBot CLI version: {__version__}") @click.command() @@ -49,6 +52,7 @@ def help(command_name: str | None) -> None: cli.add_command(init) cli.add_command(run) cli.add_command(help) +cli.add_command(plug) if __name__ == "__main__": - cli() \ No newline at end of file + cli() diff --git a/astrbot/cli/commands/__init__.py b/astrbot/cli/commands/__init__.py new file mode 100644 index 00000000..d250c8c0 --- /dev/null +++ b/astrbot/cli/commands/__init__.py @@ -0,0 +1,5 @@ +from .cmd_init import init +from .cmd_run import run +from .cmd_plug import plug + +__all__ = ["init", "run", "plug"] diff --git a/astrbot/cli/commands/cmd_init.py b/astrbot/cli/commands/cmd_init.py index 58d5e8c1..d9a42f82 100644 --- a/astrbot/cli/commands/cmd_init.py +++ b/astrbot/cli/commands/cmd_init.py @@ -1,44 +1,55 @@ -import shutil +import asyncio import click -import asyncio -from pathlib import Path -from ..utils import get_astrbot_root, check_astrbot_root, check_dashboard +from filelock import FileLock, Timeout + +from ..utils import check_dashboard, get_astrbot_root + + +async def initialize_astrbot(astrbot_root) -> None: + """执行 AstrBot 初始化逻辑""" + dot_astrbot = astrbot_root / ".astrbot" + + if not dot_astrbot.exists(): + click.echo(f"Current Directory: {astrbot_root}") + click.echo( + "如果你确认这是 Astrbot root directory, 你需要在当前目录下创建一个 .astrbot 文件标记该目录为 AstrBot 的数据目录。" + ) + if click.confirm( + f"请检查当前目录是否正确,确认正确请回车: {astrbot_root}", + default=True, + abort=True, + ): + dot_astrbot.touch() + click.echo(f"Created {dot_astrbot}") + + paths = { + "data": astrbot_root / "data", + "config": astrbot_root / "data" / "config", + "plugins": astrbot_root / "data" / "plugins", + "temp": astrbot_root / "data" / "temp", + } + + for name, path in paths.items(): + path.mkdir(parents=True, exist_ok=True) + click.echo(f"{'Created' if not path.exists() else 'Directory exists'}: {path}") + + await check_dashboard(astrbot_root / "data") @click.command() -@click.option("--path", "-p", help="AstrBot 数据目录") -@click.option("--force", "-f", is_flag=True, help="强制初始化") -def init(path: str | None, force: bool) -> None: +def init() -> None: """初始化 AstrBot""" click.echo("Initializing AstrBot...") - astrbot_root = get_astrbot_root(path) - if force: - if click.confirm( - "强制初始化会删除当前目录下的所有文件,是否继续?", - default=False, - abort=True, - ): - click.echo("正在删除当前目录下的所有文件...") - shutil.rmtree(astrbot_root, ignore_errors=True) + astrbot_root = get_astrbot_root() + lock_file = astrbot_root / "astrbot.lock" + lock = FileLock(lock_file, timeout=5) - check_astrbot_root(astrbot_root) + try: + with lock.acquire(): + asyncio.run(initialize_astrbot(astrbot_root)) + except Timeout: + raise click.ClickException("无法获取锁文件,请检查是否有其他实例正在运行") - click.echo(f"AstrBot root directory: {astrbot_root}") - - if not astrbot_root.exists(): - astrbot_root.mkdir(parents=True, exist_ok=True) - click.echo(f"Created directory: {astrbot_root}") - else: - click.echo(f"Directory already exists: {astrbot_root}") - - config_path: Path = astrbot_root / "config" - plugins_path: Path = astrbot_root / "plugins" - temp_path: Path = astrbot_root / "temp" - config_path.mkdir(parents=True, exist_ok=True) - plugins_path.mkdir(parents=True, exist_ok=True) - temp_path.mkdir(parents=True, exist_ok=True) - - click.echo(f"Created directories: {config_path}, {plugins_path}, {temp_path}") - - asyncio.run(_check_dashboard(astrbot_root)) \ No newline at end of file + except Exception as e: + raise click.ClickException(f"初始化失败: {e!s}") diff --git a/astrbot/cli/commands/cmd_plug.py b/astrbot/cli/commands/cmd_plug.py new file mode 100644 index 00000000..b250ede4 --- /dev/null +++ b/astrbot/cli/commands/cmd_plug.py @@ -0,0 +1,247 @@ +import re +from pathlib import Path + +import click +import shutil + + +from ..utils import ( + get_git_repo, + build_plug_list, + manage_plugin, + PluginStatus, + check_astrbot_root, + get_astrbot_root, +) + + +@click.group() +def plug(): + """插件管理""" + pass + + +def _get_data_path() -> Path: + base = get_astrbot_root() + if not check_astrbot_root(base): + raise click.ClickException( + f"{base}不是有效的 AstrBot 根目录,如需初始化请使用 astrbot init" + ) + return (base / "data").resolve() + + +def display_plugins(plugins, title=None, color=None): + if title: + click.echo(click.style(title, fg=color, bold=True)) + + click.echo(f"{'名称':<20} {'版本':<10} {'状态':<10} {'作者':<15} {'描述':<30}") + click.echo("-" * 85) + + for p in plugins: + desc = p["desc"][:30] + ("..." if len(p["desc"]) > 30 else "") + click.echo( + f"{p['name']:<20} {p['version']:<10} {p['status']:<10} " + f"{p['author']:<15} {desc:<30}" + ) + + +@plug.command() +@click.argument("name") +def new(name: str): + """创建新插件""" + base_path = _get_data_path() + plug_path = base_path / "plugins" / name + + if plug_path.exists(): + raise click.ClickException(f"插件 {name} 已存在") + + author = click.prompt("请输入插件作者", type=str) + desc = click.prompt("请输入插件描述", type=str) + version = click.prompt("请输入插件版本", type=str) + if not re.match(r"^\d+\.\d+(\.\d+)?$", version.lower().lstrip("v")): + raise click.ClickException("版本号必须为 x.y 或 x.y.z 格式") + repo = click.prompt("请输入插件仓库:", type=str) + if not repo.startswith("http"): + raise click.ClickException("仓库地址必须以 http 开头") + + click.echo("下载插件模板...") + get_git_repo( + "https://github.com/Soulter/helloworld", + plug_path, + ) + + click.echo("重写插件信息...") + # 重写 metadata.yaml + with open(plug_path / "metadata.yaml", "w", encoding="utf-8") as f: + f.write( + f"name: {name}\n" + f"desc: {desc}\n" + f"version: {version}\n" + f"author: {author}\n" + f"repo: {repo}\n" + ) + + # 重写 README.md + with open(plug_path / "README.md", "w", encoding="utf-8") as f: + f.write(f"# {name}\n\n{desc}\n\n# 支持\n\n[帮助文档](https://astrbot.app)\n") + + # 重写 main.py + with open(plug_path / "main.py", "r", encoding="utf-8") as f: + content = f.read() + + new_content = content.replace( + '@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")', + f'@register("{name}", "{author}", "{desc}", "{version}")', + ) + + with open(plug_path / "main.py", "w", encoding="utf-8") as f: + f.write(new_content) + + click.echo(f"插件 {name} 创建成功") + + +@plug.command() +@click.option("--all", "-a", is_flag=True, help="列出未安装的插件") +def list(all: bool): + """列出插件""" + base_path = _get_data_path() + plugins = build_plug_list(base_path / "plugins") + + # 未发布的插件 + not_published_plugins = [ + p for p in plugins if p["status"] == PluginStatus.NOT_PUBLISHED + ] + if not_published_plugins: + display_plugins(not_published_plugins, "未发布的插件", "red") + + # 需要更新的插件 + need_update_plugins = [ + p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE + ] + if need_update_plugins: + display_plugins(need_update_plugins, "需要更新的插件", "yellow") + + # 已安装的插件 + installed_plugins = [p for p in plugins if p["status"] == PluginStatus.INSTALLED] + if installed_plugins: + display_plugins(installed_plugins, "已安装的插件", "green") + + # 未安装的插件 + not_installed_plugins = [ + p for p in plugins if p["status"] == PluginStatus.NOT_INSTALLED + ] + if not_installed_plugins and all: + display_plugins(not_installed_plugins, "未安装的插件", "blue") + + if ( + not any([not_published_plugins, need_update_plugins, installed_plugins]) + and not all + ): + click.echo("未安装任何插件") + + +@plug.command() +@click.argument("name") +@click.option("--proxy", help="代理服务器地址") +def install(name: str, proxy: str | None): + """安装插件""" + base_path = _get_data_path() + plug_path = base_path / "plugins" + plugins = build_plug_list(base_path / "plugins") + + plugin = next( + ( + p + for p in plugins + if p["name"] == name and p["status"] == PluginStatus.NOT_INSTALLED + ), + None, + ) + + if not plugin: + raise click.ClickException(f"未找到可安装的插件 {name},可能是不存在或已安装") + + manage_plugin(plugin, plug_path, is_update=False, proxy=proxy) + + +@plug.command() +@click.argument("name") +def remove(name: str): + """卸载插件""" + base_path = _get_data_path() + plugins = build_plug_list(base_path / "plugins") + plugin = next((p for p in plugins if p["name"] == name), None) + + if not plugin or not plugin.get("local_path"): + raise click.ClickException(f"插件 {name} 不存在或未安装") + + plugin_path = plugin["local_path"] + + click.confirm(f"确定要卸载插件 {name} 吗?", default=False, abort=True) + + try: + shutil.rmtree(plugin_path) + click.echo(f"插件 {name} 已卸载") + except Exception as e: + raise click.ClickException(f"卸载插件 {name} 失败: {e}") + + +@plug.command() +@click.argument("name", required=False) +@click.option("--proxy", help="Github代理地址") +def update(name: str, proxy: str | None): + """更新插件""" + base_path = _get_data_path() + plug_path = base_path / "plugins" + plugins = build_plug_list(base_path / "plugins") + + if name: + plugin = next( + ( + p + for p in plugins + if p["name"] == name and p["status"] == PluginStatus.NEED_UPDATE + ), + None, + ) + + if not plugin: + raise click.ClickException(f"插件 {name} 不需要更新或无法更新") + + manage_plugin(plugin, plug_path, is_update=True, proxy=proxy) + else: + need_update_plugins = [ + p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE + ] + + if not need_update_plugins: + click.echo("没有需要更新的插件") + return + + click.echo(f"发现 {len(need_update_plugins)} 个插件需要更新") + for plugin in need_update_plugins: + plugin_name = plugin["name"] + click.echo(f"正在更新插件 {plugin_name}...") + manage_plugin(plugin, plug_path, is_update=True, proxy=proxy) + + +@plug.command() +@click.argument("query") +def search(query: str): + """搜索插件""" + base_path = _get_data_path() + plugins = build_plug_list(base_path / "plugins") + + matched_plugins = [ + p + for p in plugins + if query.lower() in p["name"].lower() + or query.lower() in p["desc"].lower() + or query.lower() in p["author"].lower() + ] + + if not matched_plugins: + click.echo(f"未找到匹配 '{query}' 的插件") + return + + display_plugins(matched_plugins, f"搜索结果: '{query}'", "cyan") diff --git a/astrbot/cli/commands/cmd_run.py b/astrbot/cli/commands/cmd_run.py index 98f6e7ba..5622c2f9 100644 --- a/astrbot/cli/commands/cmd_run.py +++ b/astrbot/cli/commands/cmd_run.py @@ -1,32 +1,62 @@ +import os +import sys +from pathlib import Path + import click import asyncio -from ..utils import get_astrbot_root, check_astrbot_root, check_dashboard + +from filelock import FileLock, Timeout + +from ..utils import check_dashboard, check_astrbot_root, get_astrbot_root -@click.command() -@click.option("--path", "-p", help="AstrBot 数据目录") -def run(path: str | None = None) -> None: +async def run_astrbot(astrbot_root: Path): """运行 AstrBot""" - try: - from ..core.log import LogBroker - from ..core import db_helper - from ..core.initial_loader import InitialLoader - except ImportError: - from astrbot.core.log import LogBroker - from astrbot.core import db_helper - from astrbot.core.initial_loader import InitialLoader + from astrbot.core import logger, LogManager, LogBroker, db_helper + from astrbot.core.initial_loader import InitialLoader - astrbot_root = get_astrbot_root(path) - check_astrbot_root(astrbot_root) - asyncio.run(check_dashboard(astrbot_root)) + await check_dashboard(astrbot_root / "data") log_broker = LogBroker() + LogManager.set_queue_handler(logger, log_broker) db = db_helper core_lifecycle = InitialLoader(db, log_broker) + + await core_lifecycle.start() + + +@click.option("--reload", "-r", is_flag=True, help="插件自动重载") +@click.option("--port", "-p", help="Astrbot Dashboard端口", required=False, type=str) +@click.command() +def run(reload: bool, port: str) -> None: + """运行 AstrBot""" try: - asyncio.run(core_lifecycle.start()) + os.environ["ASTRBOT_CLI"] = "1" + astrbot_root = get_astrbot_root() + + if not check_astrbot_root(astrbot_root): + raise click.ClickException( + f"{astrbot_root}不是有效的 AstrBot 根目录,如需初始化请使用 astrbot init" + ) + + os.environ["ASTRBOT_ROOT"] = str(astrbot_root) + sys.path.insert(0, str(astrbot_root)) + + if port: + os.environ["DASHBOARD_PORT"] = port + + if reload: + click.echo("启用插件自动重载") + os.environ["ASTROBOT_RELOAD"] = "1" + + lock_file = astrbot_root / "astrbot.lock" + lock = FileLock(lock_file, timeout=5) + with lock.acquire(): + asyncio.run(run_astrbot(astrbot_root)) except KeyboardInterrupt: - click.echo("接收到退出信号,正在关闭 AstrBot...") + click.echo("AstrBot 已关闭...") + except Timeout: + raise click.ClickException("无法获取锁文件,请检查是否有其他实例正在运行") except Exception as e: - click.echo(f"运行时出现错误: {e}") \ No newline at end of file + raise click.ClickException(f"运行时出现错误: {e!s}") diff --git a/astrbot/cli/utils/__init__.py b/astrbot/cli/utils/__init__.py index 4af69bf9..9989dcf2 100644 --- a/astrbot/cli/utils/__init__.py +++ b/astrbot/cli/utils/__init__.py @@ -1,3 +1,18 @@ -from basic import get_astrbot_root,check_astrbot_root,check_dashboard +from .basic import ( + get_astrbot_root, + check_astrbot_root, + check_dashboard, +) +from .plugin import get_git_repo, manage_plugin, build_plug_list, PluginStatus +from .version_comparator import VersionComparator -__all__ = ["get_astrbot_root", "check_astrbot_root", "check_dashboard"] +__all__ = [ + "get_astrbot_root", + "check_astrbot_root", + "check_dashboard", + "get_git_repo", + "manage_plugin", + "build_plug_list", + "VersionComparator", + "PluginStatus", +] diff --git a/astrbot/cli/utils/basic.py b/astrbot/cli/utils/basic.py index a243cdd7..d9d7e10a 100644 --- a/astrbot/cli/utils/basic.py +++ b/astrbot/cli/utils/basic.py @@ -1,66 +1,29 @@ -import os -import sys from pathlib import Path + import click -from astrbot.core.config.default import VERSION -def get_astrbot_root(path: str | None) -> Path: - """获取astrbot根目录""" - match path: - case None: - match ASTRBOT_ROOT := os.getenv("ASTRBOT_ROOT"): - case None: - astrbot_root = Path.cwd() / "data" - case _: - astrbot_root = Path(ASTRBOT_ROOT).resolve() - case str(): - astrbot_root = Path(path).resolve() - - dot_astrbot = astrbot_root / ".astrbot" - if not dot_astrbot.exists(): - if click.confirm( - f"运行前必须先执行初始化!请检查当前目录是否正确,回车以继续: {astrbot_root}", - default=True, - abort=True, - ): - dot_astrbot.touch() - astrbot_root.mkdir(parents=True, exist_ok=True) - click.echo(f"Created {dot_astrbot}") - - return astrbot_root +def check_astrbot_root(path: str | Path) -> bool: + """检查路径是否为 AstrBot 根目录""" + if not isinstance(path, Path): + path = Path(path) + if not path.exists() or not path.is_dir(): + return False + if not (path / ".astrbot").exists(): + return False + return True -def check_astrbot_root(astrbot_root: Path) -> None: - """验证""" - dot_astrbot = astrbot_root / ".astrbot" - if not astrbot_root.exists(): - click.echo(f"AstrBot root directory does not exist: {astrbot_root}") - click.echo("Please run 'astrbot init' to create the directory.") - sys.exit(1) - else: - click.echo(f"AstrBot root directory exists: {astrbot_root}") - if not dot_astrbot.exists(): - click.echo( - "如果你确认这是 Astrbot root directory, 你需要在当前目录下创建一个 .astrbot 文件标记该目录为 AstrBot 的数据目录。" - ) - if click.confirm( - f"请检查当前目录是否正确,确认正确请回车: {astrbot_root}", - default=True, - abort=True, - ): - dot_astrbot.touch() - click.echo(f"Created {dot_astrbot}") - else: - click.echo(f"Welcome back! AstrBot root directory: {astrbot_root}") +def get_astrbot_root() -> Path: + """获取Astrbot根目录路径""" + return Path.cwd() async def check_dashboard(astrbot_root: Path) -> None: """检查是否安装了dashboard""" - try: - from ..core.utils.io import get_dashboard_version, download_dashboard - except ImportError: - from astrbot.core.utils.io import get_dashboard_version, download_dashboard + from astrbot.core.utils.io import get_dashboard_version, download_dashboard + from astrbot.core.config.default import VERSION + from .version_comparator import VersionComparator try: dashboard_version = await get_dashboard_version() @@ -79,8 +42,9 @@ async def check_dashboard(astrbot_root: Path) -> None: click.echo("管理面板安装完成") case str(): - if dashboard_version == f"v{VERSION}": - click.echo("无需更新") + if VersionComparator.compare_version(VERSION, dashboard_version) <= 0: + click.echo("管理面板已是最新版本") + return else: try: version = dashboard_version.split("v")[1] @@ -100,4 +64,4 @@ async def check_dashboard(astrbot_root: Path) -> None: click.echo("管理面板初始化完成") except Exception as e: click.echo(f"下载管理面板失败: {e}") - return \ No newline at end of file + return diff --git a/astrbot/cli/utils/plugin.py b/astrbot/cli/utils/plugin.py new file mode 100644 index 00000000..84563715 --- /dev/null +++ b/astrbot/cli/utils/plugin.py @@ -0,0 +1,258 @@ +import shutil +import tempfile +import os + +import httpx +import yaml +import re +from enum import Enum +from io import BytesIO +from pathlib import Path +from zipfile import ZipFile + +import click +from .version_comparator import VersionComparator + + +class PluginStatus(str, Enum): + INSTALLED = "已安装" + NEED_UPDATE = "需更新" + NOT_INSTALLED = "未安装" + NOT_PUBLISHED = "未发布" + + +def get_git_repo(url: str, target_path: Path, proxy: str | None = None): + """从 Git 仓库下载代码并解压到指定路径""" + temp_dir = Path(tempfile.mkdtemp()) + try: + # 解析仓库信息 + repo_namespace = url.split("/")[-2:] + author = repo_namespace[0] + repo = repo_namespace[1] + + # 尝试获取最新的 release + release_url = f"https://api.github.com/repos/{author}/{repo}/releases" + try: + with httpx.Client( + proxy=proxy if proxy else None, follow_redirects=True + ) as client: + resp = client.get(release_url) + resp.raise_for_status() + releases = resp.json() + + if releases: + # 使用最新的 release + download_url = releases[0]["zipball_url"] + else: + # 没有 release,使用默认分支 + click.echo(f"正在从默认分支下载 {author}/{repo}") + download_url = f"https://github.com/{author}/{repo}/archive/refs/heads/master.zip" + except Exception as e: + click.echo(f"获取 release 信息失败: {e},将直接使用提供的 URL") + download_url = url + + # 应用代理 + if proxy: + download_url = f"{proxy}/{download_url}" + + # 下载并解压 + with httpx.Client( + proxy=proxy if proxy else None, follow_redirects=True + ) as client: + resp = client.get(download_url) + resp.raise_for_status() + zip_content = BytesIO(resp.content) + with ZipFile(zip_content) as z: + z.extractall(temp_dir) + namelist = z.namelist() + root_dir = Path(namelist[0]).parts[0] if namelist else "" + if target_path.exists(): + shutil.rmtree(target_path) + shutil.move(temp_dir / root_dir, target_path) + finally: + if temp_dir.exists(): + shutil.rmtree(temp_dir, ignore_errors=True) + + +def build_plug_list(plugins_dir: Path) -> list: + """构建插件列表,包含本地和在线插件信息 + + Args: + plugins_dir (Path): 插件目录路径 + + Returns: + list: 包含插件信息的字典列表,每个字典包含: + - name: 插件名称 + - desc: 插件描述 + - version: 插件版本 + - author: 插件作者 + - repo: 插件仓库地址 + - status: 插件状态 [已安装/需更新/未安装/未发布] + - local_path: 本地插件路径 + """ + # 获取本地插件信息 + result = [] + if os.path.exists(plugins_dir): + for plugin_name in os.listdir(plugins_dir): + plugin_dir = os.path.join(plugins_dir, plugin_name) + if not os.path.isdir(plugin_dir): + continue + + metadata = {} + # 优先从 metadata.yaml 读取 + yaml_path = os.path.join(plugin_dir, "metadata.yaml") + if os.path.exists(yaml_path): + try: + with open(yaml_path, "r", encoding="utf-8") as f: + metadata = yaml.safe_load(f) + except Exception as e: + click.echo(f"读取 {yaml_path} 失败: {e}", err=True) + continue + + # 如果没有 metadata.yaml 或信息不完整,从 Python 文件读取 + if not metadata or not all( + k in metadata for k in ["name", "desc", "version", "author", "repo"] + ): + # 检查 main.py 或与目录同名的 py 文件 + py_file = os.path.join(plugin_dir, "main.py") + if not os.path.exists(py_file): + py_file = os.path.join( + plugin_dir, f"{os.path.basename(plugin_dir)}.py" + ) + + if os.path.exists(py_file): + try: + with open(py_file, "r", encoding="utf-8") as f: + content = f.read() + register_match = re.search( + r'@register_star\s*\(\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*"([^"]+)"(?:\s*,\s*"?([^")]+)"?)?\s*\)', + content, + ) + if register_match: + if "name" not in metadata: + metadata["name"] = register_match.group(1) + if "author" not in metadata: + metadata["author"] = register_match.group(2) + if "desc" not in metadata: + metadata["desc"] = register_match.group(3) + if "version" not in metadata: + metadata["version"] = str(register_match.group(4)) + if "repo" not in metadata and register_match.group(5): + metadata["repo"] = register_match.group(5) + except Exception as e: + click.echo(f"读取 {py_file} 失败: {e}", err=True) + + # 如果成功提取元数据,添加到结果列表 + if metadata: + result.append( + { + "name": str(metadata.get("name", "")), + "desc": str(metadata.get("desc", "")), + "version": str(metadata.get("version", "")), + "author": str(metadata.get("author", "")), + "repo": str(metadata.get("repo", "")), + "status": PluginStatus.INSTALLED, + "local_path": str(plugin_dir), + } + ) + + # 获取在线插件列表 + online_plugins = [] + try: + with httpx.Client() as client: + resp = client.get("https://api.soulter.top/astrbot/plugins") + resp.raise_for_status() + data = resp.json() + for plugin_id, plugin_info in data.items(): + online_plugins.append( + { + "name": str(plugin_id), + "desc": str(plugin_info.get("desc", "")), + "version": str(plugin_info.get("version", "")), + "author": str(plugin_info.get("author", "")), + "repo": str(plugin_info.get("repo", "")), + "status": PluginStatus.NOT_INSTALLED, + "local_path": None, + } + ) + except Exception as e: + click.echo(f"获取在线插件列表失败: {e}", err=True) + + # 与在线插件比对,更新状态 + online_plugin_names = {plugin["name"] for plugin in online_plugins} + for local_plugin in result: + if local_plugin["name"] in online_plugin_names: + # 查找对应的在线插件 + online_plugin = next( + p for p in online_plugins if p["name"] == local_plugin["name"] + ) + + if ( + VersionComparator.compare_version( + local_plugin["version"], online_plugin["version"] + ) + < 0 + ): + local_plugin["status"] = PluginStatus.NEED_UPDATE + else: + # 本地插件未在线上发布 + local_plugin["status"] = PluginStatus.NOT_PUBLISHED + + # 添加未安装的在线插件 + for online_plugin in online_plugins: + if not any(plugin["name"] == online_plugin["name"] for plugin in result): + result.append(online_plugin) + + return result + + +def manage_plugin( + plugin: dict, plugins_dir: Path, is_update: bool = False, proxy: str | None = None +) -> None: + """安装或更新插件 + + Args: + plugin (dict): 插件信息字典 + plugins_dir (Path): 插件目录 + is_update (bool, optional): 是否为更新操作. 默认为 False + proxy (str, optional): 代理服务器地址 + """ + plugin_name = plugin["name"] + repo_url = plugin["repo"] + + # 如果是更新且有本地路径,直接使用本地路径 + if is_update and plugin.get("local_path"): + target_path = Path(plugin["local_path"]) + else: + target_path = plugins_dir / plugin_name + + backup_path = Path(f"{target_path}_backup") if is_update else None + + # 检查插件是否存在 + if is_update and not target_path.exists(): + raise click.ClickException(f"插件 {plugin_name} 未安装,无法更新") + + # 备份现有插件 + if is_update and backup_path.exists(): + shutil.rmtree(backup_path) + if is_update: + shutil.move(target_path, backup_path) + + try: + click.echo( + f"正在从 {repo_url} {'更新' if is_update else '下载'}插件 {plugin_name}..." + ) + get_git_repo(repo_url, target_path, proxy) + + # 更新成功,删除备份 + if is_update and backup_path.exists(): + shutil.rmtree(backup_path) + click.echo(f"插件 {plugin_name} {'更新' if is_update else '安装'}成功") + except Exception as e: + if target_path.exists(): + shutil.rmtree(target_path, ignore_errors=True) + if is_update and backup_path.exists(): + shutil.move(backup_path, target_path) + raise click.ClickException( + f"{'更新' if is_update else '安装'}插件 {plugin_name} 时出错: {e}" + ) diff --git a/astrbot/cli/utils/version_comparator.py b/astrbot/cli/utils/version_comparator.py new file mode 100644 index 00000000..fecab885 --- /dev/null +++ b/astrbot/cli/utils/version_comparator.py @@ -0,0 +1,92 @@ +""" +拷贝自 astrbot.core.utils.version_comparator +""" + +import re + + +class VersionComparator: + @staticmethod + def compare_version(v1: str, v2: str) -> int: + """根据 Semver 语义版本规范来比较版本号的大小。支持不仅局限于 3 个数字的版本号,并处理预发布标签。 + + 参考: https://semver.org/lang/zh-CN/ + + 返回 1 表示 v1 > v2,返回 -1 表示 v1 < v2,返回 0 表示 v1 = v2。 + """ + v1 = v1.lower().replace("v", "") + v2 = v2.lower().replace("v", "") + + def split_version(version): + match = re.match( + r"^([0-9]+(?:\.[0-9]+)*)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+(.+))?$", + version, + ) + if not match: + return [], None + major_minor_patch = match.group(1).split(".") + prerelease = match.group(2) + # buildmetadata = match.group(3) # 构建元数据在比较时忽略 + parts = [int(x) for x in major_minor_patch] + prerelease = VersionComparator._split_prerelease(prerelease) + return parts, prerelease + + v1_parts, v1_prerelease = split_version(v1) + v2_parts, v2_prerelease = split_version(v2) + + # 比较数字部分 + length = max(len(v1_parts), len(v2_parts)) + v1_parts.extend([0] * (length - len(v1_parts))) + v2_parts.extend([0] * (length - len(v2_parts))) + + for i in range(length): + if v1_parts[i] > v2_parts[i]: + return 1 + elif v1_parts[i] < v2_parts[i]: + return -1 + + # 比较预发布标签 + if v1_prerelease is None and v2_prerelease is not None: + return 1 # 没有预发布标签的版本高于有预发布标签的版本 + elif v1_prerelease is not None and v2_prerelease is None: + return -1 # 有预发布标签的版本低于没有预发布标签的版本 + elif v1_prerelease is not None and v2_prerelease is not None: + len_pre = max(len(v1_prerelease), len(v2_prerelease)) + for i in range(len_pre): + p1 = v1_prerelease[i] if i < len(v1_prerelease) else None + p2 = v2_prerelease[i] if i < len(v2_prerelease) else None + + if p1 is None and p2 is not None: + return -1 + elif p1 is not None and p2 is None: + return 1 + elif isinstance(p1, int) and isinstance(p2, str): + return -1 + elif isinstance(p1, str) and isinstance(p2, int): + return 1 + elif isinstance(p1, int) and isinstance(p2, int): + if p1 > p2: + return 1 + elif p1 < p2: + return -1 + elif isinstance(p1, str) and isinstance(p2, str): + if p1 > p2: + return 1 + elif p1 < p2: + return -1 + return 0 # 预发布标签完全相同 + + return 0 # 数字部分和预发布标签都相同 + + @staticmethod + def _split_prerelease(prerelease): + if not prerelease: + return None + parts = prerelease.split(".") + result = [] + for part in parts: + if part.isdigit(): + result.append(int(part)) + else: + result.append(part) + return result diff --git a/astrbot/core/star/star_manager.py b/astrbot/core/star/star_manager.py index 5aa70b23..ac35adbe 100644 --- a/astrbot/core/star/star_manager.py +++ b/astrbot/core/star/star_manager.py @@ -29,6 +29,12 @@ from astrbot.core.utils.astrbot_path import ( from .filter.permission import PermissionTypeFilter, PermissionType +try: + from watchfiles import awatch, PythonFilter +except ImportError: + if os.getenv("ASTROBOT_RELOAD", "0") == "1": + logger.warning("未安装 watchfiles,无法实现插件的热重载。") + class PluginManager: def __init__(self, context: Context, config: AstrBotConfig): @@ -52,6 +58,58 @@ class PluginManager: """插件配置 Schema 文件名""" self.failed_plugin_info = "" + if os.getenv("ASTROBOT_RELOAD", "0") == "1": + asyncio.create_task(self._watch_plugins_changes()) + + async def _watch_plugins_changes(self): + """监视插件文件变化""" + try: + async for changes in awatch( + self.plugin_store_path, + self.reserved_plugin_path, + watch_filter=PythonFilter(), + recursive=True, + ): + # 处理文件变化 + await self._handle_file_changes(changes) + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"插件热重载监视任务异常: {str(e)}") + logger.error(traceback.format_exc()) + + async def _handle_file_changes(self, changes): + """处理文件变化""" + logger.info(f"检测到文件变化: {changes}") + plugins_to_check = [] + + for star in star_registry: + if not star.activated: + continue + if star.root_dir_name is None: + continue + if star.reserved: + plugin_dir_path = os.path.join( + self.reserved_plugin_path, star.root_dir_name + ) + else: + plugin_dir_path = os.path.join( + self.plugin_store_path, star.root_dir_name + ) + plugins_to_check.append((plugin_dir_path, star.name)) + reloaded_plugins = set() + for change in changes: + _, file_path = change + for plugin_dir_path, plugin_name in plugins_to_check: + if ( + os.path.commonpath([plugin_dir_path]) + == os.path.commonpath([plugin_dir_path, file_path]) + and plugin_name not in reloaded_plugins + ): + logger.info(f"检测到插件 {plugin_name} 文件变化,正在重载...") + await self.reload(plugin_name) + reloaded_plugins.add(plugin_name) + break def _get_classes(self, arg: ModuleType): """获取指定模块(可以理解为一个 python 文件)下所有的类""" diff --git a/astrbot/dashboard/server.py b/astrbot/dashboard/server.py index 5d131080..5f0de72e 100644 --- a/astrbot/dashboard/server.py +++ b/astrbot/dashboard/server.py @@ -125,7 +125,10 @@ class AstrBotDashboard: def run(self): ip_addr = [] - port = self.core_lifecycle.astrbot_config["dashboard"].get("port", 6185) + if p := os.environ.get("DASHBOARD_PORT"): + port = p + else: + port = self.core_lifecycle.astrbot_config["dashboard"].get("port", 6185) host = self.core_lifecycle.astrbot_config["dashboard"].get("host", "0.0.0.0") logger.info(f"正在启动 WebUI, 监听地址: http://{host}:{port}")