♻️ refactor: improve agent execution architecture and shell environment handling

- Refactor AgentExecutionService process execution from Promise-based to async/await pattern
- Separate process spawning from event handler setup for better error handling
- Add dependency injection support for shell environment provider (better testability)
- Consolidate Cherry Studio bin path logic into shell-env utility
- Use typed IPC channels for consistent agent communication
- Improve error handling with proper async/await in agent execution flow
- Update test mocks to use new testable AgentExecutionService architecture
- Enhance cross-platform shell detection (zsh default for macOS, bash for Linux)
This commit is contained in:
Vaayne
2025-08-04 21:56:38 +08:00
parent e82aa2f061
commit dca0cf488b
5 changed files with 249 additions and 175 deletions
+1 -6
View File
@@ -815,12 +815,7 @@ class McpService {
private getLoginShellEnv = memoize(async (): Promise<Record<string, string>> => {
try {
const loginEnv = await getLoginShellEnvironment()
const pathSeparator = process.platform === 'win32' ? ';' : ':'
const cherryBinPath = path.join(os.homedir(), '.cherrystudio', 'bin')
loginEnv.PATH = `${loginEnv.PATH}${pathSeparator}${cherryBinPath}`
logger.debug('Successfully fetched login shell environment variables:')
return loginEnv
return await getLoginShellEnvironment()
} catch (error) {
logger.error('Failed to fetch login shell environment variables:', error as Error)
return {}
+168 -150
View File
@@ -1,5 +1,10 @@
import fs from 'node:fs'
import path from 'node:path'
import { BrowserWindow } from 'electron'
import { loggerService } from '@logger'
import { getDataPath, getResourcePath } from '@main/utils'
import { IpcChannel } from '@shared/IpcChannel'
import type {
AgentEntity,
CreateSessionLogInput,
@@ -10,10 +15,8 @@ import type {
SessionEntity
} from '@types'
import { ChildProcess, spawn } from 'child_process'
import { BrowserWindow } from 'electron'
import fs from 'fs'
import path from 'path'
import getLoginShellEnvironment from '../../utils/shell-env'
import AgentService from './AgentService'
const logger = loggerService.withContext('AgentExecutionService')
@@ -29,12 +32,14 @@ export class AgentExecutionService {
private agentService: AgentService
private readonly agentScriptPath: string
private runningProcesses: Map<string, ChildProcess> = new Map()
private getShellEnvironment: () => Promise<Record<string, string>>
private constructor() {
private constructor(getShellEnvironment?: () => Promise<Record<string, string>>) {
this.agentService = AgentService.getInstance()
// Agent.py path is relative to app root for security
// In development, use app root. In production, use app resources path
this.agentScriptPath = path.join(getResourcePath(), 'agents', 'claude_code_agent.py')
this.getShellEnvironment = getShellEnvironment || getLoginShellEnvironment
logger.info('initialized', { agentScriptPath: this.agentScriptPath })
}
@@ -45,6 +50,11 @@ export class AgentExecutionService {
return AgentExecutionService.instance
}
// For testing purposes - allows injection of shell environment provider
public static getTestInstance(getShellEnvironment: () => Promise<Record<string, string>>): AgentExecutionService {
return new AgentExecutionService(getShellEnvironment)
}
/**
* Validates that the agent.py script exists and is accessible
*/
@@ -207,11 +217,14 @@ export class AgentExecutionService {
hasExistingSession: !!existingClaudeSessionId
})
// Execute the command asynchronously (don't await completion, just startup)
this.executeAgentProcess(sessionId, executable, args, workingDirectory).catch(error => {
// Execute the command synchronously to spawn, then handle async parts
try {
await this.startAgentProcess(sessionId, executable, args, workingDirectory)
} catch (error) {
logger.error('Agent process execution failed:', error as Error, { sessionId })
this.agentService.updateSessionStatus(sessionId, 'failed').catch(() => {})
})
await this.agentService.updateSessionStatus(sessionId, 'failed')
return { success: false, error: error instanceof Error ? error.message : 'Unknown error during agent execution' }
}
return { success: true }
} catch (error) {
@@ -277,157 +290,162 @@ export class AgentExecutionService {
}
/**
* Execute the agent process and handle stdio streaming
* Start the agent process synchronously
*/
private async executeAgentProcess(
private async startAgentProcess(
sessionId: string,
executable: string,
args: string[],
workingDirectory: string
): Promise<void> {
return new Promise((resolve, reject) => {
try {
// Spawn the process
const process = spawn(executable, args, {
cwd: workingDirectory,
stdio: ['pipe', 'pipe', 'pipe'],
env: {
...globalThis.process.env,
// Add any necessary environment variables
PYTHONUNBUFFERED: '1' // Ensure Python output is not buffered
}
})
// Store the process for later management
this.runningProcesses.set(sessionId, process)
// Log execution start
const startContent: ExecutionStartContent = {
sessionId,
agentId: sessionId, // For now, using sessionId as agentId
command: `${executable} ${args.join(' ')}`,
workingDirectory
}
this.addSessionLog(sessionId, 'system', 'execution_start', startContent).catch((error) => {
logger.warn('Failed to log execution start:', error)
})
// Handle stdout
process.stdout?.on('data', (data: Buffer) => {
const output = data.toString()
logger.verbose('Agent stdout:', {
sessionId,
output: output.slice(0, 200) + (output.length > 200 ? '...' : '')
})
// Stream output to renderer processes via IPC
this.streamToRenderers('agent-output', {
sessionId,
type: 'stdout',
data: output,
timestamp: Date.now()
})
// Store in database
this.addSessionLog(sessionId, 'agent', 'output', { type: 'stdout', data: output }).catch((error) => {
logger.warn('Failed to log stdout:', error)
})
})
// Handle stderr
process.stderr?.on('data', (data: Buffer) => {
const output = data.toString()
logger.verbose('Agent stderr:', {
sessionId,
output: output.slice(0, 200) + (output.length > 200 ? '...' : '')
})
// Stream output to renderer processes via IPC
this.streamToRenderers('agent-output', {
sessionId,
type: 'stderr',
data: output,
timestamp: Date.now()
})
// Store in database
this.addSessionLog(sessionId, 'agent', 'output', { type: 'stderr', data: output }).catch((error) => {
logger.warn('Failed to log stderr:', error)
})
})
// Handle process exit
process.on('exit', async (code, signal) => {
this.runningProcesses.delete(sessionId)
const success = code === 0
const status = success ? 'completed' : 'failed'
logger.info('Agent process exited', { sessionId, code, signal, success })
// Log execution completion
const completeContent: ExecutionCompleteContent = {
sessionId,
success,
exitCode: code ?? undefined,
...(signal && { error: `Process terminated by signal: ${signal}` })
}
try {
await this.addSessionLog(sessionId, 'system', 'execution_complete', completeContent)
await this.agentService.updateSessionStatus(sessionId, status)
} catch (error) {
logger.error('Failed to log execution completion:', error as Error)
}
// Stream completion event
this.streamToRenderers('agent-complete', {
sessionId,
exitCode: code ?? -1,
success,
timestamp: Date.now()
})
resolve()
})
// Handle process errors
process.on('error', async (error) => {
this.runningProcesses.delete(sessionId)
logger.error('Agent process error:', error, { sessionId })
// Log execution error
const completeContent: ExecutionCompleteContent = {
sessionId,
success: false,
error: error.message
}
try {
await this.addSessionLog(sessionId, 'system', 'execution_complete', completeContent)
await this.agentService.updateSessionStatus(sessionId, 'failed')
} catch (logError) {
logger.error('Failed to log execution error:', logError as Error)
}
// Stream error event
this.streamToRenderers('agent-error', {
sessionId,
error: error.message,
timestamp: Date.now()
})
reject(error)
})
} catch (error) {
logger.error('Failed to spawn agent process:', error as Error, { sessionId })
reject(error)
const loginShellEnvironment = await this.getShellEnvironment()
// Spawn the process
const process = spawn(executable, args, {
cwd: workingDirectory,
stdio: ['pipe', 'pipe', 'pipe'],
env: {
...loginShellEnvironment,
PYTHONUNBUFFERED: '1'
}
})
// Store the process for later management
this.runningProcesses.set(sessionId, process)
// Set up async event handlers
this.setupProcessHandlers(sessionId, process)
}
/**
* Set up process event handlers (async)
*/
private setupProcessHandlers(sessionId: string, process: ChildProcess): void {
// Log execution start
const startContent: ExecutionStartContent = {
sessionId,
agentId: sessionId, // For now, using sessionId as agentId
command: `${process.spawnargs?.join(' ') || 'unknown'}`,
workingDirectory: process.spawnargs?.[0] || 'unknown'
}
this.addSessionLog(sessionId, 'system', IpcChannel.Agent_ExecutionOutput, startContent).catch((error) => {
logger.warn('Failed to log execution start:', error)
})
// Handle stdout
process.stdout?.on('data', (data: Buffer) => {
const output = data.toString()
logger.verbose('Agent stdout:', {
sessionId,
output: output.slice(0, 200) + (output.length > 200 ? '...' : '')
})
// Stream output to renderer processes via IPC
this.streamToRenderers(IpcChannel.Agent_ExecutionOutput, {
sessionId,
type: 'stdout',
data: output,
timestamp: Date.now()
})
// Store in database
this.addSessionLog(sessionId, 'agent', IpcChannel.Agent_ExecutionOutput, {
type: 'stdout',
data: output
}).catch((error) => {
logger.warn('Failed to log stdout:', error)
})
})
// Handle stderr
process.stderr?.on('data', (data: Buffer) => {
const output = data.toString()
logger.verbose('Agent stderr:', {
sessionId,
output: output.slice(0, 200) + (output.length > 200 ? '...' : '')
})
// Stream output to renderer processes via IPC
this.streamToRenderers(IpcChannel.Agent_ExecutionOutput, {
sessionId,
type: 'stderr',
data: output,
timestamp: Date.now()
})
// Store in database
this.addSessionLog(sessionId, 'agent', IpcChannel.Agent_ExecutionOutput, {
type: 'stderr',
data: output
}).catch((error) => {
logger.warn('Failed to log stderr:', error)
})
})
// Handle process exit
process.on('exit', async (code, signal) => {
this.runningProcesses.delete(sessionId)
const success = code === 0
const status = success ? 'completed' : 'failed'
logger.info('Agent process exited', { sessionId, code, signal, success })
// Log execution completion
const completeContent: ExecutionCompleteContent = {
sessionId,
success,
exitCode: code ?? undefined,
...(signal && { error: `Process terminated by signal: ${signal}` })
}
try {
await this.addSessionLog(sessionId, 'system', IpcChannel.Agent_ExecutionComplete, completeContent)
await this.agentService.updateSessionStatus(sessionId, status)
} catch (error) {
logger.error('Failed to log execution completion:', error as Error)
}
// Stream completion event
this.streamToRenderers(IpcChannel.Agent_ExecutionComplete, {
sessionId,
exitCode: code ?? -1,
success,
timestamp: Date.now()
})
})
// Handle process errors
process.on('error', async (error) => {
this.runningProcesses.delete(sessionId)
logger.error('Agent process error:', error, { sessionId })
// Log execution error
const completeContent: ExecutionCompleteContent = {
sessionId,
success: false,
error: error.message
}
try {
await this.addSessionLog(sessionId, 'system', IpcChannel.Agent_ExecutionComplete, completeContent)
await this.agentService.updateSessionStatus(sessionId, 'failed')
} catch (logError) {
logger.error('Failed to log execution error:', logError as Error)
}
// Stream error event
this.streamToRenderers(IpcChannel.Agent_ExecutionError, {
sessionId,
error: error.message,
timestamp: Date.now()
})
})
}
/**
* Add a session log entry
*/
@@ -480,8 +498,8 @@ export class AgentExecutionService {
*/
private streamToRenderers(channel: string, data: any): void {
try {
// Get all browser windows and send the data
const windows = BrowserWindow.getAllWindows()
windows.forEach((window) => {
if (!window.isDestroyed()) {
window.webContents.send(channel, data)
@@ -3,6 +3,12 @@ import { EventEmitter } from 'events'
import fs from 'fs'
import { beforeEach, describe, expect, it, vi } from 'vitest'
// Mock shell environment function
const mockGetLoginShellEnvironment = vi.fn(() => {
console.log('getLoginShellEnvironment mock called')
return Promise.resolve({ PATH: '/usr/bin:/bin', PYTHONUNBUFFERED: '1' })
})
import { AgentExecutionService } from '../AgentExecutionService'
// Mock child_process
@@ -27,6 +33,13 @@ vi.mock('fs', () => ({
}
}))
// Mock os
vi.mock('os', () => ({
default: {
homedir: vi.fn(() => '/test/home')
}
}))
// Mock electron
vi.mock('electron', () => ({
BrowserWindow: {
@@ -56,6 +69,7 @@ vi.mock('@logger', () => ({
}
}))
// Mock AgentService
const mockAgentService = {
getSessionById: vi.fn(),
@@ -110,12 +124,24 @@ describe('AgentExecutionService - Core Functionality', () => {
vi.mocked(fs.promises.stat).mockResolvedValue({ isFile: () => true } as any)
vi.mocked(fs.promises.mkdir).mockResolvedValue(undefined)
mockAgentService.getSessionById.mockResolvedValue({ success: true, data: mockSession })
mockAgentService.getAgentById.mockResolvedValue({ success: true, data: mockAgent })
mockAgentService.updateSessionStatus.mockResolvedValue({ success: true })
mockAgentService.addSessionLog.mockResolvedValue({ success: true })
mockAgentService.getSessionById.mockImplementation(() => {
console.log('getSessionById mock called')
return Promise.resolve({ success: true, data: mockSession })
})
mockAgentService.getAgentById.mockImplementation(() => {
console.log('getAgentById mock called')
return Promise.resolve({ success: true, data: mockAgent })
})
mockAgentService.updateSessionStatus.mockImplementation(() => {
console.log('updateSessionStatus mock called')
return Promise.resolve({ success: true })
})
mockAgentService.addSessionLog.mockImplementation(() => {
console.log('addSessionLog mock called')
return Promise.resolve({ success: true })
})
service = AgentExecutionService.getInstance()
service = AgentExecutionService.getTestInstance(mockGetLoginShellEnvironment)
})
describe('Basic Functionality', () => {
@@ -157,7 +183,7 @@ describe('AgentExecutionService - Core Functionality', () => {
const { spawn } = await import('child_process')
const result = await service.runAgent('session-1', 'Test prompt')
expect(result.success).toBe(true)
expect(spawn).toHaveBeenCalledWith('uv', expect.arrayContaining([
'run',
@@ -3,6 +3,11 @@ import { EventEmitter } from 'events'
import fs from 'fs'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
// Mock shell environment function
const mockGetLoginShellEnvironment = vi.fn(() => {
return Promise.resolve({ PATH: '/usr/bin:/bin', PYTHONUNBUFFERED: '1' })
})
import { AgentExecutionService } from '../AgentExecutionService'
// Mock child_process
@@ -33,6 +38,13 @@ vi.mock('fs', () => ({
}
}))
// Mock os
vi.mock('os', () => ({
default: {
homedir: vi.fn(() => '/test/home')
}
}))
// Create mock window
const mockWindow = {
isDestroyed: vi.fn(() => false),
@@ -41,7 +53,7 @@ const mockWindow = {
}
}
// Mock electron
// Mock electron for both import and require
vi.mock('electron', () => ({
BrowserWindow: {
getAllWindows: vi.fn(() => [mockWindow])
@@ -70,6 +82,7 @@ vi.mock('@logger', () => ({
}
}))
// Mock AgentService
const mockAgentService = {
getSessionById: vi.fn(),
@@ -94,9 +107,16 @@ describe('AgentExecutionService - Working Tests', () => {
// Reset mock process state
mockProcess.killed = false
// Remove listeners to prevent memory leaks in tests
mockProcess.removeAllListeners()
mockProcess.stdout.removeAllListeners()
mockProcess.stderr.removeAllListeners()
// Increase max listeners to prevent warnings
mockProcess.setMaxListeners(20)
mockProcess.stdout.setMaxListeners(20)
mockProcess.stderr.setMaxListeners(20)
// Create test data
mockAgent = {
@@ -135,7 +155,7 @@ describe('AgentExecutionService - Working Tests', () => {
mockAgentService.updateSessionStatus.mockResolvedValue({ success: true })
mockAgentService.addSessionLog.mockResolvedValue({ success: true })
service = AgentExecutionService.getInstance()
service = AgentExecutionService.getTestInstance(mockGetLoginShellEnvironment)
})
afterEach(() => {
@@ -285,7 +305,7 @@ describe('AgentExecutionService - Working Tests', () => {
it('should handle stdout data', () => {
mockProcess.stdout.emit('data', Buffer.from('Test stdout output'))
expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent-output', {
expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent:execution-output', {
sessionId: 'session-1',
type: 'stdout',
data: 'Test stdout output',
@@ -296,7 +316,7 @@ describe('AgentExecutionService - Working Tests', () => {
it('should handle stderr data', () => {
mockProcess.stderr.emit('data', Buffer.from('Test stderr output'))
expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent-output', {
expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent:execution-output', {
sessionId: 'session-1',
type: 'stderr',
data: 'Test stderr output',
@@ -311,7 +331,7 @@ describe('AgentExecutionService - Working Tests', () => {
await new Promise(resolve => setTimeout(resolve, 0))
expect(mockAgentService.updateSessionStatus).toHaveBeenCalledWith('session-1', 'completed')
expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent-complete', {
expect(mockWindow.webContents.send).toHaveBeenCalledWith('agent:execution-complete', {
sessionId: 'session-1',
exitCode: 0,
success: true,
@@ -399,8 +419,9 @@ describe('AgentExecutionService - Working Tests', () => {
const result = await service.runAgent('session-1', 'Test prompt')
// runAgent returns success immediately, errors are handled asynchronously
expect(result.success).toBe(true)
// When spawn throws, runAgent should return failure
expect(result.success).toBe(false)
expect(result.error).toBe('Spawn error')
})
})
})
+20 -6
View File
@@ -1,6 +1,8 @@
import { loggerService } from '@logger'
import { isMac, isWin } from '@main/constant'
import { spawn } from 'child_process'
import os from 'os'
import path from 'path'
const logger = loggerService.withContext('ShellEnv')
@@ -20,9 +22,7 @@ function getLoginShellEnvironment(): Promise<Record<string, string>> {
let commandArgs
let shellCommandToGetEnv
const platform = os.platform()
if (platform === 'win32') {
if (isWin) {
// On Windows, 'cmd.exe' is the common shell.
// The 'set' command lists environment variables.
// We don't typically talk about "login shells" in the same way,
@@ -34,11 +34,21 @@ function getLoginShellEnvironment(): Promise<Record<string, string>> {
// For POSIX systems (Linux, macOS)
if (!shellPath) {
// Fallback if process.env.SHELL is not set (less common for interactive users)
// Defaulting to bash, but this might not be the user's actual login shell.
// A more robust solution might involve checking /etc/passwd or similar,
// but that's more complex and often requires higher privileges or native modules.
logger.warn("process.env.SHELL is not set. Defaulting to /bin/bash. This might not be the user's login shell.")
shellPath = '/bin/bash' // A common default
if (isMac) {
// macOS defaults to zsh since Catalina (10.15)
logger.warn(
"process.env.SHELL is not set. Defaulting to /bin/zsh for macOS. This might not be the user's login shell."
)
shellPath = '/bin/zsh'
} else {
// Other POSIX systems (Linux) default to bash
logger.warn(
"process.env.SHELL is not set. Defaulting to /bin/bash. This might not be the user's login shell."
)
shellPath = '/bin/bash'
}
}
// -l: Make it a login shell. This sources profile files like .profile, .bash_profile, .zprofile etc.
// -i: Make it interactive. Some shells or profile scripts behave differently.
@@ -113,6 +123,10 @@ function getLoginShellEnvironment(): Promise<Record<string, string>> {
}
env.PATH = env.Path || env.PATH || ''
// set cherry studio bin path
const pathSeparator = isWin ? ';' : ':'
const cherryBinPath = path.join(os.homedir(), '.cherrystudio', 'bin')
env.PATH = `${env.PATH}${pathSeparator}${cherryBinPath}`
resolve(env)
})