Add signalr

This commit is contained in:
2025-07-21 19:54:04 +07:00
parent a32e9c33a8
commit 83ed78a1fa
11 changed files with 441 additions and 10 deletions

View File

@@ -0,0 +1,73 @@
using System.Collections.Concurrent;
using Managing.Application.Abstractions.Services;
using Managing.Application.Hubs;
using Managing.Application.Workers.Abstractions;
using Managing.Domain.Backtests;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class NotifyBundleBacktestWorker : BaseWorker<NotifyBundleBacktestWorker>
{
private readonly IBacktester _backtester;
private readonly IHubContext<BacktestHub> _hubContext;
private readonly ConcurrentDictionary<string, HashSet<string>> _sentBacktestIds = new();
public NotifyBundleBacktestWorker(
IBacktester backtester,
IHubContext<BacktestHub> hubContext,
ILogger<NotifyBundleBacktestWorker> logger,
IWorkerService workerService)
: base(WorkerType.NotifyBundleBacktest, logger, TimeSpan.FromMinutes(1), workerService)
{
_backtester = backtester;
_hubContext = hubContext;
}
protected override async Task Run(CancellationToken stoppingToken)
{
try
{
// Fetch all running bundle requests
var runningBundles = _backtester.GetPendingBundleBacktestRequests()
.Where(b => b.Status == BundleBacktestRequestStatus.Running)
.ToList();
foreach (var bundle in runningBundles)
{
var requestId = bundle.RequestId;
if (string.IsNullOrEmpty(requestId)) continue;
// Fetch all backtests for this bundle
var (backtests, _) = _backtester.GetBacktestsByRequestIdPaginated(requestId, 1, 100);
if (!_sentBacktestIds.ContainsKey(requestId))
_sentBacktestIds[requestId] = new HashSet<string>();
foreach (var backtest in backtests)
{
if (_sentBacktestIds[requestId].Contains(backtest.Id)) continue;
// If backtest is already LightBacktest, send directly
var lightResponse = backtest as LightBacktest;
if (lightResponse != null)
{
await _hubContext.Clients.Group($"bundle-{requestId}").SendAsync("BundleBacktestUpdate", lightResponse, stoppingToken);
_sentBacktestIds[requestId].Add(backtest.Id);
}
}
// If the bundle is now completed, flush the sent IDs for this requestId
if (bundle.Status == BundleBacktestRequestStatus.Completed && _sentBacktestIds.ContainsKey(requestId))
{
_sentBacktestIds.TryRemove(requestId, out _);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in NotifyBundleBacktestWorker");
}
}
}