Fix reminder for 1h and 4h

This commit is contained in:
2025-09-17 15:28:38 +07:00
parent 2e953ddf77
commit 81d9b4527b

View File

@@ -6,18 +6,16 @@ using Managing.Common;
using Managing.Domain.Accounts; using Managing.Domain.Accounts;
using Managing.Domain.Candles; using Managing.Domain.Candles;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Streams; using Orleans.Streams;
using static Managing.Common.Enums; using static Managing.Common.Enums;
namespace Managing.Application.Grains; namespace Managing.Application.Grains;
/// <summary> /// <summary>
/// StatelessWorker grain for fetching price data from external APIs and publishing to Orleans streams. /// Grain for fetching price data from external APIs and publishing to Orleans streams.
/// This grain runs periodically and processes all exchange/ticker combinations for a specific timeframe. /// This grain runs periodically and processes all exchange/ticker combinations for a specific timeframe.
/// The timeframe is passed as the PrimaryKeyString to identify which timeframe this grain handles. /// The timeframe is passed as the PrimaryKeyString to identify which timeframe this grain handles.
/// </summary> /// </summary>
[StatelessWorker]
public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
{ {
private readonly ILogger<PriceFetcherGrain> _logger; private readonly ILogger<PriceFetcherGrain> _logger;
@@ -26,7 +24,6 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
private readonly IGrainFactory _grainFactory; private readonly IGrainFactory _grainFactory;
private string FetchPricesReminderName => $"Fetch{TargetTimeframe}PricesReminder"; private string FetchPricesReminderName => $"Fetch{TargetTimeframe}PricesReminder";
private IDisposable _timer;
// Predefined lists of trading parameters to fetch // Predefined lists of trading parameters to fetch
private static readonly TradingExchanges[] SupportedExchanges = private static readonly TradingExchanges[] SupportedExchanges =
@@ -64,13 +61,18 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
_logger.LogInformation("{0} activated for timeframe {1}", nameof(PriceFetcherGrain), TargetTimeframe); _logger.LogInformation("{0} activated for timeframe {1}", nameof(PriceFetcherGrain), TargetTimeframe);
// Register a reminder to enable timer if not existing // Register a reminder aligned to timeframe boundaries
var intervalMinutes = GrainHelpers.GetIntervalMinutes(TargetTimeframe); var intervalMinutes = GrainHelpers.GetIntervalMinutes(TargetTimeframe);
var now = DateTime.UtcNow;
var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now);
await this.RegisterOrUpdateReminder( await this.RegisterOrUpdateReminder(
FetchPricesReminderName, FetchPricesReminderName,
TimeSpan.FromSeconds(10), dueTime,
TimeSpan.FromMinutes(intervalMinutes)); TimeSpan.FromMinutes(intervalMinutes));
// Optional immediate kick-off to avoid waiting until next boundary
_ = FetchAndPublishPricesAsync();
await base.OnActivateAsync(cancellationToken); await base.OnActivateAsync(cancellationToken);
} }
@@ -79,7 +81,6 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
_logger.LogInformation("{0} deactivating for timeframe {1}. Reason: {Reason}", _logger.LogInformation("{0} deactivating for timeframe {1}. Reason: {Reason}",
nameof(PriceFetcherGrain), TargetTimeframe, reason.Description); nameof(PriceFetcherGrain), TargetTimeframe, reason.Description);
StopAndDisposeTimer();
await base.OnDeactivateAsync(reason, cancellationToken); await base.OnDeactivateAsync(reason, cancellationToken);
} }
@@ -190,53 +191,12 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
} }
} }
/// <summary>
/// Starts the Orleans timer for periodic price fetching
/// </summary>
private void RegisterAndStartTimer()
{
if (_timer != null) return;
// Calculate the next execution time aligned to X-minute boundaries
var now = DateTime.UtcNow;
var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now);
var period = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(TargetTimeframe));
_logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3}",
string.Join("-", nameof(PriceFetcherGrain), TargetTimeframe), dueTime, now.Add(dueTime), period);
_timer = this.RegisterGrainTimer(
async _ => await FetchAndPublishPricesAsync(),
new GrainTimerCreationOptions
{
Period = period,
DueTime = dueTime,
KeepAlive = true
});
_logger.LogInformation("{0} timer registered and started for timeframe {1}", nameof(PriceFetcherGrain),
TargetTimeframe);
}
private void StopAndDisposeTimer()
{
if (_timer != null)
{
_timer?.Dispose();
_timer = null;
_logger.LogInformation("{0} timer stopped and disposed for timeframe {1}", nameof(PriceFetcherGrain),
TargetTimeframe);
}
}
public Task ReceiveReminder(string reminderName, TickStatus status) public Task ReceiveReminder(string reminderName, TickStatus status)
{ {
if (reminderName == FetchPricesReminderName) if (reminderName == FetchPricesReminderName)
{ {
// Only enable timer if not existing anymore // Trigger fetch on each reminder tick
if (_timer == null) return FetchAndPublishPricesAsync();
{
RegisterAndStartTimer();
}
} }
return Task.CompletedTask; return Task.CompletedTask;