Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ GenieData/
.opencode/
.kilocode/
.worktrees/
docs/plans/

68 changes: 59 additions & 9 deletions astrbot/core/star/star_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""插件的重载、启停、安装、卸载等操作。"""

import asyncio
import contextlib
import functools
import inspect
import json
import logging
import os
import sys
import tempfile
import traceback
from types import ModuleType

Expand All @@ -29,12 +31,12 @@
get_astrbot_config_path,
get_astrbot_path,
get_astrbot_plugin_path,
get_astrbot_temp_path,
)
from astrbot.core.utils.io import remove_dir
from astrbot.core.utils.metrics import Metric
from astrbot.core.utils.requirements_utils import (
RequirementsPrecheckFailed,
find_missing_requirements_or_raise,
plan_missing_requirements_install,
)

from . import StarMetadata
Expand Down Expand Up @@ -74,30 +76,78 @@ def __init__(
self.error = error


@contextlib.contextmanager
def _temporary_filtered_requirements_file(
*,
install_lines: tuple[str, ...],
):
filtered_requirements_path: str | None = None
temp_dir = get_astrbot_temp_path()

try:
os.makedirs(temp_dir, exist_ok=True)
with tempfile.NamedTemporaryFile(
mode="w",
suffix="_plugin_requirements.txt",
delete=False,
dir=temp_dir,
encoding="utf-8",
) as filtered_requirements_file:
filtered_requirements_file.write("\n".join(install_lines) + "\n")
filtered_requirements_path = filtered_requirements_file.name

yield filtered_requirements_path
finally:
if filtered_requirements_path and os.path.exists(filtered_requirements_path):
try:
os.remove(filtered_requirements_path)
except OSError as exc:
logger.warning(
"删除临时插件依赖文件失败:%s(路径:%s)",
exc,
filtered_requirements_path,
)


async def _install_requirements_with_precheck(
*,
plugin_label: str,
requirements_path: str,
) -> None:
try:
missing = find_missing_requirements_or_raise(requirements_path)
except RequirementsPrecheckFailed:
install_plan = plan_missing_requirements_install(requirements_path)

if install_plan is None:
logger.info(
f"正在安装插件 {plugin_label} 的依赖库(预检查失败,回退到完整安装): "
f"正在安装插件 {plugin_label} 的依赖库(缺失依赖预检查不可裁剪,回退到完整安装): "
f"{requirements_path}"
)
await pip_installer.install(requirements_path=requirements_path)
return

if not missing:
if not install_plan.missing_names:
logger.info(f"插件 {plugin_label} 的依赖已满足,跳过安装。")
return

if not install_plan.install_lines:
fallback_reason = install_plan.fallback_reason or "unknown reason"
logger.info(
"检测到插件 %s 缺失依赖,但无法安全裁剪 requirements,回退到完整安装: %s (%s)",
plugin_label,
requirements_path,
fallback_reason,
)
await pip_installer.install(requirements_path=requirements_path)
return

logger.info(
f"检测到插件 {plugin_label} 缺失依赖,正在按 requirements.txt 安装: "
f"{requirements_path} -> {sorted(missing)}"
f"{requirements_path} -> {sorted(install_plan.missing_names)}"
)
await pip_installer.install(requirements_path=requirements_path)

with _temporary_filtered_requirements_file(
install_lines=install_plan.install_lines,
) as filtered_requirements_path:
await pip_installer.install(requirements_path=filtered_requirements_path)


class PluginManager:
Expand Down
84 changes: 81 additions & 3 deletions astrbot/core/utils/requirements_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import shlex
import sys
from collections.abc import Iterable, Iterator
from collections.abc import Iterable, Iterator, Sequence
from dataclasses import dataclass

from packaging.requirements import InvalidRequirement, Requirement
Expand All @@ -29,6 +29,13 @@ class ParsedPackageInput:
requirement_names: frozenset[str]


@dataclass(frozen=True)
class MissingRequirementsPlan:
missing_names: frozenset[str]
install_lines: tuple[str, ...]
fallback_reason: str | None = None


def canonicalize_distribution_name(name: str) -> str:
return re.sub(r"[-_.]+", "-", name).strip("-").lower()

Expand Down Expand Up @@ -364,8 +371,8 @@ def _load_requirement_lines_for_precheck(
None,
)
if fallback_line is not None:
logger.warning(
"预检查缺失依赖失败,将回退到完整安装: unresolved direct reference in %s: %s",
logger.info(
"缺失依赖预检查发现无法安全裁剪的 option/direct-reference 行,将回退到完整安装: %s (%s)",
requirements_path,
fallback_line,
)
Expand All @@ -381,6 +388,13 @@ def find_missing_requirements(requirements_path: str) -> set[str] | None:
if not can_precheck or requirement_lines is None:
return None

return find_missing_requirements_from_lines(requirement_lines)


def find_missing_requirements_from_lines(
requirement_lines: Sequence[str],
) -> set[str] | None:

required = list(iter_requirements(lines=requirement_lines))
if not required:
return set()
Expand All @@ -401,6 +415,70 @@ def find_missing_requirements(requirements_path: str) -> set[str] | None:
return missing


def build_missing_requirements_install_lines(
requirements_path: str,
requirement_lines: Sequence[str],
missing_names: set[str] | frozenset[str],
) -> tuple[str, ...] | None:
wanted_names = set(missing_names)
install_lines: list[str] = []
for line in requirement_lines:
parsed = _parse_requirement_line(line)
if parsed is None:
if looks_like_direct_reference(line) or line.startswith(("-", "--")):
logger.debug(
"缺失依赖行筛选回退到完整安装:requirements 中包含无法安全裁剪的 option/direct-reference 行: %s (%s)",
requirements_path,
line,
)
return None
continue

name, _specifier = parsed
if name in wanted_names:
install_lines.append(line)

return tuple(install_lines)


def plan_missing_requirements_install(
requirements_path: str,
) -> MissingRequirementsPlan | None:
can_precheck, requirement_lines = _load_requirement_lines_for_precheck(
requirements_path
)
if not can_precheck or requirement_lines is None:
return None

missing = find_missing_requirements_from_lines(requirement_lines)
if missing is None:
return None

install_lines = build_missing_requirements_install_lines(
requirements_path,
requirement_lines,
missing,
)
if install_lines is None:
return None
if missing and not install_lines:
logger.warning(
"预检查缺失依赖成功,但无法映射到可安装 requirement 行,将回退到完整安装: %s -> %s",
requirements_path,
sorted(missing),
)
return MissingRequirementsPlan(
missing_names=frozenset(missing),
install_lines=(),
fallback_reason="unmapped missing requirement names",
)

return MissingRequirementsPlan(
missing_names=frozenset(missing),
install_lines=install_lines,
)


def find_missing_requirements_or_raise(requirements_path: str) -> set[str]:
missing = find_missing_requirements(requirements_path)
if missing is None:
Expand Down
Loading