refactor: message storage format of webchat

This commit is contained in:
Soulter
2025-11-29 21:01:10 +08:00
parent df4412aa80
commit c98dea7e4b
3 changed files with 243 additions and 46 deletions

View File

@@ -10,7 +10,7 @@ class PlatformMessageHistoryManager:
self,
platform_id: str,
user_id: str,
content: list[dict], # TODO: parse from message chain
content: dict, # TODO: parse from message chain
sender_id: str | None = None,
sender_name: str | None = None,
):

View File

@@ -44,6 +44,7 @@ class ChatRoute(Route):
self.update_session_display_name,
),
"/chat/get_file": ("GET", self.get_file),
"/chat/get_attachment": ("GET", self.get_attachment),
"/chat/post_image": ("POST", self.post_image),
"/chat/post_file": ("POST", self.post_file),
}
@@ -85,6 +86,26 @@ class ChatRoute(Route):
except (FileNotFoundError, OSError):
return Response().error("File access error").__dict__
async def get_attachment(self):
"""Get attachment file by attachment_id."""
attachment_id = request.args.get("attachment_id")
if not attachment_id:
return Response().error("Missing key: attachment_id").__dict__
try:
attachment = await self.db.get_attachment_by_id(attachment_id)
if not attachment:
return Response().error("Attachment not found").__dict__
file_path = attachment.path
real_file_path = os.path.realpath(file_path)
with open(real_file_path, "rb") as f:
return QuartResponse(f.read(), mimetype=attachment.mime_type)
except (FileNotFoundError, OSError):
return Response().error("File access error").__dict__
async def post_image(self):
post_data = await request.files
if "file" not in post_data:
@@ -113,6 +134,93 @@ class ChatRoute(Route):
return Response().ok(data={"filename": filename}).__dict__
def _get_image_mime_type(self, filename: str) -> str:
"""根据文件扩展名获取图片 MIME 类型"""
ext = os.path.splitext(filename)[1].lower()
mime_types = {
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
}
return mime_types.get(ext, "image/jpeg")
async def _create_image_attachment(self, filename: str) -> dict | None:
"""创建图片 attachment 并返回消息部分"""
file_path = os.path.join(self.imgs_dir, os.path.basename(filename))
if not os.path.exists(file_path):
return None
attachment = await self.db.insert_attachment(
path=file_path,
type="image",
mime_type=self._get_image_mime_type(filename),
)
if attachment:
return {"type": "image", "attachment_id": attachment.attachment_id}
return None
async def _create_audio_attachment(self, filename: str) -> dict | None:
"""创建音频 attachment 并返回消息部分"""
file_path = os.path.join(self.imgs_dir, os.path.basename(filename))
if not os.path.exists(file_path):
return None
attachment = await self.db.insert_attachment(
path=file_path,
type="record",
mime_type="audio/wav",
)
if attachment:
return {"type": "record", "attachment_id": attachment.attachment_id}
return None
async def _build_user_message_parts(
self, message: str, image_urls: list | None, audio_url: str | None
) -> list:
"""构建用户消息的部分列表"""
parts = []
if message:
parts.append({"type": "plain", "text": message})
if image_urls:
for img_filename in image_urls:
part = await self._create_image_attachment(img_filename)
if part:
parts.append(part)
if audio_url:
part = await self._create_audio_attachment(audio_url)
if part:
parts.append(part)
return parts
async def _save_bot_message(
self,
webchat_conv_id: str,
text: str,
media_parts: list,
reasoning: str,
):
"""保存 bot 消息到历史记录"""
bot_message_parts = []
if text:
bot_message_parts.append({"type": "plain", "text": text})
bot_message_parts.extend(media_parts)
new_his = {"type": "bot", "message": bot_message_parts}
if reasoning:
new_his["reasoning"] = reasoning
await self.platform_history_mgr.insert(
platform_id="webchat",
user_id=webchat_conv_id,
content=new_his,
sender_id="bot",
sender_name="bot",
)
async def chat(self):
username = g.get("username", "guest")
@@ -126,13 +234,12 @@ class ChatRoute(Route):
)
message = post_data["message"]
# conversation_id = post_data["conversation_id"]
session_id = post_data.get("session_id", post_data.get("conversation_id"))
image_url = post_data.get("image_url")
audio_url = post_data.get("audio_url")
selected_provider = post_data.get("selected_provider")
selected_model = post_data.get("selected_model")
enable_streaming = post_data.get("enable_streaming", True) # 默认为 True
enable_streaming = post_data.get("enable_streaming", True)
if not message and not image_url and not audio_url:
return (
@@ -143,27 +250,26 @@ class ChatRoute(Route):
if not session_id:
return Response().error("session_id is empty").__dict__
# 追加用户消息
webchat_conv_id = session_id
# 获取会话特定的队列
back_queue = webchat_queue_mgr.get_or_create_back_queue(webchat_conv_id)
new_his = {"type": "user", "message": message}
if image_url:
new_his["image_url"] = image_url
if audio_url:
new_his["audio_url"] = audio_url
# 构建并保存用户消息
message_parts = await self._build_user_message_parts(
message, image_url, audio_url
)
await self.platform_history_mgr.insert(
platform_id="webchat",
user_id=webchat_conv_id,
content=new_his,
content={"type": "user", "message": message_parts},
sender_id=username,
sender_name=username,
)
async def stream():
client_disconnected = False
accumulated_parts = []
accumulated_text = ""
accumulated_reasoning = ""
try:
async with track_conversation(self.running_convs, webchat_conv_id):
@@ -182,16 +288,17 @@ class ChatRoute(Route):
continue
result_text = result["data"]
type = result.get("type")
msg_type = result.get("type")
streaming = result.get("streaming", False)
# 发送 SSE 数据
try:
if not client_disconnected:
yield f"data: {json.dumps(result, ensure_ascii=False)}\n\n"
except Exception as e:
if not client_disconnected:
logger.debug(
f"[WebChat] 用户 {username} 断开聊天长连接。 {e}",
f"[WebChat] 用户 {username} 断开聊天长连接。 {e}"
)
client_disconnected = True
@@ -202,24 +309,43 @@ class ChatRoute(Route):
logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。")
client_disconnected = True
if type == "end":
# 累积消息部分
if msg_type == "plain":
chain_type = result.get("chain_type", "normal")
if chain_type == "reasoning":
accumulated_reasoning += result_text
else:
accumulated_text += result_text
elif msg_type == "image":
filename = result_text.replace("[IMAGE]", "")
part = await self._create_image_attachment(filename)
if part:
accumulated_parts.append(part)
elif msg_type == "record":
filename = result_text.replace("[RECORD]", "")
part = await self._create_audio_attachment(filename)
if part:
accumulated_parts.append(part)
# 消息结束处理
if msg_type == "end":
break
elif (
(streaming and type == "complete")
(streaming and msg_type == "complete")
or not streaming
or type == "break"
or msg_type == "break"
):
# 追加机器人消息
new_his = {"type": "bot", "message": result_text}
if "reasoning" in result:
new_his["reasoning"] = result["reasoning"]
await self.platform_history_mgr.insert(
platform_id="webchat",
user_id=webchat_conv_id,
content=new_his,
sender_id="bot",
sender_name="bot",
await self._save_bot_message(
webchat_conv_id,
accumulated_text,
accumulated_parts,
accumulated_reasoning,
)
# 重置累积变量 (对于 break 后的下一段消息)
if msg_type == "break":
accumulated_parts = []
accumulated_text = ""
accumulated_reasoning = ""
except BaseException as e:
logger.exception(f"WebChat stream unexpected error: {e}", exc_info=True)
@@ -231,7 +357,7 @@ class ChatRoute(Route):
webchat_conv_id,
{
"message": message,
"image_url": image_url, # list
"image_url": image_url,
"audio_url": audio_url,
"selected_provider": selected_provider,
"selected_model": selected_model,
@@ -249,7 +375,7 @@ class ChatRoute(Route):
"Connection": "keep-alive",
},
)
response.timeout = None # fix SSE auto disconnect issue
response.timeout = None # fix SSE auto disconnect issue # pyright: ignore[reportAttributeAccessIssue]
return response
async def delete_webchat_session(self):

View File

@@ -2,9 +2,16 @@ import { ref, reactive, type Ref } from 'vue';
import axios from 'axios';
import { useToast } from '@/utils/toast';
// 新格式消息部分的类型定义
export interface MessagePart {
type: 'plain' | 'image' | 'record' | 'file' | 'video';
text?: string; // for plain
attachment_id?: string; // for image, record, file, video
}
export interface MessageContent {
type: string;
message: string;
message: string | MessagePart[]; // 支持旧格式(string)和新格式(MessagePart[])
reasoning?: string;
image_url?: string[];
audio_url?: string;
@@ -29,6 +36,7 @@ export function useMessages(
const isToastedRunningInfo = ref(false);
const activeSSECount = ref(0);
const enableStreaming = ref(true);
const attachmentCache = new Map<string, string>(); // attachment_id -> blob URL
// 从 localStorage 读取流式响应开关状态
const savedStreamingState = localStorage.getItem('enableStreaming');
@@ -41,6 +49,59 @@ export function useMessages(
localStorage.setItem('enableStreaming', JSON.stringify(enableStreaming.value));
}
// 获取 attachment 文件并返回 blob URL
async function getAttachment(attachmentId: string): Promise<string> {
if (attachmentCache.has(attachmentId)) {
return attachmentCache.get(attachmentId)!;
}
try {
const response = await axios.get(`/api/chat/get_attachment?attachment_id=${attachmentId}`, {
responseType: 'blob'
});
const blobUrl = URL.createObjectURL(response.data);
attachmentCache.set(attachmentId, blobUrl);
return blobUrl;
} catch (err) {
console.error('Failed to get attachment:', attachmentId, err);
return '';
}
}
// 解析新格式消息为旧格式兼容的结构 (用于显示)
async function parseMessageContent(content: any): Promise<void> {
const message = content.message;
// 如果 message 是数组 (新格式)
if (Array.isArray(message)) {
let textParts: string[] = [];
let imageUrls: string[] = [];
let audioUrl: string | undefined;
for (const part of message as MessagePart[]) {
if (part.type === 'plain' && part.text) {
textParts.push(part.text);
} else if (part.type === 'image' && part.attachment_id) {
const url = await getAttachment(part.attachment_id);
if (url) imageUrls.push(url);
} else if (part.type === 'record' && part.attachment_id) {
audioUrl = await getAttachment(part.attachment_id);
}
// file 和 video 类型可以后续扩展
}
// 转换为旧格式兼容的结构
content.message = textParts.join('\n');
if (content.type === 'user') {
content.image_url = imageUrls.length > 0 ? imageUrls : undefined;
content.audio_url = audioUrl;
} else {
content.embedded_images = imageUrls.length > 0 ? imageUrls : undefined;
content.embedded_audio = audioUrl;
}
}
// 如果 message 是字符串 (旧格式),保持原有处理逻辑
}
async function getSessionMessages(sessionId: string, router: any) {
if (!sessionId) return;
@@ -65,6 +126,11 @@ export function useMessages(
for (let i = 0; i < history.length; i++) {
let content = history[i].content;
// 首先尝试解析新格式消息
await parseMessageContent(content);
// 以下是旧格式的兼容处理 (message 是字符串的情况)
if (typeof content.message === 'string') {
if (content.message?.startsWith('[IMAGE]')) {
let img = content.message.replace('[IMAGE]', '');
const imageUrl = await getMediaFile(img);
@@ -81,14 +147,19 @@ export function useMessages(
content.embedded_audio = audioUrl;
content.message = '';
}
}
// 旧格式中的 image_url 和 audio_url 字段处理
if (content.image_url && content.image_url.length > 0) {
for (let j = 0; j < content.image_url.length; j++) {
// 检查是否已经是 blob URL (新格式解析后的结果)
if (!content.image_url[j].startsWith('blob:')) {
content.image_url[j] = await getMediaFile(content.image_url[j]);
}
}
}
if (content.audio_url) {
if (content.audio_url && !content.audio_url.startsWith('blob:')) {
content.audio_url = await getMediaFile(content.audio_url);
}
}