f88031b0c9
* Refactor: using sqlmodel(sqlchemy+pydantic) as ORM framework and switch to async-based sqlite operation (#2294) * stage * stage * refactor: using sqlchemy as ORM framework, switch to async-based sqlite operation - using sqlmodel as ORM(based on sqlchemy and pydantic) - add Persona, Preference, PlatformMessageHistory table * fix: conversation * fix: remove redundant explicit session.commit, and fix some type error * fix: conversation context issue * chore: remove comments * chore: remove exclude_content param * Fix: 当多个相同消息平台实例部署时上下文可能混乱(共享) (#2298) * perf: update astrbot event session format, using platfrom id to ensure uniqueness fixes: #1000 * fix: 更新 MessageSession 类以使用 platform_id 作为唯一标识符,并调整相关方法以确保一致性 * fix: 更新 MessageSession 文档以明确 platform_id 的赋值规则,并调整 get_platform 和 get_platform_inst 方法的返回类型 * Improve: 引入全新的人格管理模式以及重构函数工具管理器 (#2305) * feat: add persona management * refactor: 重构函数工具管理器,引入 ToolSet,并让 Persona 支持绑定 Tools * feat: 更新 Persona 工具选择逻辑,支持全选和指定工具的切换 * feat: 更新 BaseDatabase 中的 persona 方法返回类型,支持返回 None * fix: platform id * feat: add support to sync mcp servers from ModelScope (#2313) * fix: 修复访问令牌的空格问题 * chore: 移除 MCP 市场相关逻辑 (#2314) * chore: 移除 MCP 市场相关路由 * Refactor: 重构配置文件管理,以支持更灵活的、会话粒度的(基于 umo part)配置文件隔离 (#2328) * refactor: 重构配置文件管理,以支持更灵活的、基于 umo part 的配置文件隔离 * Refactor: 重构配置前端页面,新增数个配置项 (#2331) * refactor: 重构配置前端页面,新增数个配置项 * feat: 完善多配置文件结构 * perf: 系统配置入口 * fix: normal config item list not display * fix: 修复 axios 请求中的上下文引用问题 * chore: remove status checking in chat page * fix: 修复 stage 在不同 pipeline 中被重复使用的问题和 persona 相关问题 * Feature: 增加图片转述提供商配置、支持用户自定义模型模态能力 (#2422) * feat: 增加图片转述提供商配置、支持用户自定义模型模态能力 * fix: 修复 LLMRequestSubStage 中会话管理方法参数不一致的问题,简化方法调用 * Feature: 优化 WebSearch 的爬取网页速度并且支持使用 Tavily 作为搜索引擎 (#2427) * feat: 优化了 websearch 的速度;支持 Tavily 作为搜索引擎 * fix: 优化日志记录格式,修复搜索结果处理中的索引和内容显示问题 * feat: 添加对话选中状态管理,优化默认对话加载逻辑 * feat: 支持通过解析URL 的方式导入网页数据到知识库 (#2280) * feat:为webchat页面添加一个手动上传文件按钮(目前只处理图片) * fix:上传后清空value,允许触发change事件以多次上传同一张图片 * perf:webchat页面消息发送后清空图片预览缩略图,维持与文本信息行为一致 * perf:将文件输入的值重置为空字符串以提升浏览器兼容性 * feat:webchat文件上传按钮支持多选文件上传 * fix:释放blob URL以防止内存泄漏 * perf:并行化sendMessage中的图片获取逻辑 * feat:完成从url获取部分的UI * feat: 添加从URL导入功能的组件 * fix: 优化导入结果处理,添加整体摘要和主题摘要的文件命名 * perf: 更新url导入选项添加默认值 * perf: 在导入url的部分配置项未启用时隐藏暂不使用的下拉框选项 * feat: 添加上传前提提示信息至导入url至知识库功能 * feat: 更新导入功能提示信息,添加上传状态通知 * fix: 优化url转知识库错误处理 * feat: 合并知识库的上传文件和 URL 标签页 * feat: 删除导入URL至知识库功能的相关组件 --------- Co-authored-by: Soulter <905617992@qq.com> * feat: 添加条件显示逻辑以优化插件配置项的可见性管理 (#2433) * Feature: 支持在 WebUI 配置文件页中配置默认知识库 (#2437) * feat: 支持配置默认知识库 * chore: clean code * refactor: 重构 Function Tool 管理并初步引入 Multi Agent 及 Agent Handsoff 机制 (#2454) * stage * refactor: 重构 Function Tool 管理并引入 multi agent handsoff 机制 - Updated `star_request.py` to use the global `call_handler` instead of context-specific calls. - Modified `entities.py` to remove the dependency on `FunctionToolManager` and streamline the function tool handling. - Refactored `func_tool_manager.py` to simplify the `FunctionTool` class and its methods, removing deprecated code and enhancing clarity. - Adjusted `provider.py` to align with the new function tool structure, removing unnecessary type unions. - Enhanced `star_handler.py` to support agent registration and tool association, introducing `RegisteringAgent` for better encapsulation. - Updated `star_manager.py` to handle tool registration for agents, ensuring proper binding of handlers. - Revised `main.py` in the web searcher package to utilize the new agent registration system for web search tools. * chore: websearch * perf: 减少嵌套 * chore: 移除未使用的 mcp 导入 * feat: 添加 WebUI 迁移助手以及相关迁移方法 (#2477) * fix: 修复迁移对话时的一些问题 * feat: 增加工具使用模型能力选项 * feat: 添加知识库插件更新检查和更新功能 * perf: 调整 WebUI sidebar 顺序 * refactor: 重构 SharedPreference 类并采用数据库存储替换 json 存储 (#2482) * perf: 使用 run_coroutine_threadsafe Co-authored-by: Raven95676 <raven95676@gmail.com> * Feature: 支持配置重排序模型(vLLM API 格式)用于 score 任务 (#2496) * feat: 支持添加重排序模型(vLLM API 格式)用于 score 任务 * fix: update rerank API base URL to use localhost * feat: 知识库支持配置重排序模型 * fix: remove debug print statement for reranked results in FaissVecDB * fix: 移除知识库中的提示文本 * Feature: 支持在配置文件配置可用的插件组 (#2505) * feat: 增加可用插件集合配置项 * remove: 旧版平台可用性配置 已经基于多配置文件实现。 * feat: 应用配置文件插件可用性配置 * perf: hoist if from if * feat: llm_tool 装饰器返回值支持返回 mcp 库中 tool 的返回值类型(mcp.type.CallToolResult) (#2507) * fix: add type definition for migrationDialog and ensure open method exists before calling * chore: update project version to 4.0.0 * feat: 多 t2i 服务的随机负载均衡 (#2529) * fix: bugfixes * Improve: 扩大配置文件生效范围的自定义程度到会话粒度 (#2532) * feat: 扩大配置文件生效范围的自定义程度 * perf: 冲突检测 * refactor: simplify config form validation and improve conflict message clarity * chore: clean code * feat: 插件配置支持多个快捷魔法配置项 * chore: 修复当自动更新 webchat title 时,history 被重置的问题 * bugfixes * feat: add custom T2I template editor (#2581) * perf: add option to clear provider selection in ProviderSelector component * 📦 release: bump verstion to v4.0.0-beta.1 * chore: delete uv.lock --------- Co-authored-by: RC-CHN <67079377+RC-CHN@users.noreply.github.com> Co-authored-by: Raven95676 <raven95676@gmail.com>
543 lines
20 KiB
Python
543 lines
20 KiB
Python
import asyncio
|
|
import typing as T
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from astrbot.core.db import BaseDatabase
|
|
from astrbot.core.db.po import (
|
|
ConversationV2,
|
|
PlatformStat,
|
|
PlatformMessageHistory,
|
|
Attachment,
|
|
Persona,
|
|
Preference,
|
|
Stats as DeprecatedStats,
|
|
Platform as DeprecatedPlatformStat,
|
|
SQLModel,
|
|
)
|
|
|
|
from sqlalchemy import select, update, delete, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.sql import func
|
|
|
|
NOT_GIVEN = T.TypeVar("NOT_GIVEN")
|
|
|
|
|
|
class SQLiteDatabase(BaseDatabase):
|
|
def __init__(self, db_path: str) -> None:
|
|
self.db_path = db_path
|
|
self.DATABASE_URL = f"sqlite+aiosqlite:///{db_path}"
|
|
self.inited = False
|
|
super().__init__()
|
|
|
|
async def initialize(self) -> None:
|
|
"""Initialize the database by creating tables if they do not exist."""
|
|
async with self.engine.begin() as conn:
|
|
await conn.run_sync(SQLModel.metadata.create_all)
|
|
await conn.commit()
|
|
|
|
# ====
|
|
# Platform Statistics
|
|
# ====
|
|
|
|
async def insert_platform_stats(
|
|
self,
|
|
platform_id: str,
|
|
platform_type: str,
|
|
count: int = 1,
|
|
timestamp: datetime = None,
|
|
) -> None:
|
|
"""Insert a new platform statistic record."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
if timestamp is None:
|
|
timestamp = datetime.now().replace(
|
|
minute=0, second=0, microsecond=0
|
|
)
|
|
current_hour = timestamp
|
|
await session.execute(
|
|
text("""
|
|
INSERT INTO platform_stats (timestamp, platform_id, platform_type, count)
|
|
VALUES (:timestamp, :platform_id, :platform_type, :count)
|
|
ON CONFLICT(timestamp, platform_id, platform_type) DO UPDATE SET
|
|
count = platform_stats.count + EXCLUDED.count
|
|
"""),
|
|
{
|
|
"timestamp": current_hour,
|
|
"platform_id": platform_id,
|
|
"platform_type": platform_type,
|
|
"count": count,
|
|
},
|
|
)
|
|
|
|
async def count_platform_stats(self) -> int:
|
|
"""Count the number of platform statistics records."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
result = await session.execute(
|
|
select(func.count(PlatformStat.platform_id)).select_from(PlatformStat)
|
|
)
|
|
count = result.scalar_one_or_none()
|
|
return count if count is not None else 0
|
|
|
|
async def get_platform_stats(self, offset_sec: int = 86400) -> T.List[PlatformStat]:
|
|
"""Get platform statistics within the specified offset in seconds and group by platform_id."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
now = datetime.now()
|
|
start_time = now - timedelta(seconds=offset_sec)
|
|
result = await session.execute(
|
|
text("""
|
|
SELECT * FROM platform_stats
|
|
WHERE timestamp >= :start_time
|
|
ORDER BY timestamp DESC
|
|
GROUP BY platform_id
|
|
"""),
|
|
{"start_time": start_time},
|
|
)
|
|
return result.scalars().all()
|
|
|
|
# ====
|
|
# Conversation Management
|
|
# ====
|
|
|
|
async def get_conversations(self, user_id=None, platform_id=None):
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(ConversationV2)
|
|
|
|
if user_id:
|
|
query = query.where(ConversationV2.user_id == user_id)
|
|
if platform_id:
|
|
query = query.where(ConversationV2.platform_id == platform_id)
|
|
# order by
|
|
query = query.order_by(ConversationV2.created_at.desc())
|
|
result = await session.execute(query)
|
|
|
|
return result.scalars().all()
|
|
|
|
async def get_conversation_by_id(self, cid):
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(ConversationV2).where(ConversationV2.conversation_id == cid)
|
|
result = await session.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_all_conversations(self, page=1, page_size=20):
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
offset = (page - 1) * page_size
|
|
result = await session.execute(
|
|
select(ConversationV2)
|
|
.order_by(ConversationV2.created_at.desc())
|
|
.offset(offset)
|
|
.limit(page_size)
|
|
)
|
|
return result.scalars().all()
|
|
|
|
async def get_filtered_conversations(
|
|
self,
|
|
page=1,
|
|
page_size=20,
|
|
platform_ids=None,
|
|
search_query="",
|
|
**kwargs,
|
|
):
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
# Build the base query with filters
|
|
base_query = select(ConversationV2)
|
|
|
|
if platform_ids:
|
|
base_query = base_query.where(
|
|
ConversationV2.platform_id.in_(platform_ids)
|
|
)
|
|
if search_query:
|
|
base_query = base_query.where(
|
|
ConversationV2.title.ilike(f"%{search_query}%")
|
|
)
|
|
|
|
# Get total count matching the filters
|
|
count_query = select(func.count()).select_from(base_query.subquery())
|
|
total_count = await session.execute(count_query)
|
|
total = total_count.scalar_one()
|
|
|
|
# Get paginated results
|
|
offset = (page - 1) * page_size
|
|
result_query = (
|
|
base_query.order_by(ConversationV2.created_at.desc())
|
|
.offset(offset)
|
|
.limit(page_size)
|
|
)
|
|
result = await session.execute(result_query)
|
|
conversations = result.scalars().all()
|
|
|
|
return conversations, total
|
|
|
|
async def create_conversation(
|
|
self,
|
|
user_id,
|
|
platform_id,
|
|
content=None,
|
|
title=None,
|
|
persona_id=None,
|
|
cid=None,
|
|
created_at=None,
|
|
updated_at=None,
|
|
):
|
|
kwargs = {}
|
|
if cid:
|
|
kwargs["conversation_id"] = cid
|
|
if created_at:
|
|
kwargs["created_at"] = created_at
|
|
if updated_at:
|
|
kwargs["updated_at"] = updated_at
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
new_conversation = ConversationV2(
|
|
user_id=user_id,
|
|
content=content or [],
|
|
platform_id=platform_id,
|
|
title=title,
|
|
persona_id=persona_id,
|
|
**kwargs,
|
|
)
|
|
session.add(new_conversation)
|
|
return new_conversation
|
|
|
|
async def update_conversation(self, cid, title=None, persona_id=None, content=None):
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
query = update(ConversationV2).where(
|
|
ConversationV2.conversation_id == cid
|
|
)
|
|
values = {}
|
|
if title is not None:
|
|
values["title"] = title
|
|
if persona_id is not None:
|
|
values["persona_id"] = persona_id
|
|
if content is not None:
|
|
values["content"] = content
|
|
if not values:
|
|
return
|
|
query = query.values(**values)
|
|
await session.execute(query)
|
|
return await self.get_conversation_by_id(cid)
|
|
|
|
async def delete_conversation(self, cid):
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
await session.execute(
|
|
delete(ConversationV2).where(ConversationV2.conversation_id == cid)
|
|
)
|
|
|
|
async def insert_platform_message_history(
|
|
self,
|
|
platform_id,
|
|
user_id,
|
|
content,
|
|
sender_id=None,
|
|
sender_name=None,
|
|
):
|
|
"""Insert a new platform message history record."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
new_history = PlatformMessageHistory(
|
|
platform_id=platform_id,
|
|
user_id=user_id,
|
|
content=content,
|
|
sender_id=sender_id,
|
|
sender_name=sender_name,
|
|
)
|
|
session.add(new_history)
|
|
return new_history
|
|
|
|
async def delete_platform_message_offset(
|
|
self, platform_id, user_id, offset_sec=86400
|
|
):
|
|
"""Delete platform message history records older than the specified offset."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
now = datetime.now()
|
|
cutoff_time = now - timedelta(seconds=offset_sec)
|
|
await session.execute(
|
|
delete(PlatformMessageHistory).where(
|
|
PlatformMessageHistory.platform_id == platform_id,
|
|
PlatformMessageHistory.user_id == user_id,
|
|
PlatformMessageHistory.created_at < cutoff_time,
|
|
)
|
|
)
|
|
|
|
async def get_platform_message_history(
|
|
self, platform_id, user_id, page=1, page_size=20
|
|
):
|
|
"""Get platform message history records."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
offset = (page - 1) * page_size
|
|
query = (
|
|
select(PlatformMessageHistory)
|
|
.where(
|
|
PlatformMessageHistory.platform_id == platform_id,
|
|
PlatformMessageHistory.user_id == user_id,
|
|
)
|
|
.order_by(PlatformMessageHistory.created_at.desc())
|
|
)
|
|
result = await session.execute(query.offset(offset).limit(page_size))
|
|
return result.scalars().all()
|
|
|
|
async def insert_attachment(self, path, type, mime_type):
|
|
"""Insert a new attachment record."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
new_attachment = Attachment(
|
|
path=path,
|
|
type=type,
|
|
mime_type=mime_type,
|
|
)
|
|
session.add(new_attachment)
|
|
return new_attachment
|
|
|
|
async def get_attachment_by_id(self, attachment_id):
|
|
"""Get an attachment by its ID."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(Attachment).where(Attachment.id == attachment_id)
|
|
result = await session.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def insert_persona(
|
|
self, persona_id, system_prompt, begin_dialogs=None, tools=None
|
|
):
|
|
"""Insert a new persona record."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
new_persona = Persona(
|
|
persona_id=persona_id,
|
|
system_prompt=system_prompt,
|
|
begin_dialogs=begin_dialogs or [],
|
|
tools=tools,
|
|
)
|
|
session.add(new_persona)
|
|
return new_persona
|
|
|
|
async def get_persona_by_id(self, persona_id):
|
|
"""Get a persona by its ID."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(Persona).where(Persona.persona_id == persona_id)
|
|
result = await session.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_personas(self):
|
|
"""Get all personas for a specific bot."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(Persona)
|
|
result = await session.execute(query)
|
|
return result.scalars().all()
|
|
|
|
async def update_persona(
|
|
self, persona_id, system_prompt=None, begin_dialogs=None, tools=NOT_GIVEN
|
|
):
|
|
"""Update a persona's system prompt or begin dialogs."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
query = update(Persona).where(Persona.persona_id == persona_id)
|
|
values = {}
|
|
if system_prompt is not None:
|
|
values["system_prompt"] = system_prompt
|
|
if begin_dialogs is not None:
|
|
values["begin_dialogs"] = begin_dialogs
|
|
if tools is not NOT_GIVEN:
|
|
values["tools"] = tools
|
|
if not values:
|
|
return
|
|
query = query.values(**values)
|
|
await session.execute(query)
|
|
return await self.get_persona_by_id(persona_id)
|
|
|
|
async def delete_persona(self, persona_id):
|
|
"""Delete a persona by its ID."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
await session.execute(
|
|
delete(Persona).where(Persona.persona_id == persona_id)
|
|
)
|
|
|
|
async def insert_preference_or_update(self, scope, scope_id, key, value):
|
|
"""Insert a new preference record or update if it exists."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
query = select(Preference).where(
|
|
Preference.scope == scope,
|
|
Preference.scope_id == scope_id,
|
|
Preference.key == key,
|
|
)
|
|
result = await session.execute(query)
|
|
existing_preference = result.scalar_one_or_none()
|
|
if existing_preference:
|
|
existing_preference.value = value
|
|
else:
|
|
new_preference = Preference(
|
|
scope=scope, scope_id=scope_id, key=key, value=value
|
|
)
|
|
session.add(new_preference)
|
|
return existing_preference or new_preference
|
|
|
|
async def get_preference(self, scope, scope_id, key):
|
|
"""Get a preference by key."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(Preference).where(
|
|
Preference.scope == scope,
|
|
Preference.scope_id == scope_id,
|
|
Preference.key == key,
|
|
)
|
|
result = await session.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_preferences(self, scope, scope_id=None, key=None):
|
|
"""Get all preferences for a specific scope ID or key."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
query = select(Preference).where(Preference.scope == scope)
|
|
if scope_id is not None:
|
|
query = query.where(Preference.scope_id == scope_id)
|
|
if key is not None:
|
|
query = query.where(Preference.key == key)
|
|
result = await session.execute(query)
|
|
return result.scalars().all()
|
|
|
|
async def remove_preference(self, scope, scope_id, key):
|
|
"""Remove a preference by scope ID and key."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
await session.execute(
|
|
delete(Preference).where(
|
|
Preference.scope == scope,
|
|
Preference.scope_id == scope_id,
|
|
Preference.key == key,
|
|
)
|
|
)
|
|
await session.commit()
|
|
|
|
async def clear_preferences(self, scope, scope_id):
|
|
"""Clear all preferences for a specific scope ID."""
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
async with session.begin():
|
|
await session.execute(
|
|
delete(Preference).where(
|
|
Preference.scope == scope, Preference.scope_id == scope_id
|
|
)
|
|
)
|
|
await session.commit()
|
|
|
|
# ====
|
|
# Deprecated Methods
|
|
# ====
|
|
|
|
def get_base_stats(self, offset_sec=86400):
|
|
"""Get base statistics within the specified offset in seconds."""
|
|
|
|
async def _inner():
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
now = datetime.now()
|
|
start_time = now - timedelta(seconds=offset_sec)
|
|
result = await session.execute(
|
|
select(PlatformStat).where(PlatformStat.timestamp >= start_time)
|
|
)
|
|
all_datas = result.scalars().all()
|
|
deprecated_stats = DeprecatedStats()
|
|
for data in all_datas:
|
|
deprecated_stats.platform.append(
|
|
DeprecatedPlatformStat(
|
|
name=data.platform_id,
|
|
count=data.count,
|
|
timestamp=data.timestamp.timestamp(),
|
|
)
|
|
)
|
|
return deprecated_stats
|
|
|
|
result = None
|
|
|
|
def runner():
|
|
nonlocal result
|
|
result = asyncio.run(_inner())
|
|
|
|
t = threading.Thread(target=runner)
|
|
t.start()
|
|
t.join()
|
|
return result
|
|
|
|
def get_total_message_count(self):
|
|
"""Get the total message count from platform statistics."""
|
|
|
|
async def _inner():
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
result = await session.execute(
|
|
select(func.sum(PlatformStat.count)).select_from(PlatformStat)
|
|
)
|
|
total_count = result.scalar_one_or_none()
|
|
return total_count if total_count is not None else 0
|
|
|
|
result = None
|
|
|
|
def runner():
|
|
nonlocal result
|
|
result = asyncio.run(_inner())
|
|
|
|
t = threading.Thread(target=runner)
|
|
t.start()
|
|
t.join()
|
|
return result
|
|
|
|
def get_grouped_base_stats(self, offset_sec=86400):
|
|
# group by platform_id
|
|
async def _inner():
|
|
async with self.get_db() as session:
|
|
session: AsyncSession
|
|
now = datetime.now()
|
|
start_time = now - timedelta(seconds=offset_sec)
|
|
result = await session.execute(
|
|
select(PlatformStat.platform_id, func.sum(PlatformStat.count))
|
|
.where(PlatformStat.timestamp >= start_time)
|
|
.group_by(PlatformStat.platform_id)
|
|
)
|
|
grouped_stats = result.all()
|
|
deprecated_stats = DeprecatedStats()
|
|
for platform_id, count in grouped_stats:
|
|
deprecated_stats.platform.append(
|
|
DeprecatedPlatformStat(
|
|
name=platform_id,
|
|
count=count,
|
|
timestamp=start_time.timestamp(),
|
|
)
|
|
)
|
|
return deprecated_stats
|
|
|
|
result = None
|
|
|
|
def runner():
|
|
nonlocal result
|
|
result = asyncio.run(_inner())
|
|
|
|
t = threading.Thread(target=runner)
|
|
t.start()
|
|
t.join()
|
|
return result
|