Files
AstrBot/util/gplugin.py
Soulter 08e7d4d0c6 fix: 修复一部分超限的报错
perf: web search稳定性和精确度优化
2023-09-27 22:06:08 +08:00

193 lines
7.3 KiB
Python

import requests
import util.general_utils as gu
from bs4 import BeautifulSoup
import time
from util.func_call import (
FuncCall,
FuncCallJsonFormatError,
FuncNotFoundError
)
def tidy_text(text: str) -> str:
return text.strip().replace("\n", "").replace(" ", "").replace("\r", "")
def special_fetch_zhihu(link: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(link, headers=headers)
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
if "zhuanlan.zhihu.com" in link:
r = soup.find(class_="Post-RichTextContainer")
else:
r = soup.find(class_="List-item").find(class_="RichContent-inner")
if r is None:
print("debug: zhihu none")
raise Exception("zhihu none")
return tidy_text(r.text)
def web_keyword_search_via_bing(keyword) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
url = "https://www.bing.com/search?q="+keyword
_cnt = 0
_detail_store = []
while _cnt < 5:
try:
response = requests.get(url, headers=headers)
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
res = []
ols = soup.find(id="b_results")
for i in ols.find_all("li", class_="b_algo"):
try:
title = i.find("h2").text
desc = i.find("p").text
link = i.find("h2").find("a").get("href")
res.append({
"title": title,
"desc": desc,
"link": link,
})
if len(res) >= 5: # 限制5条
break
if len(_detail_store) >= 3:
continue
# 爬取前两条的网页内容
if "zhihu.com" in link:
try:
_detail_store.append(special_fetch_zhihu(link)[100:800])
except BaseException as e:
print(f"zhihu parse err: {str(e)}")
else:
try:
_detail_store.append(fetch_website_content(link)[100:1000])
except BaseException as e:
print(f"fetch_website_content err: {str(e)}")
except Exception as e:
print(f"bing parse err: {str(e)}")
if len(res) == 0:
break
if len(_detail_store) > 0:
ret = f"{str(res)} \n具体网页内容: {str(_detail_store)}"
else:
ret = f"{str(res)}"
return str(ret)
except Exception as e:
print(f"bing fetch err: {str(e)}")
_cnt += 1
time.sleep(1)
print("fail to fetch bing info, using sougou.")
return web_keyword_search_via_sougou(keyword)
def web_keyword_search_via_sougou(keyword) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
}
url = f"https://sogou.com/web?query={keyword}"
response = requests.get(url, headers=headers)
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
res = []
results = soup.find("div", class_="results")
for i in results.find_all("div", class_="vrwrap"):
try:
title = tidy_text(i.find("h3").text)
link = tidy_text(i.find("h3").find("a").get("href"))
if link.startswith("/link?url="):
link = "https://www.sogou.com" + link
res.append({
"title": title,
"link": link,
})
if len(res) >= 5: # 限制5条
break
except Exception as e:
gu.log(f"sougou parse err: {str(e)}", tag="web_keyword_search_via_sougou", level=gu.LEVEL_ERROR)
# 爬取网页内容
_detail_store = []
for i in res:
if _detail_store >= 3:
break
try:
_detail_store.append(fetch_website_content(i["link"])[100:1000])
except BaseException as e:
print(f"fetch_website_content err: {str(e)}")
ret = f"{str(res)}"
if len(_detail_store) > 0:
ret += f"\n网页内容: {str(_detail_store)}"
return ret
def fetch_website_content(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers)
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
res = soup.text
res = res.replace("\n", "").replace(" ", " ").replace("\r", "").replace("\t", "")
with open(f"temp_{time.time()}.html", "w", encoding="utf-8") as f:
f.write(res)
return res
def web_search(question, provider, session_id):
new_func_call = FuncCall(provider)
new_func_call.add_func("web_keyword_search_via_bing", [{
"type": "string",
"name": "keyword",
"brief": "必应搜索的关键词(分词,尽量保留所有信息)"
}],
"在必应搜索引擎上搜索给定的关键词,并且返回第一页的搜索结果列表(标题,简介和链接)",
web_keyword_search_via_bing
)
new_func_call.add_func("fetch_website_content", [{
"type": "string",
"name": "url",
"brief": "网址"
}],
"获取网址的内容",
fetch_website_content
)
func_definition1 = new_func_call.func_dump()
question1 = f"{question} \n(只能调用一个函数。)"
try:
res1, has_func = new_func_call.func_call(question1, func_definition1, is_task=False, is_summary=False)
except BaseException as e:
res = provider.text_chat(question) + "\n(网页搜索失败, 此为默认回复)"
return res
has_func = True
if has_func:
provider.forget(session_id)
question3 = f"""请你回答`{question}`问题。\n以下是相关材料,请直接拿此材料针对问题进行总结回答,再给参考链接, 参考链接首末有空格。不要提到任何函数调用的信息。```\n{res1}\n```\n"""
print(question3)
_c = 0
while _c < 5:
try:
print('text chat')
res3 = provider.text_chat(question3)
break
except Exception as e:
print(e)
_c += 1
if _c == 5:
raise e
if "The message you submitted was too long" in str(e):
res2 = res2[:int(len(res2) / 2)]
time.sleep(3)
question3 = f"""请回答`{question}`问题。\n以下是相关材料,请直接拿此材料针对问题进行回答,再给参考链接, 参考链接首末有空格。```\n{res1}\n{res2}\n```\n"""
return res3
else:
return res1