feat: Implement Claude Code service with streaming support and tool integration

- Added `aisdk-stream-protocel.md` to document text and data stream protocols.
- Created `ClaudeCodeService` for invoking and streaming responses from the Claude Code CLI.
- Introduced built-in tools for Claude Code, including Bash, Edit, and WebFetch.
- Developed transformation functions to convert Claude Code messages to AI SDK format.
- Enhanced OCR utility with delayed loading of the Sharp module.
- Updated agent types and session message structures to accommodate new features.
- Modified API tests to reflect changes in session creation and message streaming.
- Upgraded `uuid` package to version 13.0.0 for improved UUID generation.
This commit is contained in:
Vaayne
2025-09-16 15:12:03 +08:00
parent a8e2df6bed
commit 58dbb514e0
24 changed files with 1717 additions and 700 deletions
@@ -24,7 +24,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => {
return session
}
export const createMessage = async (req: Request, res: Response): Promise<Response> => {
export const createMessageStream = async (req: Request, res: Response): Promise<void> => {
try {
const { agentId, sessionId } = req.params
@@ -32,286 +32,155 @@ export const createMessage = async (req: Request, res: Response): Promise<Respon
const messageData = { ...req.body, session_id: sessionId }
session.external_session_id
logger.info(`Creating streaming message for session: ${sessionId}`)
logger.debug('Streaming message data:', messageData)
logger.info(`Creating new message for session: ${sessionId}`)
logger.debug('Message data:', messageData)
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
const message = await sessionMessageService.createSessionMessage(messageData)
// Send initial connection event
res.write('data: {"type":"connected"}\n\n')
logger.info(`Message created successfully: ${message.id}`)
return res.status(201).json(message)
} catch (error: any) {
if (error.status) {
return res.status(error.status).json({
error: {
message: error.message,
type: 'not_found',
code: error.code
const messageStream = sessionMessageService.createSessionMessageStream(session, messageData)
// Track if the response has ended to prevent further writes
let responseEnded = false
// Handle client disconnect
req.on('close', () => {
logger.info(`Client disconnected from streaming message for session: ${sessionId}`)
responseEnded = true
messageStream.removeAllListeners()
})
// Handle stream events
messageStream.on('data', (event: any) => {
if (responseEnded) return
try {
switch (event.type) {
case 'chunk':
// Format UIMessageChunk as SSE event following AI SDK protocol
res.write(`data: ${JSON.stringify(event.chunk)}\n\n`)
break
case 'error': {
// Send error as AI SDK error chunk
const errorChunk = {
type: 'error',
errorText: event.error?.message || 'Stream processing error'
}
res.write(`data: ${JSON.stringify(errorChunk)}\n\n`)
logger.error(`Streaming message error for session: ${sessionId}:`, event.error)
responseEnded = true
res.write('data: [DONE]\n\n')
res.end()
break
}
case 'complete':
// Send completion marker following AI SDK protocol
logger.info(`Streaming message completed for session: ${sessionId}`)
responseEnded = true
res.write('data: [DONE]\n\n')
res.end()
break
default:
// Handle other event types as generic data
res.write(`data: ${JSON.stringify(event)}\n\n`)
break
}
} catch (writeError) {
logger.error('Error writing to SSE stream:', { error: writeError })
if (!responseEnded) {
responseEnded = true
res.end()
}
})
}
logger.error('Error creating message:', error)
return res.status(500).json({
error: {
message: 'Failed to create message',
type: 'internal_error',
code: 'message_creation_failed'
}
})
}
}
export const createBulkMessages = async (req: Request, res: Response): Promise<Response> => {
try {
const { agentId, sessionId } = req.params
// Handle stream errors
messageStream.on('error', (error: Error) => {
if (responseEnded) return
await verifyAgentAndSession(agentId, sessionId)
const messagesData = req.body.map((msg: any) => ({ ...msg, session_id: sessionId }))
logger.info(`Creating ${messagesData.length} messages for session: ${sessionId}`)
logger.debug('Messages data:', messagesData)
const messages = await sessionMessageService.bulkCreateSessionMessages(messagesData)
logger.info(`${messages.length} messages created successfully for session: ${sessionId}`)
return res.status(201).json(messages)
} catch (error: any) {
if (error.status) {
return res.status(error.status).json({
error: {
message: error.message,
type: 'not_found',
code: error.code
}
})
}
logger.error('Error creating bulk messages:', error)
return res.status(500).json({
error: {
message: 'Failed to create messages',
type: 'internal_error',
code: 'bulk_message_creation_failed'
logger.error(`Stream error for session: ${sessionId}:`, { error })
try {
res.write(
`data: ${JSON.stringify({
type: 'error',
error: {
message: error.message || 'Stream processing error',
type: 'stream_error',
code: 'stream_processing_failed'
}
})}\n\n`
)
} catch (writeError) {
logger.error('Error writing error to SSE stream:', { error: writeError })
}
responseEnded = true
res.end()
})
}
}
export const listMessages = async (req: Request, res: Response): Promise<Response> => {
try {
const { agentId, sessionId } = req.params
// Set a timeout to prevent hanging indefinitely
const timeout = setTimeout(
() => {
if (!responseEnded) {
logger.error(`Streaming message timeout for session: ${sessionId}`)
try {
res.write(
`data: ${JSON.stringify({
type: 'error',
error: {
message: 'Stream timeout',
type: 'timeout_error',
code: 'stream_timeout'
}
})}\n\n`
)
} catch (writeError) {
logger.error('Error writing timeout to SSE stream:', { error: writeError })
}
responseEnded = true
res.end()
}
},
5 * 60 * 1000
) // 5 minutes timeout
await verifyAgentAndSession(agentId, sessionId)
const limit = req.query.limit ? parseInt(req.query.limit as string) : 50
const offset = req.query.offset ? parseInt(req.query.offset as string) : 0
logger.info(`Listing messages for session: ${sessionId} with limit=${limit}, offset=${offset}`)
const result = await sessionMessageService.listSessionMessages(sessionId, { limit, offset })
logger.info(`Retrieved ${result.messages.length} messages (total: ${result.total}) for session: ${sessionId}`)
return res.json({
data: result.messages,
total: result.total,
limit,
offset
})
// Clear timeout when response ends
res.on('close', () => clearTimeout(timeout))
res.on('finish', () => clearTimeout(timeout))
} catch (error: any) {
if (error.status) {
return res.status(error.status).json({
error: {
message: error.message,
type: 'not_found',
code: error.code
}
})
logger.error('Error in streaming message handler:', error)
// Send error as SSE if possible
if (!res.headersSent) {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
}
logger.error('Error listing messages:', error)
return res.status(500).json({
error: {
message: 'Failed to list messages',
type: 'internal_error',
code: 'message_list_failed'
try {
const errorResponse = {
type: 'error',
error: {
message: error.status ? error.message : 'Failed to create streaming message',
type: error.status ? 'not_found' : 'internal_error',
code: error.status ? error.code : 'stream_creation_failed'
}
}
})
}
}
export const getMessage = async (req: Request, res: Response): Promise<Response> => {
try {
const { agentId, sessionId, messageId } = req.params
await verifyAgentAndSession(agentId, sessionId)
logger.info(`Getting message: ${messageId} for session: ${sessionId}`)
const message = await sessionMessageService.getSessionMessage(parseInt(messageId))
if (!message) {
logger.warn(`Message not found: ${messageId}`)
return res.status(404).json({
error: {
message: 'Message not found',
type: 'not_found',
code: 'message_not_found'
}
})
}
// Verify message belongs to the session
if (message.session_id !== sessionId) {
logger.warn(`Message ${messageId} does not belong to session ${sessionId}`)
return res.status(404).json({
error: {
message: 'Message not found for this session',
type: 'not_found',
code: 'message_not_found'
}
})
}
logger.info(`Message retrieved successfully: ${messageId}`)
return res.json(message)
} catch (error: any) {
if (error.status) {
return res.status(error.status).json({
error: {
message: error.message,
type: 'not_found',
code: error.code
}
})
}
logger.error('Error getting message:', error)
return res.status(500).json({
error: {
message: 'Failed to get message',
type: 'internal_error',
code: 'message_get_failed'
}
})
}
}
export const updateMessage = async (req: Request, res: Response): Promise<Response> => {
try {
const { agentId, sessionId, messageId } = req.params
await verifyAgentAndSession(agentId, sessionId)
logger.info(`Updating message: ${messageId} for session: ${sessionId}`)
logger.debug('Update data:', req.body)
// First check if message exists and belongs to session
const existingMessage = await sessionMessageService.getSessionMessage(parseInt(messageId))
if (!existingMessage || existingMessage.session_id !== sessionId) {
logger.warn(`Message ${messageId} not found for session ${sessionId}`)
return res.status(404).json({
error: {
message: 'Message not found for this session',
type: 'not_found',
code: 'message_not_found'
}
})
}
const message = await sessionMessageService.updateSessionMessage(parseInt(messageId), req.body)
if (!message) {
logger.warn(`Message not found for update: ${messageId}`)
return res.status(404).json({
error: {
message: 'Message not found',
type: 'not_found',
code: 'message_not_found'
}
})
}
logger.info(`Message updated successfully: ${messageId}`)
return res.json(message)
} catch (error: any) {
if (error.status) {
return res.status(error.status).json({
error: {
message: error.message,
type: 'not_found',
code: error.code
}
})
}
logger.error('Error updating message:', error)
return res.status(500).json({
error: {
message: 'Failed to update message',
type: 'internal_error',
code: 'message_update_failed'
}
})
}
}
export const deleteMessage = async (req: Request, res: Response): Promise<Response> => {
try {
const { agentId, sessionId, messageId } = req.params
await verifyAgentAndSession(agentId, sessionId)
logger.info(`Deleting message: ${messageId} for session: ${sessionId}`)
// First check if message exists and belongs to session
const existingMessage = await sessionMessageService.getSessionMessage(parseInt(messageId))
if (!existingMessage || existingMessage.session_id !== sessionId) {
logger.warn(`Message ${messageId} not found for session ${sessionId}`)
return res.status(404).json({
error: {
message: 'Message not found for this session',
type: 'not_found',
code: 'message_not_found'
}
})
}
const deleted = await sessionMessageService.deleteSessionMessage(parseInt(messageId))
if (!deleted) {
logger.warn(`Message not found for deletion: ${messageId}`)
return res.status(404).json({
error: {
message: 'Message not found',
type: 'not_found',
code: 'message_not_found'
}
})
}
logger.info(`Message deleted successfully: ${messageId}`)
return res.status(204).send()
} catch (error: any) {
if (error.status) {
return res.status(error.status).json({
error: {
message: error.message,
type: 'not_found',
code: error.code
}
})
}
logger.error('Error deleting message:', error)
return res.status(500).json({
error: {
message: 'Failed to delete message',
type: 'internal_error',
code: 'message_delete_failed'
}
})
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`)
} catch (writeError) {
logger.error('Error writing initial error to SSE stream:', { error: writeError })
}
res.end()
}
}
+1 -16
View File
@@ -6,13 +6,10 @@ import {
validateAgent,
validateAgentId,
validateAgentUpdate,
validateBulkSessionMessages,
validateMessageId,
validatePagination,
validateSession,
validateSessionId,
validateSessionMessage,
validateSessionMessageUpdate,
validateSessionUpdate
} from './validators'
@@ -191,19 +188,7 @@ const createMessagesRouter = (): express.Router => {
const messagesRouter = express.Router({ mergeParams: true })
// Message CRUD routes (nested under agent/session)
messagesRouter.post('/', validateSessionMessage, handleValidationErrors, messageHandlers.createMessage)
messagesRouter.post('/bulk', validateBulkSessionMessages, handleValidationErrors, messageHandlers.createBulkMessages)
messagesRouter.get('/', validatePagination, handleValidationErrors, messageHandlers.listMessages)
messagesRouter.get('/:messageId', validateMessageId, handleValidationErrors, messageHandlers.getMessage)
messagesRouter.put(
'/:messageId',
validateMessageId,
validateSessionMessageUpdate,
handleValidationErrors,
messageHandlers.updateMessage
)
messagesRouter.delete('/:messageId', validateMessageId, handleValidationErrors, messageHandlers.deleteMessage)
messagesRouter.post('/', validateSessionMessage, handleValidationErrors, messageHandlers.createMessageStream)
return messagesRouter
}
@@ -1,24 +1,6 @@
import { body, param } from 'express-validator'
import { body } from 'express-validator'
export const validateSessionMessage = [
body('role').notEmpty().isIn(['user', 'agent', 'system', 'tool']).withMessage('Valid role is required'),
body('type').notEmpty().isString().withMessage('Type is required'),
body('content').notEmpty().isObject().withMessage('Content must be a valid object')
]
export const validateSessionMessageUpdate = [
body('content').optional().isObject().withMessage('Content must be a valid object')
]
export const validateBulkSessionMessages = [
body().isArray().withMessage('Request body must be an array'),
body('*.parent_id').optional().isInt({ min: 1 }).withMessage('Parent ID must be a positive integer'),
body('*.role').notEmpty().isIn(['user', 'agent', 'system', 'tool']).withMessage('Valid role is required'),
body('*.type').notEmpty().isString().withMessage('Type is required'),
body('*.content').notEmpty().isObject().withMessage('Content must be a valid object'),
body('*.metadata').optional().isObject().withMessage('Metadata must be a valid object')
]
export const validateMessageId = [
param('messageId').isInt({ min: 1 }).withMessage('Message ID must be a positive integer')
body('content').notEmpty().isString().withMessage('Content must be a valid string')
]