Skip to content

perf(QQ Official API): improve streaming message delivery reliability and proactive media sending#6131

Merged
Soulter merged 3 commits intoAstrBotDevs:masterfrom
2ndelement:feat/qqofficial-streaming
Mar 12, 2026
Merged

perf(QQ Official API): improve streaming message delivery reliability and proactive media sending#6131
Soulter merged 3 commits intoAstrBotDevs:masterfrom
2ndelement:feat/qqofficial-streaming

Conversation

@2ndelement
Copy link
Contributor

@2ndelement 2ndelement commented Mar 12, 2026

Fix multiple bugs in QQ Official adapter's streaming delivery and proactive media sending that caused C2C messages to crash, produce incorrect output, or return API errors when sending video/file via tool
calls.

Streaming bugs (qqofficial_message_event.py):

  1. ret["id"] assumed the API always returns a dict — crashes when botpy returns a Message object, silently breaking the entire streaming session.
  2. Non-C2C messages (group chats) incorrectly entered the streaming loop but were never flushed; the final return await super().send_streaming(generator, ...) re-consumed an already-exhausted generator,
    causing empty or duplicated sends.
  3. Intermediate stream chunks were not cleared from send_buffer after each send, causing content to accumulate and repeat across chunks.
  4. post_c2c_message raised RuntimeError on None / non-dict API responses (e.g. rate-limit or server errors), crashing the adapter.
  5. First stream chunk sent stream.id = null to QQ API, which rejects null id fields.
  6. Final chunk (state=10) was missing the mandatory \n suffix required by QQ API, triggering STREAM_MARKDOWN_NEWLINE_ERROR.

Proactive media bug (qqofficial_platform_adapter.py):

  1. When sending video/file via tool calls, msg_id (passive-reply credential) was included in the media payload (msg_type=7). QQ API rejects msg_id on proactive media messages, returning 请求参数msg_id无效或越权

Modifications / 改动点

astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py

  • Rewrote send_streaming: non-C2C messages now accumulate in send_buffer and dispatch via a single normal _post_send(), never entering the streaming protocol.
  • Removed the broken return await super().send_streaming(generator, ...) tail call; moved super() call to function entry to correctly set _has_send_oper.
  • Clear send_buffer after each intermediate chunk is sent to prevent content accumulation.
  • Added tool-call break signal handling — flushes the current stream segment with state=10 before the tool executes, then resets stream state for the next segment.
  • Added _extract_response_message_id() static helper to safely extract id from either a dict or Message object, replacing fragile ret["id"] direct access.
  • Rich-media payloads (image/audio) in C2C now automatically downgrade to non-stream mode to avoid platform-side stream validation errors.
  • Final chunk (state=10) automatically appends \n to plain_text as required by QQ API.
  • Added stream parameter to _send_with_markdown_fallback; retries with \n-appended payload on STREAM_MARKDOWN_NEWLINE_ERROR; fallback content also gets \n in streaming context.
  • post_c2c_message: filters stream.id = None before serialization; returns None gracefully instead of raising RuntimeError on unexpected API responses.

astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py

  • In _send_by_session_common, remove msg_id from payload after setting msg_type=7 for video and file sends, for both FRIEND_MESSAGE (C2C) and GROUP_MESSAGE paths. Proactive media messages do not require a
    passive-reply msg_id.
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

2026-03-12 22-04-29
2026-03-12 22-09-43
2026-03-12 22-23-09


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

由 Sourcery 生成的概要

改进 QQ 官方适配器的流式传输和主动媒体发送,使 C2C 和群消息更加可靠,并符合 QQ API 的各项要求。

Bug 修复:

  • 当 QQ SDK 返回 Message 对象或非字典响应时,防止 C2C 流式传输中的崩溃和错误行为。
  • 确保非 C2C 消息不再进入流式循环,而是作为普通消息发送一次,避免重复发送或空消息发送。
  • 在每次发送分片后清空流式缓冲区,避免内容被重复或累积。
  • 通过在工具调用前正确关闭并重置流式分段来处理工具调用的中断信号。
  • 避免发送无效的 stream.id 值(例如 null),并在最终流式分片上强制追加必要的换行后缀,以满足 QQ API 的校验要求。
  • post_c2c_message 中优雅地处理意外或 None 的 QQ API 响应,而不是抛出运行时错误。
  • 从主动发送视频/文件的工具调用载荷中移除 msg_id,使 QQ API 能够接受媒体消息而不会出现“权限”相关错误。

增强:

  • 通过辅助方法统一从字典或 Message 对象中提取响应消息 ID,使流式传输的 ID 处理更加健壮。
  • 将带有富媒体的 C2C 流式传输降级为非流式模式,以避免在平台侧触发校验问题。
  • 扩展 markdown 回退逻辑以支持流式上下文,包括自动换行修正,以及在最终分片上更安全地从 markdown 降级为纯内容。
Original summary in English

Summary by Sourcery

Improve QQ Official adapter streaming delivery and proactive media sending to make C2C and group messages more reliable and compliant with QQ API requirements.

Bug Fixes:

  • Prevent crashes and incorrect behavior in C2C streaming when the QQ SDK returns Message objects or non-dict responses.
  • Ensure non-C2C messages no longer enter the streaming loop and are sent once as normal messages without duplication or empty sends.
  • Clear streaming buffers after each sent chunk to avoid repeated or accumulated content.
  • Handle tool-call break signals by correctly closing and resetting streaming segments before tool execution.
  • Avoid sending invalid stream.id values (such as null) and enforce required newline suffixes on final streaming chunks to satisfy QQ API validation.
  • Gracefully handle unexpected or None QQ API responses in post_c2c_message instead of raising runtime errors.
  • Remove msg_id from proactive video/file tool-call payloads so QQ API accepts media messages without权限 errors.

Enhancements:

  • Unify extraction of response message IDs from either dicts or Message objects via a helper to make streaming id handling more robust.
  • Downgrade C2C streaming with rich media to non-stream mode to avoid platform-side validation issues.
  • Extend markdown fallback logic to support streaming context, including automatic newline correction and safer markdown-to-content downgrade for final chunks.

* fix(qqofficial): rewrite send_streaming for C2C vs non-C2C split

* fix(qqofficial): add _extract_response_message_id for safe id extraction

* fix(qqofficial): flush stream segment on tool-call break signal

* fix(qqofficial): downgrade rich-media to non-stream send in C2C

* fix(qqofficial): auto-append \n to final stream chunk (state=10)

* fix(qqofficial): propagate stream param to all _send_with_markdown_fallback call sites

* fix(qqofficial): retry on STREAM_MARKDOWN_NEWLINE_ERROR with newline fix

* fix(qqofficial): handle None/non-dict response in post_c2c_message gracefully
…_by_session

QQ API rejects msg_id on proactive media (video/file, msg_type=7) messages
sent via the tool-call path, returning "请求参数msg_id无效或越权". The
msg_id passive-reply credential is consumed by the first send and cannot be
reused for subsequent media uploads in the same session.

Remove msg_id from the payload after setting msg_type=7 for video and file
sends, for both FRIEND_MESSAGE (C2C) and GROUP_MESSAGE paths.
@auto-assign auto-assign bot requested review from Fridemn and advent259141 March 12, 2026 14:52
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. labels Mar 12, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 4 个问题,并给出了一些整体性的反馈:

  • send_streaming 中,建议统一使用 asyncio.get_running_loop(),而不是 asyncio.get_event_loop()(在较新的 Python 版本和某些事件循环策略下,两者行为可能不同)。
  • post_c2c_message 的类型标注是返回 message.Message,但现在在若干分支中会返回 None;建议更新返回类型(以及所有假定返回值非 None 的调用方),以避免类型混淆和潜在的运行时错误。
  • _send_with_markdown_fallback 中,payload.copy() 是浅拷贝,并且 payload['markdown'] 有时是 MarkdownPayload 对象,有时是 dict;在修改前先将其规范化为 dict(例如通过 model_dump() / dict())可能更安全,以确保重试和回退路径的行为一致。
面向 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- In `send_streaming`, consider using `asyncio.get_running_loop()` consistently instead of `asyncio.get_event_loop()` (which may behave differently in newer Python versions and under some loop policies).
- `post_c2c_message` is annotated to return `message.Message` but now returns `None` in several branches; updating the return type (and any callers that assume a non-None result) would avoid type confusion and potential runtime errors.
- In `_send_with_markdown_fallback`, `payload.copy()` is a shallow copy and `payload['markdown']` is sometimes a `MarkdownPayload` object, sometimes a `dict`; it may be safer to normalize this to a dict (e.g., via `model_dump()`/`dict()`) before mutation so the retry and fallback paths behave consistently.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="122-134" />
<code_context>
-                        stream_payload["id"] = ret["id"]
-                        last_edit_time = asyncio.get_running_loop().time()
+                # 节流:按时间间隔发送中间分片
+                current_time = asyncio.get_event_loop().time()
+                if current_time - last_edit_time >= throttle_interval:
+                    ret = cast(
+                        message.Message,
+                        await self._post_send(stream=stream_payload),
+                    )
+                    stream_payload["index"] += 1
+                    ret_id = self._extract_response_message_id(ret)
+                    if ret_id is not None:
+                        stream_payload["id"] = ret_id
+                    last_edit_time = asyncio.get_event_loop().time()
+                    self.send_buffer = None  # 清空已发送的分片,避免下次重复发送旧内容

</code_context>
<issue_to_address>
**suggestion:** Use `get_running_loop()` instead of `get_event_loop()` inside async context for future compatibility and consistency.

Since this is already an `async` method and earlier you used `asyncio.get_running_loop().time()`, using `get_event_loop()` here may behave differently under newer asyncio semantics and can raise deprecation warnings. Please switch both calls to `asyncio.get_running_loop().time()` for consistency and future compatibility.

```suggestion
                # 节流:按时间间隔发送中间分片
                current_time = asyncio.get_running_loop().time()
                if current_time - last_edit_time >= throttle_interval:
                    ret = cast(
                        message.Message,
                        await self._post_send(stream=stream_payload),
                    )
                    stream_payload["index"] += 1
                    ret_id = self._extract_response_message_id(ret)
                    if ret_id is not None:
                        stream_payload["id"] = ret_id
                    last_edit_time = asyncio.get_running_loop().time()
                    self.send_buffer = None  # 清空已发送的分片,避免下次重复发送旧内容
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="99-108" />
<code_context>
+                # ---- C2C 流式场景 ----
+
+                # tool_call break 信号:工具开始执行,先把已有 buffer 以 state=10 结束当前流式段
+                if chain.type == "break":
+                    if self.send_buffer:
+                        stream_payload["state"] = 10
+                        ret = await self._post_send(stream=stream_payload)
+                        ret_id = self._extract_response_message_id(ret)
+                        if ret_id is not None:
+                            stream_payload["id"] = ret_id
+                    # 重置 stream_payload,为下一段流式做准备
+                    stream_payload = {
+                        "state": 1,
+                        "id": None,
+                        "index": 0,
+                        "reset": False,
+                    }
+                    last_edit_time = 0
+                    continue
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider clearing `send_buffer` after sending the final chunk on `break` to avoid re-sending already-flushed content.

When `chain.type == "break"`, `_post_send(stream=stream_payload)` flushes the current buffered content and `stream_payload`/`last_edit_time` are reset, but `self.send_buffer` is not. On the next iteration, new content will be appended to a buffer that already contains the previously sent final chunk, so the next flush may resend that content. Unless `_post_send` explicitly clears `send_buffer` (it currently doesn’t), `self.send_buffer` should be cleared after `_post_send` in this branch to avoid duplicated output between segments.
</issue_to_address>

### Comment 3
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="408-412" />
<code_context>
+            if stream and self.STREAM_MARKDOWN_NEWLINE_ERROR in str(err):
+                retry_payload = payload.copy()
+
+                markdown_payload = retry_payload.get("markdown")
+                if isinstance(markdown_payload, dict):
+                    md_content = cast(str, markdown_payload.get("content", "") or "")
+                    if md_content and not md_content.endswith("\n"):
+                        retry_payload["markdown"] = {"content": md_content + "\n"}
+
+                content = cast(str | None, retry_payload.get("content"))
</code_context>
<issue_to_address>
**issue (bug_risk):** The retry path only handles `markdown` when it is a `dict`, which may miss the `MarkdownPayload` case.

Since `_post_send` sets `payload["markdown"]` to a `MarkdownPayload` instance, the retry logic that only handles dict markdown will skip this case. That means the newline fix won’t apply and the retried request can still hit the same streaming error. Consider also handling `MarkdownPayload` here (e.g., detect the type, clone, and update its content) rather than only supporting the dict form.
</issue_to_address>

### Comment 4
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="72" />
<code_context>
         await self._post_send()

     async def send_streaming(self, generator, use_fallback: bool = False):
-        """流式输出仅支持消息列表私聊"""
+        """流式输出仅支持消息列表私聊(C2C),其他消息源退化为普通发送"""
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods for C2C streaming, markdown streaming retries, and C2C stream payload/response handling to keep the main functions high-level and easier to follow.

You can keep all current behavior but reduce local complexity by extracting a few focused helpers. These are small and mechanically safe to apply.

### 1. Split C2C streaming concerns in `send_streaming`

`send_streaming` currently mixes:

- non‑C2C accumulation
- C2C tool `break` handling
- throttling + send logic
- stream payload mutations (`state`, `id`, `index`, `reset`)
- buffer reset

You can keep the overall structure but move the low‑level details out of the loop:

```python
async def send_streaming(self, generator, use_fallback: bool = False):
    await super().send_streaming(generator, use_fallback)
    stream_payload = {"state": 1, "id": None, "index": 0, "reset": False}
    last_edit_time = 0
    throttle_interval = 1
    ret = None
    source = self.message_obj.raw_message

    try:
        async for chain in generator:
            source = self.message_obj.raw_message

            if not isinstance(source, botpy.message.C2CMessage):
                self._append_to_buffer(chain)
                continue

            if chain.type == "break":
                stream_payload, last_edit_time = await self._handle_c2c_break_segment(
                    stream_payload,
                )
                continue

            self._append_to_buffer(chain)
            ret, stream_payload, last_edit_time = await self._maybe_send_c2c_delta(
                stream_payload,
                last_edit_time,
                throttle_interval,
            )

        if isinstance(source, botpy.message.C2CMessage):
            await self._finalize_c2c_stream(stream_payload)
        else:
            await self._post_send()
    except Exception:
        logger.error("发送流式消息时出错", exc_info=True)
        self.send_buffer = None
```

With small helpers:

```python
def _append_to_buffer(self, chain: MessageChain) -> None:
    if not self.send_buffer:
        self.send_buffer = chain
    else:
        self.send_buffer.chain.extend(chain.chain)

async def _handle_c2c_break_segment(
    self,
    stream_payload: dict,
) -> tuple[dict, float]:
    if self.send_buffer:
        stream_payload["state"] = 10
        ret = await self._post_send(stream=stream_payload)
        ret_id = self._extract_response_message_id(ret)
        if ret_id is not None:
            stream_payload["id"] = ret_id

    # reset for next segment
    return {"state": 1, "id": None, "index": 0, "reset": False}, 0.0

async def _maybe_send_c2c_delta(
    self,
    stream_payload: dict,
    last_edit_time: float,
    throttle_interval: float,
) -> tuple[Any, dict, float]:
    current_time = asyncio.get_running_loop().time()
    if current_time - last_edit_time < throttle_interval:
        return None, stream_payload, last_edit_time

    ret = await self._send_c2c_stream_chunk(stream_payload)
    return ret, stream_payload, asyncio.get_running_loop().time()

async def _send_c2c_stream_chunk(self, stream_payload: dict):
    ret = cast(message.Message, await self._post_send(stream=stream_payload))
    stream_payload["index"] += 1
    ret_id = self._extract_response_message_id(ret)
    if ret_id is not None:
        stream_payload["id"] = ret_id
    self.send_buffer = None
    return ret

async def _finalize_c2c_stream(self, stream_payload: dict) -> None:
    stream_payload["state"] = 10
    await self._post_send(stream=stream_payload)
```

This preserves behavior (including id/index handling and buffer clearing) but keeps the main loop high‑level and easier to read/test.

---

### 2. Isolate streaming newline error retry in `_send_with_markdown_fallback`

Right now `_send_with_markdown_fallback` interleaves:

- streaming newline error correction
- generic “markdown not allowed” fallback

You can separate the streaming‑specific part:

```python
async def _send_with_markdown_fallback(
    self,
    send_func,
    payload: dict,
    plain_text: str,
    stream: dict | None = None,
):
    try:
        return await send_func(payload)
    except botpy.errors.ServerError as err:
        # 1. streaming newline correction
        if stream:
            retry = await self._retry_stream_newline_error(send_func, payload, err)
            if retry is not None:
                return retry

        # 2. markdown not allowed fallback
        if (
            self.MARKDOWN_NOT_ALLOWED_ERROR not in str(err)
            or not payload.get("markdown")
            or not plain_text
        ):
            raise

        logger.warning("[QQOfficial] markdown 发送被拒绝,回退到 content 模式重试。")
        fallback_payload = payload.copy()
        fallback_payload.pop("markdown", None)
        fallback_payload["content"] = plain_text
        if fallback_payload.get("msg_type") == 2:
            fallback_payload["msg_type"] = 0
        if stream:
            content = cast(str, fallback_payload.get("content") or "")
            if content and not content.endswith("\n"):
                fallback_payload["content"] = content + "\n"
        return await send_func(fallback_payload)
```

Helper:

```python
async def _retry_stream_newline_error(
    self,
    send_func,
    payload: dict,
    err: Exception,
):
    if self.STREAM_MARKDOWN_NEWLINE_ERROR not in str(err):
        return None

    retry_payload = payload.copy()

    md = retry_payload.get("markdown")
    if isinstance(md, dict):
        md_content = cast(str, md.get("content", "") or "")
        if md_content and not md_content.endswith("\n"):
            retry_payload["markdown"] = {"content": md_content + "\n"}

    content = cast(str | None, retry_payload.get("content"))
    if content and not content.endswith("\n"):
        retry_payload["content"] = content + "\n"

    logger.warning(
        "[QQOfficial] 流式 markdown 分片换行校验失败,已修正后重试一次。"
    )
    return await send_func(retry_payload)
```

This keeps the main function focused while maintaining all the retry semantics.

---

### 3. Sanitize C2C `stream` payload outside `post_c2c_message`

`post_c2c_message` now mixes HTTP call logic with protocol quirks (`stream.id` removal, `None`/non‑dict responses). You can isolate the stream cleanup and response normalization:

```python
async def post_c2c_message(
    self,
    openid: str,
    content: str | None = None,
    # ...
    stream: dict | None = None,
) -> message.Message | None:
    payload = locals()
    payload.pop("self", None)
    payload["stream"] = self._sanitize_stream_payload(stream)

    route = Route("POST", "/v2/users/{openid}/messages", openid=openid)
    result = await self.bot.api._http.request(route, json=payload)

    result = self._normalize_c2c_response(result)
    if result is None:
        return None
    return message.Message(**result)
```

Helpers:

```python
@staticmethod
def _sanitize_stream_payload(stream: dict | None) -> dict | None:
    if stream is None:
        return None
    stream_data = dict(stream)
    if stream_data.get("id") is None:
        stream_data.pop("id", None)
    return stream_data

@staticmethod
def _normalize_c2c_response(result: Any) -> dict | None:
    if result is None:
        logger.warning("[QQOfficial] post_c2c_message: API 返回 None,跳过本次发送")
        return None
    if not isinstance(result, dict):
        logger.error(f"[QQOfficial] post_c2c_message: 响应不是 dict: {result}")
        return None
    return result
```

This keeps `post_c2c_message` focused on “prepare payload + call API + wrap response”, and moves protocol details into narrowly scoped helpers.

---

These extractions should cut cognitive load significantly without altering behavior or the added functionality.
</issue_to_address>

Sourcery 对开源项目免费 —— 如果你觉得我们的代码审查有帮助,欢迎分享 ✨
请帮我变得更有用!欢迎在每条评论上点击 👍 或 👎,我会根据你的反馈改进后续的代码审查。
Original comment in English

Hey - I've found 4 issues, and left some high level feedback:

  • In send_streaming, consider using asyncio.get_running_loop() consistently instead of asyncio.get_event_loop() (which may behave differently in newer Python versions and under some loop policies).
  • post_c2c_message is annotated to return message.Message but now returns None in several branches; updating the return type (and any callers that assume a non-None result) would avoid type confusion and potential runtime errors.
  • In _send_with_markdown_fallback, payload.copy() is a shallow copy and payload['markdown'] is sometimes a MarkdownPayload object, sometimes a dict; it may be safer to normalize this to a dict (e.g., via model_dump()/dict()) before mutation so the retry and fallback paths behave consistently.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `send_streaming`, consider using `asyncio.get_running_loop()` consistently instead of `asyncio.get_event_loop()` (which may behave differently in newer Python versions and under some loop policies).
- `post_c2c_message` is annotated to return `message.Message` but now returns `None` in several branches; updating the return type (and any callers that assume a non-None result) would avoid type confusion and potential runtime errors.
- In `_send_with_markdown_fallback`, `payload.copy()` is a shallow copy and `payload['markdown']` is sometimes a `MarkdownPayload` object, sometimes a `dict`; it may be safer to normalize this to a dict (e.g., via `model_dump()`/`dict()`) before mutation so the retry and fallback paths behave consistently.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="122-134" />
<code_context>
-                        stream_payload["id"] = ret["id"]
-                        last_edit_time = asyncio.get_running_loop().time()
+                # 节流:按时间间隔发送中间分片
+                current_time = asyncio.get_event_loop().time()
+                if current_time - last_edit_time >= throttle_interval:
+                    ret = cast(
+                        message.Message,
+                        await self._post_send(stream=stream_payload),
+                    )
+                    stream_payload["index"] += 1
+                    ret_id = self._extract_response_message_id(ret)
+                    if ret_id is not None:
+                        stream_payload["id"] = ret_id
+                    last_edit_time = asyncio.get_event_loop().time()
+                    self.send_buffer = None  # 清空已发送的分片,避免下次重复发送旧内容

</code_context>
<issue_to_address>
**suggestion:** Use `get_running_loop()` instead of `get_event_loop()` inside async context for future compatibility and consistency.

Since this is already an `async` method and earlier you used `asyncio.get_running_loop().time()`, using `get_event_loop()` here may behave differently under newer asyncio semantics and can raise deprecation warnings. Please switch both calls to `asyncio.get_running_loop().time()` for consistency and future compatibility.

```suggestion
                # 节流:按时间间隔发送中间分片
                current_time = asyncio.get_running_loop().time()
                if current_time - last_edit_time >= throttle_interval:
                    ret = cast(
                        message.Message,
                        await self._post_send(stream=stream_payload),
                    )
                    stream_payload["index"] += 1
                    ret_id = self._extract_response_message_id(ret)
                    if ret_id is not None:
                        stream_payload["id"] = ret_id
                    last_edit_time = asyncio.get_running_loop().time()
                    self.send_buffer = None  # 清空已发送的分片,避免下次重复发送旧内容
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="99-108" />
<code_context>
+                # ---- C2C 流式场景 ----
+
+                # tool_call break 信号:工具开始执行,先把已有 buffer 以 state=10 结束当前流式段
+                if chain.type == "break":
+                    if self.send_buffer:
+                        stream_payload["state"] = 10
+                        ret = await self._post_send(stream=stream_payload)
+                        ret_id = self._extract_response_message_id(ret)
+                        if ret_id is not None:
+                            stream_payload["id"] = ret_id
+                    # 重置 stream_payload,为下一段流式做准备
+                    stream_payload = {
+                        "state": 1,
+                        "id": None,
+                        "index": 0,
+                        "reset": False,
+                    }
+                    last_edit_time = 0
+                    continue
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider clearing `send_buffer` after sending the final chunk on `break` to avoid re-sending already-flushed content.

When `chain.type == "break"`, `_post_send(stream=stream_payload)` flushes the current buffered content and `stream_payload`/`last_edit_time` are reset, but `self.send_buffer` is not. On the next iteration, new content will be appended to a buffer that already contains the previously sent final chunk, so the next flush may resend that content. Unless `_post_send` explicitly clears `send_buffer` (it currently doesn’t), `self.send_buffer` should be cleared after `_post_send` in this branch to avoid duplicated output between segments.
</issue_to_address>

### Comment 3
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="408-412" />
<code_context>
+            if stream and self.STREAM_MARKDOWN_NEWLINE_ERROR in str(err):
+                retry_payload = payload.copy()
+
+                markdown_payload = retry_payload.get("markdown")
+                if isinstance(markdown_payload, dict):
+                    md_content = cast(str, markdown_payload.get("content", "") or "")
+                    if md_content and not md_content.endswith("\n"):
+                        retry_payload["markdown"] = {"content": md_content + "\n"}
+
+                content = cast(str | None, retry_payload.get("content"))
</code_context>
<issue_to_address>
**issue (bug_risk):** The retry path only handles `markdown` when it is a `dict`, which may miss the `MarkdownPayload` case.

Since `_post_send` sets `payload["markdown"]` to a `MarkdownPayload` instance, the retry logic that only handles dict markdown will skip this case. That means the newline fix won’t apply and the retried request can still hit the same streaming error. Consider also handling `MarkdownPayload` here (e.g., detect the type, clone, and update its content) rather than only supporting the dict form.
</issue_to_address>

### Comment 4
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="72" />
<code_context>
         await self._post_send()

     async def send_streaming(self, generator, use_fallback: bool = False):
-        """流式输出仅支持消息列表私聊"""
+        """流式输出仅支持消息列表私聊(C2C),其他消息源退化为普通发送"""
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods for C2C streaming, markdown streaming retries, and C2C stream payload/response handling to keep the main functions high-level and easier to follow.

You can keep all current behavior but reduce local complexity by extracting a few focused helpers. These are small and mechanically safe to apply.

### 1. Split C2C streaming concerns in `send_streaming`

`send_streaming` currently mixes:

- non‑C2C accumulation
- C2C tool `break` handling
- throttling + send logic
- stream payload mutations (`state`, `id`, `index`, `reset`)
- buffer reset

You can keep the overall structure but move the low‑level details out of the loop:

```python
async def send_streaming(self, generator, use_fallback: bool = False):
    await super().send_streaming(generator, use_fallback)
    stream_payload = {"state": 1, "id": None, "index": 0, "reset": False}
    last_edit_time = 0
    throttle_interval = 1
    ret = None
    source = self.message_obj.raw_message

    try:
        async for chain in generator:
            source = self.message_obj.raw_message

            if not isinstance(source, botpy.message.C2CMessage):
                self._append_to_buffer(chain)
                continue

            if chain.type == "break":
                stream_payload, last_edit_time = await self._handle_c2c_break_segment(
                    stream_payload,
                )
                continue

            self._append_to_buffer(chain)
            ret, stream_payload, last_edit_time = await self._maybe_send_c2c_delta(
                stream_payload,
                last_edit_time,
                throttle_interval,
            )

        if isinstance(source, botpy.message.C2CMessage):
            await self._finalize_c2c_stream(stream_payload)
        else:
            await self._post_send()
    except Exception:
        logger.error("发送流式消息时出错", exc_info=True)
        self.send_buffer = None
```

With small helpers:

```python
def _append_to_buffer(self, chain: MessageChain) -> None:
    if not self.send_buffer:
        self.send_buffer = chain
    else:
        self.send_buffer.chain.extend(chain.chain)

async def _handle_c2c_break_segment(
    self,
    stream_payload: dict,
) -> tuple[dict, float]:
    if self.send_buffer:
        stream_payload["state"] = 10
        ret = await self._post_send(stream=stream_payload)
        ret_id = self._extract_response_message_id(ret)
        if ret_id is not None:
            stream_payload["id"] = ret_id

    # reset for next segment
    return {"state": 1, "id": None, "index": 0, "reset": False}, 0.0

async def _maybe_send_c2c_delta(
    self,
    stream_payload: dict,
    last_edit_time: float,
    throttle_interval: float,
) -> tuple[Any, dict, float]:
    current_time = asyncio.get_running_loop().time()
    if current_time - last_edit_time < throttle_interval:
        return None, stream_payload, last_edit_time

    ret = await self._send_c2c_stream_chunk(stream_payload)
    return ret, stream_payload, asyncio.get_running_loop().time()

async def _send_c2c_stream_chunk(self, stream_payload: dict):
    ret = cast(message.Message, await self._post_send(stream=stream_payload))
    stream_payload["index"] += 1
    ret_id = self._extract_response_message_id(ret)
    if ret_id is not None:
        stream_payload["id"] = ret_id
    self.send_buffer = None
    return ret

async def _finalize_c2c_stream(self, stream_payload: dict) -> None:
    stream_payload["state"] = 10
    await self._post_send(stream=stream_payload)
```

This preserves behavior (including id/index handling and buffer clearing) but keeps the main loop high‑level and easier to read/test.

---

### 2. Isolate streaming newline error retry in `_send_with_markdown_fallback`

Right now `_send_with_markdown_fallback` interleaves:

- streaming newline error correction
- generic “markdown not allowed” fallback

You can separate the streaming‑specific part:

```python
async def _send_with_markdown_fallback(
    self,
    send_func,
    payload: dict,
    plain_text: str,
    stream: dict | None = None,
):
    try:
        return await send_func(payload)
    except botpy.errors.ServerError as err:
        # 1. streaming newline correction
        if stream:
            retry = await self._retry_stream_newline_error(send_func, payload, err)
            if retry is not None:
                return retry

        # 2. markdown not allowed fallback
        if (
            self.MARKDOWN_NOT_ALLOWED_ERROR not in str(err)
            or not payload.get("markdown")
            or not plain_text
        ):
            raise

        logger.warning("[QQOfficial] markdown 发送被拒绝,回退到 content 模式重试。")
        fallback_payload = payload.copy()
        fallback_payload.pop("markdown", None)
        fallback_payload["content"] = plain_text
        if fallback_payload.get("msg_type") == 2:
            fallback_payload["msg_type"] = 0
        if stream:
            content = cast(str, fallback_payload.get("content") or "")
            if content and not content.endswith("\n"):
                fallback_payload["content"] = content + "\n"
        return await send_func(fallback_payload)
```

Helper:

```python
async def _retry_stream_newline_error(
    self,
    send_func,
    payload: dict,
    err: Exception,
):
    if self.STREAM_MARKDOWN_NEWLINE_ERROR not in str(err):
        return None

    retry_payload = payload.copy()

    md = retry_payload.get("markdown")
    if isinstance(md, dict):
        md_content = cast(str, md.get("content", "") or "")
        if md_content and not md_content.endswith("\n"):
            retry_payload["markdown"] = {"content": md_content + "\n"}

    content = cast(str | None, retry_payload.get("content"))
    if content and not content.endswith("\n"):
        retry_payload["content"] = content + "\n"

    logger.warning(
        "[QQOfficial] 流式 markdown 分片换行校验失败,已修正后重试一次。"
    )
    return await send_func(retry_payload)
```

This keeps the main function focused while maintaining all the retry semantics.

---

### 3. Sanitize C2C `stream` payload outside `post_c2c_message`

`post_c2c_message` now mixes HTTP call logic with protocol quirks (`stream.id` removal, `None`/non‑dict responses). You can isolate the stream cleanup and response normalization:

```python
async def post_c2c_message(
    self,
    openid: str,
    content: str | None = None,
    # ...
    stream: dict | None = None,
) -> message.Message | None:
    payload = locals()
    payload.pop("self", None)
    payload["stream"] = self._sanitize_stream_payload(stream)

    route = Route("POST", "/v2/users/{openid}/messages", openid=openid)
    result = await self.bot.api._http.request(route, json=payload)

    result = self._normalize_c2c_response(result)
    if result is None:
        return None
    return message.Message(**result)
```

Helpers:

```python
@staticmethod
def _sanitize_stream_payload(stream: dict | None) -> dict | None:
    if stream is None:
        return None
    stream_data = dict(stream)
    if stream_data.get("id") is None:
        stream_data.pop("id", None)
    return stream_data

@staticmethod
def _normalize_c2c_response(result: Any) -> dict | None:
    if result is None:
        logger.warning("[QQOfficial] post_c2c_message: API 返回 None,跳过本次发送")
        return None
    if not isinstance(result, dict):
        logger.error(f"[QQOfficial] post_c2c_message: 响应不是 dict: {result}")
        return None
    return result
```

This keeps `post_c2c_message` focused on “prepare payload + call API + wrap response”, and moves protocol details into narrowly scoped helpers.

---

These extractions should cut cognitive load significantly without altering behavior or the added functionality.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the stability and compliance of the QQ Official adapter, particularly for streaming messages and proactive media sending. It addresses several critical issues that led to crashes, incorrect message delivery, and API errors, ensuring a more reliable and robust communication experience within the platform. The changes streamline the message handling process, making it more resilient to various API response formats and user interaction patterns.

Highlights

  • Streaming Delivery Reliability: Fixed multiple bugs in the QQ Official adapter's streaming delivery for C2C messages, addressing issues like crashes from unexpected API responses, incorrect handling of non-C2C messages in the streaming loop, content accumulation, and missing newline suffixes required by the QQ API.
  • Proactive Media Sending Correction: Resolved a bug where msg_id was incorrectly included in proactive media payloads (video/file) sent via tool calls, which caused API rejections from the QQ API.
  • Enhanced Streaming Logic: Implemented robust handling for C2C streaming, including downgrading rich media to non-stream mode, clearing send buffers, handling tool-call break signals, and a new helper to safely extract message IDs from diverse API responses.
  • API Compliance: Ensured strict compliance with QQ API requirements by automatically appending newlines to final streaming chunks and filtering stream.id=None before serialization, preventing validation errors.
Changelog
  • astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py
    • Rewrote send_streaming to correctly handle non-C2C messages by accumulating them for a single normal send, preventing them from entering the streaming protocol.
    • Removed the problematic tail call to super().send_streaming and moved the super() call to the function entry for proper _has_send_oper setting.
    • Implemented clearing of the send_buffer after each intermediate chunk is sent to prevent content accumulation and repetition.
    • Added handling for tool-call break signals, flushing the current stream segment with state=10 before tool execution and resetting the stream state.
    • Introduced _extract_response_message_id static helper to safely extract message IDs from both dictionary and Message objects.
    • Configured rich-media payloads (image/audio) in C2C to automatically downgrade to non-stream mode to avoid platform-side validation errors.
    • Ensured the final stream chunk (state=10) automatically appends a newline character to plain_text as required by the QQ API.
    • Extended _send_with_markdown_fallback to include a stream parameter, enabling retry logic with newline-appended payloads on STREAM_MARKDOWN_NEWLINE_ERROR and ensuring fallback content also gets a newline in streaming contexts.
    • Modified post_c2c_message to filter stream.id = None before serialization and to gracefully return None instead of raising RuntimeError on unexpected API responses.
  • astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
    • Removed msg_id from the payload after setting msg_type=7 for video and file sends within _send_by_session_common, for both FRIEND_MESSAGE (C2C) and GROUP_MESSAGE paths, to prevent QQ API rejections for proactive media messages.
Activity
  • No human activity (comments, reviews, or progress updates) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly enhances QQ Official streaming message functionality, primarily within qqofficial_message_event.py. Key changes include refining the send_streaming method to support C2C messages with fallback for other types, introducing tool_call break signal handling for stream segments, and implementing a new _extract_response_message_id utility. The update also adds logic to downgrade streaming to normal sending for rich media, ensures final streaming markdown segments end with a newline as per QQ API requirements, and incorporates retry mechanisms for newline validation errors. Additionally, post_c2c_message now handles stream.id=None by removing it from the payload and improves API response error handling. In qqofficial_platform_adapter.py, msg_id is explicitly removed from media message payloads to prevent '越权 error' from the QQ API. A review comment highlights a regression in qqofficial_message_event.py, suggesting that asyncio.get_event_loop().time() should be reverted to asyncio.get_running_loop().time() for better explicitness and consistency with the previous implementation.

Comment on lines +123 to +133
current_time = asyncio.get_event_loop().time()
if current_time - last_edit_time >= throttle_interval:
ret = cast(
message.Message,
await self._post_send(stream=stream_payload),
)
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
last_edit_time = asyncio.get_event_loop().time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's recommended to use asyncio.get_running_loop() instead of asyncio.get_event_loop() inside coroutines (since Python 3.7). get_running_loop() is more explicit as it returns the currently running event loop and raises an error if none is running. The previous implementation was using get_running_loop(), so this change appears to be an unintentional regression.

Suggested change
current_time = asyncio.get_event_loop().time()
if current_time - last_edit_time >= throttle_interval:
ret = cast(
message.Message,
await self._post_send(stream=stream_payload),
)
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
last_edit_time = asyncio.get_event_loop().time()
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time >= throttle_interval:
ret = cast(
message.Message,
await self._post_send(stream=stream_payload),
)
stream_payload["index"] += 1
ret_id = self._extract_response_message_id(ret)
if ret_id is not None:
stream_payload["id"] = ret_id
last_edit_time = asyncio.get_running_loop().time()

@2ndelement 2ndelement force-pushed the feat/qqofficial-streaming branch from 992a701 to 872aa7c Compare March 12, 2026 15:07
@2ndelement 2ndelement changed the title Feat/qqofficial streaming fix: qqofficial streaming Mar 12, 2026
…_loop()

asyncio.get_event_loop() is deprecated since Python 3.10 and raises a
DeprecationWarning (or errors) when called from inside a running coroutine
without a current event loop set on the thread.  Replace both call-sites
in the streaming throttle logic with asyncio.get_running_loop(), which is
the correct API to use inside an already-running async context.

Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
@Soulter Soulter changed the title fix: qqofficial streaming perf: improve streaming message delivery reliability and proactive media sending Mar 12, 2026
@Soulter Soulter changed the title perf: improve streaming message delivery reliability and proactive media sending perf(QQ Official API): improve streaming message delivery reliability and proactive media sending Mar 12, 2026
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Mar 12, 2026
@Soulter Soulter merged commit e6c1164 into AstrBotDevs:master Mar 12, 2026
5 of 6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. lgtm This PR has been approved by a maintainer size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants