perf(QQ Official API): improve streaming message delivery reliability and proactive media sending#6131
Conversation
* fix(qqofficial): rewrite send_streaming for C2C vs non-C2C split * fix(qqofficial): add _extract_response_message_id for safe id extraction * fix(qqofficial): flush stream segment on tool-call break signal * fix(qqofficial): downgrade rich-media to non-stream send in C2C * fix(qqofficial): auto-append \n to final stream chunk (state=10) * fix(qqofficial): propagate stream param to all _send_with_markdown_fallback call sites * fix(qqofficial): retry on STREAM_MARKDOWN_NEWLINE_ERROR with newline fix * fix(qqofficial): handle None/non-dict response in post_c2c_message gracefully
…_by_session QQ API rejects msg_id on proactive media (video/file, msg_type=7) messages sent via the tool-call path, returning "请求参数msg_id无效或越权". The msg_id passive-reply credential is consumed by the first send and cannot be reused for subsequent media uploads in the same session. Remove msg_id from the payload after setting msg_type=7 for video and file sends, for both FRIEND_MESSAGE (C2C) and GROUP_MESSAGE paths.
There was a problem hiding this comment.
Hey - 我发现了 4 个问题,并给出了一些整体性的反馈:
- 在
send_streaming中,建议统一使用asyncio.get_running_loop(),而不是asyncio.get_event_loop()(在较新的 Python 版本和某些事件循环策略下,两者行为可能不同)。 post_c2c_message的类型标注是返回message.Message,但现在在若干分支中会返回None;建议更新返回类型(以及所有假定返回值非 None 的调用方),以避免类型混淆和潜在的运行时错误。- 在
_send_with_markdown_fallback中,payload.copy()是浅拷贝,并且payload['markdown']有时是MarkdownPayload对象,有时是dict;在修改前先将其规范化为dict(例如通过model_dump()/dict())可能更安全,以确保重试和回退路径的行为一致。
面向 AI Agent 的提示词
Please address the comments from this code review:
## Overall Comments
- In `send_streaming`, consider using `asyncio.get_running_loop()` consistently instead of `asyncio.get_event_loop()` (which may behave differently in newer Python versions and under some loop policies).
- `post_c2c_message` is annotated to return `message.Message` but now returns `None` in several branches; updating the return type (and any callers that assume a non-None result) would avoid type confusion and potential runtime errors.
- In `_send_with_markdown_fallback`, `payload.copy()` is a shallow copy and `payload['markdown']` is sometimes a `MarkdownPayload` object, sometimes a `dict`; it may be safer to normalize this to a dict (e.g., via `model_dump()`/`dict()`) before mutation so the retry and fallback paths behave consistently.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="122-134" />
<code_context>
- stream_payload["id"] = ret["id"]
- last_edit_time = asyncio.get_running_loop().time()
+ # 节流:按时间间隔发送中间分片
+ current_time = asyncio.get_event_loop().time()
+ if current_time - last_edit_time >= throttle_interval:
+ ret = cast(
+ message.Message,
+ await self._post_send(stream=stream_payload),
+ )
+ stream_payload["index"] += 1
+ ret_id = self._extract_response_message_id(ret)
+ if ret_id is not None:
+ stream_payload["id"] = ret_id
+ last_edit_time = asyncio.get_event_loop().time()
+ self.send_buffer = None # 清空已发送的分片,避免下次重复发送旧内容
</code_context>
<issue_to_address>
**suggestion:** Use `get_running_loop()` instead of `get_event_loop()` inside async context for future compatibility and consistency.
Since this is already an `async` method and earlier you used `asyncio.get_running_loop().time()`, using `get_event_loop()` here may behave differently under newer asyncio semantics and can raise deprecation warnings. Please switch both calls to `asyncio.get_running_loop().time()` for consistency and future compatibility.
```suggestion
# 节流:按时间间隔发送中间分片
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time >= throttle_interval:
ret = cast(
message.Message,
await self._post_send(stream=stream_payload),
)
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
last_edit_time = asyncio.get_running_loop().time()
self.send_buffer = None # 清空已发送的分片,避免下次重复发送旧内容
```
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="99-108" />
<code_context>
+ # ---- C2C 流式场景 ----
+
+ # tool_call break 信号:工具开始执行,先把已有 buffer 以 state=10 结束当前流式段
+ if chain.type == "break":
+ if self.send_buffer:
+ stream_payload["state"] = 10
+ ret = await self._post_send(stream=stream_payload)
+ ret_id = self._extract_response_message_id(ret)
+ if ret_id is not None:
+ stream_payload["id"] = ret_id
+ # 重置 stream_payload,为下一段流式做准备
+ stream_payload = {
+ "state": 1,
+ "id": None,
+ "index": 0,
+ "reset": False,
+ }
+ last_edit_time = 0
+ continue
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider clearing `send_buffer` after sending the final chunk on `break` to avoid re-sending already-flushed content.
When `chain.type == "break"`, `_post_send(stream=stream_payload)` flushes the current buffered content and `stream_payload`/`last_edit_time` are reset, but `self.send_buffer` is not. On the next iteration, new content will be appended to a buffer that already contains the previously sent final chunk, so the next flush may resend that content. Unless `_post_send` explicitly clears `send_buffer` (it currently doesn’t), `self.send_buffer` should be cleared after `_post_send` in this branch to avoid duplicated output between segments.
</issue_to_address>
### Comment 3
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="408-412" />
<code_context>
+ if stream and self.STREAM_MARKDOWN_NEWLINE_ERROR in str(err):
+ retry_payload = payload.copy()
+
+ markdown_payload = retry_payload.get("markdown")
+ if isinstance(markdown_payload, dict):
+ md_content = cast(str, markdown_payload.get("content", "") or "")
+ if md_content and not md_content.endswith("\n"):
+ retry_payload["markdown"] = {"content": md_content + "\n"}
+
+ content = cast(str | None, retry_payload.get("content"))
</code_context>
<issue_to_address>
**issue (bug_risk):** The retry path only handles `markdown` when it is a `dict`, which may miss the `MarkdownPayload` case.
Since `_post_send` sets `payload["markdown"]` to a `MarkdownPayload` instance, the retry logic that only handles dict markdown will skip this case. That means the newline fix won’t apply and the retried request can still hit the same streaming error. Consider also handling `MarkdownPayload` here (e.g., detect the type, clone, and update its content) rather than only supporting the dict form.
</issue_to_address>
### Comment 4
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="72" />
<code_context>
await self._post_send()
async def send_streaming(self, generator, use_fallback: bool = False):
- """流式输出仅支持消息列表私聊"""
+ """流式输出仅支持消息列表私聊(C2C),其他消息源退化为普通发送"""
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods for C2C streaming, markdown streaming retries, and C2C stream payload/response handling to keep the main functions high-level and easier to follow.
You can keep all current behavior but reduce local complexity by extracting a few focused helpers. These are small and mechanically safe to apply.
### 1. Split C2C streaming concerns in `send_streaming`
`send_streaming` currently mixes:
- non‑C2C accumulation
- C2C tool `break` handling
- throttling + send logic
- stream payload mutations (`state`, `id`, `index`, `reset`)
- buffer reset
You can keep the overall structure but move the low‑level details out of the loop:
```python
async def send_streaming(self, generator, use_fallback: bool = False):
await super().send_streaming(generator, use_fallback)
stream_payload = {"state": 1, "id": None, "index": 0, "reset": False}
last_edit_time = 0
throttle_interval = 1
ret = None
source = self.message_obj.raw_message
try:
async for chain in generator:
source = self.message_obj.raw_message
if not isinstance(source, botpy.message.C2CMessage):
self._append_to_buffer(chain)
continue
if chain.type == "break":
stream_payload, last_edit_time = await self._handle_c2c_break_segment(
stream_payload,
)
continue
self._append_to_buffer(chain)
ret, stream_payload, last_edit_time = await self._maybe_send_c2c_delta(
stream_payload,
last_edit_time,
throttle_interval,
)
if isinstance(source, botpy.message.C2CMessage):
await self._finalize_c2c_stream(stream_payload)
else:
await self._post_send()
except Exception:
logger.error("发送流式消息时出错", exc_info=True)
self.send_buffer = None
```
With small helpers:
```python
def _append_to_buffer(self, chain: MessageChain) -> None:
if not self.send_buffer:
self.send_buffer = chain
else:
self.send_buffer.chain.extend(chain.chain)
async def _handle_c2c_break_segment(
self,
stream_payload: dict,
) -> tuple[dict, float]:
if self.send_buffer:
stream_payload["state"] = 10
ret = await self._post_send(stream=stream_payload)
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
# reset for next segment
return {"state": 1, "id": None, "index": 0, "reset": False}, 0.0
async def _maybe_send_c2c_delta(
self,
stream_payload: dict,
last_edit_time: float,
throttle_interval: float,
) -> tuple[Any, dict, float]:
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time < throttle_interval:
return None, stream_payload, last_edit_time
ret = await self._send_c2c_stream_chunk(stream_payload)
return ret, stream_payload, asyncio.get_running_loop().time()
async def _send_c2c_stream_chunk(self, stream_payload: dict):
ret = cast(message.Message, await self._post_send(stream=stream_payload))
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
self.send_buffer = None
return ret
async def _finalize_c2c_stream(self, stream_payload: dict) -> None:
stream_payload["state"] = 10
await self._post_send(stream=stream_payload)
```
This preserves behavior (including id/index handling and buffer clearing) but keeps the main loop high‑level and easier to read/test.
---
### 2. Isolate streaming newline error retry in `_send_with_markdown_fallback`
Right now `_send_with_markdown_fallback` interleaves:
- streaming newline error correction
- generic “markdown not allowed” fallback
You can separate the streaming‑specific part:
```python
async def _send_with_markdown_fallback(
self,
send_func,
payload: dict,
plain_text: str,
stream: dict | None = None,
):
try:
return await send_func(payload)
except botpy.errors.ServerError as err:
# 1. streaming newline correction
if stream:
retry = await self._retry_stream_newline_error(send_func, payload, err)
if retry is not None:
return retry
# 2. markdown not allowed fallback
if (
self.MARKDOWN_NOT_ALLOWED_ERROR not in str(err)
or not payload.get("markdown")
or not plain_text
):
raise
logger.warning("[QQOfficial] markdown 发送被拒绝,回退到 content 模式重试。")
fallback_payload = payload.copy()
fallback_payload.pop("markdown", None)
fallback_payload["content"] = plain_text
if fallback_payload.get("msg_type") == 2:
fallback_payload["msg_type"] = 0
if stream:
content = cast(str, fallback_payload.get("content") or "")
if content and not content.endswith("\n"):
fallback_payload["content"] = content + "\n"
return await send_func(fallback_payload)
```
Helper:
```python
async def _retry_stream_newline_error(
self,
send_func,
payload: dict,
err: Exception,
):
if self.STREAM_MARKDOWN_NEWLINE_ERROR not in str(err):
return None
retry_payload = payload.copy()
md = retry_payload.get("markdown")
if isinstance(md, dict):
md_content = cast(str, md.get("content", "") or "")
if md_content and not md_content.endswith("\n"):
retry_payload["markdown"] = {"content": md_content + "\n"}
content = cast(str | None, retry_payload.get("content"))
if content and not content.endswith("\n"):
retry_payload["content"] = content + "\n"
logger.warning(
"[QQOfficial] 流式 markdown 分片换行校验失败,已修正后重试一次。"
)
return await send_func(retry_payload)
```
This keeps the main function focused while maintaining all the retry semantics.
---
### 3. Sanitize C2C `stream` payload outside `post_c2c_message`
`post_c2c_message` now mixes HTTP call logic with protocol quirks (`stream.id` removal, `None`/non‑dict responses). You can isolate the stream cleanup and response normalization:
```python
async def post_c2c_message(
self,
openid: str,
content: str | None = None,
# ...
stream: dict | None = None,
) -> message.Message | None:
payload = locals()
payload.pop("self", None)
payload["stream"] = self._sanitize_stream_payload(stream)
route = Route("POST", "/v2/users/{openid}/messages", openid=openid)
result = await self.bot.api._http.request(route, json=payload)
result = self._normalize_c2c_response(result)
if result is None:
return None
return message.Message(**result)
```
Helpers:
```python
@staticmethod
def _sanitize_stream_payload(stream: dict | None) -> dict | None:
if stream is None:
return None
stream_data = dict(stream)
if stream_data.get("id") is None:
stream_data.pop("id", None)
return stream_data
@staticmethod
def _normalize_c2c_response(result: Any) -> dict | None:
if result is None:
logger.warning("[QQOfficial] post_c2c_message: API 返回 None,跳过本次发送")
return None
if not isinstance(result, dict):
logger.error(f"[QQOfficial] post_c2c_message: 响应不是 dict: {result}")
return None
return result
```
This keeps `post_c2c_message` focused on “prepare payload + call API + wrap response”, and moves protocol details into narrowly scoped helpers.
---
These extractions should cut cognitive load significantly without altering behavior or the added functionality.
</issue_to_address>请帮我变得更有用!欢迎在每条评论上点击 👍 或 👎,我会根据你的反馈改进后续的代码审查。
Original comment in English
Hey - I've found 4 issues, and left some high level feedback:
- In
send_streaming, consider usingasyncio.get_running_loop()consistently instead ofasyncio.get_event_loop()(which may behave differently in newer Python versions and under some loop policies). post_c2c_messageis annotated to returnmessage.Messagebut now returnsNonein several branches; updating the return type (and any callers that assume a non-None result) would avoid type confusion and potential runtime errors.- In
_send_with_markdown_fallback,payload.copy()is a shallow copy andpayload['markdown']is sometimes aMarkdownPayloadobject, sometimes adict; it may be safer to normalize this to a dict (e.g., viamodel_dump()/dict()) before mutation so the retry and fallback paths behave consistently.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `send_streaming`, consider using `asyncio.get_running_loop()` consistently instead of `asyncio.get_event_loop()` (which may behave differently in newer Python versions and under some loop policies).
- `post_c2c_message` is annotated to return `message.Message` but now returns `None` in several branches; updating the return type (and any callers that assume a non-None result) would avoid type confusion and potential runtime errors.
- In `_send_with_markdown_fallback`, `payload.copy()` is a shallow copy and `payload['markdown']` is sometimes a `MarkdownPayload` object, sometimes a `dict`; it may be safer to normalize this to a dict (e.g., via `model_dump()`/`dict()`) before mutation so the retry and fallback paths behave consistently.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="122-134" />
<code_context>
- stream_payload["id"] = ret["id"]
- last_edit_time = asyncio.get_running_loop().time()
+ # 节流:按时间间隔发送中间分片
+ current_time = asyncio.get_event_loop().time()
+ if current_time - last_edit_time >= throttle_interval:
+ ret = cast(
+ message.Message,
+ await self._post_send(stream=stream_payload),
+ )
+ stream_payload["index"] += 1
+ ret_id = self._extract_response_message_id(ret)
+ if ret_id is not None:
+ stream_payload["id"] = ret_id
+ last_edit_time = asyncio.get_event_loop().time()
+ self.send_buffer = None # 清空已发送的分片,避免下次重复发送旧内容
</code_context>
<issue_to_address>
**suggestion:** Use `get_running_loop()` instead of `get_event_loop()` inside async context for future compatibility and consistency.
Since this is already an `async` method and earlier you used `asyncio.get_running_loop().time()`, using `get_event_loop()` here may behave differently under newer asyncio semantics and can raise deprecation warnings. Please switch both calls to `asyncio.get_running_loop().time()` for consistency and future compatibility.
```suggestion
# 节流:按时间间隔发送中间分片
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time >= throttle_interval:
ret = cast(
message.Message,
await self._post_send(stream=stream_payload),
)
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
last_edit_time = asyncio.get_running_loop().time()
self.send_buffer = None # 清空已发送的分片,避免下次重复发送旧内容
```
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="99-108" />
<code_context>
+ # ---- C2C 流式场景 ----
+
+ # tool_call break 信号:工具开始执行,先把已有 buffer 以 state=10 结束当前流式段
+ if chain.type == "break":
+ if self.send_buffer:
+ stream_payload["state"] = 10
+ ret = await self._post_send(stream=stream_payload)
+ ret_id = self._extract_response_message_id(ret)
+ if ret_id is not None:
+ stream_payload["id"] = ret_id
+ # 重置 stream_payload,为下一段流式做准备
+ stream_payload = {
+ "state": 1,
+ "id": None,
+ "index": 0,
+ "reset": False,
+ }
+ last_edit_time = 0
+ continue
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider clearing `send_buffer` after sending the final chunk on `break` to avoid re-sending already-flushed content.
When `chain.type == "break"`, `_post_send(stream=stream_payload)` flushes the current buffered content and `stream_payload`/`last_edit_time` are reset, but `self.send_buffer` is not. On the next iteration, new content will be appended to a buffer that already contains the previously sent final chunk, so the next flush may resend that content. Unless `_post_send` explicitly clears `send_buffer` (it currently doesn’t), `self.send_buffer` should be cleared after `_post_send` in this branch to avoid duplicated output between segments.
</issue_to_address>
### Comment 3
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="408-412" />
<code_context>
+ if stream and self.STREAM_MARKDOWN_NEWLINE_ERROR in str(err):
+ retry_payload = payload.copy()
+
+ markdown_payload = retry_payload.get("markdown")
+ if isinstance(markdown_payload, dict):
+ md_content = cast(str, markdown_payload.get("content", "") or "")
+ if md_content and not md_content.endswith("\n"):
+ retry_payload["markdown"] = {"content": md_content + "\n"}
+
+ content = cast(str | None, retry_payload.get("content"))
</code_context>
<issue_to_address>
**issue (bug_risk):** The retry path only handles `markdown` when it is a `dict`, which may miss the `MarkdownPayload` case.
Since `_post_send` sets `payload["markdown"]` to a `MarkdownPayload` instance, the retry logic that only handles dict markdown will skip this case. That means the newline fix won’t apply and the retried request can still hit the same streaming error. Consider also handling `MarkdownPayload` here (e.g., detect the type, clone, and update its content) rather than only supporting the dict form.
</issue_to_address>
### Comment 4
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="72" />
<code_context>
await self._post_send()
async def send_streaming(self, generator, use_fallback: bool = False):
- """流式输出仅支持消息列表私聊"""
+ """流式输出仅支持消息列表私聊(C2C),其他消息源退化为普通发送"""
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods for C2C streaming, markdown streaming retries, and C2C stream payload/response handling to keep the main functions high-level and easier to follow.
You can keep all current behavior but reduce local complexity by extracting a few focused helpers. These are small and mechanically safe to apply.
### 1. Split C2C streaming concerns in `send_streaming`
`send_streaming` currently mixes:
- non‑C2C accumulation
- C2C tool `break` handling
- throttling + send logic
- stream payload mutations (`state`, `id`, `index`, `reset`)
- buffer reset
You can keep the overall structure but move the low‑level details out of the loop:
```python
async def send_streaming(self, generator, use_fallback: bool = False):
await super().send_streaming(generator, use_fallback)
stream_payload = {"state": 1, "id": None, "index": 0, "reset": False}
last_edit_time = 0
throttle_interval = 1
ret = None
source = self.message_obj.raw_message
try:
async for chain in generator:
source = self.message_obj.raw_message
if not isinstance(source, botpy.message.C2CMessage):
self._append_to_buffer(chain)
continue
if chain.type == "break":
stream_payload, last_edit_time = await self._handle_c2c_break_segment(
stream_payload,
)
continue
self._append_to_buffer(chain)
ret, stream_payload, last_edit_time = await self._maybe_send_c2c_delta(
stream_payload,
last_edit_time,
throttle_interval,
)
if isinstance(source, botpy.message.C2CMessage):
await self._finalize_c2c_stream(stream_payload)
else:
await self._post_send()
except Exception:
logger.error("发送流式消息时出错", exc_info=True)
self.send_buffer = None
```
With small helpers:
```python
def _append_to_buffer(self, chain: MessageChain) -> None:
if not self.send_buffer:
self.send_buffer = chain
else:
self.send_buffer.chain.extend(chain.chain)
async def _handle_c2c_break_segment(
self,
stream_payload: dict,
) -> tuple[dict, float]:
if self.send_buffer:
stream_payload["state"] = 10
ret = await self._post_send(stream=stream_payload)
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
# reset for next segment
return {"state": 1, "id": None, "index": 0, "reset": False}, 0.0
async def _maybe_send_c2c_delta(
self,
stream_payload: dict,
last_edit_time: float,
throttle_interval: float,
) -> tuple[Any, dict, float]:
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time < throttle_interval:
return None, stream_payload, last_edit_time
ret = await self._send_c2c_stream_chunk(stream_payload)
return ret, stream_payload, asyncio.get_running_loop().time()
async def _send_c2c_stream_chunk(self, stream_payload: dict):
ret = cast(message.Message, await self._post_send(stream=stream_payload))
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
self.send_buffer = None
return ret
async def _finalize_c2c_stream(self, stream_payload: dict) -> None:
stream_payload["state"] = 10
await self._post_send(stream=stream_payload)
```
This preserves behavior (including id/index handling and buffer clearing) but keeps the main loop high‑level and easier to read/test.
---
### 2. Isolate streaming newline error retry in `_send_with_markdown_fallback`
Right now `_send_with_markdown_fallback` interleaves:
- streaming newline error correction
- generic “markdown not allowed” fallback
You can separate the streaming‑specific part:
```python
async def _send_with_markdown_fallback(
self,
send_func,
payload: dict,
plain_text: str,
stream: dict | None = None,
):
try:
return await send_func(payload)
except botpy.errors.ServerError as err:
# 1. streaming newline correction
if stream:
retry = await self._retry_stream_newline_error(send_func, payload, err)
if retry is not None:
return retry
# 2. markdown not allowed fallback
if (
self.MARKDOWN_NOT_ALLOWED_ERROR not in str(err)
or not payload.get("markdown")
or not plain_text
):
raise
logger.warning("[QQOfficial] markdown 发送被拒绝,回退到 content 模式重试。")
fallback_payload = payload.copy()
fallback_payload.pop("markdown", None)
fallback_payload["content"] = plain_text
if fallback_payload.get("msg_type") == 2:
fallback_payload["msg_type"] = 0
if stream:
content = cast(str, fallback_payload.get("content") or "")
if content and not content.endswith("\n"):
fallback_payload["content"] = content + "\n"
return await send_func(fallback_payload)
```
Helper:
```python
async def _retry_stream_newline_error(
self,
send_func,
payload: dict,
err: Exception,
):
if self.STREAM_MARKDOWN_NEWLINE_ERROR not in str(err):
return None
retry_payload = payload.copy()
md = retry_payload.get("markdown")
if isinstance(md, dict):
md_content = cast(str, md.get("content", "") or "")
if md_content and not md_content.endswith("\n"):
retry_payload["markdown"] = {"content": md_content + "\n"}
content = cast(str | None, retry_payload.get("content"))
if content and not content.endswith("\n"):
retry_payload["content"] = content + "\n"
logger.warning(
"[QQOfficial] 流式 markdown 分片换行校验失败,已修正后重试一次。"
)
return await send_func(retry_payload)
```
This keeps the main function focused while maintaining all the retry semantics.
---
### 3. Sanitize C2C `stream` payload outside `post_c2c_message`
`post_c2c_message` now mixes HTTP call logic with protocol quirks (`stream.id` removal, `None`/non‑dict responses). You can isolate the stream cleanup and response normalization:
```python
async def post_c2c_message(
self,
openid: str,
content: str | None = None,
# ...
stream: dict | None = None,
) -> message.Message | None:
payload = locals()
payload.pop("self", None)
payload["stream"] = self._sanitize_stream_payload(stream)
route = Route("POST", "/v2/users/{openid}/messages", openid=openid)
result = await self.bot.api._http.request(route, json=payload)
result = self._normalize_c2c_response(result)
if result is None:
return None
return message.Message(**result)
```
Helpers:
```python
@staticmethod
def _sanitize_stream_payload(stream: dict | None) -> dict | None:
if stream is None:
return None
stream_data = dict(stream)
if stream_data.get("id") is None:
stream_data.pop("id", None)
return stream_data
@staticmethod
def _normalize_c2c_response(result: Any) -> dict | None:
if result is None:
logger.warning("[QQOfficial] post_c2c_message: API 返回 None,跳过本次发送")
return None
if not isinstance(result, dict):
logger.error(f"[QQOfficial] post_c2c_message: 响应不是 dict: {result}")
return None
return result
```
This keeps `post_c2c_message` focused on “prepare payload + call API + wrap response”, and moves protocol details into narrowly scoped helpers.
---
These extractions should cut cognitive load significantly without altering behavior or the added functionality.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the stability and compliance of the QQ Official adapter, particularly for streaming messages and proactive media sending. It addresses several critical issues that led to crashes, incorrect message delivery, and API errors, ensuring a more reliable and robust communication experience within the platform. The changes streamline the message handling process, making it more resilient to various API response formats and user interaction patterns. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request significantly enhances QQ Official streaming message functionality, primarily within qqofficial_message_event.py. Key changes include refining the send_streaming method to support C2C messages with fallback for other types, introducing tool_call break signal handling for stream segments, and implementing a new _extract_response_message_id utility. The update also adds logic to downgrade streaming to normal sending for rich media, ensures final streaming markdown segments end with a newline as per QQ API requirements, and incorporates retry mechanisms for newline validation errors. Additionally, post_c2c_message now handles stream.id=None by removing it from the payload and improves API response error handling. In qqofficial_platform_adapter.py, msg_id is explicitly removed from media message payloads to prevent '越权 error' from the QQ API. A review comment highlights a regression in qqofficial_message_event.py, suggesting that asyncio.get_event_loop().time() should be reverted to asyncio.get_running_loop().time() for better explicitness and consistency with the previous implementation.
| current_time = asyncio.get_event_loop().time() | ||
| if current_time - last_edit_time >= throttle_interval: | ||
| ret = cast( | ||
| message.Message, | ||
| await self._post_send(stream=stream_payload), | ||
| ) | ||
| stream_payload["index"] += 1 | ||
| ret_id = self._extract_response_message_id(ret) | ||
| if ret_id is not None: | ||
| stream_payload["id"] = ret_id | ||
| last_edit_time = asyncio.get_event_loop().time() |
There was a problem hiding this comment.
It's recommended to use asyncio.get_running_loop() instead of asyncio.get_event_loop() inside coroutines (since Python 3.7). get_running_loop() is more explicit as it returns the currently running event loop and raises an error if none is running. The previous implementation was using get_running_loop(), so this change appears to be an unintentional regression.
| current_time = asyncio.get_event_loop().time() | |
| if current_time - last_edit_time >= throttle_interval: | |
| ret = cast( | |
| message.Message, | |
| await self._post_send(stream=stream_payload), | |
| ) | |
| stream_payload["index"] += 1 | |
| ret_id = self._extract_response_message_id(ret) | |
| if ret_id is not None: | |
| stream_payload["id"] = ret_id | |
| last_edit_time = asyncio.get_event_loop().time() | |
| current_time = asyncio.get_running_loop().time() | |
| if current_time - last_edit_time >= throttle_interval: | |
| ret = cast( | |
| message.Message, | |
| await self._post_send(stream=stream_payload), | |
| ) | |
| stream_payload["index"] += 1 | |
| ret_id = self._extract_response_message_id(ret) | |
| if ret_id is not None: | |
| stream_payload["id"] = ret_id | |
| last_edit_time = asyncio.get_running_loop().time() |
992a701 to
872aa7c
Compare
…_loop() asyncio.get_event_loop() is deprecated since Python 3.10 and raises a DeprecationWarning (or errors) when called from inside a running coroutine without a current event loop set on the thread. Replace both call-sites in the streaming throttle logic with asyncio.get_running_loop(), which is the correct API to use inside an already-running async context. Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Fix multiple bugs in QQ Official adapter's streaming delivery and proactive media sending that caused C2C messages to crash, produce incorrect output, or return API errors when sending video/file via tool
calls.
Streaming bugs (qqofficial_message_event.py):
causing empty or duplicated sends.
Proactive media bug (qqofficial_platform_adapter.py):
Modifications / 改动点
astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
passive-reply msg_id.
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.由 Sourcery 生成的概要
改进 QQ 官方适配器的流式传输和主动媒体发送,使 C2C 和群消息更加可靠,并符合 QQ API 的各项要求。
Bug 修复:
stream.id值(例如null),并在最终流式分片上强制追加必要的换行后缀,以满足 QQ API 的校验要求。post_c2c_message中优雅地处理意外或None的 QQ API 响应,而不是抛出运行时错误。msg_id,使 QQ API 能够接受媒体消息而不会出现“权限”相关错误。增强:
Original summary in English
Summary by Sourcery
Improve QQ Official adapter streaming delivery and proactive media sending to make C2C and group messages more reliable and compliant with QQ API requirements.
Bug Fixes:
Enhancements: