Compare commits

...

108 Commits

Author SHA1 Message Date
Soulter
19e3390083 Update README.md 2023-06-13 17:04:29 +08:00
Soulter
3015b90e12 bugfixes 2023-06-13 11:59:16 +08:00
Soulter
aa419f3ef9 perf: 去帮助中心部分指令显示 2023-06-13 11:54:44 +08:00
Soulter
954236c284 fix: 修复markdown宽度计算异常的问题 2023-06-13 11:54:20 +08:00
Soulter
72d6b3886b perf: markdown render 增大 fontsize 2023-06-13 11:44:34 +08:00
Soulter
a95046ecaf Merge branch 'master' of https://github.com/Soulter/QQChannelChatGPT 2023-06-13 10:09:33 +08:00
Soulter
ccdb11575b remove chore 2023-06-13 10:09:28 +08:00
Soulter
7e68b2f2be Update requirements.txt 2023-06-13 10:05:57 +08:00
Soulter
39efab1081 perf: enhanced markdown image render regex 2023-06-12 18:41:04 +08:00
Soulter
cc6707c8ce Merge branch 'master' of https://github.com/Soulter/QQChannelChatGPT 2023-06-12 18:26:16 +08:00
Soulter
09080adf84 perf: markdown渲染器支持渲染图片 2023-06-12 18:26:11 +08:00
Soulter
4cc72030c0 Update README.md 2023-06-12 08:32:05 +08:00
Soulter
a395902184 Update README.md 2023-06-12 08:30:58 +08:00
Soulter
5156f0584a Update README.md 2023-06-12 08:30:03 +08:00
Soulter
be171fe0d7 Update README.md 2023-06-12 08:14:55 +08:00
Soulter
ad4bf5e654 perf: update command add "update latest r" 2023-06-11 09:53:49 +08:00
Soulter
da7429ad62 perf: add markdown minheight 2023-06-11 09:51:53 +08:00
Soulter
b5f20ee282 chore: change fonts 2023-06-11 09:16:16 +08:00
Soulter
a9023d6f3a perf: 支持markdown渲染 2023-06-10 13:10:32 +00:00
Soulter
628b661a18 fix markdown 2023-06-10 13:05:22 +00:00
Soulter
638fe466f8 perf markdown 2023-06-10 12:51:34 +00:00
Soulter
a90adcf15c chore: change some markdown parameters 2023-06-10 12:24:37 +00:00
Soulter
7896066db6 perf: markdown perf 2023-06-10 12:22:32 +00:00
Soulter
b1314bcc31 perf: \t -> 4 blanks 2023-06-10 12:13:50 +00:00
Soulter
b1ecc929f2 perf: markdown render perf 2023-06-10 12:10:20 +00:00
Soulter
3aad42a886 perf: markdown render perf 2023-06-10 12:08:32 +00:00
Soulter
b6e87d3d31 perf: markdown render perf 2023-06-10 10:54:26 +00:00
Soulter
461eb4b9c7 perf: markdown render perf 2023-06-10 10:48:36 +00:00
Soulter
a89e92d5cc perf: markdown render perf 2023-06-10 10:33:14 +00:00
Soulter
6e69e88e91 perf: markdown render perf 2023-06-10 10:30:17 +00:00
Soulter
ae732c1dac perf: markdown render perf 2023-06-10 10:03:03 +00:00
Soulter
8e4a72c97b perf: markdown render perf 2023-06-10 10:02:23 +00:00
Soulter
bf0d82fe67 perf: markdown render perf 2023-06-10 10:01:23 +00:00
Soulter
987383f957 perf: markdown render perf 2023-06-10 10:00:22 +00:00
Soulter
c2cacf3281 perf: markdown render perf 2023-06-10 09:56:58 +00:00
Soulter
72878477dc perf: qq pic mode support markdown 2023-06-10 09:47:02 +00:00
Soulter
ad0d14420a feat: markdown render support 2023-06-10 09:39:37 +00:00
Soulter
5a7c60c81e fix: markdown render support 2023-06-10 09:38:19 +00:00
Soulter
6011840d1f feat: markdown render support 2023-06-10 09:32:49 +00:00
Soulter
9a2dffe299 feat: markdown render support 2023-06-10 09:27:02 +00:00
Soulter
e6770d2b12 Update README.md 2023-06-09 00:19:27 +08:00
Soulter
255db6ee57 Update README.md 2023-06-08 23:58:44 +08:00
Soulter
aa9ff99557 perf: better help 2023-06-06 12:31:14 +00:00
Soulter
5f024e9f30 fix: bugfixes 2023-06-06 12:28:55 +00:00
Soulter
cbdc7b7ce4 perf: better help 2023-06-06 12:23:48 +00:00
Soulter
5f636ca061 perf: improve text2img 2023-06-06 11:57:52 +00:00
Soulter
9fa3651170 perf: change word2img factors 2023-06-06 11:45:32 +00:00
Soulter
bba66788c3 fix: bugfixes 2023-06-06 11:41:26 +00:00
Soulter
200f3cce00 fix: bugfixes 2023-06-06 11:34:52 +00:00
Soulter
938490b739 fix: bugfixes 2023-06-06 11:31:08 +00:00
Soulter
e77e7b050a feat: QQ message plain texts to pic support #108 2023-06-06 11:21:55 +00:00
Soulter
bd2dbe5b63 feat:转发消息支持非文本类型 2023-06-03 14:21:47 +08:00
Soulter
c684d9cb4a fix: 修复某些插件调用send可能发生的错误 2023-06-03 10:49:12 +08:00
Soulter
7a39a9d45e feat: nick指令仅管理者能用 2023-06-01 22:09:51 +08:00
Soulter
2a3bb068db feat: bing支持自定义代理地址 2023-05-31 21:17:47 +08:00
Soulter
1aa4384ca3 perf: 优化日志输出长度限制 2023-05-31 20:31:11 +08:00
Soulter
3b26b7b26c feat: 将CmdConfig的一些方法改为静态方法 2023-05-31 10:25:39 +08:00
Soulter
3b097d662b perf: 增加支持查看新版配置文件的管理员指令newconfig 2023-05-31 10:18:08 +08:00
Soulter
c3acb3e77f feat: 支持修改入群欢迎 2023-05-31 10:07:15 +08:00
Soulter
55d58d30a8 fix: 修复手滑造成的启动报错 2023-05-29 16:40:02 +08:00
Soulter
020a8ace9f feat: 支持自定义qq回复折叠阈值
perf: 优化新版配置文件加载流程
2023-05-29 16:37:11 +08:00
Soulter
15f56ffc01 feat: 长文本支持折叠发送 #104 2023-05-29 01:10:37 +08:00
Soulter
3724659b32 perf: improve stater 2023-05-24 18:24:18 +08:00
Soulter
df77152581 chore: 更新说明 2023-05-23 23:11:24 +08:00
Soulter
339ea5f12a feat: 支持更多本地预设指令的图片化 2023-05-23 11:01:56 +08:00
Soulter
36f96ccc97 feat: 文字转图片的图片过期处理逻辑 2023-05-23 10:58:07 +08:00
Soulter
190e0a4971 feat: 支持文字转图片 2023-05-23 10:41:12 +08:00
Soulter
72638fac68 fix: 修复QQ频道@不回的问题 2023-05-23 07:58:03 +08:00
Soulter
807d19e381 fix: 修复gocq群聊时@无反应的问题 2023-05-22 20:54:31 +08:00
Soulter
10870172b4 fix: 修复私聊不回的问题 2023-05-22 20:17:53 +08:00
Soulter
1f7d3eccf9 fix: blank nick 2023-05-22 19:42:37 +08:00
Soulter
5fc58123bb fix: 修复频率限制消息识别的问题 2023-05-22 19:31:24 +08:00
Soulter
c84c9f4aaa fix: 修复gocq_loop 2023-05-22 18:47:33 +08:00
Soulter
cabe66fc0a perf: 优化gocq平台消息处理逻辑 2023-05-22 18:46:01 +08:00
Soulter
9f1315b06d perf: 优化gocq平台消息处理逻辑 2023-05-22 18:42:23 +08:00
Soulter
6f27f59730 fix: 修复GOCQ频道at报错的问题 2023-05-22 18:25:58 +08:00
Soulter
17815e7fe3 fix: 优化切换到未启动的模型报错的问题 2023-05-22 18:23:16 +08:00
Soulter
596ae80fea perf: 优化模型识别提示 2023-05-22 18:22:34 +08:00
Soulter
be2dc6ba70 feat: 指令操作不再需要在消息前加前缀
perf: 改善性能
2023-05-22 18:10:22 +08:00
Soulter
e5aa8c8270 fix: 修复群内欢迎 2023-05-21 11:12:50 +08:00
Soulter
7c5ac41c55 chore: 删除不必要的log日志 2023-05-21 11:02:00 +08:00
Soulter
c6cf1153c1 fix: 修复Windows下删除插件报错拒绝访问的问题;
修复权限组异常的问题
2023-05-21 11:00:59 +08:00
Soulter
a68338b651 perf: 优化bing报错提示 2023-05-21 10:23:50 +08:00
Soulter
bab46e912e fix: 修复默认昵称失效的问题;
修复启动时跳过管理者qq设置的问题
2023-05-21 10:18:51 +08:00
Soulter
4b158a1c89 feat: GOCQ适配QQ频道 2023-05-20 15:30:07 +08:00
Soulter
6894900e46 fix: 修复画画指令得到的图片风格像油画的问题 2023-05-20 14:27:02 +08:00
Soulter
2e11d6e007 perf: log perf 2023-05-18 22:21:29 +08:00
Soulter
348381be15 Merge branch 'master' of https://github.com/Soulter/QQChannelChatGPT 2023-05-18 22:15:07 +08:00
Soulter
9024c28e70 perf: fix some logs 2023-05-18 22:15:01 +08:00
Soulter
ae1702901b Update README.md 2023-05-18 08:34:41 +08:00
Soulter
c1c0df85e6 Update README.md 2023-05-17 20:36:54 +08:00
Soulter
f3c6d9c02b fix: draw command 2023-05-16 15:06:39 +08:00
Soulter
811a885411 fix: draw command 2023-05-16 15:04:55 +08:00
Soulter
b4ec28b71c Merge branch 'master' of https://github.com/Soulter/QQChannelChatGPT 2023-05-16 11:57:04 +08:00
Soulter
cdf4a5321b perf: 1.逆向ChatGPT库支持消息等待,不会回复忙碌。
2. 优化模型加载流程
2023-05-16 11:56:59 +08:00
Soulter
d83f155f80 Update README.md 2023-05-15 20:54:19 +08:00
Soulter
4c402ed5bd perf: 优化插件鉴别 2023-05-15 20:43:42 +08:00
Soulter
ec5aff8d0b fix: update helloworld default plugin 2023-05-15 20:14:14 +08:00
Soulter
eae0d6c422 fix: 修复一些奇怪的地方 2023-05-15 20:09:01 +08:00
Soulter
9c284b84b1 perf: 升级插件协议簇 2023-05-15 20:03:17 +08:00
Soulter
9f36e5ae05 perf: 在连接到go-cqhttp之前添加连接检测 2023-05-15 18:33:07 +08:00
Soulter
7caa380e54 perf: 优化控制台输出的长度限制 2023-05-14 21:58:45 +08:00
Soulter
41d81bb60e perf: 简化控制台字数 2023-05-14 20:54:56 +08:00
Soulter
454a74f4e1 perf: 颜色日志-优化控制台显示 2023-05-14 20:51:39 +08:00
Soulter
c5bdad02e5 fix: 修复ChatGPT逆向库回答报错的问题 2023-05-14 20:39:15 +08:00
Soulter
f46de3d518 perf: 颜色日志-美化控制台显示 2023-05-14 20:38:28 +08:00
Soulter
a3e21bea1a perf: 删除不必要的控制台信息显示 2023-05-14 19:54:47 +08:00
Soulter
d7e4707d5d perf: 简化控制台输出信息 2023-05-14 19:43:12 +08:00
20 changed files with 1212 additions and 349 deletions

View File

@@ -2,40 +2,37 @@
<img src="https://socialify.git.ci/Soulter/QQChannelChatGPT/image?description=1&forks=1&issues=1&language=1&name=1&owner=1&pattern=Circuit%20Board&stargazers=1&theme=Light" alt="QQChannelChatGPT" width="600" height="300" /> <img src="https://socialify.git.ci/Soulter/QQChannelChatGPT/image?description=1&forks=1&issues=1&language=1&name=1&owner=1&pattern=Circuit%20Board&stargazers=1&theme=Light" alt="QQChannelChatGPT" width="600" height="300" />
[![Language](https://img.shields.io/badge/language-python-green.svg?style=plastic)](https://www.python.org/) <!-- [![Language](https://img.shields.io/badge/language-python-green.svg?style=plastic)](https://www.python.org/)
[![License](https://img.shields.io/badge/license-AGPL3-orange.svg?style=plastic)](https://github.com/Soulter/QQChannelChatGPT/blob/master/LICENSE) [![License](https://img.shields.io/badge/license-AGPL3-orange.svg?style=plastic)](https://github.com/Soulter/QQChannelChatGPT/blob/master/LICENSE)
![Python](https://img.shields.io/badge/python-3.9+-blue) ![Python](https://img.shields.io/badge/python-3.9+-blue) -->
_✨在QQ和QQ频道上使用ChatGPT、NewBing等语言模型,稳定,一次部署,同时使用✨_ 基于go-cq和官方QQ频道SDK的机器人项目。支持ChatGPT、NewBing等大模型。一次部署,同时使用
_✨教程https://github.com/Soulter/QQChannelChatGPT/wiki ✨_ 部署文档https://github.com/Soulter/QQChannelChatGPT/wiki
_✨插件开发教程https://github.com/Soulter/QQChannelChatGPT/wiki/%E5%9B%9B%E3%80%81%E5%BC%80%E5%8F%91%E6%8F%92%E4%BB%B6 ✨_ 插件文档https://github.com/Soulter/QQChannelChatGPT/wiki/%E5%9B%9B%E3%80%81%E5%BC%80%E5%8F%91%E6%8F%92%E4%BB%B6
_✨欢迎体验😊(频道名: GPT机器人 | 频道号: x42d56aki2) | QQ群号322154837✨_ 欢迎体验 | **QQ群号322154837**
<!-- <img src="https://user-images.githubusercontent.com/37870767/230417115-9dd3c9d5-6b6b-4928-8fe3-82f559208aab.JPG" width="300"></img> --> <!-- <img src="https://user-images.githubusercontent.com/37870767/230417115-9dd3c9d5-6b6b-4928-8fe3-82f559208aab.JPG" width="300"></img> -->
</div> </div>
## 功能: ## 🧩功能:
近期新功能: 近期新功能:
- 支持插件https://github.com/Soulter/QQChannelChatGPT/wiki/%E5%9B%9B%E3%80%81%E5%BC%80%E5%8F%91%E6%8F%92%E4%BB%B6 - Markdown渲染支持回复消息支持图片。
- 支持一键切换语言模型(使用/bing /revgpt /gpt分别可以切换newbing、逆向ChatGPT、官方ChatGPT模型 - 支持插件。https://github.com/Soulter/QQChannelChatGPT/wiki/%E5%9B%9B%E3%80%81%E5%BC%80%E5%8F%91%E6%8F%92%E4%BB%B6
- 热更新 - 热更新
- 接入QQ支持在QQ上和QQ频道上同时聊天https://github.com/Soulter/QQChannelChatGPT/issues/82 - 接入QQ支持在QQ上和QQ频道上同时聊天https://github.com/Soulter/QQChannelChatGPT/issues/82
- 更强大的Windows启动器,环境配置自动搞定。链接https://github.com/Soulter/QQChatGPTLauncher/releases/latest - Windows启动器。链接https://github.com/Soulter/QQChatGPTLauncher/releases/latest
支持的AI语言模型请在`configs/config.yaml`下配置): 支持的AI语言模型请在`configs/config.yaml`下配置):
- 逆向ChatGPT - 逆向ChatGPT
- 官方ChatGPT AI - 官方ChatGPT API
- 文心一言(即将支持) - Bing
- NewBing - ...
- Bard (即将支持) <!--
部署此项目的教程链接https://github.com/Soulter/QQChannelChatGPT/wiki
### 基本功能 ### 基本功能
<details> <details>
<summary>✅ 回复符合上下文</summary> <summary>✅ 回复符合上下文</summary>
@@ -90,22 +87,30 @@ _✨欢迎体验😊频道名: GPT机器人 | 频道号: x42d56aki2) | QQ群
- QQ频道机器人框架为QQ官方开源的框架稳定。 - QQ频道机器人框架为QQ官方开源的框架稳定。
</details> </details> -->
> 关于tokentoken就相当于是AI中的单词数但是不等于单词数`text-davinci-003`模型中最大可以支持`4097`个token。在发送信息时这个机器人会将用户的历史聊天记录打包发送给ChatGPT因此`token`也会相应的累加为了保证聊天的上下文的逻辑性就有了缓存token。 <!-- > 关于tokentoken就相当于是AI中的单词数但是不等于单词数`text-davinci-003`模型中最大可以支持`4097`个token。在发送信息时这个机器人会将用户的历史聊天记录打包发送给ChatGPT因此`token`也会相应的累加为了保证聊天的上下文的逻辑性就有了缓存token。 -->
### 🛠️ 插件支持 ### 🛠️ 插件支持
本项目支持接入插件。 本项目支持接入插件。
> 使用`plugin i 插件GitHub链接`即可安装。
插件开发教程https://github.com/Soulter/QQChannelChatGPT/wiki/%E5%9B%9B%E3%80%81%E5%BC%80%E5%8F%91%E6%8F%92%E4%BB%B6 插件开发教程https://github.com/Soulter/QQChannelChatGPT/wiki/%E5%9B%9B%E3%80%81%E5%BC%80%E5%8F%91%E6%8F%92%E4%BB%B6
部分好用的插件: 部分好用的插件:
`HuggingChat`: https://github.com/Soulter/HuggingChatForQQBot - `HuggingChat`: https://github.com/Soulter/HuggingChatForQQBot | HuggingChat模型接入
- `GoodPlugins`: https://github.com/Soulter/goodplugins | 随机动漫图片、搜番、喜报生成器等等
### 指令功能 - `sysstat`: https://github.com/Soulter/sysstatqcbot | 查看系统状态
- `BiliMonitor`: https://github.com/Soulter/BiliMonitor | 订阅B站动态
<!--
### 指令
#### OpenAI官方API #### OpenAI官方API
在频道内需要先`@`机器人之后再输入指令在QQ中暂时需要在消息前加上`ai `,不需要@ 在频道内需要先`@`机器人之后再输入指令在QQ中暂时需要在消息前加上`ai `,不需要@
@@ -132,15 +137,14 @@ _✨欢迎体验😊频道名: GPT机器人 | 频道号: x42d56aki2) | QQ群
- `/gpt` 切换为OpenAI官方API - `/gpt` 切换为OpenAI官方API
- `/bing` 切换为bing - `/bing` 切换为bing
* 切换模型指令支持临时回复。如`/bing 你好`将会临时使用一次bing模型 * 切换模型指令支持临时回复。如`/bing 你好`将会临时使用一次bing模型 -->
## 📰使用方法: ## 📰使用方法:
**详细部署教程链接**https://soulter.top/posts/qpdg.html 使用文档https://github.com/Soulter/QQChannelChatGPT/wiki
**Windows用户推荐Windows一键安装请前往Release下载最新版本Beta**
有报错请先看issue解决不了再在频道内反馈。
**Windows用户可以使用启动器一键安装请前往Release下载最新版本Beta**
<!--
### 安装第三方库 ### 安装第三方库
```shell ```shell
@@ -148,16 +152,14 @@ pip install -r requirements.txt
``` ```
> ⚠Python版本应>=3.9 > ⚠Python版本应>=3.9
### 配置 ### 配置
**详细部署教程链接**https://github.com/Soulter/QQChannelChatGPT/wiki **详细部署教程链接**https://github.com/Soulter/QQChannelChatGPT/wiki
### 启动 ### 启动
- 启动main.py - 启动main.py -->
## 🙇‍感谢
## 感谢
本项目使用了一下项目: 本项目使用了一下项目:
[ChatGPT by acheong08](https://github.com/acheong08/ChatGPT) [ChatGPT by acheong08](https://github.com/acheong08/ChatGPT)
@@ -168,7 +170,12 @@ pip install -r requirements.txt
[nakuru-project by Lxns-Network](https://github.com/Lxns-Network/nakuru-project) [nakuru-project by Lxns-Network](https://github.com/Lxns-Network/nakuru-project)
<!-- ## 👀部分演示截图
帮助中心(`help`指令)
![)F%2VQA`O)`4BHTXZ653(~9](https://github.com/Soulter/QQChannelChatGPT/assets/37870767/57eaa8c6-6962-4940-823c-2e26b5206cf5)
-->
## ⚙配置文件说明: ## ⚙配置文件说明:
```yaml ```yaml
# 如果你不知道怎么部署请查看https://github.com/Soulter/QQChannelChatGPT/wiki # 如果你不知道怎么部署请查看https://github.com/Soulter/QQChannelChatGPT/wiki

View File

@@ -4,24 +4,28 @@ from nakuru import (
FriendMessage FriendMessage
) )
from botpy.message import Message, DirectMessage from botpy.message import Message, DirectMessage
from model.platform.qq import QQ
import time
import threading
class HelloWorldPlugin: class HelloWorldPlugin:
""" """
初始化函数, 可以选择直接pass 初始化函数, 可以选择直接pass
""" """
def __init__(self) -> None: def __init__(self) -> None:
self.myThread = None # 线程对象如果要使用线程需要在此处定义。在run处定义会被释放掉
print("这是HelloWorld测试插件, 发送 helloworld 即可触发此插件。") print("这是HelloWorld测试插件, 发送 helloworld 即可触发此插件。")
""" """
入口函数,机器人会调用此函数。 入口函数,机器人会调用此函数。
参数规范: message: 消息文本; role: 身份; platform: 消息平台; message_obj: 消息对象 参数规范: message: 消息文本; role: 身份; platform: 消息平台; message_obj: 消息对象; qq_platform: QQ平台对象可以通过调用qq_platform.send()直接发送消息。详见Helloworld插件示例
参数详情: role为admin或者member; platform为qqchan或者gocq; message_obj为nakuru的GroupMessage对象或者FriendMessage对象或者频道的Message, DirectMessage对象。 参数详情: role为admin或者member; platform为qqchan或者gocq; message_obj为nakuru的GroupMessage对象或者FriendMessage对象或者频道的Message, DirectMessage对象。
返回规范: bool: 是否hit到此插件(所有的消息均会调用每一个载入的插件, 如果没有hit到, 则应返回False) 返回规范: bool: 是否hit到此插件(所有的消息均会调用每一个载入的插件, 如果没有hit到, 则应返回False)
Tuple: None或者长度为3的元组。当没有hit到时, 返回None. hit到时, 第1个参数为指令是否调用成功, 第2个参数为返回的消息文本或者gocq的消息链列表, 第3个参数为指令名称 Tuple: None或者长度为3的元组。当没有hit到时, 返回None. hit到时, 第1个参数为指令是否调用成功, 第2个参数为返回的消息文本或者gocq的消息链列表, 第3个参数为指令名称
例子:做一个名为"yuanshen"的插件;当接收到消息为“原神 可莉”, 如果不想要处理此消息则返回False, None如果想要处理但是执行失败了返回True, tuple([False, "请求失败啦~", "yuanshen"]) 例子:做一个名为"yuanshen"的插件;当接收到消息为“原神 可莉”, 如果不想要处理此消息则返回False, None如果想要处理但是执行失败了返回True, tuple([False, "请求失败啦~", "yuanshen"])
执行成功了返回True, tuple([True, "结果文本", "yuanshen"]) 执行成功了返回True, tuple([True, "结果文本", "yuanshen"])
""" """
def run(self, message: str, role: str, platform: str, message_obj): def run(self, message: str, role: str, platform: str, message_obj, qq_platform: QQ):
if platform == "gocq": if platform == "gocq":
""" """
@@ -30,6 +34,11 @@ class HelloWorldPlugin:
img_url = "https://gchat.qpic.cn/gchatpic_new/905617992/720871955-2246763964-C6EE1A52CC668EC982453065C4FA8747/0?term=2&amp;is_origin=0" img_url = "https://gchat.qpic.cn/gchatpic_new/905617992/720871955-2246763964-C6EE1A52CC668EC982453065C4FA8747/0?term=2&amp;is_origin=0"
if message == "helloworld": if message == "helloworld":
return True, tuple([True, [Plain("Hello World!!"), Image.fromURL(url=img_url)], "helloworld"]) return True, tuple([True, [Plain("Hello World!!"), Image.fromURL(url=img_url)], "helloworld"])
elif message == "hiloop":
if self.myThread is None:
self.myThread = threading.Thread(target=self.helloworldThread, args=(message_obj, qq_platform))
self.myThread.start()
return True, tuple([True, [Plain("A lot of Helloworlds!!"), Image.fromURL(url=img_url)], "helloworld"])
else: else:
return False, None return False, None
elif platform == "qqchan": elif platform == "qqchan":
@@ -59,6 +68,11 @@ class HelloWorldPlugin:
"author": "Soulter" "author": "Soulter"
} }
def helloworldThread(self, meseage_obj, qq_platform: QQ):
while True:
qq_platform.send(meseage_obj, [Plain("Hello World!!")]) # 第一个参数可以是message_obj, 也可以是qq群号
time.sleep(3) # 睡眠3秒。 用while True一定要记得sleep不然会卡死
# 热知识:检测消息开头指令,使用以下方法 # 热知识:检测消息开头指令,使用以下方法
# if message.startswith("原神"): # if message.startswith("原神"):

View File

@@ -18,15 +18,21 @@ from nakuru import (
CQHTTP, CQHTTP,
GroupMessage, GroupMessage,
GroupMemberIncrease, GroupMemberIncrease,
FriendMessage FriendMessage,
GuildMessage
) )
from nakuru.entities.components import Plain,At from nakuru.entities.components import Plain,At
from model.command.command import Command from model.command.command import Command
from model.command.command_rev_chatgpt import CommandRevChatGPT
from model.command.command_rev_edgegpt import CommandRevEdgeGPT
from model.command.command_openai_official import CommandOpenAIOfficial
from util import general_utils as gu
from util.cmd_config import CmdConfig as cc
# QQBotClient实例 # QQBotClient实例
client = '' client = ''
# ChatGPT实例
global chatgpt
# 缓存的会话 # 缓存的会话
session_dict = {} session_dict = {}
# 最大缓存token在配置里改 configs/config.yaml # 最大缓存token在配置里改 configs/config.yaml
@@ -41,7 +47,7 @@ stat_file = ''
uniqueSession = False uniqueSession = False
# 日志记录 # 日志记录
logf = open('log.log', 'a+', encoding='utf-8') # logf = open('log.log', 'a+', encoding='utf-8')
# 是否上传日志,仅上传频道数量等数量的统计信息 # 是否上传日志,仅上传频道数量等数量的统计信息
is_upload_log = True is_upload_log = True
@@ -72,8 +78,10 @@ REV_EDGEGPT = 'rev_edgegpt'
provider = None provider = None
chosen_provider = None chosen_provider = None
# 逆向库对象 # 语言模型对象
rev_chatgpt = None rev_chatgpt = None
rev_edgegpt = None
chatgpt = None
# gpt配置信息 # gpt配置信息
gpt_config = {} gpt_config = {}
# 百度内容审核实例 # 百度内容审核实例
@@ -91,34 +99,31 @@ qqchan_loop = None
# QQ机器人 # QQ机器人
gocq_bot = None gocq_bot = None
PLATFORM_GOCQ = 'gocq' PLATFORM_GOCQ = 'gocq'
gocq_app = None gocq_app = CQHTTP(
host="127.0.0.1",
port=6700,
http_port=5700,
)
admin_qq = "123456" admin_qq = "123456"
gocq_loop = None gocq_loop = None
nick_qq = "ai" nick_qq = None
bing_cache_loop = None bing_cache_loop = None
# 插件 # 插件
cached_plugins = {} cached_plugins = {}
# 统计
cnt_total = 0
cnt_valid = 0
def gocq_runner(): # 新版配置文件
global gocq_app cc.init_attributes(["qq_forward_threshold"], 200)
ok = False cc.init_attributes(["qq_welcome"], "欢迎加入本群!\n欢迎给https://github.com/Soulter/QQChannelChatGPT项目一个Star😊~\n输入help查看帮助~\n")
while not ok: cc.init_attributes(["bing_proxy"], "")
try: cc.init_attributes(["qq_pic_mode"], False)
gocq_app = CQHTTP( # cc.init_attributes(["qq_forward_mode"], False)
host="127.0.0.1",
port=6700,
http_port=5700,
)
ok = True
except BaseException as e:
print("[System-err] 连接到go-cqhttp异常, 5秒后重试。"+str(e))
threading.Thread(target=gocq_runner, daemon=True).start()
def new_sub_thread(func, args=()): def new_sub_thread(func, args=()):
thread = threading.Thread(target=func, args=args, daemon=True) thread = threading.Thread(target=func, args=args, daemon=True)
@@ -148,38 +153,31 @@ def toggle_count(at: bool, message):
# 上传统计信息并检查更新 # 上传统计信息并检查更新
def upload(): def upload():
global object_id global object_id
global version global version, cnt_valid, cnt_total
while True: while True:
addr = '' addr = ''
addr_ip = ''
try: try:
# 用户唯一性标识
addr = requests.get('http://myip.ipip.net', timeout=5).text addr = requests.get('http://myip.ipip.net', timeout=5).text
except BaseException: addr_ip = re.findall(r'\d+.\d+.\d+.\d+', addr)[0]
except BaseException as e:
print(e)
pass pass
try: try:
ts = str(time.time()) o = {"cnt_total": cnt_total,"admin": admin_qq,"addr": addr,}
guild_count, guild_msg_count, guild_direct_msg_count, session_count = get_stat() o_j = json.dumps(o)
headers = { res = {"version": version, "count": cnt_valid, "ip": addr_ip, "others": o_j}
'X-LC-Id': 'UqfXTWW15nB7iMT0OHvYrDFb-gzGzoHsz', resp = requests.post('https://api.soulter.top/upload', data=json.dumps(res), timeout=5)
'X-LC-Key': 'QAZ1rQLY1ZufHrZlpuUiNff7', # print(resp.text)
'Content-Type': 'application/json' if resp.status_code == 200:
} ok = resp.json()
key_stat = chatgpt.get_key_stat() if ok['status'] == 'ok':
d = {"data": {'version': version, "guild_count": guild_count, "guild_msg_count": guild_msg_count, "guild_direct_msg_count": guild_direct_msg_count, "session_count": session_count, 'addr': addr, 'key_stat':key_stat}} cnt_valid = 0
d = json.dumps(d).encode("utf-8") cnt_total = 0
res = requests.put(f'https://uqfxtww1.lc-cn-n1-shared.com/1.1/classes/bot_record/{object_id}', headers = headers, data = d)
if json.loads(res.text)['code'] == 1:
print("[System] New User.")
res = requests.post(f'https://uqfxtww1.lc-cn-n1-shared.com/1.1/classes/bot_record', headers = headers, data = d)
object_id = json.loads(res.text)['objectId']
object_id_file = open(abs_path+"configs/object_id", 'w+', encoding='utf-8')
object_id_file.write(str(object_id))
object_id_file.flush()
object_id_file.close()
except BaseException as e: except BaseException as e:
print(e)
pass pass
# 每隔2小时上传一次 time.sleep(60*10)
time.sleep(60*60*2)
''' '''
初始化机器人 初始化机器人
@@ -194,40 +192,43 @@ def initBot(cfg, prov):
reply_prefix = cfg['reply_prefix'] reply_prefix = cfg['reply_prefix']
# 语言模型提供商 # 语言模型提供商
print("--------------------加载语言模型--------------------") gu.log("--------加载语言模型--------", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
if REV_CHATGPT in prov: if REV_CHATGPT in prov:
print("- 逆向ChatGPT库 -") gu.log("- 逆向ChatGPT库 -", gu.LEVEL_INFO)
if cfg['rev_ChatGPT']['enable']: if cfg['rev_ChatGPT']['enable']:
if 'account' in cfg['rev_ChatGPT']: if 'account' in cfg['rev_ChatGPT']:
from model.provider.provider_rev_chatgpt import ProviderRevChatGPT from model.provider.provider_rev_chatgpt import ProviderRevChatGPT
from model.command.command_rev_chatgpt import CommandRevChatGPT
rev_chatgpt = ProviderRevChatGPT(cfg['rev_ChatGPT']) rev_chatgpt = ProviderRevChatGPT(cfg['rev_ChatGPT'])
command_rev_chatgpt = CommandRevChatGPT(cfg['rev_ChatGPT'])
chosen_provider = REV_CHATGPT chosen_provider = REV_CHATGPT
else: else:
input("[System-err] 请退出本程序, 然后在配置文件中填写rev_ChatGPT相关配置") input("[System-err] 请退出本程序, 然后在配置文件中填写rev_ChatGPT相关配置")
if REV_EDGEGPT in prov: if REV_EDGEGPT in prov:
print("- New Bing -") gu.log("- New Bing -", gu.LEVEL_INFO)
if not os.path.exists('./cookies.json'): if not os.path.exists('./cookies.json'):
input("[System-err] 导入Bing模型时发生错误, 没有找到cookies文件或者cookies文件放置位置错误。windows启动器启动的用户请把cookies.json文件放到和启动器相同的目录下。\n如何获取请看https://github.com/Soulter/QQChannelChatGPT仓库介绍。") input("[System-err] 导入Bing模型时发生错误, 没有找到cookies文件或者cookies文件放置位置错误。windows启动器启动的用户请把cookies.json文件放到和启动器相同的目录下。\n如何获取请看https://github.com/Soulter/QQChannelChatGPT仓库介绍。")
else: else:
if cfg['rev_edgegpt']['enable']: if cfg['rev_edgegpt']['enable']:
from model.provider.provider_rev_edgegpt import ProviderRevEdgeGPT try:
from model.command.command_rev_edgegpt import CommandRevEdgeGPT from model.provider.provider_rev_edgegpt import ProviderRevEdgeGPT
rev_edgegpt = ProviderRevEdgeGPT() rev_edgegpt = ProviderRevEdgeGPT()
command_rev_edgegpt = CommandRevEdgeGPT(rev_edgegpt) chosen_provider = REV_EDGEGPT
chosen_provider = REV_EDGEGPT except BaseException as e:
gu.log("加载Bing模型时发生错误, 请检查1. cookies文件是否正确放置 2. 是否设置了代理(梯子)。", gu.LEVEL_ERROR, max_len=60)
if OPENAI_OFFICIAL in prov: if OPENAI_OFFICIAL in prov:
print("- OpenAI ChatGPT官方API -") gu.log("- OpenAI官方 -", gu.LEVEL_INFO)
if cfg['openai']['key'] is not None: if cfg['openai']['key'] is not None:
from model.provider.provider_openai_official import ProviderOpenAIOfficial from model.provider.provider_openai_official import ProviderOpenAIOfficial
from model.command.command_openai_official import CommandOpenAIOfficial
chatgpt = ProviderOpenAIOfficial(cfg['openai']) chatgpt = ProviderOpenAIOfficial(cfg['openai'])
command_openai_official = CommandOpenAIOfficial(chatgpt)
chosen_provider = OPENAI_OFFICIAL chosen_provider = OPENAI_OFFICIAL
print("--------------------加载个性化配置--------------------") command_rev_edgegpt = CommandRevEdgeGPT(rev_edgegpt)
command_rev_chatgpt = CommandRevChatGPT(rev_chatgpt)
command_openai_official = CommandOpenAIOfficial(chatgpt)
gu.log("--------加载个性化配置--------", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
# 得到关键词 # 得到关键词
if os.path.exists("keyword.json"): if os.path.exists("keyword.json"):
with open("keyword.json", 'r', encoding='utf-8') as f: with open("keyword.json", 'r', encoding='utf-8') as f:
@@ -244,32 +245,20 @@ def initBot(cfg, prov):
if 'baidu_aip' in cfg and 'enable' in cfg['baidu_aip'] and cfg['baidu_aip']['enable']: if 'baidu_aip' in cfg and 'enable' in cfg['baidu_aip'] and cfg['baidu_aip']['enable']:
try: try:
baidu_judge = BaiduJudge(cfg['baidu_aip']) baidu_judge = BaiduJudge(cfg['baidu_aip'])
print("[System] 百度内容审核初始化成功") gu.log("百度内容审核初始化成功", gu.LEVEL_INFO)
except BaseException as e: except BaseException as e:
input("[System] 百度内容审核初始化失败: " + str(e)) gu.log("百度内容审核初始化失败", gu.LEVEL_ERROR)
exit()
# 统计上传 threading.Thread(target=upload, daemon=True).start()
if is_upload_log:
# 读取object_id
global object_id
if not os.path.exists(abs_path+"configs/object_id"):
with open(abs_path+"configs/object_id", 'w', encoding='utf-8') as f:
f.write("")
object_id_file = open(abs_path+"configs/object_id", 'r', encoding='utf-8')
object_id = object_id_file.read()
object_id_file.close()
# 创建上传定时器线程
threading.Thread(target=upload, daemon=True).start()
# 得到私聊模式配置 # 得到私聊模式配置
if 'direct_message_mode' in cfg: if 'direct_message_mode' in cfg:
direct_message_mode = cfg['direct_message_mode'] direct_message_mode = cfg['direct_message_mode']
print("[System] 私聊功能: "+str(direct_message_mode)) gu.log("私聊功能: "+str(direct_message_mode), gu.LEVEL_INFO)
# 得到发言频率配置 # 得到发言频率配置
if 'limit' in cfg: if 'limit' in cfg:
print('[System] 发言频率配置: '+str(cfg['limit'])) gu.log("发言频率配置: "+str(cfg['limit']), gu.LEVEL_INFO)
if 'count' in cfg['limit']: if 'count' in cfg['limit']:
frequency_count = cfg['limit']['count'] frequency_count = cfg['limit']['count']
if 'time' in cfg['limit']: if 'time' in cfg['limit']:
@@ -277,75 +266,81 @@ def initBot(cfg, prov):
# 得到公告配置 # 得到公告配置
if 'notice' in cfg: if 'notice' in cfg:
print('[System] 公告配置: '+cfg['notice']) gu.log("公告配置: "+cfg['notice'], gu.LEVEL_INFO)
announcement += cfg['notice'] announcement += cfg['notice']
try: try:
if 'uniqueSessionMode' in cfg and cfg['uniqueSessionMode']: if 'uniqueSessionMode' in cfg and cfg['uniqueSessionMode']:
uniqueSession = True uniqueSession = True
else: else:
uniqueSession = False uniqueSession = False
print("[System] 独立会话: " + str(uniqueSession)) gu.log("独立会话: "+str(uniqueSession), gu.LEVEL_INFO)
if 'dump_history_interval' in cfg: if 'dump_history_interval' in cfg:
print("[System] 历史记录转储时间周期: " + cfg['dump_history_interval'] + "分钟") gu.log("历史记录保存间隔: "+str(cfg['dump_history_interval']), gu.LEVEL_INFO)
except BaseException: except BaseException:
pass pass
print(f"[System] QQ开放平台AppID: {cfg['qqbot']['appid']} 令牌: {cfg['qqbot']['token']}")
print("\n[System] 如果有任何问题, 请在 https://github.com/Soulter/QQChannelChatGPT 上提交issue说明问题或者添加QQ905617992") gu.log(f"QQ开放平台AppID: {cfg['qqbot']['appid']} 令牌: {cfg['qqbot']['token']}")
print("[System] 请给 https://github.com/Soulter/QQChannelChatGPT 点个star!")
if chosen_provider is None: if chosen_provider is None:
print("[System-Warning] 检测到没有启动任何一个语言模型。请至少在配置文件中启用一个语言模型。") gu.log("检测到没有启动任何一个语言模型。请至少在配置文件中启用一个语言模型。", gu.LEVEL_CRITICAL)
# 得到指令设置(cmd_config.json)
if os.path.exists("cmd_config.json"):
with open("cmd_config.json", 'r', encoding='utf-8') as f:
cmd_config = json.load(f)
# QQ机器人昵称
if 'nick_qq' in cmd_config:
global nick_qq
nick_qq = cmd_config['nick_qq']
global nick_qq
nick_qq = cc.get('nick_qq', nick_qq)
thread_inst = None thread_inst = None
print("--------------------加载插件--------------------") gu.log("--------加载插件--------", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
# 加载插件 # 加载插件
_command = Command(None) _command = Command(None)
ok, err = _command.plugin_reload(cached_plugins) ok, err = _command.plugin_reload(cached_plugins)
if ok: if ok:
print("加载插件完成") gu.log("加载插件完成", gu.LEVEL_INFO)
else: else:
print(err) gu.log(err, gu.LEVEL_ERROR)
print("--------------------加载平台--------------------") gu.log("--------加载平台--------", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
# GOCQ # GOCQ
global gocq_bot
if 'gocqbot' in cfg and cfg['gocqbot']['enable']: if 'gocqbot' in cfg and cfg['gocqbot']['enable']:
print("- 启用QQ机器人 -") gu.log("- 启用QQ机器人 -", gu.LEVEL_INFO)
if os.path.exists("cmd_config.json"):
with open("cmd_config.json", 'r', encoding='utf-8') as f: global admin_qq, admin_qqchan
cmd_config = json.load(f) admin_qq = cc.get('admin_qq', None)
global admin_qq admin_qqchan = cc.get('admin_qqchan', None)
if "admin_qq" in cmd_config: if admin_qq == None:
admin_qq = cmd_config['admin_qq'] gu.log("未设置管理者QQ号(管理者才能使用update/plugin等指令)", gu.LEVEL_WARNING)
print("[System] 管理者QQ号: " + admin_qq) admin_qq = input("请输入管理者QQ号(必须设置): ")
else: gu.log("管理者QQ号设置为: " + admin_qq, gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
admin_qq = input("[System] 请输入管理者QQ号(管理者QQ号才能使用update/plugin等指令): ") cc.put('admin_qq', admin_qq)
print("[System] 管理者QQ号设置为: " + admin_qq) if admin_qqchan == None:
cmd_config['admin_qq'] = admin_qq gu.log("未设置管理者QQ频道用户号(管理者才能使用update/plugin等指令)", gu.LEVEL_WARNING)
with open("cmd_config.json", 'w', encoding='utf-8') as f: admin_qqchan = input("请输入管理者频道用户号(不是QQ号, 可以先回车跳过然后在频道发送指令!myid获取): ")
json.dump(cmd_config, f, indent=4) if admin_qqchan == "":
f.flush() gu.log("跳过设置管理者频道用户号", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
global gocq_app, gocq_bot, gocq_loop else:
gocq_bot = QQ() gu.log("管理者频道用户号设置为: " + admin_qqchan, gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
cc.put('admin_qqchan', admin_qqchan)
gu.log("管理者QQ: " + admin_qq, gu.LEVEL_INFO)
gu.log("管理者频道用户号: " + admin_qqchan, gu.LEVEL_INFO)
global gocq_app, gocq_loop
gocq_loop = asyncio.new_event_loop() gocq_loop = asyncio.new_event_loop()
gocq_bot = QQ(True, cc, gocq_loop)
thread_inst = threading.Thread(target=run_gocq_bot, args=(gocq_loop, gocq_bot, gocq_app), daemon=False) thread_inst = threading.Thread(target=run_gocq_bot, args=(gocq_loop, gocq_bot, gocq_app), daemon=False)
thread_inst.start() thread_inst.start()
else:
gocq_bot = QQ(False)
gu.log("机器人部署教程: https://github.com/Soulter/QQChannelChatGPT/wiki/", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
gu.log("如果有任何问题, 请在 https://github.com/Soulter/QQChannelChatGPT 上提交issue说明问题", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
gu.log("请给 https://github.com/Soulter/QQChannelChatGPT 点个star!", gu.LEVEL_INFO, fg=gu.FG_COLORS['yellow'])
# QQ频道 # QQ频道
if 'qqbot' in cfg and cfg['qqbot']['enable']: if 'qqbot' in cfg and cfg['qqbot']['enable']:
print("- 启用QQ频道机器人 -") gu.log("- 启用QQ频道机器人(旧版) -", gu.LEVEL_INFO)
global qqchannel_bot, qqchan_loop global qqchannel_bot, qqchan_loop
qqchannel_bot = QQChan() qqchannel_bot = QQChan()
qqchan_loop = asyncio.new_event_loop() qqchan_loop = asyncio.new_event_loop()
@@ -354,7 +349,7 @@ def initBot(cfg, prov):
# thread.join() # thread.join()
if thread_inst == None: if thread_inst == None:
input("[System-Error] 没有启用任何机器人,程序退出") input("[System-Error] 没有启用/成功启用任何机器人,程序退出")
exit() exit()
thread_inst.join() thread_inst.join()
@@ -367,10 +362,23 @@ def run_qqchan_bot(cfg, loop, qqchannel_bot):
try: try:
qqchannel_bot.run_bot(client, cfg['qqbot']['appid'], cfg['qqbot']['token']) qqchannel_bot.run_bot(client, cfg['qqbot']['appid'], cfg['qqbot']['token'])
except BaseException as e: except BaseException as e:
input(f"\n[System-Error] 启动QQ频道机器人时出现错误原因如下{e}\n可能是没有填写QQBOT appid和token请在config中完善你的appid和token\n配置教程https://soulter.top/posts/qpdg.html\n") gu.log("启动QQ频道机器人时出现错误, 原因如下: " + str(e), gu.LEVEL_CRITICAL, tag="QQ频道")
gu.log(r"【提醒】有可能你想启动的是gocq, 并不是这个旧版的QQ频道SDK, 如果是这样, 请修改配置文件QQChannelChatGPT/config.yaml详情请看https://github.com/Soulter/QQChannelChatGPT/wiki/%E4%BA%8C%E3%80%81%E9%A1%B9%E7%9B%AE%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%E9%85%8D%E7%BD%AE。" + str(e), gu.LEVEL_CRITICAL, tag="QQ频道")
# gu.log("如果你使用了go-cqhttp, 则可以忽略上面的报错。" + str(e), gu.LEVEL_CRITICAL, tag="QQ频道")
# input(f"\n[System-Error] 启动QQ频道机器人时出现错误原因如下{e}\n可能是没有填写QQBOT appid和token请在config中完善你的appid和token\n配置教程https://soulter.top/posts/qpdg.html\n")
def run_gocq_bot(loop, gocq_bot, gocq_app): def run_gocq_bot(loop, gocq_bot, gocq_app):
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
gu.log("正在检查本地GO-CQHTTP连接...端口5700, 6700", tag="QQ")
while True:
if not gu.port_checker(5700) or not gu.port_checker(6700):
gu.log("与GO-CQHTTP通信失败, 请检查GO-CQHTTP是否启动并正确配置。5秒后自动重试。", gu.LEVEL_CRITICAL, tag="QQ")
time.sleep(5)
else:
gu.log("检查完毕,未发现问题。", tag="QQ")
break
global gocq_client global gocq_client
gocq_client = gocqClient() gocq_client = gocqClient()
try: try:
@@ -407,7 +415,11 @@ def save_provider_preference(chosen_provider):
''' '''
通用回复方法 通用回复方法
''' '''
def send_message(platform, message, res, msg_ref = None, image = None, gocq_loop = None, qqchannel_bot = None, gocq_bot = None): def send_message(platform, message, res, msg_ref = None, image = None, gocq_loop = None, qqchannel_bot = None, gocq_bot = None, image_mode=False):
# imagemode:
# For GOCQ: when image_mode is true, ALL plain texts in res will change into a new pic
global cnt_valid
cnt_valid += 1
if platform == PLATFORM_QQCHAN: if platform == PLATFORM_QQCHAN:
if image != None: if image != None:
qqchannel_bot.send_qq_msg(message, str(res), image_mode=True, msg_ref=msg_ref) qqchannel_bot.send_qq_msg(message, str(res), image_mode=True, msg_ref=msg_ref)
@@ -415,9 +427,10 @@ def send_message(platform, message, res, msg_ref = None, image = None, gocq_loop
qqchannel_bot.send_qq_msg(message, str(res), msg_ref=msg_ref) qqchannel_bot.send_qq_msg(message, str(res), msg_ref=msg_ref)
if platform == PLATFORM_GOCQ: if platform == PLATFORM_GOCQ:
if image != None: if image != None:
asyncio.run_coroutine_threadsafe(gocq_bot.send_qq_msg(message, image, image_mode=True), gocq_loop).result() # image is a url string
asyncio.run_coroutine_threadsafe(gocq_bot.send_qq_msg(message, [Plain(text="好的,我根据你的需要为你生成了一张图片😊"),Image.fromURL(image)], False), gocq_loop).result()
else: else:
asyncio.run_coroutine_threadsafe(gocq_bot.send_qq_msg(message, res, False, ), gocq_loop).result() asyncio.run_coroutine_threadsafe(gocq_bot.send_qq_msg(message, res, image_mode), gocq_loop).result()
def oper_msg(message, def oper_msg(message,
@@ -434,36 +447,30 @@ def oper_msg(message,
session_id = '' session_id = ''
user_id = '' user_id = ''
user_name = '' user_name = ''
global chosen_provider, reply_prefix, keywords, qqchannel_bot, gocq_bot, gocq_loop, bing_cache_loop global chosen_provider, reply_prefix, keywords, qqchannel_bot, gocq_bot, gocq_loop, bing_cache_loop, qqchan_loop
role = "member" # 角色 role = "member" # 角色
hit = False # 是否命中指令 hit = False # 是否命中指令
command_result = () # 调用指令返回的结果 command_result = () # 调用指令返回的结果
global admin_qq, cached_plugins global admin_qq, admin_qqchan, cached_plugins, gocq_bot, nick_qq
global cnt_total
cnt_total += 1
with_tag = False # 是否带有昵称
# 将nick_qq(昵称)统一转换为tuple
if nick_qq == None:
nick_qq = ("ai","!","")
if isinstance(nick_qq, str):
nick_qq = (nick_qq,)
if isinstance(nick_qq, list):
nick_qq = tuple(nick_qq)
if platform == PLATFORM_QQCHAN: if platform == PLATFORM_QQCHAN:
print("[QQCHAN-BOT] 接收到消息:"+ str(message.content)) with_tag = True
gu.log(f"收到消息:{message.content}", gu.LEVEL_INFO, tag="QQ频道")
user_id = message.author.id user_id = message.author.id
user_name = message.author.username user_name = message.author.username
global qqchan_loop
if platform == PLATFORM_GOCQ:
if isinstance(message.message[0], Plain):
print("[GOCQ-BOT] 接收到消息:"+ str(message.message[0].text))
elif isinstance(message.message[0], At):
print("[GOCQ-BOT] 接收到消息:"+ str(message.message[1].text))
user_id = message.user_id
user_name = message.user_id
global gocq_loop
if chosen_provider is None:
send_message(platform, message, f"没有启动任何一个语言模型。请至少在配置文件中启用一个语言模型。", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
# 检查发言频率
if not check_frequency(user_id):
send_message(platform, message, f'你的发言超过频率限制(╯▔皿▔)╯。\n管理员设置{frequency_time}秒内只能提问{frequency_count}次。', msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
return
if platform == PLATFORM_QQCHAN:
if group: if group:
# 频道内 # 频道内
# 过滤@ # 过滤@
@@ -478,7 +485,7 @@ def oper_msg(message,
session_id = message.channel_id session_id = message.channel_id
# 得到身份 # 得到身份
if "2" in message.member.roles or "4" in message.member.roles or "5" in message.member.roles: if "2" in message.member.roles or "4" in message.member.roles or "5" in message.member.roles:
print("[QQCHAN-BOT] 检测到管理员身份") # gu.log(f"检测到管理员身份", gu.LEVEL_INFO, tag="QQ频道")
role = "admin" role = "admin"
else: else:
role = "member" role = "member"
@@ -488,28 +495,64 @@ def oper_msg(message,
session_id = user_id session_id = user_id
if platform == PLATFORM_GOCQ: if platform == PLATFORM_GOCQ:
_len = 0
for i in message.message:
if isinstance(i, Plain):
qq_msg += str(i.text).strip()
if isinstance(i, At):
# @机器人
if message.type == "GuildMessage":
if i.qq == message.self_tiny_id:
with_tag = True
if message.type == "FriendMessage":
if i.qq == message.self_id:
with_tag = True
if message.type == "GroupMessage":
if i.qq == message.self_id:
with_tag = True
for i in nick_qq:
if i != '' and qq_msg.startswith(i):
_len = len(i)
with_tag = True
break
qq_msg = qq_msg[_len:].strip()
gu.log(f"收到消息:{qq_msg}", gu.LEVEL_INFO, tag="QQ")
user_id = message.user_id
if group: if group:
if isinstance(message.message[0], Plain): # 适配GO-CQHTTP的频道功能
qq_msg = str(message.message[0].text).strip() if message.type == "GuildMessage":
elif isinstance(message.message[0], At): session_id = message.channel_id
qq_msg = str(message.message[1].text).strip()
else: else:
return session_id = message.group_id
session_id = message.group_id
else: else:
qq_msg = message.message[0].text with_tag = True
# qq_msg = message.message[0].text
session_id = message.user_id session_id = message.user_id
role = "member" role = "member"
if str(message.sender.user_id) == admin_qq:
print("[GOCQ-BOT] 检测到管理员身份") if message.type == "GuildMessage":
sender_id = str(message.sender.tiny_id)
else:
sender_id = str(message.sender.user_id)
if sender_id == admin_qq or sender_id == admin_qqchan:
# gu.log("检测到管理员身份", gu.LEVEL_INFO, tag="GOCQ")
role = "admin" role = "admin"
if qq_msg == "": if qq_msg == "":
send_message(platform, message, f"Hi~", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) send_message(platform, message, f"Hi~", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
return return
logf.write("[QQBOT] "+ qq_msg+'\n') if with_tag:
logf.flush() # 检查发言频率
if not check_frequency(user_id):
send_message(platform, message, f'你的发言超过频率限制(╯▔皿▔)╯。\n管理员设置{frequency_time}秒内只能提问{frequency_count}次。', msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
return
# logf.write("[GOCQBOT] "+ qq_msg+'\n')
# logf.flush()
# 关键词回复 # 关键词回复
for k in keywords: for k in keywords:
@@ -557,27 +600,39 @@ def oper_msg(message,
chatgpt_res = "" chatgpt_res = ""
if chosen_provider == OPENAI_OFFICIAL: if chosen_provider == OPENAI_OFFICIAL:
hit, command_result = command_openai_official.check_command(qq_msg, session_id, user_name, role, platform=platform, message_obj=message, cached_plugins=cached_plugins) hit, command_result = command_openai_official.check_command(qq_msg, session_id, user_name, role, platform=platform, message_obj=message, cached_plugins=cached_plugins, qq_platform=gocq_bot)
# hit: 是否触发了指令 # hit: 是否触发了指令
if not hit: if not hit:
if not with_tag:
return
if chatgpt == None:
send_message(platform, message, f"管理员未启动OpenAI模型或初始化时失败。", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
return
# 请求ChatGPT获得结果 # 请求ChatGPT获得结果
try: try:
chatgpt_res = chatgpt.text_chat(qq_msg, session_id) chatgpt_res = chatgpt.text_chat(qq_msg, session_id)
if OPENAI_OFFICIAL in reply_prefix: if OPENAI_OFFICIAL in reply_prefix:
chatgpt_res = reply_prefix[OPENAI_OFFICIAL] + chatgpt_res chatgpt_res = reply_prefix[OPENAI_OFFICIAL] + chatgpt_res
except (BaseException) as e: except (BaseException) as e:
print("[System-Err] OpenAI API请求错误, 原因: "+str(e)) gu.log("OpenAI API请求错误, 原因: "+str(e), gu.LEVEL_ERROR)
send_message(platform, message, f"OpenAI API错误, 原因: {str(e)}", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) send_message(platform, message, f"OpenAI API错误, 原因: {str(e)}", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
elif chosen_provider == REV_CHATGPT: elif chosen_provider == REV_CHATGPT:
hit, command_result = command_rev_chatgpt.check_command(qq_msg, role, platform=platform, message=message, cached_plugins=cached_plugins) hit, command_result = command_rev_chatgpt.check_command(qq_msg, role, platform=platform, message_obj=message, cached_plugins=cached_plugins, qq_platform=gocq_bot)
if not hit: if not hit:
if not with_tag:
return
if rev_chatgpt == None:
send_message(platform, message, f"管理员未启动此模型或者此模型初始化时失败。", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
return
try: try:
while rev_chatgpt.is_all_busy():
time.sleep(1)
chatgpt_res = str(rev_chatgpt.text_chat(qq_msg)) chatgpt_res = str(rev_chatgpt.text_chat(qq_msg))
if REV_CHATGPT in reply_prefix: if REV_CHATGPT in reply_prefix:
chatgpt_res = reply_prefix[REV_CHATGPT] + chatgpt_res chatgpt_res = reply_prefix[REV_CHATGPT] + chatgpt_res
except BaseException as e: except BaseException as e:
print("[System-Err] RevChatGPT请求错误, 原因: "+str(e)) gu.log("逆向ChatGPT请求错误, 原因: "+str(e), gu.LEVEL_ERROR)
send_message(platform, message, f"RevChatGPT错误, 原因: \n{str(e)}", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) send_message(platform, message, f"RevChatGPT错误, 原因: \n{str(e)}", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
elif chosen_provider == REV_EDGEGPT: elif chosen_provider == REV_EDGEGPT:
@@ -586,9 +641,14 @@ def oper_msg(message,
bing_cache_loop = gocq_loop bing_cache_loop = gocq_loop
elif platform == PLATFORM_QQCHAN: elif platform == PLATFORM_QQCHAN:
bing_cache_loop = qqchan_loop bing_cache_loop = qqchan_loop
hit, command_result = command_rev_edgegpt.check_command(qq_msg, bing_cache_loop, role, platform=platform, message_obj=message, cached_plugins=cached_plugins) hit, command_result = command_rev_edgegpt.check_command(qq_msg, bing_cache_loop, role, platform=platform, message_obj=message, cached_plugins=cached_plugins, qq_platform=gocq_bot)
if not hit: if not hit:
try: try:
if not with_tag:
return
if rev_edgegpt == None:
send_message(platform, message, f"管理员未启动此模型或者此模型初始化时失败。", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
return
while rev_edgegpt.is_busy(): while rev_edgegpt.is_busy():
time.sleep(1) time.sleep(1)
@@ -605,7 +665,7 @@ def oper_msg(message,
if REV_EDGEGPT in reply_prefix: if REV_EDGEGPT in reply_prefix:
chatgpt_res = reply_prefix[REV_EDGEGPT] + chatgpt_res chatgpt_res = reply_prefix[REV_EDGEGPT] + chatgpt_res
except BaseException as e: except BaseException as e:
print("[System-Err] Rev NewBing API错误。原因如下:\n"+str(e)) gu.log("NewBing请求错误, 原因: "+str(e), gu.LEVEL_ERROR)
send_message(platform, message, f"Rev NewBing API错误。原因如下\n{str(e)} \n前往官方频道反馈~", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) send_message(platform, message, f"Rev NewBing API错误。原因如下\n{str(e)} \n前往官方频道反馈~", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
# 切换回原来的语言模型 # 切换回原来的语言模型
@@ -622,17 +682,18 @@ def oper_msg(message,
with open("keyword.json", "r", encoding="utf-8") as f: with open("keyword.json", "r", encoding="utf-8") as f:
keywords = json.load(f) keywords = json.load(f)
# QQ昵称 # 昵称
if command == "nick": if command == "nick":
with open("cmd_config.json", "r", encoding="utf-8") as f: nick_qq = cc.get("nick_qq", nick_qq)
global nick_qq
nick_qq = json.load(f)["nick_qq"]
if command_result[0]: if command_result[0]:
# 是否是画图指令 # 是否是画图指令
if isinstance(command_result, list) and len(command_result) == 3 and command_result[2] == 'draw': if isinstance(command_result[1], list) and len(command_result) == 3 and command_result[2] == 'draw':
for i in command_result[1]: if chatgpt != None:
send_message(platform, message, i, msg_ref=msg_ref, image=i, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) for i in command_result[1]:
send_message(platform, message, i, msg_ref=msg_ref, image=i, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
else:
send_message(platform, message, "画图指令需要启用OpenAI官方模型.", msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
else: else:
try: try:
send_message(platform, message, command_result[1], msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) send_message(platform, message, command_result[1], msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
@@ -648,8 +709,8 @@ def oper_msg(message,
return return
# 记录日志 # 记录日志
logf.write(f"{reply_prefix} {str(chatgpt_res)}\n") # logf.write(f"{reply_prefix} {str(chatgpt_res)}\n")
logf.flush() # logf.flush()
# 敏感过滤 # 敏感过滤
# 过滤不合适的词 # 过滤不合适的词
@@ -664,9 +725,15 @@ def oper_msg(message,
# 发送qq信息 # 发送qq信息
try: try:
send_message(platform, message, chatgpt_res, msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot) if platform==PLATFORM_GOCQ:
if cc.get("qq_pic_mode", False):
send_message(platform, message, chatgpt_res, image_mode=True, msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
else:
send_message(platform, message, chatgpt_res, msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
else:
send_message(platform, message, chatgpt_res, msg_ref=msg_ref, gocq_loop=gocq_loop, qqchannel_bot=qqchannel_bot, gocq_bot=gocq_bot)
except BaseException as e: except BaseException as e:
print("回复消息错误: \n"+str(e)) gu.log("回复消息错误: \n"+str(e), gu.LEVEL_ERROR)
''' '''
获取统计信息 获取统计信息
@@ -716,23 +783,10 @@ class gocqClient():
# 收到群聊消息 # 收到群聊消息
@gocq_app.receiver("GroupMessage") @gocq_app.receiver("GroupMessage")
async def _(app: CQHTTP, source: GroupMessage): async def _(app: CQHTTP, source: GroupMessage):
global nick_qq # gu.log(str(source), gu.LEVEL_INFO, max_len=9999)
# 将nick_qq转换为元组
if nick_qq == None:
nick_qq = ("ai",)
if isinstance(nick_qq, str):
nick_qq = (nick_qq,)
if isinstance(nick_qq, list):
nick_qq = tuple(nick_qq)
if isinstance(source.message[0], Plain): if isinstance(source.message[0], Plain):
if source.message[0].text.startswith(nick_qq): new_sub_thread(oper_msg, (source, True, None, PLATFORM_GOCQ))
_len = 0
for i in nick_qq:
if source.message[0].text.startswith(i):
_len = len(i)
source.message[0].text = source.message[0].text[_len:].strip()
new_sub_thread(oper_msg, (source, True, None, PLATFORM_GOCQ))
if isinstance(source.message[0], At): if isinstance(source.message[0], At):
if source.message[0].qq == source.self_id: if source.message[0].qq == source.self_id:
new_sub_thread(oper_msg, (source, True, None, PLATFORM_GOCQ)) new_sub_thread(oper_msg, (source, True, None, PLATFORM_GOCQ))
@@ -748,7 +802,25 @@ class gocqClient():
@gocq_app.receiver("GroupMemberIncrease") @gocq_app.receiver("GroupMemberIncrease")
async def _(app: CQHTTP, source: GroupMemberIncrease): async def _(app: CQHTTP, source: GroupMemberIncrease):
global nick_qq global nick_qq, cc
await app.sendGroupMessage(source.group_id, [ await app.sendGroupMessage(source.group_id, [
Plain(text=f"欢迎加入本群!\n欢迎给https://github.com/Soulter/QQChannelChatGPT项目一个Star😊~\n@我输入help查看帮助~\n我叫{nick_qq}, 你也可以以【{nick_qq}+问题】的格式来提醒我并问我问题哦~\n") Plain(text=cc.get("qq_welcome", "欢迎新人~")),
]) ])
@gocq_app.receiver("GuildMessage")
async def _(app: CQHTTP, source: GuildMessage):
# gu.log(str(source), gu.LEVEL_INFO, max_len=9999)
if isinstance(source.message[0], Plain):
# if source.message[0].text.startswith(nick_qq):
# _len = 0
# for i in nick_qq:
# if source.message[0].text.startswith(i):
# _len = len(i)
# source.message[0].text = source.message[0].text[_len:].strip()
new_sub_thread(oper_msg, (source, True, None, PLATFORM_GOCQ))
if isinstance(source.message[0], At):
if source.message[0].qq == source.self_tiny_id:
new_sub_thread(oper_msg, (source, True, None, PLATFORM_GOCQ))
else:
return

18
main.py
View File

@@ -1,11 +1,10 @@
import threading
import asyncio
import os, sys import os, sys
from pip._internal import main as pipmain from pip._internal import main as pipmain
import util.general_utils as gu
abs_path = os.path.dirname(os.path.realpath(sys.argv[0])) + '/' abs_path = os.path.dirname(os.path.realpath(sys.argv[0])) + '/'
def main(loop, event): def main():
try: try:
import cores.qqbot.core as qqBot import cores.qqbot.core as qqBot
import yaml import yaml
@@ -13,7 +12,7 @@ def main(loop, event):
cfg = yaml.safe_load(ymlfile) cfg = yaml.safe_load(ymlfile)
except BaseException as e: except BaseException as e:
print(e) print(e)
input("yaml库未导入或者配置文件格式错误,请退出程序重试。") input("第三方依赖库未完全安装完毕,请退出程序重试。")
exit() exit()
if 'http_proxy' in cfg: if 'http_proxy' in cfg:
@@ -22,7 +21,11 @@ def main(loop, event):
os.environ['HTTPS_PROXY'] = cfg['https_proxy'] os.environ['HTTPS_PROXY'] = cfg['https_proxy']
provider = privider_chooser(cfg) provider = privider_chooser(cfg)
print('[System] 当前语言模型提供商: ' + str(provider)) if len(provider) == 0:
gu.log("未开启任何语言模型, 请在configs/config.yaml下选择开启相应语言模型。", gu.LEVEL_CRITICAL)
input("按任意键退出...")
exit()
print('[System] 开启的语言模型: ' + str(provider))
# 执行Bot # 执行Bot
qqBot.initBot(cfg, provider) qqBot.initBot(cfg, provider)
@@ -116,7 +119,4 @@ if __name__ == "__main__":
except BaseException as e: except BaseException as e:
print(e) print(e)
print(f"[System-err] Replit Web保活服务启动失败:{str(e)}") print(f"[System-err] Replit Web保活服务启动失败:{str(e)}")
main()
bot_event = threading.Event()
loop = asyncio.get_event_loop()
main(loop, bot_event)

View File

@@ -9,6 +9,15 @@ import json
import util.plugin_util as putil import util.plugin_util as putil
import shutil import shutil
import importlib import importlib
from util import general_utils as gu
from util.cmd_config import CmdConfig as cc
from model.platform.qq import QQ
import stat
from nakuru.entities.components import (
Plain,
Image
)
from PIL import Image as PILImage
PLATFORM_QQCHAN = 'qqchan' PLATFORM_QQCHAN = 'qqchan'
PLATFORM_GOCQ = 'gocq' PLATFORM_GOCQ = 'gocq'
@@ -32,25 +41,50 @@ class Command:
except BaseException as e: except BaseException as e:
raise e raise e
def check_command(self, message, role, platform, message_obj, cached_plugins: dict): def check_command(self, message, role, platform, message_obj, cached_plugins: dict, qq_platform: QQ):
# 插件 # 插件
for k, v in cached_plugins.items(): for k, v in cached_plugins.items():
try: try:
hit, res = v["clsobj"].run(message, role, platform, message_obj) hit, res = v["clsobj"].run(message, role, platform, message_obj, qq_platform)
if hit: if hit:
return True, res return True, res
except BaseException as e: except BaseException as e:
print(f"[Debug] {k}插件加载出现问题,原因: {str(e)}\n已安装插件: {cached_plugins.keys}\n如果你没有相关装插件的想法, 请直接忽略此报错, 不影响其他功能的运行。") gu.log(f"{k}插件加载出现问题,原因: {str(e)}\n已安装插件: {cached_plugins.keys}\n如果你没有相关装插件的想法, 请直接忽略此报错, 不影响其他功能的运行。", level=gu.LEVEL_WARNING)
if self.command_start_with(message, "nick"): if self.command_start_with(message, "nick"):
return True, self.set_nick(message, platform) return True, self.set_nick(message, platform, role)
if self.command_start_with(message, "plugin"): if self.command_start_with(message, "plugin"):
return True, self.plugin_oper(message, role, cached_plugins) return True, self.plugin_oper(message, role, cached_plugins, platform)
if self.command_start_with(message, "myid"):
return True, self.get_my_id(message_obj, platform)
if self.command_start_with(message, "nconf") or self.command_start_with(message, "newconf"):
return True, self.get_new_conf(message, role, platform)
return False, None return False, None
def get_my_id(self, message_obj, platform):
if platform == "gocq":
if message_obj.type == "GuildMessage":
return True, f"你的频道id是{str(message_obj.sender.tiny_id)}", "plugin"
else:
return True, f"你的QQ是{str(message_obj.sender.user_id)}", "plugin"
def get_new_conf(self, message, role, platform):
if role != "admin":
return False, f"你的身份组{role}没有权限使用此指令。", "newconf"
if platform == gu.PLATFORM_GOCQ:
l = message.split(" ")
if len(l) <= 1:
obj = cc.get_all()
p = gu.create_text_image("【cmd_config.json】", json.dumps(obj, indent=4, ensure_ascii=False))
return True, [Image.fromFileSystem(p)], "newconf"
return False, f"Not support or not implemented.", "newconf"
def plugin_reload(self, cached_plugins: dict, target: str = None, all: bool = False): def plugin_reload(self, cached_plugins: dict, target: str = None, all: bool = False):
plugins = self.get_plugin_modules() plugins = self.get_plugin_modules()
fail_rec = "" fail_rec = ""
@@ -80,6 +114,7 @@ class Command:
"info": info "info": info
} }
except BaseException as e: except BaseException as e:
raise e
fail_rec += f"加载{p}插件出现问题,原因{str(e)}\n" fail_rec += f"加载{p}插件出现问题,原因{str(e)}\n"
if fail_rec == "": if fail_rec == "":
return True, None return True, None
@@ -91,9 +126,12 @@ class Command:
''' '''
插件指令 插件指令
''' '''
def plugin_oper(self, message: str, role: str, cached_plugins: dict): def plugin_oper(self, message: str, role: str, cached_plugins: dict, platform: str):
l = message.split(" ") l = message.split(" ")
if len(l) < 2: if len(l) < 2:
if platform == gu.PLATFORM_GOCQ:
p = gu.create_text_image("【插件指令面板】", "安装插件: \nplugin i 插件Github地址\n卸载插件: \nplugin i 插件名 \n重载插件: \nplugin reload\n查看插件列表:\nplugin l\n更新插件: plugin u 插件名\n")
return True, [Image.fromFileSystem(p)], "plugin"
return True, "\n=====插件指令面板=====\n安装插件: \nplugin i 插件Github地址\n卸载插件: \nplugin i 插件名 \n重载插件: \nplugin reload\n查看插件列表:\nplugin l\n更新插件: plugin u 插件名\n===============", "plugin" return True, "\n=====插件指令面板=====\n安装插件: \nplugin i 插件Github地址\n卸载插件: \nplugin i 插件名 \n重载插件: \nplugin reload\n查看插件列表:\nplugin l\n更新插件: plugin u 插件名\n===============", "plugin"
else: else:
ppath = "" ppath = ""
@@ -138,7 +176,8 @@ class Command:
return False, f"你的身份组{role}没有权限删除插件", "plugin" return False, f"你的身份组{role}没有权限删除插件", "plugin"
try: try:
# 删除文件夹 # 删除文件夹
shutil.rmtree(os.path.join(ppath, l[2])) # shutil.rmtree(os.path.join(ppath, l[2]))
self.remove_dir(os.path.join(ppath, l[2]))
if l[2] in cached_plugins: if l[2] in cached_plugins:
del cached_plugins[l[2]] del cached_plugins[l[2]]
return True, "插件卸载成功~", "plugin" return True, "插件卸载成功~", "plugin"
@@ -160,6 +199,9 @@ class Command:
elif l[1] == "l": elif l[1] == "l":
try: try:
plugin_list_info = "\n".join([f"{k}: \n名称: {v['info']['name']}\n简介: {v['info']['desc']}\n版本: {v['info']['version']}\n作者: {v['info']['author']}\n" for k, v in cached_plugins.items()]) plugin_list_info = "\n".join([f"{k}: \n名称: {v['info']['name']}\n简介: {v['info']['desc']}\n版本: {v['info']['version']}\n作者: {v['info']['author']}\n" for k, v in cached_plugins.items()])
if platform == gu.PLATFORM_GOCQ:
p = gu.create_text_image("【已激活插件列表】", plugin_list_info + "\n使用plugin v 插件名 查看插件帮助\n")
return True, [Image.fromFileSystem(p)], "plugin"
return True, "\n=====已激活插件列表=====\n" + plugin_list_info + "\n使用plugin v 插件名 查看插件帮助\n=================", "plugin" return True, "\n=====已激活插件列表=====\n" + plugin_list_info + "\n使用plugin v 插件名 查看插件帮助\n=================", "plugin"
except BaseException as e: except BaseException as e:
return False, f"获取插件列表失败,原因: {str(e)}", "plugin" return False, f"获取插件列表失败,原因: {str(e)}", "plugin"
@@ -167,6 +209,9 @@ class Command:
try: try:
if l[2] in cached_plugins: if l[2] in cached_plugins:
info = cached_plugins[l[2]]["info"] info = cached_plugins[l[2]]["info"]
if platform == gu.PLATFORM_GOCQ:
p = gu.create_text_image(f"【插件信息】", f"名称: {info['name']}\n{info['desc']}\n版本: {info['version']}\n作者: {info['author']}\n\n帮助:\n{info['help']}")
return True, [Image.fromFileSystem(p)], "plugin"
res = f"\n=====插件信息=====\n名称: {info['name']}\n{info['desc']}\n版本: {info['version']}作者: {info['author']}\n\n帮助:\n{info['help']}" res = f"\n=====插件信息=====\n名称: {info['name']}\n{info['desc']}\n版本: {info['version']}作者: {info['author']}\n\n帮助:\n{info['help']}"
return True, res, "plugin" return True, res, "plugin"
else: else:
@@ -193,10 +238,24 @@ class Command:
return True, "cached_plugins: \n" + str(cached_plugins), "plugin" return True, "cached_plugins: \n" + str(cached_plugins), "plugin"
def remove_dir(self, file_path):
while 1:
if not os.path.exists(file_path):
break
try:
shutil.rmtree(file_path)
except PermissionError as e:
err_file_path = str(e).split("\'", 2)[1]
if os.path.exists(err_file_path):
os.chmod(err_file_path, stat.S_IWUSR)
''' '''
nick: 存储机器人的昵称 nick: 存储机器人的昵称
''' '''
def set_nick(self, message: str, platform: str): def set_nick(self, message: str, platform: str, role: str = "member"):
if role != "admin":
return True, "你无权使用该指令 :P", "nick"
if platform == PLATFORM_GOCQ: if platform == PLATFORM_GOCQ:
l = message.split(" ") l = message.split(" ")
if len(l) == 1: if len(l) == 1:
@@ -232,25 +291,38 @@ class Command:
"update r": "重启机器人", "update r": "重启机器人",
"reset": "重置会话", "reset": "重置会话",
"nick": "设置机器人昵称", "nick": "设置机器人昵称",
"plugin": "插件安装、卸载和重载",
"/bing": "切换到bing模型", "/bing": "切换到bing模型",
"/gpt": "切换到OpenAI ChatGPT API", "/gpt": "切换到OpenAI ChatGPT API",
"/revgpt": "切换到网页版ChatGPT", "/revgpt": "切换到网页版ChatGPT",
"/bing 问题": "临时使用一次bing模型进行会话",
"/gpt 问题": "临时使用一次OpenAI ChatGPT API进行会话",
"/revgpt 问题": "临时使用一次网页版ChatGPT进行会话",
"plugin": "插件安装、卸载和重载"
} }
def help_messager(self, commands: dict): def help_messager(self, commands: dict, platform: str, cached_plugins: dict = None):
try: try:
resp = requests.get("https://soulter.top/channelbot/notice.json").text resp = requests.get("https://soulter.top/channelbot/notice.json").text
notice = json.loads(resp)["notice"] notice = json.loads(resp)["notice"]
except BaseException as e: except BaseException as e:
notice = "" notice = ""
msg = "Github项目名QQChannelChatGPT, 有问题提交issue, 欢迎Star\n指令列表\n" msg = "# Help Center\n## 指令列表\n"
# msg = "Github项目名QQChannelChatGPT, 有问题提交issue, 欢迎Star\n【指令列表】\n"
for key, value in commands.items(): for key, value in commands.items():
msg += key + ": " + value + "\n" msg += f"`{key}` - {value}\n"
# plugins
if cached_plugins != None:
plugin_list_info = "\n".join([f"`{k}` {v['info']['name']}\n{v['info']['desc']}\n" for k, v in cached_plugins.items()])
if plugin_list_info.strip() != "":
msg += "\n## 插件列表\n> 使用plugin v 插件名 查看插件帮助\n"
msg += plugin_list_info
msg += notice msg += notice
if platform == gu.PLATFORM_GOCQ:
try:
# p = gu.create_text_image("【Help Center】", msg)
p = gu.create_markdown_image(msg)
return [Image.fromFileSystem(p)]
except BaseException as e:
gu.log(str(e))
return msg
return msg return msg
# 接受可变参数 # 接受可变参数
@@ -272,7 +344,6 @@ class Command:
del_mode = False del_mode = False
if l[1] == "d": if l[1] == "d":
print("删除关键词: "+l[2])
del_mode = True del_mode = True
try: try:
@@ -291,7 +362,6 @@ class Command:
return False, "该关键词不存在", "keyword" return False, "该关键词不存在", "keyword"
keyword = {l[1]: l[2]} keyword = {l[1]: l[2]}
with open("keyword.json", "w", encoding="utf-8") as f: with open("keyword.json", "w", encoding="utf-8") as f:
print("设置指令: "+l[1]+" -> "+l[2])
json.dump(keyword, f, ensure_ascii=False, indent=4) json.dump(keyword, f, ensure_ascii=False, indent=4)
f.flush() f.flush()
if del_mode: if del_mode:
@@ -338,6 +408,10 @@ class Command:
pash_tag = "QQChannelChatGPT"+os.sep pash_tag = "QQChannelChatGPT"+os.sep
repo.remotes.origin.pull() repo.remotes.origin.pull()
if len(l) == 3 and l[2] == "r":
py = sys.executable
os.execl(py, py, *sys.argv)
return True, "更新成功~是否重启输入update r重启重启指令不返回任何确认信息", "update" return True, "更新成功~是否重启输入update r重启重启指令不返回任何确认信息", "update"
except BaseException as e: except BaseException as e:

View File

@@ -1,6 +1,9 @@
from model.command.command import Command from model.command.command import Command
from model.provider.provider_openai_official import ProviderOpenAIOfficial from model.provider.provider_openai_official import ProviderOpenAIOfficial
from cores.qqbot.personality import personalities from cores.qqbot.personality import personalities
from model.platform.qq import QQ
from util import general_utils as gu
class CommandOpenAIOfficial(Command): class CommandOpenAIOfficial(Command):
def __init__(self, provider: ProviderOpenAIOfficial): def __init__(self, provider: ProviderOpenAIOfficial):
@@ -14,8 +17,10 @@ class CommandOpenAIOfficial(Command):
role: str, role: str,
platform: str, platform: str,
message_obj, message_obj,
cached_plugins: dict): cached_plugins: dict,
hit, res = super().check_command(message, role, platform, message_obj=message_obj, cached_plugins=cached_plugins) qq_platform: QQ):
self.platform = platform
hit, res = super().check_command(message, role, platform, message_obj=message_obj, cached_plugins=cached_plugins, qq_platform=qq_platform)
if hit: if hit:
return True, res return True, res
if self.command_start_with(message, "reset", "重置"): if self.command_start_with(message, "reset", "重置"):
@@ -31,7 +36,7 @@ class CommandOpenAIOfficial(Command):
elif self.command_start_with(message, "count"): elif self.command_start_with(message, "count"):
return True, self.count() return True, self.count()
elif self.command_start_with(message, "help", "帮助"): elif self.command_start_with(message, "help", "帮助"):
return True, self.help() return True, self.help(cached_plugins)
elif self.command_start_with(message, "unset"): elif self.command_start_with(message, "unset"):
return True, self.unset(session_id) return True, self.unset(session_id)
elif self.command_start_with(message, "set"): elif self.command_start_with(message, "set"):
@@ -50,7 +55,7 @@ class CommandOpenAIOfficial(Command):
return False, None return False, None
def help(self): def help(self, cached_plugins):
commands = super().general_commands() commands = super().general_commands()
commands[''] = '画画' commands[''] = '画画'
commands['key'] = '添加OpenAI key' commands['key'] = '添加OpenAI key'
@@ -58,14 +63,18 @@ class CommandOpenAIOfficial(Command):
commands['gpt'] = '查看gpt配置信息' commands['gpt'] = '查看gpt配置信息'
commands['status'] = '查看key使用状态' commands['status'] = '查看key使用状态'
commands['token'] = '查看本轮会话token' commands['token'] = '查看本轮会话token'
return True, super().help_messager(commands), "help" return True, super().help_messager(commands, self.platform, cached_plugins), "help"
def reset(self, session_id: str): def reset(self, session_id: str):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "reset"
self.provider.forget(session_id) self.provider.forget(session_id)
return True, "重置成功", "reset" return True, "重置成功", "reset"
def his(self, message: str, session_id: str, name: str): def his(self, message: str, session_id: str, name: str):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "his"
#分页每页5条 #分页每页5条
msg = '' msg = ''
size_per_page = 3 size_per_page = 3
@@ -82,12 +91,18 @@ class CommandOpenAIOfficial(Command):
return True, f"历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页", "his" return True, f"历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页", "his"
def token(self, session_id: str): def token(self, session_id: str):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "token"
return True, f"会话的token数: {self.provider.get_user_usage_tokens(self.provider.session_dict[session_id])}\n系统最大缓存token数: {self.provider.max_tokens}", "token" return True, f"会话的token数: {self.provider.get_user_usage_tokens(self.provider.session_dict[session_id])}\n系统最大缓存token数: {self.provider.max_tokens}", "token"
def gpt(self): def gpt(self):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "gpt"
return True, f"OpenAI GPT配置:\n {self.provider.chatGPT_configs}", "gpt" return True, f"OpenAI GPT配置:\n {self.provider.chatGPT_configs}", "gpt"
def status(self): def status(self):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "status"
chatgpt_cfg_str = "" chatgpt_cfg_str = ""
key_stat = self.provider.get_key_stat() key_stat = self.provider.get_key_stat()
index = 1 index = 1
@@ -108,10 +123,14 @@ class CommandOpenAIOfficial(Command):
return True, f"⭐使用情况({str(gg_count)}个已用):\n{chatgpt_cfg_str}⏰全频道已用{total}tokens", "status" return True, f"⭐使用情况({str(gg_count)}个已用):\n{chatgpt_cfg_str}⏰全频道已用{total}tokens", "status"
def count(self): def count(self):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "reset"
guild_count, guild_msg_count, guild_direct_msg_count, session_count = self.provider.get_stat() guild_count, guild_msg_count, guild_direct_msg_count, session_count = self.provider.get_stat()
return True, f"当前会话数: {len(self.provider.session_dict)}\n共有频道数: {guild_count} \n共有消息数: {guild_msg_count}\n私信数: {guild_direct_msg_count}\n历史会话数: {session_count}", "count" return True, f"当前会话数: {len(self.provider.session_dict)}\n共有频道数: {guild_count} \n共有消息数: {guild_msg_count}\n私信数: {guild_direct_msg_count}\n历史会话数: {session_count}", "count"
def key(self, message: str, user_name: str): def key(self, message: str, user_name: str):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "reset"
l = message.split(" ") l = message.split(" ")
if len(l) == 1: if len(l) == 1:
msg = "感谢您赞助keykey为官方API使用请以以下格式赞助:\n/key xxxxx" msg = "感谢您赞助keykey为官方API使用请以以下格式赞助:\n/key xxxxx"
@@ -124,11 +143,15 @@ class CommandOpenAIOfficial(Command):
return True, "该Key被验证为无效。也许是输入错误了或者重试。", "key" return True, "该Key被验证为无效。也许是输入错误了或者重试。", "key"
def unset(self, session_id: str): def unset(self, session_id: str):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "unset"
self.provider.now_personality = {} self.provider.now_personality = {}
self.provider.forget(session_id) self.provider.forget(session_id)
return True, "已清除人格并重置历史记录。", "unset" return True, "已清除人格并重置历史记录。", "unset"
def set(self, message: str, session_id: str): def set(self, message: str, session_id: str):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "set"
l = message.split(" ") l = message.split(" ")
if len(l) == 1: if len(l) == 1:
return True, f"【由Github项目QQChannelChatGPT支持】\n\n【人格文本由PlexPt开源项目awesome-chatgpt-pr \ return True, f"【由Github项目QQChannelChatGPT支持】\n\n【人格文本由PlexPt开源项目awesome-chatgpt-pr \
@@ -187,6 +210,12 @@ class CommandOpenAIOfficial(Command):
return True, f"自定义人格已设置。 \n人格信息: {ps}", "set" return True, f"自定义人格已设置。 \n人格信息: {ps}", "set"
def draw(self, message): def draw(self, message):
if self.provider is None:
return False, "未启动OpenAI ChatGPT语言模型.", "draw"
if message.startswith("/画"):
message = message[2:]
elif message.startswith(""):
message = message[1:]
try: try:
# 画图模式传回3个参数 # 画图模式传回3个参数
img_url = self.provider.image_chat(message) img_url = self.provider.image_chat(message)

View File

@@ -1,5 +1,6 @@
from model.command.command import Command from model.command.command import Command
from model.provider.provider_rev_chatgpt import ProviderRevChatGPT from model.provider.provider_rev_chatgpt import ProviderRevChatGPT
from model.platform.qq import QQ
class CommandRevChatGPT(Command): class CommandRevChatGPT(Command):
def __init__(self, provider: ProviderRevChatGPT): def __init__(self, provider: ProviderRevChatGPT):
@@ -11,12 +12,14 @@ class CommandRevChatGPT(Command):
role: str, role: str,
platform: str, platform: str,
message_obj, message_obj,
cached_plugins: dict): cached_plugins: dict,
hit, res = super().check_command(message, role, platform, message_obj=message_obj, cached_plugins=cached_plugins) qq_platform: QQ):
self.platform = platform
hit, res = super().check_command(message, role, platform, message_obj=message_obj, cached_plugins=cached_plugins, qq_platform=qq_platform)
if hit: if hit:
return True, res return True, res
if self.command_start_with(message, "help", "帮助"): if self.command_start_with(message, "help", "帮助"):
return True, self.help() return True, self.help(cached_plugins)
elif self.command_start_with(message, "reset"): elif self.command_start_with(message, "reset"):
return True, self.reset() return True, self.reset()
elif self.command_start_with(message, "update"): elif self.command_start_with(message, "update"):
@@ -31,5 +34,6 @@ class CommandRevChatGPT(Command):
def reset(self): def reset(self):
return False, "此功能暂未开放", "reset" return False, "此功能暂未开放", "reset"
def help(self):
return True, super().help_messager(super().general_commands()), "help" def help(self, cached_plugins: dict):
return True, super().help_messager(super().general_commands(), self.platform, cached_plugins), "help"

View File

@@ -1,6 +1,8 @@
from model.command.command import Command from model.command.command import Command
from model.provider.provider_rev_edgegpt import ProviderRevEdgeGPT from model.provider.provider_rev_edgegpt import ProviderRevEdgeGPT
import asyncio import asyncio
from model.platform.qq import QQ
class CommandRevEdgeGPT(Command): class CommandRevEdgeGPT(Command):
def __init__(self, provider: ProviderRevEdgeGPT): def __init__(self, provider: ProviderRevEdgeGPT):
self.provider = provider self.provider = provider
@@ -13,14 +15,16 @@ class CommandRevEdgeGPT(Command):
role: str, role: str,
platform: str, platform: str,
message_obj, message_obj,
cached_plugins: dict): cached_plugins: dict,
hit, res = super().check_command(message, role, platform, message_obj=message_obj, cached_plugins=cached_plugins) qq_platform: QQ):
self.platform = platform
hit, res = super().check_command(message, role, platform, message_obj=message_obj, cached_plugins=cached_plugins, qq_platform=qq_platform)
if hit: if hit:
return True, res return True, res
if self.command_start_with(message, "reset"): if self.command_start_with(message, "reset"):
return True, self.reset(loop) return True, self.reset(loop)
elif self.command_start_with(message, "help"): elif self.command_start_with(message, "help"):
return True, self.help() return True, self.help(cached_plugins)
elif self.command_start_with(message, "update"): elif self.command_start_with(message, "update"):
return True, self.update(message, role) return True, self.update(message, role)
elif self.command_start_with(message, "keyword"): elif self.command_start_with(message, "keyword"):
@@ -31,6 +35,8 @@ class CommandRevEdgeGPT(Command):
return False, None return False, None
def reset(self, loop): def reset(self, loop):
if self.provider is None:
return False, "未启动Bing语言模型.", "reset"
res = asyncio.run_coroutine_threadsafe(self.provider.forget(), loop).result() res = asyncio.run_coroutine_threadsafe(self.provider.forget(), loop).result()
print(res) print(res)
if res: if res:
@@ -38,6 +44,6 @@ class CommandRevEdgeGPT(Command):
else: else:
return res, "重置失败", "reset" return res, "重置失败", "reset"
def help(self): def help(self, cached_plugins: dict):
return True, super().help_messager(super().general_commands()), "help" return True, super().help_messager(super().general_commands(), self.platform, cached_plugins), "help"

View File

@@ -1,44 +1,143 @@
from nakuru.entities.components import Plain, At, Image from nakuru.entities.components import Plain, At, Image, Node
from util import general_utils as gu
from util.cmd_config import CmdConfig
import asyncio
from nakuru import (
CQHTTP,
GuildMessage
)
import time
class FakeSource:
def __init__(self, type, group_id):
self.type = type
self.group_id = group_id
class QQ: class QQ:
def __init__(self, is_start: bool, cc: CmdConfig = None, gocq_loop = None) -> None:
self.is_start = is_start
self.gocq_loop = gocq_loop
self.cc = cc
def run_bot(self, gocq): def run_bot(self, gocq):
self.client = gocq self.client: CQHTTP = gocq
self.client.run() self.client.run()
def get_msg_loop(self):
return self.gocq_loop
async def send_qq_msg(self, async def send_qq_msg(self,
source, source,
res, res,
image_mode: bool = False): image_mode: bool = False):
"""
res可以是一个数组也就是gocq的消息链.
"""
# print(res) if not self.is_start:
print("[System-Info] 回复QQ消息中..."+str(res)) raise Exception("管理员未启动GOCQ平台")
"""
res可以是一个数组, 也就是gocq的消息链。
插件开发者请使用send方法, 可以不用直接调用这个方法。
"""
gu.log("回复GOCQ消息: "+str(res), level=gu.LEVEL_INFO, tag="GOCQ", max_len=300)
if isinstance(source, int):
source = FakeSource("GroupMessage", source)
# str convert to CQ Message Chain
if isinstance(res, str):
res_str = res
res = []
if source.type == "GroupMessage":
res.append(At(qq=source.user_id))
res.append(Plain(text=res_str))
# if image mode, put all Plain texts into a new picture.
if image_mode and isinstance(res, list):
plains = []
news = []
for i in res:
if isinstance(i, Plain):
plains.append(i.text)
else:
news.append(i)
p = gu.create_markdown_image("".join(plains))
news.append(Image.fromFileSystem(p))
res = news
# 回复消息链
if isinstance(res, list) and len(res) > 0: if isinstance(res, list) and len(res) > 0:
await self.client.sendGroupMessage(source.group_id, res) if source.type == "GuildMessage":
return await self.client.sendGuildChannelMessage(source.guild_id, source.channel_id, res)
# 通过消息链处理 return
if not image_mode:
if source.type == "GroupMessage":
await self.client.sendGroupMessage(source.group_id, [
At(qq=source.user_id),
Plain(text=res)
])
elif source.type == "FriendMessage": elif source.type == "FriendMessage":
await self.client.sendFriendMessage(source.user_id, [ await self.client.sendFriendMessage(source.user_id, res)
Plain(text=res) return
]) elif source.type == "GroupMessage":
else: # 过长时forward发送
if source.type == "GroupMessage": plain_text_len = 0
await self.client.sendGroupMessage(source.group_id, [ image_num = 0
At(qq=source.user_id), for i in res:
Plain(text="好的,我根据你的需要为你生成了一张图片😊"), if isinstance(i, Plain):
Image.fromURL(url=res) plain_text_len += len(i.text)
]) elif isinstance(i, Image):
elif source.type == "FriendMessage": image_num += 1
await self.client.sendFriendMessage(source.user_id, [ if plain_text_len > self.cc.get('qq_forward_threshold', 200):
Plain(text="好的,我根据你的需要为你生成了一张图片😊"), # 删除At
Image.fromURL(url=res) for i in res:
]) if isinstance(i, At):
res.remove(i)
node = Node(res)
# node.content = res
node.uin = source.self_id
node.name = f"To {source.sender.nickname}:"
node.time = int(time.time())
print(node)
nodes=[node]
await self.client.sendGroupForwardMessage(source.group_id, nodes)
return
await self.client.sendGroupMessage(source.group_id, res)
return
def send(self,
to,
res,
):
'''
提供给插件的发送QQ消息接口, 不用在外部await。
参数说明第一个参数可以是消息对象也可以是QQ群号。第二个参数是消息内容消息内容可以是消息链列表也可以是纯文字信息
'''
try:
asyncio.run_coroutine_threadsafe(self.send_qq_msg(to, res), self.gocq_loop).result()
except BaseException as e:
raise e
def send_guild(self,
message_obj,
res,
):
'''
提供给插件的发送GOCQ QQ频道消息接口, 不用在外部await。
参数说明:第一个参数必须是消息对象, 第二个参数是消息内容(消息内容可以是消息链列表,也可以是纯文字信息)。
'''
try:
asyncio.run_coroutine_threadsafe(self.send_qq_msg(message_obj, res), self.gocq_loop).result()
except BaseException as e:
raise e
def create_text_image(title: str, text: str, max_width=30, font_size=20):
'''
文本转图片。
title: 标题
text: 文本内容
max_width: 文本宽度最大值默认30
font_size: 字体大小默认20
返回:文件路径
'''
try:
img = gu.word2img(title, text, max_width, font_size)
p = gu.save_temp_img(img)
return p
except Exception as e:
raise e

View File

@@ -6,7 +6,7 @@ import re
import asyncio import asyncio
import requests import requests
from cores.qqbot.personality import personalities from cores.qqbot.personality import personalities
from util import general_utils as gu
class QQChan(): class QQChan():
@@ -16,7 +16,8 @@ class QQChan():
self.client.run(appid=appid, token=token) self.client.run(appid=appid, token=token)
def send_qq_msg(self, message, res, image_mode=False, msg_ref = None): def send_qq_msg(self, message, res, image_mode=False, msg_ref = None):
print("[System-Info] 回复QQ频道消息中..."+str(res)) gu.log("回复QQ频道消息: "+str(res), level=gu.LEVEL_INFO, tag="QQ频道", max_len=30)
if not image_mode: if not image_mode:
try: try:
if msg_ref is not None: if msg_ref is not None:

View File

@@ -6,6 +6,7 @@ import sys
from cores.database.conn import dbConn from cores.database.conn import dbConn
from model.provider.provider import Provider from model.provider.provider import Provider
import threading import threading
from util import general_utils as gu
abs_path = os.path.dirname(os.path.realpath(sys.argv[0])) + '/' abs_path = os.path.dirname(os.path.realpath(sys.argv[0])) + '/'
key_record_path = abs_path+'chatgpt_key_record' key_record_path = abs_path+'chatgpt_key_record'
@@ -16,7 +17,7 @@ class ProviderOpenAIOfficial(Provider):
if 'api_base' in cfg and cfg['api_base'] != 'none' and cfg['api_base'] != '': if 'api_base' in cfg and cfg['api_base'] != 'none' and cfg['api_base'] != '':
openai.api_base = cfg['api_base'] openai.api_base = cfg['api_base']
if cfg['key'] != '' and cfg['key'] != None: if cfg['key'] != '' and cfg['key'] != None:
print("[System] 读取ChatGPT Key成功") gu.log("读取ChatGPT Key成功")
self.key_list = cfg['key'] self.key_list = cfg['key']
else: else:
input("[System] 请先去完善ChatGPT的Key。详情请前往https://beta.openai.com/account/api-keys") input("[System] 请先去完善ChatGPT的Key。详情请前往https://beta.openai.com/account/api-keys")
@@ -25,7 +26,7 @@ class ProviderOpenAIOfficial(Provider):
self.init_key_record() self.init_key_record()
self.chatGPT_configs = cfg['chatGPTConfigs'] self.chatGPT_configs = cfg['chatGPTConfigs']
print(f'[System] 加载ChatGPTConfigs: {self.chatGPT_configs}') gu.log(f'加载ChatGPTConfigs: {self.chatGPT_configs}')
self.openai_configs = cfg self.openai_configs = cfg
# 会话缓存 # 会话缓存
self.session_dict = {} self.session_dict = {}
@@ -39,9 +40,10 @@ class ProviderOpenAIOfficial(Provider):
db1 = dbConn() db1 = dbConn()
for session in db1.get_all_session(): for session in db1.get_all_session():
self.session_dict[session[0]] = json.loads(session[1])['data'] self.session_dict[session[0]] = json.loads(session[1])['data']
print("[System] 历史记录读取成功喵") gu.log("历史记录读取成功喵")
except BaseException as e: except BaseException as e:
print("[System] 历史记录读取失败: " + str(e)) gu.log("历史记录读取失败", level=gu.LEVEL_ERROR)
# 读取统计信息 # 读取统计信息
if not os.path.exists(abs_path+"configs/stat"): if not os.path.exists(abs_path+"configs/stat"):
@@ -110,6 +112,7 @@ class ProviderOpenAIOfficial(Provider):
cache_data_list, new_record, req = self.wrap(prompt, session_id) cache_data_list, new_record, req = self.wrap(prompt, session_id)
retry = 0 retry = 0
response = None response = None
err = ''
while retry < 5: while retry < 5:
try: try:
response = openai.ChatCompletion.create( response = openai.ChatCompletion.create(
@@ -118,9 +121,8 @@ class ProviderOpenAIOfficial(Provider):
) )
break break
except Exception as e: except Exception as e:
print(e)
if 'You exceeded' in str(e) or 'Billing hard limit has been reached' in str(e) or 'No API key provided' in str(e) or 'Incorrect API key provided' in str(e): if 'You exceeded' in str(e) or 'Billing hard limit has been reached' in str(e) or 'No API key provided' in str(e) or 'Incorrect API key provided' in str(e):
print("[System] 当前Key已超额或者不正常,正在切换") gu.log("当前Key已超额或常, 正在切换", level=gu.LEVEL_WARNING)
self.key_stat[openai.api_key]['exceed'] = True self.key_stat[openai.api_key]['exceed'] = True
self.save_key_record() self.save_key_record()
@@ -130,17 +132,21 @@ class ProviderOpenAIOfficial(Provider):
raise e raise e
else: else:
break break
if 'maximum context length' in str(e): elif 'maximum context length' in str(e):
print("token超限, 清空对应缓存") gu.log("token超限, 清空对应缓存")
self.session_dict[session_id] = [] self.session_dict[session_id] = []
cache_data_list, new_record, req = self.wrap(prompt, session_id) cache_data_list, new_record, req = self.wrap(prompt, session_id)
else:
gu.log(str(e), level=gu.LEVEL_ERROR)
err = str(e)
retry+=1 retry+=1
if retry >= 5: if retry >= 5:
raise BaseException("连接超时") gu.log(r"如果报错, 且您的机器在中国大陆内, 请确保您的电脑已经设置好代理软件(梯子), 并在配置文件设置了系统代理地址。详见https://github.com/Soulter/QQChannelChatGPT/wiki/%E4%BA%8C%E3%80%81%E9%A1%B9%E7%9B%AE%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%E9%85%8D%E7%BD%AE", max_len=999)
raise BaseException("连接出错: "+str(err))
self.key_stat[openai.api_key]['used'] += response['usage']['total_tokens'] self.key_stat[openai.api_key]['used'] += response['usage']['total_tokens']
self.save_key_record() self.save_key_record()
print("[ChatGPT] "+str(response["choices"][0]["message"]["content"])) # print("[ChatGPT] "+str(response["choices"][0]["message"]["content"]))
chatgpt_res = str(response["choices"][0]["message"]["content"]).strip() chatgpt_res = str(response["choices"][0]["message"]["content"]).strip()
current_usage_tokens = response['usage']['total_tokens'] current_usage_tokens = response['usage']['total_tokens']
@@ -192,13 +198,12 @@ class ProviderOpenAIOfficial(Provider):
image_url = [] image_url = []
for i in range(img_num): for i in range(img_num):
image_url.append(response['data'][i]['url']) image_url.append(response['data'][i]['url'])
print(image_url)
break break
except Exception as e: except Exception as e:
print(e) gu.log(str(e), level=gu.LEVEL_ERROR)
if 'You exceeded' in str(e) or 'Billing hard limit has been reached' in str( if 'You exceeded' in str(e) or 'Billing hard limit has been reached' in str(
e) or 'No API key provided' in str(e) or 'Incorrect API key provided' in str(e): e) or 'No API key provided' in str(e) or 'Incorrect API key provided' in str(e):
print("[System] 当前Key已超额或者不正常,正在切换") gu.log("当前Key已超额或者不正常, 正在切换", level=gu.LEVEL_WARNING)
self.key_stat[openai.api_key]['exceed'] = True self.key_stat[openai.api_key]['exceed'] = True
self.save_key_record() self.save_key_record()
@@ -305,7 +310,7 @@ class ProviderOpenAIOfficial(Provider):
if not self.key_stat[key]['exceed']: if not self.key_stat[key]['exceed']:
is_all_exceed = False is_all_exceed = False
openai.api_key = key openai.api_key = key
print(f"[System] 切换到Key: {key}, 已使用token: {self.key_stat[key]['used']}") gu.log(f"切换到Key: {key}, 已使用token: {self.key_stat[key]['used']}", level=gu.LEVEL_INFO)
if len(req) > 0: if len(req) > 0:
try: try:
response = openai.ChatCompletion.create( response = openai.ChatCompletion.create(
@@ -314,20 +319,21 @@ class ProviderOpenAIOfficial(Provider):
) )
return response, True return response, True
except Exception as e: except Exception as e:
print(e)
if 'You exceeded' in str(e): if 'You exceeded' in str(e):
print("[System] 当前Key已超额,正在切换") gu.log("当前Key已超额, 正在切换")
self.key_stat[openai.api_key]['exceed'] = True self.key_stat[openai.api_key]['exceed'] = True
self.save_key_record() self.save_key_record()
time.sleep(1) time.sleep(1)
continue continue
else:
gu.log(str(e), level=gu.LEVEL_ERROR)
else: else:
return True return True
if is_all_exceed: if is_all_exceed:
print("[System] 所有Key已超额") gu.log("所有Key已超额", level=gu.LEVEL_CRITICAL)
return None, False return None, False
else: else:
print("[System] 在切换key时程序异常。") gu.log("在切换key时程序异常。", level=gu.LEVEL_ERROR)
return None, False return None, False
def getConfigs(self): def getConfigs(self):
@@ -375,7 +381,7 @@ class ProviderOpenAIOfficial(Provider):
try: try:
self.key_stat = json.load(keyfile) self.key_stat = json.load(keyfile)
except Exception as e: except Exception as e:
print(e) gu.log(str(e), level=gu.LEVEL_ERROR)
self.key_stat = {} self.key_stat = {}
finally: finally:
for key in self.key_list: for key in self.key_list:

View File

@@ -1,13 +1,16 @@
from revChatGPT.V1 import Chatbot from revChatGPT.V1 import Chatbot
from revChatGPT import typings from revChatGPT import typings
from model.provider.provider import Provider from model.provider.provider import Provider
from util import general_utils as gu
class ProviderRevChatGPT(Provider): class ProviderRevChatGPT(Provider):
def __init__(self, config): def __init__(self, config):
self.rev_chatgpt = [] self.rev_chatgpt = []
for i in range(0, len(config['account'])): for i in range(0, len(config['account'])):
try: try:
print(f"[System] 创建rev_ChatGPT负载{str(i)}中...") gu.log(f"创建rev_ChatGPT负载{str(i)}中...", level=gu.LEVEL_INFO, tag="RevChatGPT")
if 'password' in config['account'][i]: if 'password' in config['account'][i]:
config['account'][i]['password'] = str(config['account'][i]['password']) config['account'][i]['password'] = str(config['account'][i]['password'])
revstat = { revstat = {
@@ -16,7 +19,7 @@ class ProviderRevChatGPT(Provider):
} }
self.rev_chatgpt.append(revstat) self.rev_chatgpt.append(revstat)
except BaseException as e: except BaseException as e:
print(f"[System] 创建rev_ChatGPT负载失败: {str(e)}") gu.log(f"创建rev_ChatGPT负载{str(i)}失败: {str(e)}", level=gu.LEVEL_ERROR, tag="RevChatGPT")
def forget(self) -> bool: def forget(self) -> bool:
return False return False
@@ -42,21 +45,22 @@ class ProviderRevChatGPT(Provider):
raise e raise e
err_count += 1 err_count += 1
print(f"[RevChatGPT] 请求出现问题: {str(e)} | 正在重试: {str(err_count)}") gu.log(f"请求出现问题: {str(e)} | 正在重试: {str(err_count)}", level=gu.LEVEL_WARNING, tag="RevChatGPT")
if err_count >= retry_count: if err_count >= retry_count:
raise e raise e
except BaseException as e: except BaseException as e:
err_count += 1 err_count += 1
print(f"[RevChatGPT] 请求出现问题: {str(e)} | 正在重试: {str(err_count)}") gu.log(f"请求出现问题: {str(e)} | 正在重试: {str(err_count)}", level=gu.LEVEL_WARNING, tag="RevChatGPT")
if err_count >= retry_count: if err_count >= retry_count:
raise e raise e
if resp == '':
resp = "RevChatGPT出现故障."
print("[RevChatGPT] "+str(resp)) # print("[RevChatGPT] "+str(resp))
return resp return resp
def text_chat(self, prompt): def text_chat(self, prompt) -> str:
res = '' res = ''
print("[Debug] "+str(self.rev_chatgpt))
err_msg = '' err_msg = ''
cursor = 0 cursor = 0
for revstat in self.rev_chatgpt: for revstat in self.rev_chatgpt:
@@ -70,7 +74,7 @@ class ProviderRevChatGPT(Provider):
# todo: 细化错误管理 # todo: 细化错误管理
except BaseException as e: except BaseException as e:
revstat['busy'] = False revstat['busy'] = False
print(f"请求出现问题: {str(e)}") gu.log(f"请求出现问题: {str(e)}", level=gu.LEVEL_WARNING, tag="RevChatGPT")
err_msg += f"账号{cursor} - 错误原因: {str(e)}" err_msg += f"账号{cursor} - 错误原因: {str(e)}"
continue continue
else: else:
@@ -78,3 +82,9 @@ class ProviderRevChatGPT(Provider):
continue continue
res = f'回复失败。错误跟踪:{err_msg}' res = f'回复失败。错误跟踪:{err_msg}'
return res return res
def is_all_busy(self) -> bool:
for revstat in self.rev_chatgpt:
if not revstat['busy']:
return False
return True

View File

@@ -2,6 +2,9 @@ from model.provider.provider import Provider
from EdgeGPT import Chatbot, ConversationStyle from EdgeGPT import Chatbot, ConversationStyle
import json import json
import os import os
from util import general_utils as gu
from util.cmd_config import CmdConfig as cc
class ProviderRevEdgeGPT(Provider): class ProviderRevEdgeGPT(Provider):
def __init__(self): def __init__(self):
@@ -9,7 +12,10 @@ class ProviderRevEdgeGPT(Provider):
self.wait_stack = [] self.wait_stack = []
with open('./cookies.json', 'r') as f: with open('./cookies.json', 'r') as f:
cookies = json.load(f) cookies = json.load(f)
self.bot = Chatbot(cookies=cookies) proxy = cc.get("bing_proxy", None)
if proxy == "":
proxy = None
self.bot = Chatbot(cookies=cookies, proxy = proxy)
def is_busy(self): def is_busy(self):
return self.busy return self.busy
@@ -73,18 +79,18 @@ class ProviderRevEdgeGPT(Provider):
await self.forget() await self.forget()
err_count += 1 err_count += 1
continue continue
reply_msg += f"\n{throttling['numUserMessagesInConversation']}/{throttling['maxNumUserMessagesInConversation']}" reply_msg += f"\n[{throttling['numUserMessagesInConversation']}/{throttling['maxNumUserMessagesInConversation']}]"
break break
except BaseException as e: except BaseException as e:
# raise e gu.log(str(e), level=gu.LEVEL_WARNING, tag="RevEdgeGPT")
print(e)
err_count += 1 err_count += 1
if err_count >= retry_count: if err_count >= retry_count:
gu.log(r"如果报错, 且您的机器在中国大陆内, 请确保您的电脑已经设置好代理软件(梯子), 并在配置文件设置了系统代理地址。详见https://github.com/Soulter/QQChannelChatGPT/wiki/%E4%BA%8C%E3%80%81%E9%A1%B9%E7%9B%AE%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%E9%85%8D%E7%BD%AE", max_len=999)
self.busy = False self.busy = False
raise e raise e
print("[RevEdgeGPT] 请求出现了一些问题, 正在重试。次数"+str(err_count)) gu.log("请求出现了一些问题, 正在重试。次数"+str(err_count), level=gu.LEVEL_WARNING, tag="RevEdgeGPT")
self.busy = False self.busy = False
print("[RevEdgeGPT] "+str(reply_msg)) # print("[RevEdgeGPT] "+str(reply_msg))
return reply_msg, 1 return reply_msg, 1

View File

@@ -7,4 +7,4 @@ EdgeGPT~=0.1.22.1
chardet~=5.1.0 chardet~=5.1.0
Pillow~=9.4.0 Pillow~=9.4.0
GitPython~=3.1.31 GitPython~=3.1.31
nakuru-project-idk~=0.0.2 nakuru-project

BIN
resources/fonts/simhei.ttf Normal file

Binary file not shown.

BIN
resources/fonts/syst.otf Normal file

Binary file not shown.

53
util/cmd_config.py Normal file
View File

@@ -0,0 +1,53 @@
import os
import json
cpath = "cmd_config.json"
def check_exist():
if not os.path.exists(cpath):
with open(cpath, "w", encoding="utf-8") as f:
json.dump({}, f, indent=4)
f.flush()
class CmdConfig():
@staticmethod
def get(key, default=None):
check_exist()
with open(cpath, "r", encoding="utf-8") as f:
d = json.load(f)
if key in d:
return d[key]
else:
return default
@staticmethod
def get_all():
check_exist()
with open(cpath, "r", encoding="utf-8") as f:
return json.load(f)
@staticmethod
def put(key, value):
check_exist()
with open(cpath, "r", encoding="utf-8") as f:
d = json.load(f)
d[key] = value
with open(cpath, "w", encoding="utf-8") as f:
json.dump(d, f, indent=4)
f.flush()
@staticmethod
def init_attributes(keys: list, init_val = ""):
check_exist()
with open(cpath, "r", encoding="utf-8") as f:
d = json.load(f)
_tag = False
for k in keys:
if k not in d:
d[k] = init_val
_tag = True
if _tag:
with open(cpath, "w", encoding="utf-8") as f:
json.dump(d, f, indent=4)
f.flush()

493
util/general_utils.py Normal file
View File

@@ -0,0 +1,493 @@
import datetime
import time
import socket
from PIL import Image, ImageDraw, ImageFont
import os
import re
import requests
PLATFORM_GOCQ = 'gocq'
PLATFORM_QQCHAN = 'qqchan'
FG_COLORS = {
"black": "30",
"red": "31",
"green": "32",
"yellow": "33",
"blue": "34",
"purple": "35",
"cyan": "36",
"white": "37",
"default": "39",
}
BG_COLORS = {
"black": "40",
"red": "41",
"green": "42",
"yellow": "43",
"blue": "44",
"purple": "45",
"cyan": "46",
"white": "47",
"default": "49",
}
LEVEL_INFO = "INFO"
LEVEL_WARNING = "WARNING"
LEVEL_ERROR = "ERROR"
LEVEL_CRITICAL = "CRITICAL"
level_colors = {
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "purple",
}
def log(
msg: str,
level: str = "INFO",
tag: str = "System",
fg: str = None,
bg: str = None,
max_len: int = 100):
"""
日志记录函数
"""
if len(msg) > max_len:
msg = msg[:max_len] + "..."
now = datetime.datetime.now().strftime("%m-%d %H:%M:%S")
pre = f"[{now}] [{level}] [{tag}]: {msg}"
if level == "INFO":
if fg is None:
fg = FG_COLORS["green"]
if bg is None:
bg = BG_COLORS["default"]
elif level == "WARNING":
if fg is None:
fg = FG_COLORS["yellow"]
if bg is None:
bg = BG_COLORS["default"]
elif level == "ERROR":
if fg is None:
fg = FG_COLORS["red"]
if bg is None:
bg = BG_COLORS["default"]
elif level == "CRITICAL":
if fg is None:
fg = FG_COLORS["purple"]
if bg is None:
bg = BG_COLORS["default"]
print(f"\033[{fg};{bg}m{pre}\033[0m")
def port_checker(port: int, host: str = "localhost"):
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
try:
sk.connect((host, port))
sk.close()
return True
except Exception:
sk.close()
return False
def word2img(title: str, text: str, max_width=30, font_size=20):
if os.path.exists("resources/fonts/syst.otf"):
font_path = "resources/fonts/syst.otf"
elif os.path.exists("QQChannelChatGPT/resources/fonts/syst.otf"):
font_path = "QQChannelChatGPT/resources/fonts/syst.otf"
elif os.path.exists("C:/Windows/Fonts/simhei.ttf"):
font_path = "C:/Windows/Fonts/simhei.ttf"
elif os.path.exists("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"):
font_path = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"
else:
raise Exception("找不到字体文件")
width_factor = 1.0
height_factor = 1.5
# 格式化文本宽度最大为30
lines = text.split('\n')
i = 0
length = len(lines)
for l in lines:
if len(l) > max_width:
# lines[i] = l[:max_width] + '\n' + l[max_width:]
# for
cp = l
for ii in range(len(l)):
if ii % max_width == 0:
cp = cp[:ii] + '\n' + cp[ii:]
length += 1
lines[i] = cp
i += 1
text = '\n'.join(lines)
width = int(max_width * font_size * width_factor)
height = int(length * font_size * height_factor)
image = Image.new('RGB', (width, height), (255, 255, 255))
draw = ImageDraw.Draw(image)
text_font = ImageFont.truetype(font_path, font_size)
title_font = ImageFont.truetype(font_path, font_size + 5)
# 标题居中
title_width, title_height = title_font.getsize(title)
draw.text(((width - title_width) / 2, 10), title, fill=(0, 0, 0), font=title_font)
# 文本不居中
draw.text((10, title_height+20), text, fill=(0, 0, 0), font=text_font)
return image
def render_markdown(markdown_text, image_width=800, image_height=600, font_size=26, font_color=(0, 0, 0), bg_color=(255, 255, 255)):
HEADER_MARGIN = 20
HEADER_FONT_STANDARD_SIZE = 42
QUOTE_LEFT_LINE_MARGIN = 10
QUOTE_FONT_LINE_MARGIN = 6 # 引用文字距离左边线的距离和上下的距离
QUOTE_LEFT_LINE_HEIGHT = font_size + QUOTE_FONT_LINE_MARGIN * 2
QUOTE_LEFT_LINE_WIDTH = 5
QUOTE_LEFT_LINE_COLOR = (180, 180, 180)
QUOTE_FONT_SIZE = font_size
QUOTE_FONT_COLOR = (180, 180, 180)
# QUOTE_BG_COLOR = (255, 255, 255)
CODE_BLOCK_MARGIN = 10
CODE_BLOCK_FONT_SIZE = font_size
CODE_BLOCK_FONT_COLOR = (255, 255, 255)
CODE_BLOCK_BG_COLOR = (240, 240, 240)
CODE_BLOCK_CODES_MARGIN_VERTICAL = 5 # 代码块和代码之间的距离
CODE_BLOCK_CODES_MARGIN_HORIZONTAL = 5 # 代码块和代码之间的距离
CODE_BLOCK_TEXT_MARGIN = 4 # 代码和代码之间的距离
INLINE_CODE_MARGIN = 8
INLINE_CODE_FONT_SIZE = font_size
INLINE_CODE_FONT_COLOR = font_color
INLINE_CODE_FONT_MARGIN = 4
INLINE_CODE_BG_COLOR = (230, 230, 230)
INLINE_CODE_BG_HEIGHT = INLINE_CODE_FONT_SIZE + INLINE_CODE_FONT_MARGIN * 2
LIST_MARGIN = 8
LIST_FONT_SIZE = font_size
LIST_FONT_COLOR = font_color
TEXT_LINE_MARGIN = 8
IMAGE_MARGIN = 15
# 用于匹配图片的正则表达式
IMAGE_REGEX = r"!\s*\[.*?\]\s*\((.*?)\)"
if os.path.exists("resources/fonts/syst.otf"):
font_path = "resources/fonts/syst.otf"
elif os.path.exists("QQChannelChatGPT/resources/fonts/syst.otf"):
font_path = "QQChannelChatGPT/resources/fonts/syst.otf"
elif os.path.exists("C:/Windows/Fonts/simhei.ttf"):
font_path = "C:/Windows/Fonts/simhei.ttf"
elif os.path.exists("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"):
font_path = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"
else:
raise Exception("找不到字体文件")
# backup
if os.path.exists("resources/fonts/simhei.ttf"):
font_path1 = "resources/fonts/simhei.ttf"
elif os.path.exists("QQChannelChatGPT/resources/fonts/simhei.ttf"):
font_path1 = "QQChannelChatGPT/resources/fonts/simhei.ttf"
else:
font_path1 = font_path
# 加载字体
font = ImageFont.truetype(font_path, font_size)
images: Image = {}
# pre_process, get height of each line
pre_lines = markdown_text.split('\n')
height = 0
pre_in_code = False
i = -1
_pre_lines = []
for line in pre_lines:
i += 1
# 处理图片
if re.search(IMAGE_REGEX, line):
try:
image_url = re.findall(IMAGE_REGEX, line)[0]
print(image_url)
image_res = Image.open(requests.get(image_url, stream=True, timeout=5).raw)
images[i] = image_res
# 最大不得超过image_width的50%
img_height = image_res.size[1]
if image_res.size[0] > image_width*0.5:
image_res = image_res.resize((int(image_width*0.5), int(image_res.size[1]*image_width*0.5/image_res.size[0])))
img_height = image_res.size[1]
height += img_height + IMAGE_MARGIN*2
line = re.sub(IMAGE_REGEX, "", line)
except Exception as e:
print(e)
line = re.sub(IMAGE_REGEX, "\n[加载失败的图片]\n", line)
continue
line.replace("\t", " ")
if font.getsize(line)[0] > image_width:
cp = line
_width = 0
_word_cnt = 0
for ii in range(len(line)):
# 检测是否是中文
_width += font.getsize(line[ii])[0]
_word_cnt+=1
if _width > image_width:
_pre_lines.append(cp[:_word_cnt])
cp = cp[_word_cnt:]
_word_cnt=0
_width=0
_pre_lines.append(cp)
else:
_pre_lines.append(line)
pre_lines = _pre_lines
i=-1
for line in pre_lines:
if line == "":
height += TEXT_LINE_MARGIN
continue
i += 1
line = line.strip()
if pre_in_code and not line.startswith("```"):
height += font_size + CODE_BLOCK_TEXT_MARGIN
# pre_codes.append(line)
continue
if line.startswith("#"):
header_level = line.count("#")
height += HEADER_FONT_STANDARD_SIZE + HEADER_MARGIN*2 - header_level * 4
elif line.startswith("-"):
height += font_size+LIST_MARGIN*2
elif line.startswith(">"):
height += font_size+QUOTE_LEFT_LINE_MARGIN*2
elif line.startswith("```"):
if pre_in_code:
pre_in_code = False
# pre_codes = []
height += CODE_BLOCK_MARGIN
else:
pre_in_code = True
height += CODE_BLOCK_MARGIN
elif re.search(r"`(.*?)`", line):
height += font_size+INLINE_CODE_FONT_MARGIN*2+INLINE_CODE_MARGIN*2
else:
height += font_size + TEXT_LINE_MARGIN*2
markdown_text = '\n'.join(pre_lines)
print("Pre process done, height: ", height)
image_height = height
if image_height < 100:
image_height = 100
image_width += 20
# 创建空白图像
image = Image.new('RGB', (image_width, image_height), bg_color)
draw = ImageDraw.Draw(image)
# # get all the emojis unicode in the markdown text
# unicode_text = markdown_text.encode('unicode_escape').decode()
# # print(unicode_text)
# unicode_emojis = re.findall(r'\\U\w{8}', unicode_text)
# emoji_base_url = "https://abs.twimg.com/emoji/v1/72x72/{unicode_emoji}.png"
# 设置初始位置
x, y = 10, 10
# 解析Markdown文本
lines = markdown_text.split("\n")
# lines = pre_lines
in_code_block = False
code_block_start_y = 0
code_block_codes = []
index = -1
for line in lines:
index += 1
if in_code_block and not line.startswith("```"):
code_block_codes.append(line)
y += font_size + CODE_BLOCK_TEXT_MARGIN
continue
line = line.strip()
if line.startswith("#"):
# unicode_emojis = re.findall(r'\\U0001\w{4}', line)
# for unicode_emoji in unicode_emojis:
# line = line.replace(unicode_emoji, "")
# unicode_emoji = ""
# if len(unicode_emojis) > 0:
# unicode_emoji = unicode_emojis[0]
# 处理标题
header_level = line.count("#")
line = line.strip("#").strip()
font_size_header = HEADER_FONT_STANDARD_SIZE - header_level * 4
# if unicode_emoji != "":
# emoji_url = emoji_base_url.format(unicode_emoji=unicode_emoji[-5:])
# emoji = Image.open(requests.get(emoji_url, stream=True).raw)
# emoji = emoji.resize((font_size, font_size))
# image.paste(emoji, (x, y))
# x += font_size
font = ImageFont.truetype(font_path, font_size_header)
y += HEADER_MARGIN # 上边距
# 字间距
draw.text((x, y), line, font=font, fill=font_color)
draw.line((x, y + font_size_header + 8, image_width - 10, y + font_size_header + 8), fill=(230, 230, 230), width=3)
y += font_size_header + HEADER_MARGIN
elif line.startswith(">"):
# 处理引用
quote_text = line.strip(">")
# quote_width = image_width - 20 # 引用框的宽度为图像宽度减去左右边距
# quote_height = font_size + 10 # 引用框的高度为字体大小加上上下边距
# quote_box = (x, y, x + quote_width, y + quote_height)
# draw.rounded_rectangle(quote_box, radius=5, fill=(230, 230, 230), width=2) # 使用灰色填充矩形框作为引用背景
y+=QUOTE_LEFT_LINE_MARGIN
draw.line((x, y, x, y + QUOTE_LEFT_LINE_HEIGHT), fill=QUOTE_LEFT_LINE_COLOR, width=QUOTE_LEFT_LINE_WIDTH)
font = ImageFont.truetype(font_path, QUOTE_FONT_SIZE)
draw.text((x + QUOTE_FONT_LINE_MARGIN, y + QUOTE_FONT_LINE_MARGIN), quote_text, font=font, fill=QUOTE_FONT_COLOR)
y += font_size + QUOTE_LEFT_LINE_HEIGHT + QUOTE_LEFT_LINE_MARGIN
elif line.startswith("-"):
# 处理列表
list_text = line.strip("-").strip()
font = ImageFont.truetype(font_path, LIST_FONT_SIZE)
y += LIST_MARGIN
draw.text((x, y), " · " + list_text, font=font, fill=LIST_FONT_COLOR)
y += font_size + LIST_MARGIN
elif line.startswith("```"):
if not in_code_block:
code_block_start_y = y+CODE_BLOCK_MARGIN
in_code_block = True
else:
# print(code_block_codes)
in_code_block = False
codes = "\n".join(code_block_codes)
code_block_codes = []
draw.rounded_rectangle((x, code_block_start_y, image_width - 10, y+CODE_BLOCK_CODES_MARGIN_VERTICAL + CODE_BLOCK_TEXT_MARGIN), radius=5, fill=CODE_BLOCK_BG_COLOR, width=2)
font = ImageFont.truetype(font_path1, CODE_BLOCK_FONT_SIZE)
draw.text((x + CODE_BLOCK_CODES_MARGIN_HORIZONTAL, code_block_start_y + CODE_BLOCK_CODES_MARGIN_VERTICAL), codes, font=font, fill=font_color)
y += CODE_BLOCK_CODES_MARGIN_VERTICAL + CODE_BLOCK_MARGIN
# y += font_size+10
elif re.search(r"`(.*?)`", line):
y += INLINE_CODE_MARGIN # 上边距
# 处理行内代码
code_regex = r"`(.*?)`"
parts_inline = re.findall(code_regex, line)
# print(parts_inline)
parts = re.split(code_regex, line)
# print(parts)
for part in parts:
# the judge has a tiny bug.
# when line is like "hi`hi`". all the parts will be in parts_inline.
if part in parts_inline:
font = ImageFont.truetype(font_path, INLINE_CODE_FONT_SIZE)
code_text = part.strip("`")
code_width = font.getsize(code_text)[0] + INLINE_CODE_FONT_MARGIN*2
x += INLINE_CODE_MARGIN
code_box = (x, y, x + code_width, y + INLINE_CODE_BG_HEIGHT)
draw.rounded_rectangle(code_box, radius=5, fill=INLINE_CODE_BG_COLOR, width=2) # 使用灰色填充矩形框作为引用背景
draw.text((x+INLINE_CODE_FONT_MARGIN, y), code_text, font=font, fill=font_color)
x += code_width+INLINE_CODE_MARGIN-INLINE_CODE_FONT_MARGIN
else:
font = ImageFont.truetype(font_path, font_size)
draw.text((x, y), part, font=font, fill=font_color)
x += font.getsize(part)[0]
y += font_size + INLINE_CODE_MARGIN
x = 10
else:
# 处理普通文本
if line == "":
y += TEXT_LINE_MARGIN
else:
font = ImageFont.truetype(font_path, font_size)
draw.text((x, y), line, font=font, fill=font_color)
y += font_size + TEXT_LINE_MARGIN*2
# 图片特殊处理
if index in images:
image_res = images[index]
# 最大不得超过image_width的50%
if image_res.size[0] > image_width*0.5:
image_res = image_res.resize((int(image_width*0.5), int(image_res.size[1]*image_width*0.5/image_res.size[0])))
image.paste(image_res, (IMAGE_MARGIN, y))
y += image_res.size[1] + IMAGE_MARGIN*2
return image
def save_temp_img(img: Image) -> str:
if not os.path.exists("temp"):
os.makedirs("temp")
# 获得文件创建时间清除超过1小时的
try:
for f in os.listdir("temp"):
path = os.path.join("temp", f)
if os.path.isfile(path):
ctime = os.path.getctime(path)
if time.time() - ctime > 3600:
os.remove(path)
except Exception as e:
log(f"清除临时文件失败: {e}", level=LEVEL_WARNING, tag="GeneralUtils")
# 获得时间戳
timestamp = int(time.time())
p = f"temp/{timestamp}.png"
img.save(p)
return p
def create_text_image(title: str, text: str, max_width=30, font_size=20):
'''
文本转图片。
title: 标题
text: 文本内容
max_width: 文本宽度最大值默认30
font_size: 字体大小默认20
返回:文件路径
'''
try:
img = word2img(title, text, max_width, font_size)
p = save_temp_img(img)
return p
except Exception as e:
raise e
def create_markdown_image(text: str):
'''
markdown文本转图片。
返回:文件路径
'''
try:
img = render_markdown(text)
p = save_temp_img(img)
return p
except Exception as e:
raise e
def test_markdown():
# 示例使用
markdown_text = """# Help Center
! [] (https://soulter.top/helpme.jpg)
"""
image = render_markdown(markdown_text)
image.show()
test_markdown()

View File

@@ -1,11 +0,0 @@
import logging
from logging.handlers import RotatingFileHandler
import colorlog
logger = logging.getLogger("QQChannelChatGPT")
logger.setLevel(logging.DEBUG)
handler = colorlog.StreamHandler()
fmt = "%(log_color)s[%(name)s] %(message)s"
handler.setFormatter(colorlog.ColoredFormatter(
fmt))
logger.addHandler(handler)

View File

@@ -7,7 +7,7 @@ def get_classes(p_name, arg):
clsmembers = inspect.getmembers(arg, inspect.isclass) clsmembers = inspect.getmembers(arg, inspect.isclass)
for (name, _) in clsmembers: for (name, _) in clsmembers:
# print(name, p_name) # print(name, p_name)
if p_name.lower() == name.lower().replace("plugin", ""): if p_name.lower() == name.lower()[:-6]:
classes.append(name) classes.append(name)
break break
return classes return classes