From 83ed78a1fa86b1d386070bd0d25591b12dd09177 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Mon, 21 Jul 2025 19:54:04 +0700 Subject: [PATCH] Add signalr --- .../Controllers/BacktestController.cs | 45 ++++- .../Models/Requests/LightBacktestResponse.cs | 25 +++ .../BundleBacktestWorker.cs | 2 +- .../Backtesting/Backtester.cs | 17 +- src/Managing.Application/Hubs/BacktestHub.cs | 16 +- .../Workers/NotifyBundleBacktestWorker.cs | 73 ++++++++ src/Managing.Bootstrap/ApiBootstrap.cs | 5 + src/Managing.Common/Enums.cs | 3 +- .../src/generated/ManagingApi.ts | 84 +++++++++ .../pages/backtestPage/BundleRequestModal.tsx | 163 ++++++++++++++++++ .../backtestPage/bundleRequestsTable.tsx | 18 +- 11 files changed, 441 insertions(+), 10 deletions(-) create mode 100644 src/Managing.Application/Workers/NotifyBundleBacktestWorker.cs create mode 100644 src/Managing.WebApp/src/pages/backtestPage/BundleRequestModal.tsx diff --git a/src/Managing.Api/Controllers/BacktestController.cs b/src/Managing.Api/Controllers/BacktestController.cs index 93edd9a..5a932d2 100644 --- a/src/Managing.Api/Controllers/BacktestController.cs +++ b/src/Managing.Api/Controllers/BacktestController.cs @@ -27,7 +27,7 @@ namespace Managing.Api.Controllers; [Produces("application/json")] public class BacktestController : BaseController { - private readonly IHubContext _hubContext; + private readonly IHubContext _hubContext; private readonly IBacktester _backtester; private readonly IScenarioService _scenarioService; private readonly IAccountService _accountService; @@ -45,7 +45,7 @@ public class BacktestController : BaseController /// The service for genetic algorithm operations. /// The repository for backtest operations. public BacktestController( - IHubContext hubContext, + IHubContext hubContext, IBacktester backtester, IScenarioService scenarioService, IAccountService accountService, @@ -537,6 +537,47 @@ public class BacktestController : BaseController }); } + /// + /// Subscribes the client to real-time updates for a bundle backtest request via SignalR. + /// The client will receive LightBacktestResponse objects as new backtests are generated. + /// + /// The bundle request ID to subscribe to. + [HttpPost] + [Route("Bundle/Subscribe")] // POST /Backtest/Bundle/Subscribe + public async Task SubscribeToBundle([FromQuery] string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return BadRequest("RequestId is required"); + + // Get the connection ID from the SignalR context (assume it's passed via header or query) + var connectionId = HttpContext.Request.Headers["X-SignalR-ConnectionId"].ToString(); + if (string.IsNullOrEmpty(connectionId)) + return BadRequest("SignalR connection ID is required in X-SignalR-ConnectionId header"); + + // Add the connection to the SignalR group for this bundle + await _hubContext.Groups.AddToGroupAsync(connectionId, $"bundle-{requestId}"); + return Ok(new { Subscribed = true, RequestId = requestId }); + } + + /// + /// Unsubscribes the client from real-time updates for a bundle backtest request via SignalR. + /// + /// The bundle request ID to unsubscribe from. + [HttpPost] + [Route("Bundle/Unsubscribe")] // POST /Backtest/Bundle/Unsubscribe + public async Task UnsubscribeFromBundle([FromQuery] string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return BadRequest("RequestId is required"); + + var connectionId = HttpContext.Request.Headers["X-SignalR-ConnectionId"].ToString(); + if (string.IsNullOrEmpty(connectionId)) + return BadRequest("SignalR connection ID is required in X-SignalR-ConnectionId header"); + + await _hubContext.Groups.RemoveFromGroupAsync(connectionId, $"bundle-{requestId}"); + return Ok(new { Unsubscribed = true, RequestId = requestId }); + } + /// /// Runs a genetic algorithm optimization with the specified configuration. /// This endpoint saves the genetic request to the database and returns the request ID. diff --git a/src/Managing.Api/Models/Requests/LightBacktestResponse.cs b/src/Managing.Api/Models/Requests/LightBacktestResponse.cs index e819a0e..1f3d126 100644 --- a/src/Managing.Api/Models/Requests/LightBacktestResponse.cs +++ b/src/Managing.Api/Models/Requests/LightBacktestResponse.cs @@ -1,4 +1,5 @@ using System.ComponentModel.DataAnnotations; +using Managing.Domain.Backtests; using Managing.Domain.Bots; namespace Managing.Api.Models.Requests; @@ -19,4 +20,28 @@ public class LightBacktestResponse [Required] public double? SharpeRatio { get; set; } [Required] public double Score { get; set; } [Required] public string ScoreMessage { get; set; } = string.Empty; +} + +public static class LightBacktestResponseMapper +{ + public static LightBacktestResponse MapFromDomain(Backtest b) + { + if (b == null) return null; + return new LightBacktestResponse + { + Id = b.Id, + Config = b.Config, + FinalPnl = b.FinalPnl, + WinRate = b.WinRate, + GrowthPercentage = b.GrowthPercentage, + HodlPercentage = b.HodlPercentage, + StartDate = b.StartDate, + EndDate = b.EndDate, + MaxDrawdown = b.Statistics?.MaxDrawdown, + Fees = b.Fees, + SharpeRatio = (double?)b.Statistics?.SharpeRatio, + Score = b.Score, + ScoreMessage = b.ScoreMessage + }; + } } \ No newline at end of file diff --git a/src/Managing.Application.Workers/BundleBacktestWorker.cs b/src/Managing.Application.Workers/BundleBacktestWorker.cs index 5306c26..59733bb 100644 --- a/src/Managing.Application.Workers/BundleBacktestWorker.cs +++ b/src/Managing.Application.Workers/BundleBacktestWorker.cs @@ -231,7 +231,7 @@ public class BundleBacktestWorker : BaseWorker backtestConfig, runBacktestRequest.StartDate, runBacktestRequest.EndDate, - null, // No user context in worker + bundleRequest.User, // No user context in worker runBacktestRequest.Save, runBacktestRequest.WithCandles, bundleRequest.RequestId // Use bundleRequestId as requestId for traceability diff --git a/src/Managing.Application/Backtesting/Backtester.cs b/src/Managing.Application/Backtesting/Backtester.cs index daaf256..603b07c 100644 --- a/src/Managing.Application/Backtesting/Backtester.cs +++ b/src/Managing.Application/Backtesting/Backtester.cs @@ -2,6 +2,7 @@ using Managing.Application.Abstractions.Repositories; using Managing.Application.Abstractions.Services; using Managing.Application.Bots; +using Managing.Application.Hubs; using Managing.Core.FixedSizedQueue; using Managing.Domain.Accounts; using Managing.Domain.Backtests; @@ -13,8 +14,10 @@ using Managing.Domain.Strategies; using Managing.Domain.Strategies.Base; using Managing.Domain.Users; using Managing.Domain.Workflows; +using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using static Managing.Common.Enums; +using LightBacktestResponse = Managing.Domain.Backtests.LightBacktest; // Use the domain model for notification namespace Managing.Application.Backtesting { @@ -28,6 +31,7 @@ namespace Managing.Application.Backtesting private readonly IAccountService _accountService; private readonly IMessengerService _messengerService; private readonly IKaigenService _kaigenService; + private readonly IHubContext _hubContext; public Backtester( IExchangeService exchangeService, @@ -37,7 +41,8 @@ namespace Managing.Application.Backtesting IScenarioService scenarioService, IAccountService accountService, IMessengerService messengerService, - IKaigenService kaigenService) + IKaigenService kaigenService, + IHubContext hubContext) { _exchangeService = exchangeService; _botFactory = botFactory; @@ -47,6 +52,7 @@ namespace Managing.Application.Backtesting _accountService = accountService; _messengerService = messengerService; _kaigenService = kaigenService; + _hubContext = hubContext; } public Backtest RunSimpleBotBacktest(Workflow workflow, bool save = false) @@ -604,5 +610,14 @@ namespace Managing.Application.Backtesting { return _backtestRepository.GetPendingBundleBacktestRequests(); } + + /// + /// Sends a LightBacktestResponse to all SignalR subscribers of a bundle request. + /// + public async Task SendBundleBacktestUpdateAsync(string requestId, LightBacktestResponse response) + { + if (string.IsNullOrWhiteSpace(requestId) || response == null) return; + await _hubContext.Clients.Group($"bundle-{requestId}").SendAsync("BundleBacktestUpdate", response); + } } } \ No newline at end of file diff --git a/src/Managing.Application/Hubs/BacktestHub.cs b/src/Managing.Application/Hubs/BacktestHub.cs index 0b29eb1..b96f54e 100644 --- a/src/Managing.Application/Hubs/BacktestHub.cs +++ b/src/Managing.Application/Hubs/BacktestHub.cs @@ -4,12 +4,20 @@ namespace Managing.Application.Hubs; public class BacktestHub : Hub { - public async override Task OnConnectedAsync() + public override async Task OnConnectedAsync() { await base.OnConnectedAsync(); - await Clients.Caller.SendAsync("Message", $"Connected successfully on backtest hub. ConnectionId : {Context.ConnectionId}"); + await Clients.Caller.SendAsync("Message", "Connected to BacktestHub!"); } - public async Task SubscribeBots() => - await Clients.All.SendAsync("BacktestsSubscription", "Successfully subscribed"); + public async Task SubscribeToBundle(string requestId) + { + if (!string.IsNullOrWhiteSpace(requestId)) + { + await Groups.AddToGroupAsync(Context.ConnectionId, $"bundle-{requestId}"); + await Clients.Caller.SendAsync("SubscribedToBundle", requestId); + } + } + + public string GetConnectionId() => Context.ConnectionId; } diff --git a/src/Managing.Application/Workers/NotifyBundleBacktestWorker.cs b/src/Managing.Application/Workers/NotifyBundleBacktestWorker.cs new file mode 100644 index 0000000..dd0bb82 --- /dev/null +++ b/src/Managing.Application/Workers/NotifyBundleBacktestWorker.cs @@ -0,0 +1,73 @@ +using System.Collections.Concurrent; +using Managing.Application.Abstractions.Services; +using Managing.Application.Hubs; +using Managing.Application.Workers.Abstractions; +using Managing.Domain.Backtests; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; +using static Managing.Common.Enums; + +namespace Managing.Application.Workers; + +public class NotifyBundleBacktestWorker : BaseWorker +{ + private readonly IBacktester _backtester; + private readonly IHubContext _hubContext; + private readonly ConcurrentDictionary> _sentBacktestIds = new(); + + public NotifyBundleBacktestWorker( + IBacktester backtester, + IHubContext hubContext, + ILogger logger, + IWorkerService workerService) + : base(WorkerType.NotifyBundleBacktest, logger, TimeSpan.FromMinutes(1), workerService) + { + _backtester = backtester; + _hubContext = hubContext; + } + + protected override async Task Run(CancellationToken stoppingToken) + { + try + { + // Fetch all running bundle requests + var runningBundles = _backtester.GetPendingBundleBacktestRequests() + .Where(b => b.Status == BundleBacktestRequestStatus.Running) + .ToList(); + + foreach (var bundle in runningBundles) + { + var requestId = bundle.RequestId; + if (string.IsNullOrEmpty(requestId)) continue; + + // Fetch all backtests for this bundle + var (backtests, _) = _backtester.GetBacktestsByRequestIdPaginated(requestId, 1, 100); + if (!_sentBacktestIds.ContainsKey(requestId)) + _sentBacktestIds[requestId] = new HashSet(); + + foreach (var backtest in backtests) + { + if (_sentBacktestIds[requestId].Contains(backtest.Id)) continue; + + // If backtest is already LightBacktest, send directly + var lightResponse = backtest as LightBacktest; + if (lightResponse != null) + { + await _hubContext.Clients.Group($"bundle-{requestId}").SendAsync("BundleBacktestUpdate", lightResponse, stoppingToken); + _sentBacktestIds[requestId].Add(backtest.Id); + } + } + + // If the bundle is now completed, flush the sent IDs for this requestId + if (bundle.Status == BundleBacktestRequestStatus.Completed && _sentBacktestIds.ContainsKey(requestId)) + { + _sentBacktestIds.TryRemove(requestId, out _); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in NotifyBundleBacktestWorker"); + } + } +} \ No newline at end of file diff --git a/src/Managing.Bootstrap/ApiBootstrap.cs b/src/Managing.Bootstrap/ApiBootstrap.cs index 2b480ea..23f763c 100644 --- a/src/Managing.Bootstrap/ApiBootstrap.cs +++ b/src/Managing.Bootstrap/ApiBootstrap.cs @@ -171,6 +171,11 @@ public static class ApiBootstrap services.AddHostedService(); } + if (configuration.GetValue("WorkerNotifyBundleBacktest", false)) + { + services.AddHostedService(); + } + return services; } diff --git a/src/Managing.Common/Enums.cs b/src/Managing.Common/Enums.cs index dbe08ba..d9b4a59 100644 --- a/src/Managing.Common/Enums.cs +++ b/src/Managing.Common/Enums.cs @@ -384,7 +384,8 @@ public static class Enums FundingRatesWatcher, BalanceTracking, GeneticAlgorithm, - BundleBacktest + BundleBacktest, + NotifyBundleBacktest } public enum WorkflowUsage diff --git a/src/Managing.WebApp/src/generated/ManagingApi.ts b/src/Managing.WebApp/src/generated/ManagingApi.ts index bc0faf0..8d12caf 100644 --- a/src/Managing.WebApp/src/generated/ManagingApi.ts +++ b/src/Managing.WebApp/src/generated/ManagingApi.ts @@ -871,6 +871,90 @@ export class BacktestClient extends AuthorizedApiBase { return Promise.resolve(null as any); } + backtest_SubscribeToBundle(requestId: string | null | undefined): Promise { + let url_ = this.baseUrl + "/Backtest/Bundle/Subscribe?"; + if (requestId !== undefined && requestId !== null) + url_ += "requestId=" + encodeURIComponent("" + requestId) + "&"; + url_ = url_.replace(/[?&]$/, ""); + + let options_: RequestInit = { + method: "POST", + headers: { + "Accept": "application/octet-stream" + } + }; + + return this.transformOptions(options_).then(transformedOptions_ => { + return this.http.fetch(url_, transformedOptions_); + }).then((_response: Response) => { + return this.processBacktest_SubscribeToBundle(_response); + }); + } + + protected processBacktest_SubscribeToBundle(response: Response): Promise { + const status = response.status; + let _headers: any = {}; if (response.headers && response.headers.forEach) { response.headers.forEach((v: any, k: any) => _headers[k] = v); }; + if (status === 200 || status === 206) { + const contentDisposition = response.headers ? response.headers.get("content-disposition") : undefined; + let fileNameMatch = contentDisposition ? /filename\*=(?:(\\?['"])(.*?)\1|(?:[^\s]+'.*?')?([^;\n]*))/g.exec(contentDisposition) : undefined; + let fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[3] || fileNameMatch[2] : undefined; + if (fileName) { + fileName = decodeURIComponent(fileName); + } else { + fileNameMatch = contentDisposition ? /filename="?([^"]*?)"?(;|$)/g.exec(contentDisposition) : undefined; + fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[1] : undefined; + } + return response.blob().then(blob => { return { fileName: fileName, data: blob, status: status, headers: _headers }; }); + } else if (status !== 200 && status !== 204) { + return response.text().then((_responseText) => { + return throwException("An unexpected server error occurred.", status, _responseText, _headers); + }); + } + return Promise.resolve(null as any); + } + + backtest_UnsubscribeFromBundle(requestId: string | null | undefined): Promise { + let url_ = this.baseUrl + "/Backtest/Bundle/Unsubscribe?"; + if (requestId !== undefined && requestId !== null) + url_ += "requestId=" + encodeURIComponent("" + requestId) + "&"; + url_ = url_.replace(/[?&]$/, ""); + + let options_: RequestInit = { + method: "POST", + headers: { + "Accept": "application/octet-stream" + } + }; + + return this.transformOptions(options_).then(transformedOptions_ => { + return this.http.fetch(url_, transformedOptions_); + }).then((_response: Response) => { + return this.processBacktest_UnsubscribeFromBundle(_response); + }); + } + + protected processBacktest_UnsubscribeFromBundle(response: Response): Promise { + const status = response.status; + let _headers: any = {}; if (response.headers && response.headers.forEach) { response.headers.forEach((v: any, k: any) => _headers[k] = v); }; + if (status === 200 || status === 206) { + const contentDisposition = response.headers ? response.headers.get("content-disposition") : undefined; + let fileNameMatch = contentDisposition ? /filename\*=(?:(\\?['"])(.*?)\1|(?:[^\s]+'.*?')?([^;\n]*))/g.exec(contentDisposition) : undefined; + let fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[3] || fileNameMatch[2] : undefined; + if (fileName) { + fileName = decodeURIComponent(fileName); + } else { + fileNameMatch = contentDisposition ? /filename="?([^"]*?)"?(;|$)/g.exec(contentDisposition) : undefined; + fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[1] : undefined; + } + return response.blob().then(blob => { return { fileName: fileName, data: blob, status: status, headers: _headers }; }); + } else if (status !== 200 && status !== 204) { + return response.text().then((_responseText) => { + return throwException("An unexpected server error occurred.", status, _responseText, _headers); + }); + } + return Promise.resolve(null as any); + } + backtest_RunGenetic(request: RunGeneticRequest): Promise { let url_ = this.baseUrl + "/Backtest/Genetic"; url_ = url_.replace(/[?&]$/, ""); diff --git a/src/Managing.WebApp/src/pages/backtestPage/BundleRequestModal.tsx b/src/Managing.WebApp/src/pages/backtestPage/BundleRequestModal.tsx new file mode 100644 index 0000000..007cb72 --- /dev/null +++ b/src/Managing.WebApp/src/pages/backtestPage/BundleRequestModal.tsx @@ -0,0 +1,163 @@ +import React, {useEffect, useRef, useState} from 'react'; +import {BundleBacktestRequest, LightBacktestResponse} from '../../generated/ManagingApiTypes'; +import {BacktestClient} from '../../generated/ManagingApi'; +import useApiUrlStore from '../../app/store/apiStore'; +import Toast from '../../components/mollecules/Toast/Toast'; +import {useQuery} from '@tanstack/react-query'; +import * as signalR from '@microsoft/signalr'; +import AuthorizedApiBase from '../../generated/AuthorizedApiBase'; + +interface BundleRequestModalProps { + open: boolean; + onClose: () => void; + bundle: BundleBacktestRequest | null; +} + +const BundleRequestModal: React.FC = ({ open, onClose, bundle }) => { + const { apiUrl } = useApiUrlStore(); + const [backtests, setBacktests] = useState([]); + const signalRRef = useRef(null); + + const { + data: queryBacktests, + isLoading, + error: queryError, + refetch + } = useQuery({ + queryKey: ['bundle-backtests', bundle?.requestId], + queryFn: async () => { + if (!open || !bundle) return []; + const client = new BacktestClient({} as any, apiUrl); + const res = await client.backtest_GetBacktestsByRequestId(bundle.requestId); + if (!res) return []; + return res.map((b: any) => ({ + id: b.id, + config: b.config, + finalPnl: b.finalPnl, + winRate: b.winRate, + growthPercentage: b.growthPercentage, + hodlPercentage: b.hodlPercentage, + startDate: b.startDate, + endDate: b.endDate, + maxDrawdown: b.maxDrawdown ?? null, + fees: b.fees, + sharpeRatio: b.sharpeRatio ?? null, + score: b.score ?? 0, + scoreMessage: b.scoreMessage ?? '', + })); + }, + enabled: !!open && !!bundle, + refetchOnWindowFocus: false, + }); + useEffect(() => { + if (queryBacktests) setBacktests(queryBacktests); + }, [queryBacktests]); + + // SignalR live updates + useEffect(() => { + if (!open || !bundle) return; + if (bundle.status !== 'Pending' && bundle.status !== 'Running') return; + let connection: any = null; + let connectionId: string = ''; + let unsubscribed = false; + (async () => { + try { + connection = new signalR.HubConnectionBuilder() + .withUrl(`${apiUrl.replace(/\/$/, '')}/backtestHub`) + .withAutomaticReconnect() + .build(); + await connection.start(); + connectionId = connection.connectionId; + // Subscribe to bundle updates + const authBase = new AuthorizedApiBase({} as any); + let fetchOptions: any = { + method: 'POST', + headers: { 'X-SignalR-ConnectionId': connectionId }, + }; + fetchOptions = await authBase.transformOptions(fetchOptions); + await fetch(`${apiUrl}/backtest/Bundle/Subscribe?requestId=${bundle.requestId}`, fetchOptions); + connection.on('BundleBacktestUpdate', (result: LightBacktestResponse) => { + setBacktests((prev) => { + if (prev.some((b) => b.id === result.id)) return prev; + return [...prev, result]; + }); + }); + signalRRef.current = connection; + } catch (e: any) { + new Toast('Failed to subscribe to live updates', false); + } + })(); + return () => { + unsubscribed = true; + if (connection && connectionId) { + (async () => { + const authBase = new AuthorizedApiBase({} as any); + let fetchOptions: any = { + method: 'POST', + headers: { 'X-SignalR-ConnectionId': connectionId }, + }; + fetchOptions = await authBase.transformOptions(fetchOptions); + await fetch(`${apiUrl}/backtest/Bundle/Unsubscribe?requestId=${bundle.requestId}`, fetchOptions); + })(); + } + if (signalRRef.current) { + signalRRef.current.stop(); + signalRRef.current = null; + } + }; + }, [open, bundle, apiUrl]); + + if (!open || !bundle) return null; + + return ( +
+
+

Bundle: {bundle.name}

+
+
Request ID: {bundle.requestId}
+
Status: {bundle.status}
+
Created: {bundle.createdAt ? new Date(bundle.createdAt).toLocaleString() : '-'}
+
Completed: {bundle.completedAt ? new Date(bundle.completedAt).toLocaleString() : '-'}
+
+
Backtest Results
+ {isLoading ? ( +
Loading backtests...
+ ) : queryError ? ( +
{(queryError as any)?.message || 'Failed to fetch backtests'}
+ ) : ( +
+ + + + + + + + + + + + + {backtests.map((b) => ( + + + + + + + + + ))} + +
IDFinal PnLWin RateGrowth %StartEnd
{b.id}{b.finalPnl}{b.winRate}{b.growthPercentage}{b.startDate ? new Date(b.startDate).toLocaleString() : '-'}{b.endDate ? new Date(b.endDate).toLocaleString() : '-'}
+
+ )} +
+ +
+
+
+ ); +}; + +export default BundleRequestModal; \ No newline at end of file diff --git a/src/Managing.WebApp/src/pages/backtestPage/bundleRequestsTable.tsx b/src/Managing.WebApp/src/pages/backtestPage/bundleRequestsTable.tsx index 7491d8a..46dcee7 100644 --- a/src/Managing.WebApp/src/pages/backtestPage/bundleRequestsTable.tsx +++ b/src/Managing.WebApp/src/pages/backtestPage/bundleRequestsTable.tsx @@ -4,6 +4,7 @@ import useApiUrlStore from '../../app/store/apiStore'; import Table from '../../components/mollecules/Table/Table'; import {BundleBacktestRequest} from '../../generated/ManagingApiTypes'; import Toast from '../../components/mollecules/Toast/Toast'; +import BundleRequestModal from './BundleRequestModal'; const BundleRequestsTable = () => { const { apiUrl } = useApiUrlStore(); @@ -11,6 +12,8 @@ const BundleRequestsTable = () => { const [loading, setLoading] = useState(false); const [error, setError] = useState(null); const [deletingId, setDeletingId] = useState(null); + const [modalOpen, setModalOpen] = useState(false); + const [selectedBundle, setSelectedBundle] = useState(null); const fetchData = () => { setLoading(true); @@ -119,7 +122,15 @@ const BundleRequestsTable = () => { disableSortBy: true, Cell: ({ row }: any) => (
- +