Compare commits

...

6 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
18e33d5505 Add comprehensive documentation for n8n provider
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-11-09 18:11:40 +00:00
copilot-swe-agent[bot]
a63762b054 Update README files to include n8n provider
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-11-09 18:08:04 +00:00
copilot-swe-agent[bot]
510fbdbb22 Add n8n provider to dashboard configuration
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-11-09 18:05:38 +00:00
copilot-swe-agent[bot]
d6cf205bc3 Fix lazy session initialization and add comprehensive tests
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-11-09 18:02:51 +00:00
copilot-swe-agent[bot]
a0e96ae975 Add n8n provider implementation
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-11-09 18:00:37 +00:00
copilot-swe-agent[bot]
f1b26546fd Initial plan 2025-11-09 17:54:29 +00:00
9 changed files with 883 additions and 0 deletions

View File

@@ -168,6 +168,7 @@ uv run main.py
- Dify - Dify
- 阿里云百炼应用 - 阿里云百炼应用
- Coze - Coze
- n8n
**语音转文本服务** **语音转文本服务**

View File

@@ -83,6 +83,8 @@ See docs: [Source Code Deployment](https://astrbot.app/deploy/astrbot/cli.html)
| Claude API | ✔ | Text Generation | | | Claude API | ✔ | Text Generation | |
| Google Gemini API | ✔ | Text Generation | | | Google Gemini API | ✔ | Text Generation | |
| Dify | ✔ | LLMOps | | | Dify | ✔ | LLMOps | |
| Coze | ✔ | LLMOps | |
| n8n | ✔ | LLMOps/Workflow | Webhook-based workflow automation |
| DashScope (Alibaba Cloud) | ✔ | LLMOps | | | DashScope (Alibaba Cloud) | ✔ | LLMOps | |
| Ollama | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) | | Ollama | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) |
| LM Studio | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) | | LM Studio | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) |

View File

@@ -249,6 +249,8 @@ class ProviderManager:
from .sources.dify_source import ProviderDify as ProviderDify from .sources.dify_source import ProviderDify as ProviderDify
case "coze": case "coze":
from .sources.coze_source import ProviderCoze as ProviderCoze from .sources.coze_source import ProviderCoze as ProviderCoze
case "n8n":
from .sources.n8n_source import ProviderN8n as ProviderN8n
case "dashscope": case "dashscope":
from .sources.dashscope_source import ( from .sources.dashscope_source import (
ProviderDashscope as ProviderDashscope, ProviderDashscope as ProviderDashscope,

View File

@@ -0,0 +1,341 @@
from collections.abc import AsyncGenerator
import astrbot.core.message.components as Comp
from astrbot.core import logger, sp
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.utils.n8n_api_client import N8nAPIClient
from .. import Provider
from ..entities import LLMResponse
from ..register import register_provider_adapter
@register_provider_adapter("n8n", "n8n 工作流适配器。")
class ProviderN8n(Provider):
def __init__(
self,
provider_config,
provider_settings,
default_persona=None,
) -> None:
super().__init__(
provider_config,
provider_settings,
default_persona,
)
self.webhook_url = provider_config.get("n8n_webhook_url", "")
if not self.webhook_url:
raise Exception("n8n Webhook URL 不能为空。")
self.auth_header = provider_config.get("n8n_auth_header", "")
self.auth_value = provider_config.get("n8n_auth_value", "")
self.http_method = provider_config.get("n8n_http_method", "POST").upper()
if self.http_method not in ["GET", "POST"]:
raise Exception("n8n HTTP 方法必须是 GET 或 POST。")
self.model_name = "n8n"
self.output_key = provider_config.get("n8n_output_key", "output")
self.input_key = provider_config.get("n8n_input_key", "input")
self.session_id_key = provider_config.get("n8n_session_id_key", "sessionId")
self.image_urls_key = provider_config.get("n8n_image_urls_key", "imageUrls")
self.streaming = provider_config.get("n8n_streaming", False)
if isinstance(self.streaming, str):
self.streaming = self.streaming.lower() in ["true", "1", "yes"]
self.variables: dict = provider_config.get("variables", {})
self.timeout = provider_config.get("timeout", 120)
if isinstance(self.timeout, str):
self.timeout = int(self.timeout)
self.api_client = N8nAPIClient(
webhook_url=self.webhook_url,
auth_header=self.auth_header if self.auth_header else None,
auth_value=self.auth_value if self.auth_value else None,
)
async def text_chat(
self,
prompt: str,
session_id=None,
image_urls=None,
func_tool=None,
contexts=None,
system_prompt=None,
tool_calls_result=None,
model=None,
**kwargs,
) -> LLMResponse:
if image_urls is None:
image_urls = []
session_id = session_id or kwargs.get("user") or "unknown"
# Build the payload
payload = self.variables.copy()
# Add session variables
session_var = await sp.session_get(session_id, "session_variables", default={})
payload.update(session_var)
# Add the main input
payload[self.input_key] = prompt
# Add session ID
payload[self.session_id_key] = session_id
# Add system prompt if provided
if system_prompt:
payload["system_prompt"] = system_prompt
# Add image URLs if provided
if image_urls:
payload[self.image_urls_key] = image_urls
try:
if self.streaming:
# Use streaming execution
result_text = ""
async for chunk in self.api_client.execute_workflow(
data=payload,
method=self.http_method,
streaming=True,
timeout=self.timeout,
):
logger.debug(f"n8n streaming chunk: {chunk}")
if isinstance(chunk, dict):
# Try to extract text from various possible keys
text = (
chunk.get("output", "")
or chunk.get("text", "")
or chunk.get("data", "")
)
if text:
result_text += str(text)
else:
result_text += str(chunk)
result = {"output": result_text}
else:
# Non-streaming execution
result = await self.api_client.execute_workflow(
data=payload,
method=self.http_method,
streaming=False,
timeout=self.timeout,
)
logger.debug(f"n8n response: {result}")
except Exception as e:
logger.error(f"n8n 请求失败:{e!s}")
return LLMResponse(role="err", completion_text=f"n8n 请求失败:{e!s}")
if not result:
logger.warning("n8n 请求结果为空,请查看 Debug 日志。")
return LLMResponse(role="assistant", completion_text="")
chain = await self.parse_n8n_result(result)
return LLMResponse(role="assistant", result_chain=chain)
async def text_chat_stream(
self,
prompt,
session_id=None,
image_urls=None,
func_tool=None,
contexts=None,
system_prompt=None,
tool_calls_result=None,
model=None,
**kwargs,
) -> AsyncGenerator[LLMResponse, None]:
if not self.streaming:
# Simulate streaming by calling text_chat
llm_response = await self.text_chat(
prompt=prompt,
session_id=session_id,
image_urls=image_urls,
func_tool=func_tool,
contexts=contexts,
system_prompt=system_prompt,
tool_calls_result=tool_calls_result,
model=model,
**kwargs,
)
llm_response.is_chunk = True
yield llm_response
llm_response.is_chunk = False
yield llm_response
else:
# True streaming
if image_urls is None:
image_urls = []
session_id = session_id or kwargs.get("user") or "unknown"
# Build the payload
payload = self.variables.copy()
session_var = await sp.session_get(
session_id,
"session_variables",
default={},
)
payload.update(session_var)
payload[self.input_key] = prompt
payload[self.session_id_key] = session_id
if system_prompt:
payload["system_prompt"] = system_prompt
if image_urls:
payload[self.image_urls_key] = image_urls
try:
accumulated_text = ""
async for chunk in self.api_client.execute_workflow(
data=payload,
method=self.http_method,
streaming=True,
timeout=self.timeout,
):
logger.debug(f"n8n streaming chunk: {chunk}")
if isinstance(chunk, dict):
text = (
chunk.get("output", "")
or chunk.get("text", "")
or chunk.get("data", "")
)
if text:
accumulated_text += str(text)
yield LLMResponse(
role="assistant",
completion_text=str(text),
is_chunk=True,
)
else:
accumulated_text += str(chunk)
yield LLMResponse(
role="assistant",
completion_text=str(chunk),
is_chunk=True,
)
# Send final response
if accumulated_text:
chain = MessageChain(chain=[Comp.Plain(accumulated_text)])
yield LLMResponse(
role="assistant",
result_chain=chain,
is_chunk=False,
)
except Exception as e:
logger.error(f"n8n 流式请求失败:{e!s}")
yield LLMResponse(
role="err",
completion_text=f"n8n 流式请求失败:{e!s}",
is_chunk=False,
)
async def parse_n8n_result(self, result: dict | str) -> MessageChain:
"""Parse n8n workflow result into MessageChain"""
if isinstance(result, str):
return MessageChain(chain=[Comp.Plain(result)])
# Extract output from result
output = result.get(self.output_key)
if output is None:
# Try common alternative keys
output = (
result.get("data")
or result.get("result")
or result.get("response")
or result.get("text")
)
if output is None:
# If still no output, use the entire result
output = result
chains = []
if isinstance(output, str):
# Simple text output
chains.append(Comp.Plain(output))
elif isinstance(output, list):
# Handle array output
for item in output:
if isinstance(item, dict):
# Check if it's a file/media object
if "type" in item:
comp = await self._parse_media_item(item)
if comp:
chains.append(comp)
else:
chains.append(Comp.Plain(str(item)))
else:
chains.append(Comp.Plain(str(item)))
else:
chains.append(Comp.Plain(str(item)))
elif isinstance(output, dict):
# Handle object output
# Check if it's a media object
if "type" in output:
comp = await self._parse_media_item(output)
if comp:
chains.append(comp)
else:
chains.append(Comp.Plain(str(output)))
else:
chains.append(Comp.Plain(str(output)))
else:
chains.append(Comp.Plain(str(output)))
if not chains:
chains.append(Comp.Plain(""))
return MessageChain(chain=chains)
async def _parse_media_item(self, item: dict):
"""Parse media item from n8n response"""
item_type = item.get("type", "").lower()
url = item.get("url") or item.get("file_url") or item.get("path")
if not url:
return None
match item_type:
case "image":
return Comp.Image(file=url, url=url)
case "audio":
# Download audio file if needed
return Comp.File(name=item.get("name", "audio"), file=url)
case "video":
return Comp.Video(file=url)
case "file":
return Comp.File(name=item.get("name", "file"), file=url)
case _:
return None
async def forget(self, session_id):
"""Clear session context (n8n is stateless, so this is a no-op)"""
return True
async def get_current_key(self):
"""Get current API key/auth value"""
return self.auth_value or ""
async def set_key(self, key):
"""Set API key/auth value"""
raise Exception("n8n 适配器不支持设置 API Key。")
async def get_models(self):
"""Get available models"""
return [self.get_model()]
async def get_human_readable_context(self, session_id, page, page_size):
"""Get human-readable context (not supported for n8n)"""
raise Exception("暂不支持获得 n8n 的历史消息记录。")
async def terminate(self):
"""Clean up resources"""
await self.api_client.close()

View File

@@ -0,0 +1,149 @@
import codecs
import json
from collections.abc import AsyncGenerator
from typing import Any
from aiohttp import ClientSession
from astrbot.core import logger
async def _stream_sse(resp) -> AsyncGenerator[dict, None]:
"""Stream Server-Sent Events (SSE) response"""
decoder = codecs.getincrementaldecoder("utf-8")()
buffer = ""
async for chunk in resp.content.iter_chunked(8192):
buffer += decoder.decode(chunk)
while "\n\n" in buffer:
block, buffer = buffer.split("\n\n", 1)
if block.strip().startswith("data:"):
try:
yield json.loads(block[5:])
except json.JSONDecodeError:
logger.warning(f"Drop invalid n8n json data: {block[5:]}")
continue
# flush any remaining text
buffer += decoder.decode(b"", final=True)
if buffer.strip().startswith("data:"):
try:
yield json.loads(buffer[5:])
except json.JSONDecodeError:
logger.warning(f"Drop invalid n8n json data: {buffer[5:]}")
class N8nAPIClient:
"""n8n API Client for webhook-based workflow execution"""
def __init__(
self,
webhook_url: str,
auth_header: str | None = None,
auth_value: str | None = None,
):
self.webhook_url = webhook_url
self.session = None
self.headers = {}
if auth_header and auth_value:
self.headers[auth_header] = auth_value
def _get_session(self) -> ClientSession:
"""Lazily create and return the ClientSession"""
if self.session is None:
self.session = ClientSession(trust_env=True)
return self.session
async def execute_workflow(
self,
data: dict[str, Any],
method: str = "POST",
streaming: bool = False,
timeout: float = 120,
) -> dict[str, Any] | AsyncGenerator[dict[str, Any], None]:
"""Execute n8n workflow via webhook
Args:
data: Data to send to the webhook
method: HTTP method (GET or POST)
streaming: Whether to expect streaming response
timeout: Request timeout in seconds
Returns:
Workflow execution result or async generator for streaming responses
"""
logger.debug(f"n8n workflow execution: {data}")
session = self._get_session()
if method.upper() == "GET":
async with session.get(
self.webhook_url,
params=data,
headers=self.headers,
timeout=timeout,
) as resp:
if resp.status != 200:
text = await resp.text()
raise Exception(
f"n8n workflow execution failed: {resp.status}. {text}",
)
if streaming:
return self._handle_streaming_response(resp)
return await resp.json()
# POST method
async with session.post(
self.webhook_url,
json=data,
headers=self.headers,
timeout=timeout,
) as resp:
if resp.status != 200:
text = await resp.text()
raise Exception(
f"n8n workflow execution failed: {resp.status}. {text}",
)
if streaming:
return self._handle_streaming_response(resp)
return await resp.json()
async def _handle_streaming_response(
self,
resp,
) -> AsyncGenerator[dict[str, Any], None]:
"""Handle streaming response from n8n workflow"""
content_type = resp.headers.get("Content-Type", "")
if "text/event-stream" in content_type:
# SSE response
async for event in _stream_sse(resp):
yield event
else:
# Regular streaming response
decoder = codecs.getincrementaldecoder("utf-8")()
buffer = ""
async for chunk in resp.content.iter_chunked(8192):
buffer += decoder.decode(chunk)
# Try to parse each line as JSON
lines = buffer.split("\n")
buffer = lines[-1] # Keep incomplete line in buffer
for line in lines[:-1]:
line = line.strip()
if line:
try:
yield json.loads(line)
except json.JSONDecodeError:
# If not JSON, yield as text
yield {"text": line}
# Flush remaining buffer
buffer += decoder.decode(b"", final=True)
if buffer.strip():
try:
yield json.loads(buffer)
except json.JSONDecodeError:
yield {"text": buffer}
async def close(self):
"""Close the HTTP session"""
if self.session:
await self.session.close()
self.session = None

View File

@@ -23,6 +23,7 @@ export function getProviderIcon(type) {
'ppio': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/ppio.svg', 'ppio': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/ppio.svg',
'dify': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/dify-color.svg', 'dify': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/dify-color.svg',
"coze": "https://registry.npmmirror.com/@lobehub/icons-static-svg/1.66.0/files/icons/coze.svg", "coze": "https://registry.npmmirror.com/@lobehub/icons-static-svg/1.66.0/files/icons/coze.svg",
'n8n': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/n8n.svg',
'dashscope': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/alibabacloud-color.svg', 'dashscope': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/alibabacloud-color.svg',
'fastgpt': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/fastgpt-color.svg', 'fastgpt': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/fastgpt-color.svg',
'lm_studio': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/lmstudio.svg', 'lm_studio': 'https://registry.npmmirror.com/@lobehub/icons-static-svg/latest/files/icons/lmstudio.svg',

View File

@@ -291,6 +291,7 @@ export default {
"zhipu_chat_completion": "chat_completion", "zhipu_chat_completion": "chat_completion",
"dify": "chat_completion", "dify": "chat_completion",
"coze": "chat_completion", "coze": "chat_completion",
"n8n": "chat_completion",
"dashscope": "chat_completion", "dashscope": "chat_completion",
"openai_whisper_api": "speech_to_text", "openai_whisper_api": "speech_to_text",
"openai_whisper_selfhost": "speech_to_text", "openai_whisper_selfhost": "speech_to_text",

View File

@@ -0,0 +1,181 @@
# n8n Provider Implementation
## Overview
This document describes the n8n provider implementation for AstrBot, which enables users to integrate n8n workflow automation with AstrBot's chatbot capabilities.
## Architecture
The n8n provider consists of two main components:
### 1. N8nAPIClient (`astrbot/core/utils/n8n_api_client.py`)
A lightweight HTTP client that handles webhook communication with n8n workflows.
**Key Features:**
- Supports both GET and POST HTTP methods
- Handles streaming and non-streaming responses
- Custom authentication header support
- Lazy ClientSession initialization to avoid event loop issues
- Server-Sent Events (SSE) support for streaming responses
### 2. ProviderN8n (`astrbot/core/provider/sources/n8n_source.py`)
The main provider adapter that implements the AstrBot Provider interface.
**Key Features:**
- Webhook-based workflow execution
- Session management with customizable session ID keys
- Multimodal support (text and images)
- Configurable input/output key mappings for flexibility
- Streaming and non-streaming response modes
- Automatic response parsing into MessageChain format
## Configuration
The n8n provider accepts the following configuration parameters:
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `n8n_webhook_url` | string | Yes | - | Webhook URL for the n8n workflow |
| `n8n_http_method` | string | No | "POST" | HTTP method (GET or POST) |
| `n8n_auth_header` | string | No | "" | Authentication header name |
| `n8n_auth_value` | string | No | "" | Authentication value |
| `n8n_output_key` | string | No | "output" | Key to extract workflow output |
| `n8n_input_key` | string | No | "input" | Key to send user input |
| `n8n_session_id_key` | string | No | "sessionId" | Key for session ID |
| `n8n_image_urls_key` | string | No | "imageUrls" | Key for image URLs |
| `n8n_streaming` | boolean | No | false | Enable streaming responses |
| `timeout` | integer | No | 120 | Request timeout in seconds |
| `variables` | object | No | {} | Additional variables to send |
### Example Configuration
```json
{
"type": "n8n",
"id": "my_n8n_workflow",
"enable": true,
"n8n_webhook_url": "https://your-n8n-instance.com/webhook/abc123",
"n8n_http_method": "POST",
"n8n_auth_header": "Authorization",
"n8n_auth_value": "Bearer your-token-here",
"n8n_output_key": "result",
"n8n_input_key": "query",
"timeout": 60,
"variables": {
"language": "zh-CN",
"model": "gpt-4"
}
}
```
## n8n Workflow Design
To use the n8n provider effectively, your n8n workflow should:
1. **Accept webhook input** with the following structure:
```json
{
"input": "user message text",
"sessionId": "unique-session-identifier",
"imageUrls": ["http://example.com/image.jpg"],
"system_prompt": "optional system prompt",
"custom_variable": "custom value"
}
```
2. **Return output** in one of these formats:
- Simple string: `"response text"`
- Object with output key: `{"output": "response text"}`
- Object with alternative keys: `{"data": "response text"}`, `{"result": "response text"}`, etc.
- Array of items (will be concatenated)
3. **For streaming responses** (if enabled):
- Send chunks as Server-Sent Events (SSE)
- Each chunk should contain `output`, `text`, or `data` field
## Response Parsing
The provider intelligently parses n8n workflow responses:
1. **String responses**: Directly converted to plain text
2. **Object responses**: Extracts the configured output key or falls back to common keys
3. **Array responses**: Processes each item, supporting media objects
4. **Media objects**: Supports images, videos, audio, and files with type detection
Example media object:
```json
{
"type": "image",
"url": "https://example.com/image.jpg"
}
```
## Testing
The implementation includes comprehensive test coverage (13 tests):
- Provider registration and initialization
- Configuration validation
- Method implementations (get_models, forget, terminate, etc.)
- Response parsing for various formats
- Error handling
Run tests with:
```bash
uv run pytest tests/test_n8n_provider.py -v
```
## Integration Points
### Dashboard Integration
- Added to provider type mapping in `dashboard/src/views/ProviderPage.vue`
- Icon support in `dashboard/src/utils/providerUtils.js`
### Provider Manager
- Registered in `astrbot/core/provider/manager.py` for dynamic loading
### Documentation
- Added to README.md (Chinese)
- Added to README_en.md (English)
## Use Cases
The n8n provider enables powerful integration scenarios:
1. **Custom AI Workflows**: Build complex multi-step AI workflows in n8n
2. **External API Integration**: Connect to third-party services through n8n
3. **Data Processing**: Process user input through custom data pipelines
4. **Conditional Logic**: Implement complex branching logic in n8n
5. **Multi-Model Orchestration**: Combine multiple AI models in a single workflow
## Limitations
- n8n workflows are stateless by design; session management must be handled in the workflow
- No built-in context history retrieval (implement in your n8n workflow if needed)
- File uploads are sent as URLs (not binary data)
## Security Considerations
- Use HTTPS for webhook URLs in production
- Implement authentication using custom headers
- Validate and sanitize all inputs in your n8n workflow
- Consider rate limiting at the n8n level
- No security vulnerabilities found by CodeQL analysis
## Future Enhancements
Potential improvements for future versions:
- Support for file binary uploads
- Context history integration
- Advanced error handling and retry logic
- Webhook signature validation
- Support for n8n Cloud and self-hosted instances with different authentication methods
## References
- [n8n Official Documentation](https://docs.n8n.io/)
- [n8n Webhook Node Documentation](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.webhook/)
- [AstrBot Provider Documentation](https://astrbot.app/)

205
tests/test_n8n_provider.py Normal file
View File

@@ -0,0 +1,205 @@
"""Test n8n provider implementation"""
import pytest
from astrbot.core.provider.sources.n8n_source import ProviderN8n
class TestN8nProvider:
"""Test suite for n8n provider"""
def test_provider_registration(self):
"""Test that n8n provider is properly registered"""
from astrbot.core.provider.register import provider_cls_map
assert "n8n" in provider_cls_map
assert provider_cls_map["n8n"].type == "n8n"
assert provider_cls_map["n8n"].cls_type == ProviderN8n
def test_provider_initialization_missing_url(self):
"""Test that provider raises error when webhook URL is missing"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
}
with pytest.raises(Exception, match="n8n Webhook URL 不能为空"):
ProviderN8n(config, {}, None)
def test_provider_initialization_invalid_method(self):
"""Test that provider raises error for invalid HTTP method"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
"n8n_http_method": "PUT",
}
with pytest.raises(Exception, match="n8n HTTP 方法必须是 GET 或 POST"):
ProviderN8n(config, {}, None)
def test_provider_initialization_success(self):
"""Test successful provider initialization"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
"n8n_http_method": "POST",
"n8n_auth_header": "Authorization",
"n8n_auth_value": "Bearer test_token",
"n8n_output_key": "result",
"n8n_input_key": "query",
"timeout": 60,
}
provider = ProviderN8n(config, {}, None)
assert provider.webhook_url == "https://example.com/webhook"
assert provider.http_method == "POST"
assert provider.auth_header == "Authorization"
assert provider.auth_value == "Bearer test_token"
assert provider.output_key == "result"
assert provider.input_key == "query"
assert provider.timeout == 60
assert provider.model_name == "n8n"
def test_provider_default_values(self):
"""Test provider initialization with default values"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
}
provider = ProviderN8n(config, {}, None)
assert provider.http_method == "POST"
assert provider.auth_header == ""
assert provider.auth_value == ""
assert provider.output_key == "output"
assert provider.input_key == "input"
assert provider.session_id_key == "sessionId"
assert provider.image_urls_key == "imageUrls"
assert provider.streaming is False
assert provider.timeout == 120
@pytest.mark.asyncio
async def test_get_models(self):
"""Test get_models method"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
}
provider = ProviderN8n(config, {}, None)
models = await provider.get_models()
assert models == ["n8n"]
@pytest.mark.asyncio
async def test_get_current_key(self):
"""Test get_current_key method"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
"n8n_auth_value": "test_auth_value",
}
provider = ProviderN8n(config, {}, None)
key = await provider.get_current_key()
assert key == "test_auth_value"
@pytest.mark.asyncio
async def test_set_key_raises_exception(self):
"""Test that set_key raises an exception"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
}
provider = ProviderN8n(config, {}, None)
with pytest.raises(Exception, match="n8n 适配器不支持设置 API Key"):
await provider.set_key("new_key")
@pytest.mark.asyncio
async def test_forget(self):
"""Test forget method"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
}
provider = ProviderN8n(config, {}, None)
result = await provider.forget("test_session")
assert result is True
@pytest.mark.asyncio
async def test_terminate(self):
"""Test terminate method"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
}
provider = ProviderN8n(config, {}, None)
# Should not raise an exception
await provider.terminate()
@pytest.mark.asyncio
async def test_parse_n8n_result_string(self):
"""Test parsing string result"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
}
provider = ProviderN8n(config, {}, None)
result = await provider.parse_n8n_result("Hello, world!")
assert len(result.chain) == 1
assert result.chain[0].type == "Plain"
assert result.chain[0].text == "Hello, world!"
@pytest.mark.asyncio
async def test_parse_n8n_result_dict_with_output(self):
"""Test parsing dict result with output key"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
"n8n_output_key": "result",
}
provider = ProviderN8n(config, {}, None)
result = await provider.parse_n8n_result({"result": "Test response"})
assert len(result.chain) == 1
assert result.chain[0].type == "Plain"
assert result.chain[0].text == "Test response"
@pytest.mark.asyncio
async def test_parse_n8n_result_dict_without_output_key(self):
"""Test parsing dict result without configured output key"""
config = {
"type": "n8n",
"id": "test_n8n",
"enable": True,
"n8n_webhook_url": "https://example.com/webhook",
"n8n_output_key": "custom_output",
}
provider = ProviderN8n(config, {}, None)
# Should fall back to common keys like 'data', 'result', etc.
result = await provider.parse_n8n_result({"data": "Fallback response"})
assert len(result.chain) == 1
assert result.chain[0].type == "Plain"
assert result.chain[0].text == "Fallback response"