Compare commits
6 Commits
refactor/w
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18e33d5505 | ||
|
|
a63762b054 | ||
|
|
510fbdbb22 | ||
|
|
d6cf205bc3 | ||
|
|
a0e96ae975 | ||
|
|
f1b26546fd |
@@ -83,6 +83,8 @@ See docs: [Source Code Deployment](https://astrbot.app/deploy/astrbot/cli.html)
|
||||
| Claude API | ✔ | Text Generation | |
|
||||
| Google Gemini API | ✔ | Text Generation | |
|
||||
| Dify | ✔ | LLMOps | |
|
||||
| Coze | ✔ | LLMOps | |
|
||||
| n8n | ✔ | LLMOps/Workflow | Webhook-based workflow automation |
|
||||
| DashScope (Alibaba Cloud) | ✔ | LLMOps | |
|
||||
| Ollama | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) |
|
||||
| LM Studio | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) |
|
||||
|
||||
@@ -249,6 +249,8 @@ class ProviderManager:
|
||||
from .sources.dify_source import ProviderDify as ProviderDify
|
||||
case "coze":
|
||||
from .sources.coze_source import ProviderCoze as ProviderCoze
|
||||
case "n8n":
|
||||
from .sources.n8n_source import ProviderN8n as ProviderN8n
|
||||
case "dashscope":
|
||||
from .sources.dashscope_source import (
|
||||
ProviderDashscope as ProviderDashscope,
|
||||
|
||||
341
astrbot/core/provider/sources/n8n_source.py
Normal file
341
astrbot/core/provider/sources/n8n_source.py
Normal file
@@ -0,0 +1,341 @@
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import astrbot.core.message.components as Comp
|
||||
from astrbot.core import logger, sp
|
||||
from astrbot.core.message.message_event_result import MessageChain
|
||||
from astrbot.core.utils.n8n_api_client import N8nAPIClient
|
||||
|
||||
from .. import Provider
|
||||
from ..entities import LLMResponse
|
||||
from ..register import register_provider_adapter
|
||||
|
||||
|
||||
@register_provider_adapter("n8n", "n8n 工作流适配器。")
|
||||
class ProviderN8n(Provider):
|
||||
def __init__(
|
||||
self,
|
||||
provider_config,
|
||||
provider_settings,
|
||||
default_persona=None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
provider_config,
|
||||
provider_settings,
|
||||
default_persona,
|
||||
)
|
||||
self.webhook_url = provider_config.get("n8n_webhook_url", "")
|
||||
if not self.webhook_url:
|
||||
raise Exception("n8n Webhook URL 不能为空。")
|
||||
|
||||
self.auth_header = provider_config.get("n8n_auth_header", "")
|
||||
self.auth_value = provider_config.get("n8n_auth_value", "")
|
||||
self.http_method = provider_config.get("n8n_http_method", "POST").upper()
|
||||
if self.http_method not in ["GET", "POST"]:
|
||||
raise Exception("n8n HTTP 方法必须是 GET 或 POST。")
|
||||
|
||||
self.model_name = "n8n"
|
||||
self.output_key = provider_config.get("n8n_output_key", "output")
|
||||
self.input_key = provider_config.get("n8n_input_key", "input")
|
||||
self.session_id_key = provider_config.get("n8n_session_id_key", "sessionId")
|
||||
self.image_urls_key = provider_config.get("n8n_image_urls_key", "imageUrls")
|
||||
|
||||
self.streaming = provider_config.get("n8n_streaming", False)
|
||||
if isinstance(self.streaming, str):
|
||||
self.streaming = self.streaming.lower() in ["true", "1", "yes"]
|
||||
|
||||
self.variables: dict = provider_config.get("variables", {})
|
||||
self.timeout = provider_config.get("timeout", 120)
|
||||
if isinstance(self.timeout, str):
|
||||
self.timeout = int(self.timeout)
|
||||
|
||||
self.api_client = N8nAPIClient(
|
||||
webhook_url=self.webhook_url,
|
||||
auth_header=self.auth_header if self.auth_header else None,
|
||||
auth_value=self.auth_value if self.auth_value else None,
|
||||
)
|
||||
|
||||
async def text_chat(
|
||||
self,
|
||||
prompt: str,
|
||||
session_id=None,
|
||||
image_urls=None,
|
||||
func_tool=None,
|
||||
contexts=None,
|
||||
system_prompt=None,
|
||||
tool_calls_result=None,
|
||||
model=None,
|
||||
**kwargs,
|
||||
) -> LLMResponse:
|
||||
if image_urls is None:
|
||||
image_urls = []
|
||||
|
||||
session_id = session_id or kwargs.get("user") or "unknown"
|
||||
|
||||
# Build the payload
|
||||
payload = self.variables.copy()
|
||||
|
||||
# Add session variables
|
||||
session_var = await sp.session_get(session_id, "session_variables", default={})
|
||||
payload.update(session_var)
|
||||
|
||||
# Add the main input
|
||||
payload[self.input_key] = prompt
|
||||
|
||||
# Add session ID
|
||||
payload[self.session_id_key] = session_id
|
||||
|
||||
# Add system prompt if provided
|
||||
if system_prompt:
|
||||
payload["system_prompt"] = system_prompt
|
||||
|
||||
# Add image URLs if provided
|
||||
if image_urls:
|
||||
payload[self.image_urls_key] = image_urls
|
||||
|
||||
try:
|
||||
if self.streaming:
|
||||
# Use streaming execution
|
||||
result_text = ""
|
||||
async for chunk in self.api_client.execute_workflow(
|
||||
data=payload,
|
||||
method=self.http_method,
|
||||
streaming=True,
|
||||
timeout=self.timeout,
|
||||
):
|
||||
logger.debug(f"n8n streaming chunk: {chunk}")
|
||||
if isinstance(chunk, dict):
|
||||
# Try to extract text from various possible keys
|
||||
text = (
|
||||
chunk.get("output", "")
|
||||
or chunk.get("text", "")
|
||||
or chunk.get("data", "")
|
||||
)
|
||||
if text:
|
||||
result_text += str(text)
|
||||
else:
|
||||
result_text += str(chunk)
|
||||
|
||||
result = {"output": result_text}
|
||||
else:
|
||||
# Non-streaming execution
|
||||
result = await self.api_client.execute_workflow(
|
||||
data=payload,
|
||||
method=self.http_method,
|
||||
streaming=False,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
logger.debug(f"n8n response: {result}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"n8n 请求失败:{e!s}")
|
||||
return LLMResponse(role="err", completion_text=f"n8n 请求失败:{e!s}")
|
||||
|
||||
if not result:
|
||||
logger.warning("n8n 请求结果为空,请查看 Debug 日志。")
|
||||
return LLMResponse(role="assistant", completion_text="")
|
||||
|
||||
chain = await self.parse_n8n_result(result)
|
||||
return LLMResponse(role="assistant", result_chain=chain)
|
||||
|
||||
async def text_chat_stream(
|
||||
self,
|
||||
prompt,
|
||||
session_id=None,
|
||||
image_urls=None,
|
||||
func_tool=None,
|
||||
contexts=None,
|
||||
system_prompt=None,
|
||||
tool_calls_result=None,
|
||||
model=None,
|
||||
**kwargs,
|
||||
) -> AsyncGenerator[LLMResponse, None]:
|
||||
if not self.streaming:
|
||||
# Simulate streaming by calling text_chat
|
||||
llm_response = await self.text_chat(
|
||||
prompt=prompt,
|
||||
session_id=session_id,
|
||||
image_urls=image_urls,
|
||||
func_tool=func_tool,
|
||||
contexts=contexts,
|
||||
system_prompt=system_prompt,
|
||||
tool_calls_result=tool_calls_result,
|
||||
model=model,
|
||||
**kwargs,
|
||||
)
|
||||
llm_response.is_chunk = True
|
||||
yield llm_response
|
||||
llm_response.is_chunk = False
|
||||
yield llm_response
|
||||
else:
|
||||
# True streaming
|
||||
if image_urls is None:
|
||||
image_urls = []
|
||||
|
||||
session_id = session_id or kwargs.get("user") or "unknown"
|
||||
|
||||
# Build the payload
|
||||
payload = self.variables.copy()
|
||||
session_var = await sp.session_get(
|
||||
session_id,
|
||||
"session_variables",
|
||||
default={},
|
||||
)
|
||||
payload.update(session_var)
|
||||
payload[self.input_key] = prompt
|
||||
payload[self.session_id_key] = session_id
|
||||
|
||||
if system_prompt:
|
||||
payload["system_prompt"] = system_prompt
|
||||
if image_urls:
|
||||
payload[self.image_urls_key] = image_urls
|
||||
|
||||
try:
|
||||
accumulated_text = ""
|
||||
async for chunk in self.api_client.execute_workflow(
|
||||
data=payload,
|
||||
method=self.http_method,
|
||||
streaming=True,
|
||||
timeout=self.timeout,
|
||||
):
|
||||
logger.debug(f"n8n streaming chunk: {chunk}")
|
||||
if isinstance(chunk, dict):
|
||||
text = (
|
||||
chunk.get("output", "")
|
||||
or chunk.get("text", "")
|
||||
or chunk.get("data", "")
|
||||
)
|
||||
if text:
|
||||
accumulated_text += str(text)
|
||||
yield LLMResponse(
|
||||
role="assistant",
|
||||
completion_text=str(text),
|
||||
is_chunk=True,
|
||||
)
|
||||
else:
|
||||
accumulated_text += str(chunk)
|
||||
yield LLMResponse(
|
||||
role="assistant",
|
||||
completion_text=str(chunk),
|
||||
is_chunk=True,
|
||||
)
|
||||
|
||||
# Send final response
|
||||
if accumulated_text:
|
||||
chain = MessageChain(chain=[Comp.Plain(accumulated_text)])
|
||||
yield LLMResponse(
|
||||
role="assistant",
|
||||
result_chain=chain,
|
||||
is_chunk=False,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"n8n 流式请求失败:{e!s}")
|
||||
yield LLMResponse(
|
||||
role="err",
|
||||
completion_text=f"n8n 流式请求失败:{e!s}",
|
||||
is_chunk=False,
|
||||
)
|
||||
|
||||
async def parse_n8n_result(self, result: dict | str) -> MessageChain:
|
||||
"""Parse n8n workflow result into MessageChain"""
|
||||
if isinstance(result, str):
|
||||
return MessageChain(chain=[Comp.Plain(result)])
|
||||
|
||||
# Extract output from result
|
||||
output = result.get(self.output_key)
|
||||
if output is None:
|
||||
# Try common alternative keys
|
||||
output = (
|
||||
result.get("data")
|
||||
or result.get("result")
|
||||
or result.get("response")
|
||||
or result.get("text")
|
||||
)
|
||||
|
||||
if output is None:
|
||||
# If still no output, use the entire result
|
||||
output = result
|
||||
|
||||
chains = []
|
||||
|
||||
if isinstance(output, str):
|
||||
# Simple text output
|
||||
chains.append(Comp.Plain(output))
|
||||
elif isinstance(output, list):
|
||||
# Handle array output
|
||||
for item in output:
|
||||
if isinstance(item, dict):
|
||||
# Check if it's a file/media object
|
||||
if "type" in item:
|
||||
comp = await self._parse_media_item(item)
|
||||
if comp:
|
||||
chains.append(comp)
|
||||
else:
|
||||
chains.append(Comp.Plain(str(item)))
|
||||
else:
|
||||
chains.append(Comp.Plain(str(item)))
|
||||
else:
|
||||
chains.append(Comp.Plain(str(item)))
|
||||
elif isinstance(output, dict):
|
||||
# Handle object output
|
||||
# Check if it's a media object
|
||||
if "type" in output:
|
||||
comp = await self._parse_media_item(output)
|
||||
if comp:
|
||||
chains.append(comp)
|
||||
else:
|
||||
chains.append(Comp.Plain(str(output)))
|
||||
else:
|
||||
chains.append(Comp.Plain(str(output)))
|
||||
else:
|
||||
chains.append(Comp.Plain(str(output)))
|
||||
|
||||
if not chains:
|
||||
chains.append(Comp.Plain(""))
|
||||
|
||||
return MessageChain(chain=chains)
|
||||
|
||||
async def _parse_media_item(self, item: dict):
|
||||
"""Parse media item from n8n response"""
|
||||
item_type = item.get("type", "").lower()
|
||||
url = item.get("url") or item.get("file_url") or item.get("path")
|
||||
|
||||
if not url:
|
||||
return None
|
||||
|
||||
match item_type:
|
||||
case "image":
|
||||
return Comp.Image(file=url, url=url)
|
||||
case "audio":
|
||||
# Download audio file if needed
|
||||
return Comp.File(name=item.get("name", "audio"), file=url)
|
||||
case "video":
|
||||
return Comp.Video(file=url)
|
||||
case "file":
|
||||
return Comp.File(name=item.get("name", "file"), file=url)
|
||||
case _:
|
||||
return None
|
||||
|
||||
async def forget(self, session_id):
|
||||
"""Clear session context (n8n is stateless, so this is a no-op)"""
|
||||
return True
|
||||
|
||||
async def get_current_key(self):
|
||||
"""Get current API key/auth value"""
|
||||
return self.auth_value or ""
|
||||
|
||||
async def set_key(self, key):
|
||||
"""Set API key/auth value"""
|
||||
raise Exception("n8n 适配器不支持设置 API Key。")
|
||||
|
||||
async def get_models(self):
|
||||
"""Get available models"""
|
||||
return [self.get_model()]
|
||||
|
||||
async def get_human_readable_context(self, session_id, page, page_size):
|
||||
"""Get human-readable context (not supported for n8n)"""
|
||||
raise Exception("暂不支持获得 n8n 的历史消息记录。")
|
||||
|
||||
async def terminate(self):
|
||||
"""Clean up resources"""
|
||||
await self.api_client.close()
|
||||
149
astrbot/core/utils/n8n_api_client.py
Normal file
149
astrbot/core/utils/n8n_api_client.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import codecs
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
from aiohttp import ClientSession
|
||||
|
||||
from astrbot.core import logger
|
||||
|
||||
|
||||
async def _stream_sse(resp) -> AsyncGenerator[dict, None]:
|
||||
"""Stream Server-Sent Events (SSE) response"""
|
||||
decoder = codecs.getincrementaldecoder("utf-8")()
|
||||
buffer = ""
|
||||
async for chunk in resp.content.iter_chunked(8192):
|
||||
buffer += decoder.decode(chunk)
|
||||
while "\n\n" in buffer:
|
||||
block, buffer = buffer.split("\n\n", 1)
|
||||
if block.strip().startswith("data:"):
|
||||
try:
|
||||
yield json.loads(block[5:])
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Drop invalid n8n json data: {block[5:]}")
|
||||
continue
|
||||
# flush any remaining text
|
||||
buffer += decoder.decode(b"", final=True)
|
||||
if buffer.strip().startswith("data:"):
|
||||
try:
|
||||
yield json.loads(buffer[5:])
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Drop invalid n8n json data: {buffer[5:]}")
|
||||
|
||||
|
||||
class N8nAPIClient:
|
||||
"""n8n API Client for webhook-based workflow execution"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
webhook_url: str,
|
||||
auth_header: str | None = None,
|
||||
auth_value: str | None = None,
|
||||
):
|
||||
self.webhook_url = webhook_url
|
||||
self.session = None
|
||||
self.headers = {}
|
||||
if auth_header and auth_value:
|
||||
self.headers[auth_header] = auth_value
|
||||
|
||||
def _get_session(self) -> ClientSession:
|
||||
"""Lazily create and return the ClientSession"""
|
||||
if self.session is None:
|
||||
self.session = ClientSession(trust_env=True)
|
||||
return self.session
|
||||
|
||||
async def execute_workflow(
|
||||
self,
|
||||
data: dict[str, Any],
|
||||
method: str = "POST",
|
||||
streaming: bool = False,
|
||||
timeout: float = 120,
|
||||
) -> dict[str, Any] | AsyncGenerator[dict[str, Any], None]:
|
||||
"""Execute n8n workflow via webhook
|
||||
|
||||
Args:
|
||||
data: Data to send to the webhook
|
||||
method: HTTP method (GET or POST)
|
||||
streaming: Whether to expect streaming response
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Workflow execution result or async generator for streaming responses
|
||||
|
||||
"""
|
||||
logger.debug(f"n8n workflow execution: {data}")
|
||||
|
||||
session = self._get_session()
|
||||
|
||||
if method.upper() == "GET":
|
||||
async with session.get(
|
||||
self.webhook_url,
|
||||
params=data,
|
||||
headers=self.headers,
|
||||
timeout=timeout,
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
text = await resp.text()
|
||||
raise Exception(
|
||||
f"n8n workflow execution failed: {resp.status}. {text}",
|
||||
)
|
||||
if streaming:
|
||||
return self._handle_streaming_response(resp)
|
||||
return await resp.json()
|
||||
# POST method
|
||||
async with session.post(
|
||||
self.webhook_url,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=timeout,
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
text = await resp.text()
|
||||
raise Exception(
|
||||
f"n8n workflow execution failed: {resp.status}. {text}",
|
||||
)
|
||||
if streaming:
|
||||
return self._handle_streaming_response(resp)
|
||||
return await resp.json()
|
||||
|
||||
async def _handle_streaming_response(
|
||||
self,
|
||||
resp,
|
||||
) -> AsyncGenerator[dict[str, Any], None]:
|
||||
"""Handle streaming response from n8n workflow"""
|
||||
content_type = resp.headers.get("Content-Type", "")
|
||||
if "text/event-stream" in content_type:
|
||||
# SSE response
|
||||
async for event in _stream_sse(resp):
|
||||
yield event
|
||||
else:
|
||||
# Regular streaming response
|
||||
decoder = codecs.getincrementaldecoder("utf-8")()
|
||||
buffer = ""
|
||||
async for chunk in resp.content.iter_chunked(8192):
|
||||
buffer += decoder.decode(chunk)
|
||||
# Try to parse each line as JSON
|
||||
lines = buffer.split("\n")
|
||||
buffer = lines[-1] # Keep incomplete line in buffer
|
||||
for line in lines[:-1]:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
# If not JSON, yield as text
|
||||
yield {"text": line}
|
||||
|
||||
# Flush remaining buffer
|
||||
buffer += decoder.decode(b"", final=True)
|
||||
if buffer.strip():
|
||||
try:
|
||||
yield json.loads(buffer)
|
||||
except json.JSONDecodeError:
|
||||
yield {"text": buffer}
|
||||
|
||||
async def close(self):
|
||||
"""Close the HTTP session"""
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
@@ -23,6 +23,7 @@ export function getProviderIcon(type) {
|
||||
'ppio': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/ppio.svg',
|
||||
'dify': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/dify-color.svg',
|
||||
"coze": "https://registry.npmmirror.com/@lobehub/icons-static-svg/1.66.0/files/icons/coze.svg",
|
||||
'n8n': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/n8n.svg',
|
||||
'dashscope': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/alibabacloud-color.svg',
|
||||
'fastgpt': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/fastgpt-color.svg',
|
||||
'lm_studio': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/lmstudio.svg',
|
||||
|
||||
@@ -291,6 +291,7 @@ export default {
|
||||
"zhipu_chat_completion": "chat_completion",
|
||||
"dify": "chat_completion",
|
||||
"coze": "chat_completion",
|
||||
"n8n": "chat_completion",
|
||||
"dashscope": "chat_completion",
|
||||
"openai_whisper_api": "speech_to_text",
|
||||
"openai_whisper_selfhost": "speech_to_text",
|
||||
|
||||
181
docs/n8n_provider_implementation.md
Normal file
181
docs/n8n_provider_implementation.md
Normal file
@@ -0,0 +1,181 @@
|
||||
# n8n Provider Implementation
|
||||
|
||||
## Overview
|
||||
|
||||
This document describes the n8n provider implementation for AstrBot, which enables users to integrate n8n workflow automation with AstrBot's chatbot capabilities.
|
||||
|
||||
## Architecture
|
||||
|
||||
The n8n provider consists of two main components:
|
||||
|
||||
### 1. N8nAPIClient (`astrbot/core/utils/n8n_api_client.py`)
|
||||
|
||||
A lightweight HTTP client that handles webhook communication with n8n workflows.
|
||||
|
||||
**Key Features:**
|
||||
- Supports both GET and POST HTTP methods
|
||||
- Handles streaming and non-streaming responses
|
||||
- Custom authentication header support
|
||||
- Lazy ClientSession initialization to avoid event loop issues
|
||||
- Server-Sent Events (SSE) support for streaming responses
|
||||
|
||||
### 2. ProviderN8n (`astrbot/core/provider/sources/n8n_source.py`)
|
||||
|
||||
The main provider adapter that implements the AstrBot Provider interface.
|
||||
|
||||
**Key Features:**
|
||||
- Webhook-based workflow execution
|
||||
- Session management with customizable session ID keys
|
||||
- Multimodal support (text and images)
|
||||
- Configurable input/output key mappings for flexibility
|
||||
- Streaming and non-streaming response modes
|
||||
- Automatic response parsing into MessageChain format
|
||||
|
||||
## Configuration
|
||||
|
||||
The n8n provider accepts the following configuration parameters:
|
||||
|
||||
| Parameter | Type | Required | Default | Description |
|
||||
|-----------|------|----------|---------|-------------|
|
||||
| `n8n_webhook_url` | string | Yes | - | Webhook URL for the n8n workflow |
|
||||
| `n8n_http_method` | string | No | "POST" | HTTP method (GET or POST) |
|
||||
| `n8n_auth_header` | string | No | "" | Authentication header name |
|
||||
| `n8n_auth_value` | string | No | "" | Authentication value |
|
||||
| `n8n_output_key` | string | No | "output" | Key to extract workflow output |
|
||||
| `n8n_input_key` | string | No | "input" | Key to send user input |
|
||||
| `n8n_session_id_key` | string | No | "sessionId" | Key for session ID |
|
||||
| `n8n_image_urls_key` | string | No | "imageUrls" | Key for image URLs |
|
||||
| `n8n_streaming` | boolean | No | false | Enable streaming responses |
|
||||
| `timeout` | integer | No | 120 | Request timeout in seconds |
|
||||
| `variables` | object | No | {} | Additional variables to send |
|
||||
|
||||
### Example Configuration
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "n8n",
|
||||
"id": "my_n8n_workflow",
|
||||
"enable": true,
|
||||
"n8n_webhook_url": "https://your-n8n-instance.com/webhook/abc123",
|
||||
"n8n_http_method": "POST",
|
||||
"n8n_auth_header": "Authorization",
|
||||
"n8n_auth_value": "Bearer your-token-here",
|
||||
"n8n_output_key": "result",
|
||||
"n8n_input_key": "query",
|
||||
"timeout": 60,
|
||||
"variables": {
|
||||
"language": "zh-CN",
|
||||
"model": "gpt-4"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## n8n Workflow Design
|
||||
|
||||
To use the n8n provider effectively, your n8n workflow should:
|
||||
|
||||
1. **Accept webhook input** with the following structure:
|
||||
```json
|
||||
{
|
||||
"input": "user message text",
|
||||
"sessionId": "unique-session-identifier",
|
||||
"imageUrls": ["http://example.com/image.jpg"],
|
||||
"system_prompt": "optional system prompt",
|
||||
"custom_variable": "custom value"
|
||||
}
|
||||
```
|
||||
|
||||
2. **Return output** in one of these formats:
|
||||
- Simple string: `"response text"`
|
||||
- Object with output key: `{"output": "response text"}`
|
||||
- Object with alternative keys: `{"data": "response text"}`, `{"result": "response text"}`, etc.
|
||||
- Array of items (will be concatenated)
|
||||
|
||||
3. **For streaming responses** (if enabled):
|
||||
- Send chunks as Server-Sent Events (SSE)
|
||||
- Each chunk should contain `output`, `text`, or `data` field
|
||||
|
||||
## Response Parsing
|
||||
|
||||
The provider intelligently parses n8n workflow responses:
|
||||
|
||||
1. **String responses**: Directly converted to plain text
|
||||
2. **Object responses**: Extracts the configured output key or falls back to common keys
|
||||
3. **Array responses**: Processes each item, supporting media objects
|
||||
4. **Media objects**: Supports images, videos, audio, and files with type detection
|
||||
|
||||
Example media object:
|
||||
```json
|
||||
{
|
||||
"type": "image",
|
||||
"url": "https://example.com/image.jpg"
|
||||
}
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
The implementation includes comprehensive test coverage (13 tests):
|
||||
|
||||
- Provider registration and initialization
|
||||
- Configuration validation
|
||||
- Method implementations (get_models, forget, terminate, etc.)
|
||||
- Response parsing for various formats
|
||||
- Error handling
|
||||
|
||||
Run tests with:
|
||||
```bash
|
||||
uv run pytest tests/test_n8n_provider.py -v
|
||||
```
|
||||
|
||||
## Integration Points
|
||||
|
||||
### Dashboard Integration
|
||||
- Added to provider type mapping in `dashboard/src/views/ProviderPage.vue`
|
||||
- Icon support in `dashboard/src/utils/providerUtils.js`
|
||||
|
||||
### Provider Manager
|
||||
- Registered in `astrbot/core/provider/manager.py` for dynamic loading
|
||||
|
||||
### Documentation
|
||||
- Added to README.md (Chinese)
|
||||
- Added to README_en.md (English)
|
||||
|
||||
## Use Cases
|
||||
|
||||
The n8n provider enables powerful integration scenarios:
|
||||
|
||||
1. **Custom AI Workflows**: Build complex multi-step AI workflows in n8n
|
||||
2. **External API Integration**: Connect to third-party services through n8n
|
||||
3. **Data Processing**: Process user input through custom data pipelines
|
||||
4. **Conditional Logic**: Implement complex branching logic in n8n
|
||||
5. **Multi-Model Orchestration**: Combine multiple AI models in a single workflow
|
||||
|
||||
## Limitations
|
||||
|
||||
- n8n workflows are stateless by design; session management must be handled in the workflow
|
||||
- No built-in context history retrieval (implement in your n8n workflow if needed)
|
||||
- File uploads are sent as URLs (not binary data)
|
||||
|
||||
## Security Considerations
|
||||
|
||||
- Use HTTPS for webhook URLs in production
|
||||
- Implement authentication using custom headers
|
||||
- Validate and sanitize all inputs in your n8n workflow
|
||||
- Consider rate limiting at the n8n level
|
||||
- No security vulnerabilities found by CodeQL analysis
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
Potential improvements for future versions:
|
||||
|
||||
- Support for file binary uploads
|
||||
- Context history integration
|
||||
- Advanced error handling and retry logic
|
||||
- Webhook signature validation
|
||||
- Support for n8n Cloud and self-hosted instances with different authentication methods
|
||||
|
||||
## References
|
||||
|
||||
- [n8n Official Documentation](https://docs.n8n.io/)
|
||||
- [n8n Webhook Node Documentation](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.webhook/)
|
||||
- [AstrBot Provider Documentation](https://astrbot.app/)
|
||||
205
tests/test_n8n_provider.py
Normal file
205
tests/test_n8n_provider.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Test n8n provider implementation"""
|
||||
|
||||
import pytest
|
||||
|
||||
from astrbot.core.provider.sources.n8n_source import ProviderN8n
|
||||
|
||||
|
||||
class TestN8nProvider:
|
||||
"""Test suite for n8n provider"""
|
||||
|
||||
def test_provider_registration(self):
|
||||
"""Test that n8n provider is properly registered"""
|
||||
from astrbot.core.provider.register import provider_cls_map
|
||||
|
||||
assert "n8n" in provider_cls_map
|
||||
assert provider_cls_map["n8n"].type == "n8n"
|
||||
assert provider_cls_map["n8n"].cls_type == ProviderN8n
|
||||
|
||||
def test_provider_initialization_missing_url(self):
|
||||
"""Test that provider raises error when webhook URL is missing"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
}
|
||||
with pytest.raises(Exception, match="n8n Webhook URL 不能为空"):
|
||||
ProviderN8n(config, {}, None)
|
||||
|
||||
def test_provider_initialization_invalid_method(self):
|
||||
"""Test that provider raises error for invalid HTTP method"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
"n8n_http_method": "PUT",
|
||||
}
|
||||
with pytest.raises(Exception, match="n8n HTTP 方法必须是 GET 或 POST"):
|
||||
ProviderN8n(config, {}, None)
|
||||
|
||||
def test_provider_initialization_success(self):
|
||||
"""Test successful provider initialization"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
"n8n_http_method": "POST",
|
||||
"n8n_auth_header": "Authorization",
|
||||
"n8n_auth_value": "Bearer test_token",
|
||||
"n8n_output_key": "result",
|
||||
"n8n_input_key": "query",
|
||||
"timeout": 60,
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
|
||||
assert provider.webhook_url == "https://example.com/webhook"
|
||||
assert provider.http_method == "POST"
|
||||
assert provider.auth_header == "Authorization"
|
||||
assert provider.auth_value == "Bearer test_token"
|
||||
assert provider.output_key == "result"
|
||||
assert provider.input_key == "query"
|
||||
assert provider.timeout == 60
|
||||
assert provider.model_name == "n8n"
|
||||
|
||||
def test_provider_default_values(self):
|
||||
"""Test provider initialization with default values"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
|
||||
assert provider.http_method == "POST"
|
||||
assert provider.auth_header == ""
|
||||
assert provider.auth_value == ""
|
||||
assert provider.output_key == "output"
|
||||
assert provider.input_key == "input"
|
||||
assert provider.session_id_key == "sessionId"
|
||||
assert provider.image_urls_key == "imageUrls"
|
||||
assert provider.streaming is False
|
||||
assert provider.timeout == 120
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_models(self):
|
||||
"""Test get_models method"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
models = await provider.get_models()
|
||||
|
||||
assert models == ["n8n"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_current_key(self):
|
||||
"""Test get_current_key method"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
"n8n_auth_value": "test_auth_value",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
key = await provider.get_current_key()
|
||||
|
||||
assert key == "test_auth_value"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_key_raises_exception(self):
|
||||
"""Test that set_key raises an exception"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
|
||||
with pytest.raises(Exception, match="n8n 适配器不支持设置 API Key"):
|
||||
await provider.set_key("new_key")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_forget(self):
|
||||
"""Test forget method"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
result = await provider.forget("test_session")
|
||||
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminate(self):
|
||||
"""Test terminate method"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
# Should not raise an exception
|
||||
await provider.terminate()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_n8n_result_string(self):
|
||||
"""Test parsing string result"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
result = await provider.parse_n8n_result("Hello, world!")
|
||||
|
||||
assert len(result.chain) == 1
|
||||
assert result.chain[0].type == "Plain"
|
||||
assert result.chain[0].text == "Hello, world!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_n8n_result_dict_with_output(self):
|
||||
"""Test parsing dict result with output key"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
"n8n_output_key": "result",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
result = await provider.parse_n8n_result({"result": "Test response"})
|
||||
|
||||
assert len(result.chain) == 1
|
||||
assert result.chain[0].type == "Plain"
|
||||
assert result.chain[0].text == "Test response"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_n8n_result_dict_without_output_key(self):
|
||||
"""Test parsing dict result without configured output key"""
|
||||
config = {
|
||||
"type": "n8n",
|
||||
"id": "test_n8n",
|
||||
"enable": True,
|
||||
"n8n_webhook_url": "https://example.com/webhook",
|
||||
"n8n_output_key": "custom_output",
|
||||
}
|
||||
provider = ProviderN8n(config, {}, None)
|
||||
# Should fall back to common keys like 'data', 'result', etc.
|
||||
result = await provider.parse_n8n_result({"data": "Fallback response"})
|
||||
|
||||
assert len(result.chain) == 1
|
||||
assert result.chain[0].type == "Plain"
|
||||
assert result.chain[0].text == "Fallback response"
|
||||
Reference in New Issue
Block a user