From 86e056389d9876d1c78cb0720fb6f132bb38a529 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Tue, 6 Jan 2026 18:18:06 +0700 Subject: [PATCH] Implement streaming chat functionality in LlmController - Added a new ChatStream endpoint to handle real-time chat interactions with LLMs, providing streaming progress updates. - Introduced LlmProgressUpdate class to encapsulate various types of progress updates during chat processing, including iteration starts, tool calls, and final responses. - Enhanced error handling and user authentication checks within the streaming process to ensure robust interaction. - Refactored tool execution logic to safely handle tool calls and provide detailed feedback on execution status and results. --- src/Managing.Api/Controllers/LlmController.cs | 324 ++++++++++++++++++ .../Services/ILlmService.cs | 56 +++ .../LLM/Providers/GeminiProvider.cs | 2 +- .../src/generated/ManagingApiTypes.ts | 14 + .../src/generated/ManagingApi.ts | 53 +++ .../src/generated/ManagingApiTypes.ts | 14 + 6 files changed, 462 insertions(+), 1 deletion(-) diff --git a/src/Managing.Api/Controllers/LlmController.cs b/src/Managing.Api/Controllers/LlmController.cs index f6f483af..576eefc0 100644 --- a/src/Managing.Api/Controllers/LlmController.cs +++ b/src/Managing.Api/Controllers/LlmController.cs @@ -36,6 +36,305 @@ public class LlmController : BaseController _cache = cache; } + /// + /// Sends a chat message to an LLM with streaming progress updates (Server-Sent Events). + /// Provides real-time updates about iterations, tool calls, and progress similar to Cursor/Claude. + /// + /// The chat request with messages and optional provider/API key + /// Stream of progress updates ending with final response + [HttpPost] + [Route("ChatStream")] + [Produces("text/event-stream")] + public async IAsyncEnumerable ChatStream([FromBody] LlmChatRequest request) + { + if (request == null) + { + yield return new LlmProgressUpdate + { + Type = "error", + Message = "Chat request is required", + Error = "Chat request is required" + }; + yield break; + } + + if (request.Messages == null || !request.Messages.Any()) + { + yield return new LlmProgressUpdate + { + Type = "error", + Message = "At least one message is required", + Error = "At least one message is required" + }; + yield break; + } + + User? user = null; + try + { + user = await GetUser(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error getting user for streaming chat request"); + } + + if (user == null) + { + yield return new LlmProgressUpdate + { + Type = "error", + Message = "Error authenticating user", + Error = "Unable to authenticate user" + }; + yield break; + } + + await foreach (var update in ChatStreamInternal(request, user)) + { + yield return update; + } + } + + private async IAsyncEnumerable ChatStreamInternal(LlmChatRequest request, User user) + { + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = "Initializing conversation and loading available tools..." + }; + + // Get available MCP tools (with caching for 5 minutes) + var availableTools = await _cache.GetOrCreateAsync("mcp_tools", async entry => + { + entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(5); + return (await _mcpService.GetAvailableToolsAsync()).ToList(); + }); + request.Tools = availableTools; + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = $"Loaded {availableTools.Count} available tools. Preparing system context..." + }; + + // Add or prepend system message to ensure LLM knows it can respond directly + var existingSystemMessages = request.Messages.Where(m => m.Role == "system").ToList(); + foreach (var msg in existingSystemMessages) + { + request.Messages.Remove(msg); + } + + // Add explicit system message with domain expertise and tool guidance + var systemMessage = new LlmMessage + { + Role = "system", + Content = BuildSystemMessage() + }; + request.Messages.Insert(0, systemMessage); + + // Proactively inject backtest details fetching if user is asking for analysis + await InjectBacktestDetailsFetchingIfNeeded(request, user); + + // Add helpful context extraction message if backtest ID was found + AddBacktestContextGuidance(request); + + // Iterative tool calling: keep looping until we get a final answer without tool calls + int maxIterations = DetermineMaxIterations(request); + int iteration = 0; + LlmChatResponse? finalResponse = null; + const int DelayBetweenIterationsMs = 500; + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = $"Starting analysis (up to {maxIterations} iterations may be needed)..." + }; + + while (iteration < maxIterations) + { + iteration++; + + yield return new LlmProgressUpdate + { + Type = "iteration_start", + Message = $"Iteration {iteration}/{maxIterations}: Analyzing your request and determining next steps...", + Iteration = iteration, + MaxIterations = maxIterations + }; + + _logger.LogInformation("LLM chat iteration {Iteration}/{MaxIterations} for user {UserId}", + iteration, maxIterations, user.Id); + + // Add delay between iterations to avoid rapid bursts and rate limiting + if (iteration > 1) + { + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = "Waiting briefly to respect rate limits...", + Iteration = iteration, + MaxIterations = maxIterations + }; + await Task.Delay(DelayBetweenIterationsMs); + } + + // Trim context if conversation is getting too long + TrimConversationContext(request); + + // Send chat request to LLM + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = $"Iteration {iteration}: Sending request to LLM...", + Iteration = iteration, + MaxIterations = maxIterations + }; + + var response = await _llmService.ChatAsync(user, request); + + // If LLM doesn't want to call tools, we have our final answer + if (!response.RequiresToolExecution || response.ToolCalls == null || !response.ToolCalls.Any()) + { + finalResponse = response; + _logger.LogInformation("LLM provided final answer after {Iteration} iteration(s) for user {UserId}", + iteration, user.Id); + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = "Received final response. Preparing answer...", + Iteration = iteration, + MaxIterations = maxIterations + }; + + break; + } + + // LLM wants to call tools - execute them + _logger.LogInformation("LLM requested {Count} tool calls in iteration {Iteration} for user {UserId}", + response.ToolCalls.Count, iteration, user.Id); + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = $"Iteration {iteration}: LLM requested {response.ToolCalls.Count} tool call(s). Executing tools...", + Iteration = iteration, + MaxIterations = maxIterations + }; + + // Execute tool calls sequentially to allow progress updates + var toolResults = new List(); + foreach (var toolCall in response.ToolCalls) + { + yield return new LlmProgressUpdate + { + Type = "tool_call", + Message = $"Calling tool: {toolCall.Name}", + Iteration = iteration, + MaxIterations = maxIterations, + ToolName = toolCall.Name, + ToolArguments = toolCall.Arguments + }; + + var (success, result, error) = await ExecuteToolSafely(user, toolCall.Name, toolCall.Arguments, toolCall.Id, iteration, maxIterations); + + if (success && result != null) + { + _logger.LogInformation("Successfully executed tool {ToolName} in iteration {Iteration} for user {UserId}", + toolCall.Name, iteration, user.Id); + + yield return new LlmProgressUpdate + { + Type = "tool_result", + Message = $"Tool {toolCall.Name} completed successfully", + Iteration = iteration, + MaxIterations = maxIterations, + ToolName = toolCall.Name + }; + + toolResults.Add(new LlmMessage + { + Role = "tool", + Content = JsonSerializer.Serialize(result), + ToolCallId = toolCall.Id + }); + } + else + { + _logger.LogError("Error executing tool {ToolName} in iteration {Iteration} for user {UserId}: {Error}", + toolCall.Name, iteration, user.Id, error); + + yield return new LlmProgressUpdate + { + Type = "tool_result", + Message = $"Tool {toolCall.Name} encountered an error: {error}", + Iteration = iteration, + MaxIterations = maxIterations, + ToolName = toolCall.Name, + Error = error + }; + + toolResults.Add(new LlmMessage + { + Role = "tool", + Content = $"Error executing tool: {error}", + ToolCallId = toolCall.Id + }); + } + } + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = $"Iteration {iteration}: All tools completed. Analyzing results...", + Iteration = iteration, + MaxIterations = maxIterations + }; + + // Add assistant message with tool calls to conversation history + request.Messages.Add(new LlmMessage + { + Role = "assistant", + Content = response.Content, + ToolCalls = response.ToolCalls + }); + + // Add tool results to conversation history + request.Messages.AddRange(toolResults); + + // Continue loop to get LLM's response to the tool results + } + + // If we hit max iterations, return the last response (even if it has tool calls) + if (finalResponse == null) + { + _logger.LogWarning("Reached max iterations ({MaxIterations}) for user {UserId}. Returning last response.", + maxIterations, user.Id); + + yield return new LlmProgressUpdate + { + Type = "thinking", + Message = "Reached maximum iterations. Getting final response...", + Iteration = maxIterations, + MaxIterations = maxIterations + }; + + finalResponse = await _llmService.ChatAsync(user, request); + } + + // Send final response + yield return new LlmProgressUpdate + { + Type = "final_response", + Message = "Analysis complete!", + Response = finalResponse, + Iteration = iteration, + MaxIterations = maxIterations + }; + } + /// /// Sends a chat message to an LLM with automatic provider selection and MCP tool calling support. /// Supports both auto mode (backend selects provider) and BYOK (user provides API key). @@ -334,6 +633,31 @@ public class LlmController : BaseController """; } + /// + /// Executes a tool safely and returns a result tuple (success, result, error) + /// This avoids try-catch around yield statements + /// + private async Task<(bool success, object? result, string? error)> ExecuteToolSafely( + User user, + string toolName, + Dictionary arguments, + string toolCallId, + int iteration, + int maxIterations) + { + try + { + var result = await _mcpService.ExecuteToolAsync(user, toolName, arguments); + return (true, result, null); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing tool {ToolName} in iteration {Iteration} for user {UserId}", + toolName, iteration, user.Id); + return (false, null, ex.Message); + } + } + /// /// Trims conversation context to prevent token overflow while preserving important context /// diff --git a/src/Managing.Application.Abstractions/Services/ILlmService.cs b/src/Managing.Application.Abstractions/Services/ILlmService.cs index f32e6ea9..97cacd43 100644 --- a/src/Managing.Application.Abstractions/Services/ILlmService.cs +++ b/src/Managing.Application.Abstractions/Services/ILlmService.cs @@ -80,6 +80,62 @@ public class LlmUsage public int TotalTokens { get; set; } } +/// +/// Progress update for streaming LLM responses +/// +public class LlmProgressUpdate +{ + /// + /// Type of progress update + /// + public string Type { get; set; } = string.Empty; // "iteration_start", "thinking", "tool_call", "tool_result", "final_response", "error" + + /// + /// Human-readable message about what's happening + /// + public string Message { get; set; } = string.Empty; + + /// + /// Current iteration number (if applicable) + /// + public int? Iteration { get; set; } + + /// + /// Maximum iterations (if applicable) + /// + public int? MaxIterations { get; set; } + + /// + /// Tool name being called (if applicable) + /// + public string? ToolName { get; set; } + + /// + /// Tool arguments (if applicable) + /// + public Dictionary? ToolArguments { get; set; } + + /// + /// Partial response content (if applicable) + /// + public string? Content { get; set; } + + /// + /// Final response (only sent when Type is "final_response") + /// + public LlmChatResponse? Response { get; set; } + + /// + /// Error message (only sent when Type is "error") + /// + public string? Error { get; set; } + + /// + /// Timestamp of the update + /// + public DateTime Timestamp { get; set; } = DateTime.UtcNow; +} + /// /// Configuration for an LLM provider /// diff --git a/src/Managing.Application/LLM/Providers/GeminiProvider.cs b/src/Managing.Application/LLM/Providers/GeminiProvider.cs index 00c39952..a3e911a8 100644 --- a/src/Managing.Application/LLM/Providers/GeminiProvider.cs +++ b/src/Managing.Application/LLM/Providers/GeminiProvider.cs @@ -67,7 +67,7 @@ public class GeminiProvider : ILlmProvider return delay; }, - onRetry: (outcome, timespan, retryCount, context) => + onRetryAsync: async (outcome, timespan, retryCount, context) => { var exception = outcome.Exception; var response = outcome.Result; diff --git a/src/Managing.Web3Proxy/src/generated/ManagingApiTypes.ts b/src/Managing.Web3Proxy/src/generated/ManagingApiTypes.ts index 8a4b9e38..d7c51156 100644 --- a/src/Managing.Web3Proxy/src/generated/ManagingApiTypes.ts +++ b/src/Managing.Web3Proxy/src/generated/ManagingApiTypes.ts @@ -1278,6 +1278,7 @@ export interface UserStrategyDetailsViewModel { walletBalances?: { [key: string]: number; } | null; ticker: Ticker; masterAgentName?: string | null; + botTradingBalance: number; } export interface PositionViewModel { @@ -1443,6 +1444,19 @@ export interface JobStatusTypeSummary { count?: number; } +export interface LlmProgressUpdate { + type?: string; + message?: string; + iteration?: number | null; + maxIterations?: number | null; + toolName?: string | null; + toolArguments?: { [key: string]: any; } | null; + content?: string | null; + response?: LlmChatResponse | null; + error?: string | null; + timestamp?: Date; +} + export interface LlmChatResponse { content?: string; provider?: string; diff --git a/src/Managing.WebApp/src/generated/ManagingApi.ts b/src/Managing.WebApp/src/generated/ManagingApi.ts index 9d5a2f86..a6035a36 100644 --- a/src/Managing.WebApp/src/generated/ManagingApi.ts +++ b/src/Managing.WebApp/src/generated/ManagingApi.ts @@ -2910,6 +2910,45 @@ export class LlmClient extends AuthorizedApiBase { this.baseUrl = baseUrl ?? "http://localhost:5000"; } + llm_ChatStream(request: LlmChatRequest): Promise { + let url_ = this.baseUrl + "/Llm/ChatStream"; + url_ = url_.replace(/[?&]$/, ""); + + const content_ = JSON.stringify(request); + + let options_: RequestInit = { + body: content_, + method: "POST", + headers: { + "Content-Type": "application/json", + "Accept": "application/json" + } + }; + + return this.transformOptions(options_).then(transformedOptions_ => { + return this.http.fetch(url_, transformedOptions_); + }).then((_response: Response) => { + return this.processLlm_ChatStream(_response); + }); + } + + protected processLlm_ChatStream(response: Response): Promise { + const status = response.status; + let _headers: any = {}; if (response.headers && response.headers.forEach) { response.headers.forEach((v: any, k: any) => _headers[k] = v); }; + if (status === 200) { + return response.text().then((_responseText) => { + let result200: any = null; + result200 = _responseText === "" ? null : JSON.parse(_responseText, this.jsonParseReviver) as LlmProgressUpdate[]; + return result200; + }); + } else if (status !== 200 && status !== 204) { + return response.text().then((_responseText) => { + return throwException("An unexpected server error occurred.", status, _responseText, _headers); + }); + } + return Promise.resolve(null as any); + } + llm_Chat(request: LlmChatRequest): Promise { let url_ = this.baseUrl + "/Llm/Chat"; url_ = url_.replace(/[?&]$/, ""); @@ -6067,6 +6106,7 @@ export interface UserStrategyDetailsViewModel { walletBalances?: { [key: string]: number; } | null; ticker: Ticker; masterAgentName?: string | null; + botTradingBalance: number; } export interface PositionViewModel { @@ -6232,6 +6272,19 @@ export interface JobStatusTypeSummary { count?: number; } +export interface LlmProgressUpdate { + type?: string; + message?: string; + iteration?: number | null; + maxIterations?: number | null; + toolName?: string | null; + toolArguments?: { [key: string]: any; } | null; + content?: string | null; + response?: LlmChatResponse | null; + error?: string | null; + timestamp?: Date; +} + export interface LlmChatResponse { content?: string; provider?: string; diff --git a/src/Managing.WebApp/src/generated/ManagingApiTypes.ts b/src/Managing.WebApp/src/generated/ManagingApiTypes.ts index 8a4b9e38..d7c51156 100644 --- a/src/Managing.WebApp/src/generated/ManagingApiTypes.ts +++ b/src/Managing.WebApp/src/generated/ManagingApiTypes.ts @@ -1278,6 +1278,7 @@ export interface UserStrategyDetailsViewModel { walletBalances?: { [key: string]: number; } | null; ticker: Ticker; masterAgentName?: string | null; + botTradingBalance: number; } export interface PositionViewModel { @@ -1443,6 +1444,19 @@ export interface JobStatusTypeSummary { count?: number; } +export interface LlmProgressUpdate { + type?: string; + message?: string; + iteration?: number | null; + maxIterations?: number | null; + toolName?: string | null; + toolArguments?: { [key: string]: any; } | null; + content?: string | null; + response?: LlmChatResponse | null; + error?: string | null; + timestamp?: Date; +} + export interface LlmChatResponse { content?: string; provider?: string;