Move workers

This commit is contained in:
2025-08-05 17:53:19 +07:00
parent 7d92031059
commit 3d3f71ac7a
26 changed files with 81 additions and 118 deletions

View File

@@ -46,7 +46,7 @@ public class BalanceTrackingWorker : BaseWorker<BalanceTrackingWorker>
_logger.LogInformation("Starting balance tracking...");
// Get all active bots
var bots = await _mediator.Send(new GetBotsByStatusCommand(BotStatus.Up));
var bots = await _mediator.Send(new GetBotsByStatusCommand(BotStatus.Up));
var botCount = bots.Count();
if (botCount == 0)

View File

@@ -0,0 +1,80 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public abstract class BaseWorker<T> : BackgroundService where T : class
{
private readonly IServiceProvider _serviceProvider;
private readonly WorkerType _workerType;
protected readonly ILogger<T> _logger;
protected readonly TimeSpan _delay;
private int _executionCount;
protected BaseWorker(
WorkerType workerType,
ILogger<T> logger,
TimeSpan timeSpan,
IServiceProvider serviceProvider)
{
_workerType = workerType;
_logger = logger;
_delay = timeSpan == TimeSpan.Zero ? TimeSpan.FromMinutes(1) : timeSpan;
_serviceProvider = serviceProvider;
_executionCount = 0;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
_logger.LogInformation($"[{_workerType}] Starting");
using (var scope = _serviceProvider.CreateScope())
{
var workerService = scope.ServiceProvider.GetRequiredService<IWorkerService>();
var worker = await workerService.GetWorker(_workerType);
if (worker == null)
{
await workerService.InsertWorker(_workerType, _delay);
}
else
{
_logger.LogInformation(
$"[{_workerType}] Last run : {worker.LastRunTime} - Execution Count : {worker.ExecutionCount}");
_executionCount = worker.ExecutionCount;
}
}
cancellationToken.Register(() => _logger.LogInformation($"[{_workerType}] Stopping"));
while (!cancellationToken.IsCancellationRequested)
{
using (var scope = _serviceProvider.CreateScope())
{
var workerService = scope.ServiceProvider.GetRequiredService<IWorkerService>();
var worker = await workerService.GetWorker(_workerType);
await Run(cancellationToken);
_executionCount++;
await workerService.UpdateWorker(_workerType, _executionCount);
}
_logger.LogInformation($"[{_workerType}] Run ok. Next run at : {DateTime.UtcNow.Add(_delay)}");
await Task.Delay(_delay);
}
_logger.LogInformation($"[{_workerType}] Stopped");
}
catch (Exception ex)
{
SentrySdk.CaptureException(ex);
_logger.LogError($"Error : {ex.Message}");
}
}
protected abstract Task Run(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,342 @@
using System.Text.Json;
using Managing.Application.Abstractions.Services;
using Managing.Domain.Backtests;
using Managing.Domain.Bots;
using Managing.Domain.MoneyManagements;
using Managing.Domain.Scenarios;
using Managing.Domain.Strategies;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
/// <summary>
/// Worker for processing bundle backtest requests
/// </summary>
public class BundleBacktestWorker : BaseWorker<BundleBacktestWorker>
{
private readonly IServiceProvider _serviceProvider;
private readonly IMessengerService _messengerService;
private static readonly WorkerType _workerType = WorkerType.BundleBacktest;
public BundleBacktestWorker(
IServiceProvider serviceProvider,
IMessengerService messengerService,
ILogger<BundleBacktestWorker> logger) : base(
_workerType,
logger,
TimeSpan.FromMinutes(1),
serviceProvider)
{
_serviceProvider = serviceProvider;
_messengerService = messengerService;
}
protected override async Task Run(CancellationToken cancellationToken)
{
var maxDegreeOfParallelism = 3;
using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
var processingTasks = new List<Task>();
try
{
// Create a new service scope to get fresh instances of services with scoped DbContext
using var scope = _serviceProvider.CreateScope();
var backtester = scope.ServiceProvider.GetRequiredService<IBacktester>();
// Get pending bundle backtest requests
var pendingRequests =
await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Pending);
foreach (var bundleRequest in pendingRequests)
{
if (cancellationToken.IsCancellationRequested)
break;
await semaphore.WaitAsync(cancellationToken);
var task = Task.Run(async () =>
{
try
{
await ProcessBundleRequest(bundleRequest, cancellationToken);
}
finally
{
semaphore.Release();
}
}, cancellationToken);
processingTasks.Add(task);
}
await Task.WhenAll(processingTasks);
await RetryUnfinishedBacktestsInFailedBundles(backtester, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in BundleBacktestWorker");
throw;
}
}
private async Task ProcessBundleRequest(BundleBacktestRequest bundleRequest, CancellationToken cancellationToken)
{
// Create a new service scope for this task to avoid DbContext concurrency issues
using var scope = _serviceProvider.CreateScope();
var backtester = scope.ServiceProvider.GetRequiredService<IBacktester>();
try
{
_logger.LogInformation("Starting to process bundle backtest request {RequestId}", bundleRequest.RequestId);
// Update status to running
bundleRequest.Status = BundleBacktestRequestStatus.Running;
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
// Deserialize the backtest requests as strongly-typed objects
var backtestRequests =
JsonSerializer.Deserialize<List<RunBacktestRequest>>(
bundleRequest.BacktestRequestsJson);
if (backtestRequests == null)
{
throw new InvalidOperationException("Failed to deserialize backtest requests");
}
// Process each backtest request
for (int i = 0; i < backtestRequests.Count; i++)
{
if (cancellationToken.IsCancellationRequested)
break;
try
{
var runBacktestRequest = backtestRequests[i];
// Update current backtest being processed
bundleRequest.CurrentBacktest = $"Backtest {i + 1} of {backtestRequests.Count}";
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
// Run the backtest directly with the strongly-typed request
var backtestId = await RunSingleBacktest(backtester, runBacktestRequest, bundleRequest, i,
cancellationToken);
if (!string.IsNullOrEmpty(backtestId))
{
bundleRequest.Results.Add(backtestId);
}
// Update progress
bundleRequest.CompletedBacktests++;
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
_logger.LogInformation("Completed backtest {Index} for bundle request {RequestId}",
i + 1, bundleRequest.RequestId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing backtest {Index} for bundle request {RequestId}",
i + 1, bundleRequest.RequestId);
bundleRequest.FailedBacktests++;
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
}
}
// Update final status and send notifications
if (bundleRequest.FailedBacktests == 0)
{
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
// Send Telegram message to the user's channelId
await NotifyUser(bundleRequest);
}
else if (bundleRequest.CompletedBacktests == 0)
{
bundleRequest.Status = BundleBacktestRequestStatus.Failed;
bundleRequest.ErrorMessage = "All backtests failed";
}
else
{
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
bundleRequest.ErrorMessage = $"{bundleRequest.FailedBacktests} backtests failed";
// Send Telegram message to the user's channelId even with partial failures
await NotifyUser(bundleRequest);
}
bundleRequest.CompletedAt = DateTime.UtcNow;
bundleRequest.CurrentBacktest = null;
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
_logger.LogInformation("Completed processing bundle backtest request {RequestId} with status {Status}",
bundleRequest.RequestId, bundleRequest.Status);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing bundle backtest request {RequestId}", bundleRequest.RequestId);
bundleRequest.Status = BundleBacktestRequestStatus.Failed;
bundleRequest.ErrorMessage = ex.Message;
bundleRequest.CompletedAt = DateTime.UtcNow;
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
}
}
private async Task NotifyUser(BundleBacktestRequest bundleRequest)
{
if (bundleRequest.User?.TelegramChannel != null)
{
var message =
$"⚠️ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed with {bundleRequest.FailedBacktests} failed backtests.";
await _messengerService.SendMessage(message, bundleRequest.User.TelegramChannel);
}
}
// Change RunSingleBacktest to accept RunBacktestRequest directly
private async Task<string> RunSingleBacktest(IBacktester backtester, RunBacktestRequest runBacktestRequest,
BundleBacktestRequest bundleRequest,
int index, CancellationToken cancellationToken)
{
if (runBacktestRequest == null || runBacktestRequest.Config == null)
{
_logger.LogError("Invalid RunBacktestRequest in bundle (null config)");
return string.Empty;
}
// Map MoneyManagement
MoneyManagement moneyManagement = null;
if (!string.IsNullOrEmpty(runBacktestRequest.Config.MoneyManagementName))
{
// In worker context, we cannot resolve by name (no user/db), so skip or set null
// Optionally, log a warning
_logger.LogWarning("MoneyManagementName provided but cannot resolve in worker context: {Name}",
(string)runBacktestRequest.Config.MoneyManagementName);
}
else if (runBacktestRequest.Config.MoneyManagement != null)
{
var mmReq = runBacktestRequest.Config.MoneyManagement;
moneyManagement = new MoneyManagement
{
Name = mmReq.Name,
Timeframe = mmReq.Timeframe,
StopLoss = mmReq.StopLoss,
TakeProfit = mmReq.TakeProfit,
Leverage = mmReq.Leverage
};
moneyManagement.FormatPercentage();
}
// Map Scenario
LightScenario scenario = null;
if (runBacktestRequest.Config.Scenario != null)
{
var sReq = runBacktestRequest.Config.Scenario;
scenario = new LightScenario(sReq.Name, sReq.LoopbackPeriod)
{
Indicators = sReq.Indicators?.Select(i => new LightIndicator(i.Name, i.Type)
{
SignalType = i.SignalType,
MinimumHistory = i.MinimumHistory,
Period = i.Period,
FastPeriods = i.FastPeriods,
SlowPeriods = i.SlowPeriods,
SignalPeriods = i.SignalPeriods,
Multiplier = i.Multiplier,
SmoothPeriods = i.SmoothPeriods,
StochPeriods = i.StochPeriods,
CyclePeriods = i.CyclePeriods
}).ToList() ?? new List<LightIndicator>()
};
}
// Map TradingBotConfig
var backtestConfig = new TradingBotConfig
{
AccountName = runBacktestRequest.Config.AccountName,
MoneyManagement = moneyManagement,
Ticker = runBacktestRequest.Config.Ticker,
ScenarioName = runBacktestRequest.Config.ScenarioName,
Scenario = scenario,
Timeframe = runBacktestRequest.Config.Timeframe,
IsForWatchingOnly = runBacktestRequest.Config.IsForWatchingOnly,
BotTradingBalance = runBacktestRequest.Config.BotTradingBalance,
IsForBacktest = true,
CooldownPeriod = runBacktestRequest.Config.CooldownPeriod,
MaxLossStreak = runBacktestRequest.Config.MaxLossStreak,
MaxPositionTimeHours = runBacktestRequest.Config.MaxPositionTimeHours,
FlipOnlyWhenInProfit = runBacktestRequest.Config.FlipOnlyWhenInProfit,
FlipPosition = runBacktestRequest.Config.FlipPosition,
Name = $"{bundleRequest.Name} #{index + 1}",
CloseEarlyWhenProfitable = runBacktestRequest.Config.CloseEarlyWhenProfitable,
UseSynthApi = runBacktestRequest.Config.UseSynthApi,
UseForPositionSizing = runBacktestRequest.Config.UseForPositionSizing,
UseForSignalFiltering = runBacktestRequest.Config.UseForSignalFiltering,
UseForDynamicStopLoss = runBacktestRequest.Config.UseForDynamicStopLoss
};
// Run the backtest (no user context)
var result = await backtester.RunTradingBotBacktest(
backtestConfig,
runBacktestRequest.StartDate,
runBacktestRequest.EndDate,
bundleRequest.User, // No user context in worker
true,
runBacktestRequest.WithCandles,
bundleRequest.RequestId // Use bundleRequestId as requestId for traceability
);
_logger.LogInformation("Processed backtest for bundle request {RequestId}", bundleRequest.RequestId);
// Assume the backtest is created and you have its ID (e.g., backtest.Id)
// Return the backtest ID
return result.Id;
}
private async Task RetryUnfinishedBacktestsInFailedBundles(IBacktester backtester,
CancellationToken cancellationToken)
{
var failedBundles = await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Failed);
foreach (var failedBundle in failedBundles)
{
if (cancellationToken.IsCancellationRequested)
break;
// Use Results property to determine which backtests need to be retried
var succeededIds = new HashSet<string>(failedBundle.Results ?? new List<string>());
// Deserialize the original requests
var originalRequests =
JsonSerializer
.Deserialize<List<RunBacktestRequest>>(failedBundle.BacktestRequestsJson);
if (originalRequests == null) continue;
for (int i = failedBundle.CompletedBacktests; i < originalRequests.Count; i++)
{
var expectedId = /* logic to compute expected backtest id for this request */ string.Empty;
// If this backtest was not run or did not succeed, re-run it
if (!succeededIds.Contains(expectedId))
{
var backtestId = await RunSingleBacktest(backtester, originalRequests[i], failedBundle, i,
cancellationToken);
if (!string.IsNullOrEmpty(backtestId))
{
failedBundle.Results?.Add(backtestId);
failedBundle.CompletedBacktests++;
await backtester.UpdateBundleBacktestRequestAsync(failedBundle);
}
}
}
// If all backtests succeeded, update the bundle status
if (failedBundle.CompletedBacktests == originalRequests.Count)
{
failedBundle.Status = BundleBacktestRequestStatus.Completed;
failedBundle.ErrorMessage = null; // Clear any previous error
failedBundle.CompletedAt = DateTime.UtcNow;
await backtester.UpdateBundleBacktestRequestAsync(failedBundle);
// Notify user about successful retry
await NotifyUser(failedBundle);
}
else
{
_logger.LogWarning("Bundle {RequestId} still has unfinished backtests after retry",
failedBundle.RequestId);
}
}
}
}

View File

@@ -0,0 +1,28 @@
using Managing.Application.Abstractions.Services;
using Managing.Common;
using Microsoft.Extensions.Logging;
namespace Managing.Application.Workers;
public class FundingRatesWatcher : BaseWorker<FundingRatesWatcher>
{
private readonly IStatisticService _statisticService;
public FundingRatesWatcher(
ILogger<FundingRatesWatcher> logger,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
Enums.WorkerType.FundingRatesWatcher,
logger,
TimeSpan.FromMinutes(30),
serviceProvider
)
{
_statisticService = statisticService;
}
protected override async Task Run(CancellationToken cancellationToken)
{
await _statisticService.UpdateFundingRates();
}
}

View File

@@ -0,0 +1,106 @@
using Managing.Application.Abstractions.Services;
using Managing.Core;
using Managing.Domain.Backtests;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
/// <summary>
/// Worker that processes genetic algorithm requests
/// </summary>
public class GeneticAlgorithmWorker : BaseWorker<GeneticAlgorithmWorker>
{
private readonly IServiceScopeFactory _scopeFactory;
public GeneticAlgorithmWorker(
ILogger<GeneticAlgorithmWorker> logger,
IServiceProvider serviceProvider,
IServiceScopeFactory scopeFactory)
: base(WorkerType.GeneticAlgorithm, logger, TimeSpan.FromMinutes(5), serviceProvider)
{
_scopeFactory = scopeFactory;
}
protected override async Task Run(CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("[GeneticAlgorithm] Starting genetic algorithm processing");
// TODO: Implement method to get pending genetic requests
// For now, we'll create a placeholder for the genetic algorithm logic
await ProcessPendingGeneticRequests(cancellationToken);
_logger.LogInformation("[GeneticAlgorithm] Completed genetic algorithm processing");
}
catch (Exception ex)
{
_logger.LogError(ex, "[GeneticAlgorithm] Error during genetic algorithm processing");
throw;
}
}
private async Task ProcessPendingGeneticRequests(CancellationToken cancellationToken)
{
try
{
// Get pending genetic requests from the repository using scoped service
var pendingRequests = await ServiceScopeHelpers.WithScopedService<IGeneticService, IEnumerable<GeneticRequest>>(_scopeFactory,
async geneticService => await geneticService.GetGeneticRequestsAsync(GeneticRequestStatus.Pending));
if (!pendingRequests.Any())
{
_logger.LogInformation("[GeneticAlgorithm] No pending genetic requests found");
return;
}
_logger.LogInformation("[GeneticAlgorithm] Found {Count} pending genetic requests", pendingRequests.Count());
foreach (var request in pendingRequests)
{
try
{
_logger.LogInformation("[GeneticAlgorithm] Processing request {RequestId}", request.RequestId);
// Update status to Running using scoped service
request.Status = GeneticRequestStatus.Running;
await ServiceScopeHelpers.WithScopedService<IGeneticService>(_scopeFactory,
async geneticService => await geneticService.UpdateGeneticRequestAsync(request));
// Run genetic algorithm using the service
var results = await ServiceScopeHelpers.WithScopedServices<IGeneticService, IBacktester, GeneticAlgorithmResult>(_scopeFactory,
async (geneticService, backtester) => await geneticService.RunGeneticAlgorithm(request, cancellationToken));
// Update request with results using scoped service
request.Status = GeneticRequestStatus.Completed;
request.CompletedAt = DateTime.UtcNow;
request.BestFitness = results.BestFitness;
request.BestIndividual = results.BestIndividual;
request.ProgressInfo = results.ProgressInfo;
await ServiceScopeHelpers.WithScopedService<IGeneticService>(_scopeFactory,
async geneticService => await geneticService.UpdateGeneticRequestAsync(request));
_logger.LogInformation("[GeneticAlgorithm] Successfully completed request {RequestId}", request.RequestId);
}
catch (Exception ex)
{
request.Status = GeneticRequestStatus.Failed;
request.ErrorMessage = ex.Message;
request.CompletedAt = DateTime.UtcNow;
await ServiceScopeHelpers.WithScopedService<IGeneticService>(_scopeFactory,
async geneticService => await geneticService.UpdateGeneticRequestAsync(request));
_logger.LogError(ex, "[GeneticAlgorithm] Error processing request {RequestId}", request.RequestId);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[GeneticAlgorithm] Error retrieving pending genetic requests");
throw;
}
}
}

View File

@@ -0,0 +1,29 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class LeaderboardWorker : BaseWorker<LeaderboardWorker>
{
private readonly IStatisticService _statisticService;
private static readonly WorkerType _workerType = WorkerType.LeaderboardWorker;
public LeaderboardWorker(
ILogger<LeaderboardWorker> logger,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
_workerType,
logger,
TimeSpan.FromHours(24),
serviceProvider
)
{
_statisticService = statisticService;
}
protected override async Task Run(CancellationToken cancellationToken)
{
await _statisticService.UpdateLeaderboard();
}
}

View File

@@ -0,0 +1,66 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public abstract class PricesBaseWorker<T> : BaseWorker<T> where T : class
{
private readonly IPricesService _pricesService;
private readonly IStatisticService _statisticService;
private readonly Timeframe _timeframe;
public PricesBaseWorker(
ILogger<T> logger,
IPricesService pricesService,
IServiceProvider serviceProvider,
IStatisticService statisticService,
TimeSpan delay,
WorkerType workerType,
Timeframe timeframe) : base(
workerType,
logger,
delay,
serviceProvider
)
{
_pricesService = pricesService;
_statisticService = statisticService;
_timeframe = timeframe;
}
private List<Ticker> _eligibleTickers = new List<Ticker>
{
Ticker.BTC,
Ticker.ETH,
Ticker.BNB,
Ticker.DOGE,
Ticker.ADA,
Ticker.SOL,
Ticker.XRP,
Ticker.LINK,
Ticker.RENDER,
Ticker.SUI,
Ticker.GMX,
Ticker.ARB,
Ticker.PEPE,
Ticker.PENDLE,
Ticker.AAVE,
Ticker.HYPE
};
protected override async Task Run(CancellationToken cancellationToken)
{
var tickers = await _statisticService.GetTickers();
var filteredTickers = tickers
.Where(t => _eligibleTickers.Contains(t))
.ToList();
// Filter with the eligible tickers
foreach (var ticker in filteredTickers)
{
await _pricesService.UpdatePrice(TradingExchanges.Evm, ticker, _timeframe);
}
}
}

View File

@@ -0,0 +1,24 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class PricesFifteenMinutesWorker : PricesBaseWorker<PricesFifteenMinutesWorker>
{
public PricesFifteenMinutesWorker(
ILogger<PricesFifteenMinutesWorker> logger,
IPricesService pricesService,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
logger,
pricesService,
serviceProvider,
statisticService,
TimeSpan.FromMinutes(1),
WorkerType.PriceFifteenMinutes,
Timeframe.FifteenMinutes
)
{
}
}

View File

@@ -0,0 +1,24 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class PricesFiveMinutesWorker : PricesBaseWorker<PricesFiveMinutesWorker>
{
public PricesFiveMinutesWorker(
ILogger<PricesFiveMinutesWorker> logger,
IPricesService pricesService,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
logger,
pricesService,
serviceProvider,
statisticService,
TimeSpan.FromMinutes(1),
WorkerType.PriceFiveMinutes,
Timeframe.FiveMinutes
)
{
}
}

View File

@@ -0,0 +1,24 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class PricesFourHoursWorker : PricesBaseWorker<PricesFourHoursWorker>
{
public PricesFourHoursWorker(
ILogger<PricesFourHoursWorker> logger,
IPricesService pricesService,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
logger,
pricesService,
serviceProvider,
statisticService,
TimeSpan.FromHours(2),
WorkerType.PriceFourHour,
Timeframe.FourHour
)
{
}
}

View File

@@ -0,0 +1,24 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class PricesOneDayWorker : PricesBaseWorker<PricesOneDayWorker>
{
public PricesOneDayWorker(
ILogger<PricesOneDayWorker> logger,
IPricesService pricesService,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
logger,
pricesService,
serviceProvider,
statisticService,
TimeSpan.FromHours(12),
WorkerType.PriceOneDay,
Timeframe.OneDay
)
{
}
}

View File

@@ -0,0 +1,23 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
public class PricesOneHourWorker : PricesBaseWorker<PricesOneHourWorker>
{
public PricesOneHourWorker(
ILogger<PricesOneHourWorker> logger,
IPricesService pricesService,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
logger,
pricesService,
serviceProvider,
statisticService,
TimeSpan.FromMinutes(30),
WorkerType.PriceOneHour,
Timeframe.OneHour)
{
}
}

View File

@@ -0,0 +1,35 @@
using Managing.Application.Abstractions.Services;
using Managing.Common;
using Microsoft.Extensions.Logging;
namespace Managing.Application.Workers;
public class SpotlightWorker : BaseWorker<SpotlightWorker>
{
private readonly IStatisticService _statisticService;
public SpotlightWorker(
ILogger<SpotlightWorker> logger,
IServiceProvider serviceProvider,
IStatisticService statisticService) : base(
Enums.WorkerType.Spotlight,
logger,
TimeSpan.FromMinutes(5),
serviceProvider)
{
_statisticService = statisticService;
}
protected override async Task Run(CancellationToken cancellationToken)
{
try
{
await _statisticService.UpdateSpotlight();
}
catch (Exception ex)
{
_logger.LogError("Enable to update spotlight", ex);
throw;
}
}
}

View File

@@ -0,0 +1,43 @@
using Managing.Application.Abstractions.Services;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
/// <summary>
/// Represents a worker that watches traders and performs actions based on trading activities.
/// Inherits from <see cref="BaseWorker{TWorker}"/> where TWorker is <see cref="TraderWatcher"/>.
/// </summary>
public class TraderWatcher : BaseWorker<TraderWatcher>
{
private readonly ITradingService _tradingService;
private static readonly WorkerType _workerType = WorkerType.TraderWatcher;
/// <summary>
/// Initializes a new instance of the <see cref="TraderWatcher"/> class.
/// </summary>
/// <param name="logger">The logger to be used by the worker.</param>
/// <param name="tradingService">The trading service to monitor trading activities.</param>
/// <param name="workerService">The worker service to manage worker lifecycle.</param>
public TraderWatcher(
ILogger<TraderWatcher> logger,
IServiceProvider serviceProvider,
ITradingService tradingService) : base(
_workerType,
logger,
TimeSpan.FromSeconds(120),
serviceProvider
)
{
_tradingService = tradingService;
}
/// <summary>
/// Executes the worker's task to watch traders. This method is called periodically based on the worker's configured interval.
/// </summary>
/// <param name="cancellationToken">A token to observe while waiting for the task to complete.</param>
protected override async Task Run(CancellationToken cancellationToken)
{
await _tradingService.WatchTrader();
}
}

View File

@@ -0,0 +1,69 @@
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Common;
using Managing.Domain.Workers;
namespace Managing.Application.Workers;
public class WorkerService : IWorkerService
{
private readonly IWorkerRepository _workerRepository;
public WorkerService(IWorkerRepository workerRepository)
{
_workerRepository = workerRepository;
}
public async Task<Worker> GetWorker(Enums.WorkerType workerType)
{
return await _workerRepository.GetWorkerAsync(workerType);
}
public async Task InsertWorker(Enums.WorkerType workerType, TimeSpan delay)
{
var worker = new Worker()
{
WorkerType = workerType,
StartTime = DateTime.UtcNow,
LastRunTime = null,
ExecutionCount = 0,
Delay = delay
};
await _workerRepository.InsertWorker(worker);
}
public async Task UpdateWorker(Enums.WorkerType workerType, int executionCount)
{
await _workerRepository.UpdateWorker(workerType, executionCount);
}
public async Task DisableWorker(Enums.WorkerType workerType)
{
await _workerRepository.DisableWorker(workerType);
}
public async Task EnableWorker(Enums.WorkerType workerType)
{
await _workerRepository.EnableWorker(workerType);
}
public async Task<IEnumerable<Worker>> GetWorkers()
{
return await _workerRepository.GetWorkers();
}
public async Task<bool> ToggleWorker(Enums.WorkerType workerType)
{
var worker = await GetWorker(workerType);
if (worker.IsActive)
{
_ = _workerRepository.DisableWorker(workerType);
return false;
}
else
{
_ = _workerRepository.EnableWorker(workerType);
return true;
}
}
}