From dca0cf488bf57d72af7b5d0c0d9204276b2a9679 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Mon, 4 Aug 2025 21:56:38 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20improve=20agen?= =?UTF-8?q?t=20execution=20architecture=20and=20shell=20environment=20hand?= =?UTF-8?q?ling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Refactor AgentExecutionService process execution from Promise-based to async/await pattern - Separate process spawning from event handler setup for better error handling - Add dependency injection support for shell environment provider (better testability) - Consolidate Cherry Studio bin path logic into shell-env utility - Use typed IPC channels for consistent agent communication - Improve error handling with proper async/await in agent execution flow - Update test mocks to use new testable AgentExecutionService architecture - Enhance cross-platform shell detection (zsh default for macOS, bash for Linux) --- src/main/services/MCPService.ts | 7 +- .../services/agent/AgentExecutionService.ts | 318 +++++++++--------- .../AgentExecutionService.simple.test.ts | 38 ++- .../AgentExecutionService.working.test.ts | 35 +- src/main/utils/shell-env.ts | 26 +- 5 files changed, 249 insertions(+), 175 deletions(-) diff --git a/src/main/services/MCPService.ts b/src/main/services/MCPService.ts index 0f44dc301..997c14c57 100644 --- a/src/main/services/MCPService.ts +++ b/src/main/services/MCPService.ts @@ -815,12 +815,7 @@ class McpService { private getLoginShellEnv = memoize(async (): Promise> => { try { - const loginEnv = await getLoginShellEnvironment() - const pathSeparator = process.platform === 'win32' ? ';' : ':' - const cherryBinPath = path.join(os.homedir(), '.cherrystudio', 'bin') - loginEnv.PATH = `${loginEnv.PATH}${pathSeparator}${cherryBinPath}` - logger.debug('Successfully fetched login shell environment variables:') - return loginEnv + return await getLoginShellEnvironment() } catch (error) { logger.error('Failed to fetch login shell environment variables:', error as Error) return {} diff --git a/src/main/services/agent/AgentExecutionService.ts b/src/main/services/agent/AgentExecutionService.ts index e8ee6ee02..808e7bb34 100644 --- a/src/main/services/agent/AgentExecutionService.ts +++ b/src/main/services/agent/AgentExecutionService.ts @@ -1,5 +1,10 @@ +import fs from 'node:fs' +import path from 'node:path' + +import { BrowserWindow } from 'electron' import { loggerService } from '@logger' import { getDataPath, getResourcePath } from '@main/utils' +import { IpcChannel } from '@shared/IpcChannel' import type { AgentEntity, CreateSessionLogInput, @@ -10,10 +15,8 @@ import type { SessionEntity } from '@types' import { ChildProcess, spawn } from 'child_process' -import { BrowserWindow } from 'electron' -import fs from 'fs' -import path from 'path' +import getLoginShellEnvironment from '../../utils/shell-env' import AgentService from './AgentService' const logger = loggerService.withContext('AgentExecutionService') @@ -29,12 +32,14 @@ export class AgentExecutionService { private agentService: AgentService private readonly agentScriptPath: string private runningProcesses: Map = new Map() + private getShellEnvironment: () => Promise> - private constructor() { + private constructor(getShellEnvironment?: () => Promise>) { this.agentService = AgentService.getInstance() // Agent.py path is relative to app root for security // In development, use app root. In production, use app resources path this.agentScriptPath = path.join(getResourcePath(), 'agents', 'claude_code_agent.py') + this.getShellEnvironment = getShellEnvironment || getLoginShellEnvironment logger.info('initialized', { agentScriptPath: this.agentScriptPath }) } @@ -45,6 +50,11 @@ export class AgentExecutionService { return AgentExecutionService.instance } + // For testing purposes - allows injection of shell environment provider + public static getTestInstance(getShellEnvironment: () => Promise>): AgentExecutionService { + return new AgentExecutionService(getShellEnvironment) + } + /** * Validates that the agent.py script exists and is accessible */ @@ -207,11 +217,14 @@ export class AgentExecutionService { hasExistingSession: !!existingClaudeSessionId }) - // Execute the command asynchronously (don't await completion, just startup) - this.executeAgentProcess(sessionId, executable, args, workingDirectory).catch(error => { + // Execute the command synchronously to spawn, then handle async parts + try { + await this.startAgentProcess(sessionId, executable, args, workingDirectory) + } catch (error) { logger.error('Agent process execution failed:', error as Error, { sessionId }) - this.agentService.updateSessionStatus(sessionId, 'failed').catch(() => {}) - }) + await this.agentService.updateSessionStatus(sessionId, 'failed') + return { success: false, error: error instanceof Error ? error.message : 'Unknown error during agent execution' } + } return { success: true } } catch (error) { @@ -277,157 +290,162 @@ export class AgentExecutionService { } /** - * Execute the agent process and handle stdio streaming + * Start the agent process synchronously */ - private async executeAgentProcess( + private async startAgentProcess( sessionId: string, executable: string, args: string[], workingDirectory: string ): Promise { - return new Promise((resolve, reject) => { - try { - // Spawn the process - const process = spawn(executable, args, { - cwd: workingDirectory, - stdio: ['pipe', 'pipe', 'pipe'], - env: { - ...globalThis.process.env, - // Add any necessary environment variables - PYTHONUNBUFFERED: '1' // Ensure Python output is not buffered - } - }) - - // Store the process for later management - this.runningProcesses.set(sessionId, process) - - // Log execution start - const startContent: ExecutionStartContent = { - sessionId, - agentId: sessionId, // For now, using sessionId as agentId - command: `${executable} ${args.join(' ')}`, - workingDirectory - } - - this.addSessionLog(sessionId, 'system', 'execution_start', startContent).catch((error) => { - logger.warn('Failed to log execution start:', error) - }) - - // Handle stdout - process.stdout?.on('data', (data: Buffer) => { - const output = data.toString() - logger.verbose('Agent stdout:', { - sessionId, - output: output.slice(0, 200) + (output.length > 200 ? '...' : '') - }) - - // Stream output to renderer processes via IPC - this.streamToRenderers('agent-output', { - sessionId, - type: 'stdout', - data: output, - timestamp: Date.now() - }) - - // Store in database - this.addSessionLog(sessionId, 'agent', 'output', { type: 'stdout', data: output }).catch((error) => { - logger.warn('Failed to log stdout:', error) - }) - }) - - // Handle stderr - process.stderr?.on('data', (data: Buffer) => { - const output = data.toString() - logger.verbose('Agent stderr:', { - sessionId, - output: output.slice(0, 200) + (output.length > 200 ? '...' : '') - }) - - // Stream output to renderer processes via IPC - this.streamToRenderers('agent-output', { - sessionId, - type: 'stderr', - data: output, - timestamp: Date.now() - }) - - // Store in database - this.addSessionLog(sessionId, 'agent', 'output', { type: 'stderr', data: output }).catch((error) => { - logger.warn('Failed to log stderr:', error) - }) - }) - - // Handle process exit - process.on('exit', async (code, signal) => { - this.runningProcesses.delete(sessionId) - - const success = code === 0 - const status = success ? 'completed' : 'failed' - - logger.info('Agent process exited', { sessionId, code, signal, success }) - - // Log execution completion - const completeContent: ExecutionCompleteContent = { - sessionId, - success, - exitCode: code ?? undefined, - ...(signal && { error: `Process terminated by signal: ${signal}` }) - } - - try { - await this.addSessionLog(sessionId, 'system', 'execution_complete', completeContent) - await this.agentService.updateSessionStatus(sessionId, status) - } catch (error) { - logger.error('Failed to log execution completion:', error as Error) - } - - // Stream completion event - this.streamToRenderers('agent-complete', { - sessionId, - exitCode: code ?? -1, - success, - timestamp: Date.now() - }) - - resolve() - }) - - // Handle process errors - process.on('error', async (error) => { - this.runningProcesses.delete(sessionId) - - logger.error('Agent process error:', error, { sessionId }) - - // Log execution error - const completeContent: ExecutionCompleteContent = { - sessionId, - success: false, - error: error.message - } - - try { - await this.addSessionLog(sessionId, 'system', 'execution_complete', completeContent) - await this.agentService.updateSessionStatus(sessionId, 'failed') - } catch (logError) { - logger.error('Failed to log execution error:', logError as Error) - } - - // Stream error event - this.streamToRenderers('agent-error', { - sessionId, - error: error.message, - timestamp: Date.now() - }) - - reject(error) - }) - } catch (error) { - logger.error('Failed to spawn agent process:', error as Error, { sessionId }) - reject(error) + const loginShellEnvironment = await this.getShellEnvironment() + + // Spawn the process + const process = spawn(executable, args, { + cwd: workingDirectory, + stdio: ['pipe', 'pipe', 'pipe'], + env: { + ...loginShellEnvironment, + PYTHONUNBUFFERED: '1' } }) + + // Store the process for later management + this.runningProcesses.set(sessionId, process) + + // Set up async event handlers + this.setupProcessHandlers(sessionId, process) } + /** + * Set up process event handlers (async) + */ + private setupProcessHandlers(sessionId: string, process: ChildProcess): void { + // Log execution start + const startContent: ExecutionStartContent = { + sessionId, + agentId: sessionId, // For now, using sessionId as agentId + command: `${process.spawnargs?.join(' ') || 'unknown'}`, + workingDirectory: process.spawnargs?.[0] || 'unknown' + } + + this.addSessionLog(sessionId, 'system', IpcChannel.Agent_ExecutionOutput, startContent).catch((error) => { + logger.warn('Failed to log execution start:', error) + }) + + // Handle stdout + process.stdout?.on('data', (data: Buffer) => { + const output = data.toString() + logger.verbose('Agent stdout:', { + sessionId, + output: output.slice(0, 200) + (output.length > 200 ? '...' : '') + }) + + // Stream output to renderer processes via IPC + this.streamToRenderers(IpcChannel.Agent_ExecutionOutput, { + sessionId, + type: 'stdout', + data: output, + timestamp: Date.now() + }) + + // Store in database + this.addSessionLog(sessionId, 'agent', IpcChannel.Agent_ExecutionOutput, { + type: 'stdout', + data: output + }).catch((error) => { + logger.warn('Failed to log stdout:', error) + }) + }) + + // Handle stderr + process.stderr?.on('data', (data: Buffer) => { + const output = data.toString() + logger.verbose('Agent stderr:', { + sessionId, + output: output.slice(0, 200) + (output.length > 200 ? '...' : '') + }) + + // Stream output to renderer processes via IPC + this.streamToRenderers(IpcChannel.Agent_ExecutionOutput, { + sessionId, + type: 'stderr', + data: output, + timestamp: Date.now() + }) + + // Store in database + this.addSessionLog(sessionId, 'agent', IpcChannel.Agent_ExecutionOutput, { + type: 'stderr', + data: output + }).catch((error) => { + logger.warn('Failed to log stderr:', error) + }) + }) + + // Handle process exit + process.on('exit', async (code, signal) => { + this.runningProcesses.delete(sessionId) + + const success = code === 0 + const status = success ? 'completed' : 'failed' + + logger.info('Agent process exited', { sessionId, code, signal, success }) + + // Log execution completion + const completeContent: ExecutionCompleteContent = { + sessionId, + success, + exitCode: code ?? undefined, + ...(signal && { error: `Process terminated by signal: ${signal}` }) + } + + try { + await this.addSessionLog(sessionId, 'system', IpcChannel.Agent_ExecutionComplete, completeContent) + await this.agentService.updateSessionStatus(sessionId, status) + } catch (error) { + logger.error('Failed to log execution completion:', error as Error) + } + + // Stream completion event + this.streamToRenderers(IpcChannel.Agent_ExecutionComplete, { + sessionId, + exitCode: code ?? -1, + success, + timestamp: Date.now() + }) + }) + + // Handle process errors + process.on('error', async (error) => { + this.runningProcesses.delete(sessionId) + + logger.error('Agent process error:', error, { sessionId }) + + // Log execution error + const completeContent: ExecutionCompleteContent = { + sessionId, + success: false, + error: error.message + } + + try { + await this.addSessionLog(sessionId, 'system', IpcChannel.Agent_ExecutionComplete, completeContent) + await this.agentService.updateSessionStatus(sessionId, 'failed') + } catch (logError) { + logger.error('Failed to log execution error:', logError as Error) + } + + // Stream error event + this.streamToRenderers(IpcChannel.Agent_ExecutionError, { + sessionId, + error: error.message, + timestamp: Date.now() + }) + }) + } + + /** * Add a session log entry */ @@ -480,8 +498,8 @@ export class AgentExecutionService { */ private streamToRenderers(channel: string, data: any): void { try { - // Get all browser windows and send the data const windows = BrowserWindow.getAllWindows() + windows.forEach((window) => { if (!window.isDestroyed()) { window.webContents.send(channel, data) diff --git a/src/main/services/agent/__tests__/AgentExecutionService.simple.test.ts b/src/main/services/agent/__tests__/AgentExecutionService.simple.test.ts index a1bba9622..6c0fd50a6 100644 --- a/src/main/services/agent/__tests__/AgentExecutionService.simple.test.ts +++ b/src/main/services/agent/__tests__/AgentExecutionService.simple.test.ts @@ -3,6 +3,12 @@ import { EventEmitter } from 'events' import fs from 'fs' import { beforeEach, describe, expect, it, vi } from 'vitest' +// Mock shell environment function +const mockGetLoginShellEnvironment = vi.fn(() => { + console.log('getLoginShellEnvironment mock called') + return Promise.resolve({ PATH: '/usr/bin:/bin', PYTHONUNBUFFERED: '1' }) +}) + import { AgentExecutionService } from '../AgentExecutionService' // Mock child_process @@ -27,6 +33,13 @@ vi.mock('fs', () => ({ } })) +// Mock os +vi.mock('os', () => ({ + default: { + homedir: vi.fn(() => '/test/home') + } +})) + // Mock electron vi.mock('electron', () => ({ BrowserWindow: { @@ -56,6 +69,7 @@ vi.mock('@logger', () => ({ } })) + // Mock AgentService const mockAgentService = { getSessionById: vi.fn(), @@ -110,12 +124,24 @@ describe('AgentExecutionService - Core Functionality', () => { vi.mocked(fs.promises.stat).mockResolvedValue({ isFile: () => true } as any) vi.mocked(fs.promises.mkdir).mockResolvedValue(undefined) - mockAgentService.getSessionById.mockResolvedValue({ success: true, data: mockSession }) - mockAgentService.getAgentById.mockResolvedValue({ success: true, data: mockAgent }) - mockAgentService.updateSessionStatus.mockResolvedValue({ success: true }) - mockAgentService.addSessionLog.mockResolvedValue({ success: true }) + mockAgentService.getSessionById.mockImplementation(() => { + console.log('getSessionById mock called') + return Promise.resolve({ success: true, data: mockSession }) + }) + mockAgentService.getAgentById.mockImplementation(() => { + console.log('getAgentById mock called') + return Promise.resolve({ success: true, data: mockAgent }) + }) + mockAgentService.updateSessionStatus.mockImplementation(() => { + console.log('updateSessionStatus mock called') + return Promise.resolve({ success: true }) + }) + mockAgentService.addSessionLog.mockImplementation(() => { + console.log('addSessionLog mock called') + return Promise.resolve({ success: true }) + }) - service = AgentExecutionService.getInstance() + service = AgentExecutionService.getTestInstance(mockGetLoginShellEnvironment) }) describe('Basic Functionality', () => { @@ -157,7 +183,7 @@ describe('AgentExecutionService - Core Functionality', () => { const { spawn } = await import('child_process') const result = await service.runAgent('session-1', 'Test prompt') - + expect(result.success).toBe(true) expect(spawn).toHaveBeenCalledWith('uv', expect.arrayContaining([ 'run', diff --git a/src/main/services/agent/__tests__/AgentExecutionService.working.test.ts b/src/main/services/agent/__tests__/AgentExecutionService.working.test.ts index d6db6faa7..f0c28587f 100644 --- a/src/main/services/agent/__tests__/AgentExecutionService.working.test.ts +++ b/src/main/services/agent/__tests__/AgentExecutionService.working.test.ts @@ -3,6 +3,11 @@ import { EventEmitter } from 'events' import fs from 'fs' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +// Mock shell environment function +const mockGetLoginShellEnvironment = vi.fn(() => { + return Promise.resolve({ PATH: '/usr/bin:/bin', PYTHONUNBUFFERED: '1' }) +}) + import { AgentExecutionService } from '../AgentExecutionService' // Mock child_process @@ -33,6 +38,13 @@ vi.mock('fs', () => ({ } })) +// Mock os +vi.mock('os', () => ({ + default: { + homedir: vi.fn(() => '/test/home') + } +})) + // Create mock window const mockWindow = { isDestroyed: vi.fn(() => false), @@ -41,7 +53,7 @@ const mockWindow = { } } -// Mock electron +// Mock electron for both import and require vi.mock('electron', () => ({ BrowserWindow: { getAllWindows: vi.fn(() => [mockWindow]) @@ -70,6 +82,7 @@ vi.mock('@logger', () => ({ } })) + // Mock AgentService const mockAgentService = { getSessionById: vi.fn(), @@ -94,9 +107,16 @@ describe('AgentExecutionService - Working Tests', () => { // Reset mock process state mockProcess.killed = false + // Remove listeners to prevent memory leaks in tests mockProcess.removeAllListeners() mockProcess.stdout.removeAllListeners() mockProcess.stderr.removeAllListeners() + + // Increase max listeners to prevent warnings + mockProcess.setMaxListeners(20) + mockProcess.stdout.setMaxListeners(20) + mockProcess.stderr.setMaxListeners(20) + // Create test data mockAgent = { @@ -135,7 +155,7 @@ describe('AgentExecutionService - Working Tests', () => { mockAgentService.updateSessionStatus.mockResolvedValue({ success: true }) mockAgentService.addSessionLog.mockResolvedValue({ success: true }) - service = AgentExecutionService.getInstance() + service = AgentExecutionService.getTestInstance(mockGetLoginShellEnvironment) }) afterEach(() => { @@ -285,7 +305,7 @@ describe('AgentExecutionService - Working Tests', () => { it('should handle stdout data', () => { mockProcess.stdout.emit('data', Buffer.from('Test stdout output')) - expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent-output', { + expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent:execution-output', { sessionId: 'session-1', type: 'stdout', data: 'Test stdout output', @@ -296,7 +316,7 @@ describe('AgentExecutionService - Working Tests', () => { it('should handle stderr data', () => { mockProcess.stderr.emit('data', Buffer.from('Test stderr output')) - expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent-output', { + expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent:execution-output', { sessionId: 'session-1', type: 'stderr', data: 'Test stderr output', @@ -311,7 +331,7 @@ describe('AgentExecutionService - Working Tests', () => { await new Promise(resolve => setTimeout(resolve, 0)) expect(mockAgentService.updateSessionStatus).toHaveBeenCalledWith('session-1', 'completed') - expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent-complete', { + expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent:execution-complete', { sessionId: 'session-1', exitCode: 0, success: true, @@ -399,8 +419,9 @@ describe('AgentExecutionService - Working Tests', () => { const result = await service.runAgent('session-1', 'Test prompt') - // runAgent returns success immediately, errors are handled asynchronously - expect(result.success).toBe(true) + // When spawn throws, runAgent should return failure + expect(result.success).toBe(false) + expect(result.error).toBe('Spawn error') }) }) }) \ No newline at end of file diff --git a/src/main/utils/shell-env.ts b/src/main/utils/shell-env.ts index 831cb76b6..17cdf3398 100644 --- a/src/main/utils/shell-env.ts +++ b/src/main/utils/shell-env.ts @@ -1,6 +1,8 @@ import { loggerService } from '@logger' +import { isMac, isWin } from '@main/constant' import { spawn } from 'child_process' import os from 'os' +import path from 'path' const logger = loggerService.withContext('ShellEnv') @@ -20,9 +22,7 @@ function getLoginShellEnvironment(): Promise> { let commandArgs let shellCommandToGetEnv - const platform = os.platform() - - if (platform === 'win32') { + if (isWin) { // On Windows, 'cmd.exe' is the common shell. // The 'set' command lists environment variables. // We don't typically talk about "login shells" in the same way, @@ -34,11 +34,21 @@ function getLoginShellEnvironment(): Promise> { // For POSIX systems (Linux, macOS) if (!shellPath) { // Fallback if process.env.SHELL is not set (less common for interactive users) - // Defaulting to bash, but this might not be the user's actual login shell. // A more robust solution might involve checking /etc/passwd or similar, // but that's more complex and often requires higher privileges or native modules. - logger.warn("process.env.SHELL is not set. Defaulting to /bin/bash. This might not be the user's login shell.") - shellPath = '/bin/bash' // A common default + if (isMac) { + // macOS defaults to zsh since Catalina (10.15) + logger.warn( + "process.env.SHELL is not set. Defaulting to /bin/zsh for macOS. This might not be the user's login shell." + ) + shellPath = '/bin/zsh' + } else { + // Other POSIX systems (Linux) default to bash + logger.warn( + "process.env.SHELL is not set. Defaulting to /bin/bash. This might not be the user's login shell." + ) + shellPath = '/bin/bash' + } } // -l: Make it a login shell. This sources profile files like .profile, .bash_profile, .zprofile etc. // -i: Make it interactive. Some shells or profile scripts behave differently. @@ -113,6 +123,10 @@ function getLoginShellEnvironment(): Promise> { } env.PATH = env.Path || env.PATH || '' + // set cherry studio bin path + const pathSeparator = isWin ? ';' : ':' + const cherryBinPath = path.join(os.homedir(), '.cherrystudio', 'bin') + env.PATH = `${env.PATH}${pathSeparator}${cherryBinPath}` resolve(env) })