Remove expected candle and fix update candles

This commit is contained in:
2025-09-17 22:04:53 +07:00
parent 841bb20800
commit 0cb7672a01
3 changed files with 63 additions and 15 deletions

View File

@@ -126,6 +126,13 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
_state.State.Candles = new List<Candle>();
}
// Check for gaps in the candle data
if (HasGapInCandleData(candle))
{
await LoadInitialCandlesAsync(candle.Exchange, candle.Ticker, candle.Timeframe);
return;
}
// Add the new candle
_state.State.Candles.Add(candle);
@@ -163,6 +170,58 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
return Task.CompletedTask;
}
/// <summary>
/// Checks if there's a gap in the candle data by comparing the time interval
/// between the new candle and the last candle in the store.
/// </summary>
/// <param name="newCandle">The new candle received from the stream</param>
/// <returns>True if a gap is detected, false otherwise</returns>
private bool HasGapInCandleData(Candle newCandle)
{
try
{
// If no candles exist, no gap to check
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
{
return false;
}
// Get the last candle from the store
var lastCandle = _state.State.Candles
.OrderBy(c => c.Date)
.LastOrDefault();
if (lastCandle == null)
{
return false;
}
// Calculate the expected time interval for this timeframe
var expectedInterval = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(newCandle.Timeframe));
// Calculate the actual time difference between the new candle and the last candle
var actualInterval = newCandle.Date - lastCandle.Date;
// Check if the actual interval matches the expected interval (with a small tolerance)
var tolerance = TimeSpan.FromSeconds(1); // 1 second tolerance for minor timing differences
var isGap = Math.Abs((actualInterval - expectedInterval).TotalSeconds) > tolerance.TotalSeconds;
if (isGap)
{
_logger.LogWarning("Gap detected: Expected interval {ExpectedInterval}, actual interval {ActualInterval} for {GrainKey}",
expectedInterval, actualInterval, this.GetPrimaryKeyString());
}
return isGap;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking for gaps in candle data for grain {GrainKey}", this.GetPrimaryKeyString());
// In case of error, assume no gap to avoid unnecessary refreshes
return false;
}
}
private async Task LoadInitialCandlesAsync(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
{
try

View File

@@ -1,4 +1,3 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Application.Shared;
@@ -131,15 +130,6 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
var isFirstCall = !existingCandles.Any();
// Check if the next expected candle is available yet
var nextExpectedCandleTime = CandleHelpers.GetNextExpectedCandleTime(timeframe);
if (nextExpectedCandleTime > DateTime.UtcNow)
{
_logger.LogDebug("Next candle for {Exchange}-{Ticker}-{Timeframe} not available yet. Expected at {NextCandleTime}, current time: {CurrentTime}",
exchange, ticker, timeframe, nextExpectedCandleTime, DateTime.UtcNow);
return; // Skip this fetch as the new candle won't be available
}
var startDate = !isFirstCall
? existingCandles.Last().Date
: new DateTime(2017, 1, 1);
@@ -172,10 +162,10 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
{
await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false);
_logger.LogDebug(
"[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}",
_logger.LogInformation(
"[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} new candles",
ticker, exchange, timeframe,
processedCandles.Count, streamKey);
processedCandles.Count);
}
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
@@ -201,7 +191,6 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
{
if (reminderName == FetchPricesReminderName)
{
// Trigger fetch on each reminder tick
return FetchAndPublishPricesAsync();
}

View File

@@ -28,7 +28,7 @@ public class PriceFetcherInitializer : IHostedService
foreach (var timeframe in timeframes)
{
var grain = _grainFactory.GetGrain<IPriceFetcherGrain>(timeframe.ToString());
Console.WriteLine("GrainId for {0}: {1}", timeframe, grain.GetGrainId());
Console.WriteLine("Init grain for {0}: {1}", timeframe, grain.GetGrainId());
// Actually call a method on the grain to activate it
// This will trigger OnActivateAsync and register the reminder