fix plug to the store
This commit is contained in:
@@ -34,22 +34,38 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
|
||||
|
||||
public override async Task OnActivateAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var grainKey = this.GetPrimaryKeyString();
|
||||
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
|
||||
|
||||
// Parse the grain key to extract exchange, ticker, and timeframe
|
||||
var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey);
|
||||
|
||||
// Initialize state if empty
|
||||
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
|
||||
try
|
||||
{
|
||||
await LoadInitialCandlesAsync(exchange, ticker, timeframe);
|
||||
var grainKey = this.GetPrimaryKeyString();
|
||||
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
|
||||
|
||||
// Parse the grain key to extract exchange, ticker, and timeframe
|
||||
var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey);
|
||||
|
||||
// Initialize state if empty
|
||||
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
|
||||
{
|
||||
await LoadInitialCandlesAsync(exchange, ticker, timeframe);
|
||||
}
|
||||
|
||||
// Subscribe to the price stream
|
||||
await SubscribeToPriceStreamAsync(grainKey);
|
||||
|
||||
await base.OnActivateAsync(cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error during CandleStoreGrain activation for key: {GrainKey}", this.GetPrimaryKeyString());
|
||||
|
||||
// Ensure state is initialized even if there's an error
|
||||
if (_state.State.Candles == null)
|
||||
{
|
||||
_state.State.Candles = new List<Candle>();
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
throw; // Re-throw to let Orleans handle the activation failure
|
||||
}
|
||||
|
||||
// Subscribe to the price stream
|
||||
await SubscribeToPriceStreamAsync(grainKey);
|
||||
|
||||
await base.OnActivateAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
|
||||
@@ -68,7 +84,14 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
|
||||
{
|
||||
try
|
||||
{
|
||||
return Task.FromResult(_state.State.Candles?.ToList() ?? new List<Candle>());
|
||||
// Ensure state is initialized
|
||||
if (_state.State.Candles == null)
|
||||
{
|
||||
_logger.LogWarning("State not initialized for grain {GrainKey}, returning empty list", this.GetPrimaryKeyString());
|
||||
return Task.FromResult(new List<Candle>());
|
||||
}
|
||||
|
||||
return Task.FromResult(_state.State.Candles.ToList());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -135,14 +158,17 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
|
||||
_logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
||||
exchange, ticker, timeframe);
|
||||
|
||||
// Ensure state is initialized first
|
||||
if (_state.State.Candles == null)
|
||||
{
|
||||
_state.State.Candles = new List<Candle>();
|
||||
}
|
||||
|
||||
// Load the last 500 candles from the database
|
||||
var endDate = DateTime.UtcNow;
|
||||
var startDate =
|
||||
CandleHelpers
|
||||
.GetBotPreloadSinceFromTimeframe(timeframe); // Look back 30 days to ensure we get enough data
|
||||
var startDate = CandleHelpers.GetBotPreloadSinceFromTimeframe(timeframe);
|
||||
|
||||
var candles =
|
||||
await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
|
||||
var candles = await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
|
||||
|
||||
if (candles?.Any() == true)
|
||||
{
|
||||
@@ -195,7 +221,22 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
|
||||
|
||||
public Task<Candle> GetLastCandle()
|
||||
{
|
||||
return Task.FromResult(_state.State.Candles?.LastOrDefault());
|
||||
try
|
||||
{
|
||||
// Ensure state is initialized
|
||||
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
|
||||
{
|
||||
_logger.LogDebug("No candles available for grain {GrainKey}", this.GetPrimaryKeyString());
|
||||
return Task.FromResult<Candle>(null);
|
||||
}
|
||||
|
||||
return Task.FromResult(_state.State.Candles.LastOrDefault());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error retrieving last candle for grain {GrainKey}", this.GetPrimaryKeyString());
|
||||
return Task.FromResult<Candle>(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -106,12 +106,12 @@ public class PriceFetcher15MinGrain : Grain, IPriceFetcher15MinGrain, IRemindabl
|
||||
|
||||
// Get the last candle date from database
|
||||
var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe,
|
||||
DateTime.UtcNow.AddDays(-30), 1);
|
||||
DateTime.UtcNow.AddDays(-30), DateTime.UtcNow.AddDays(1), 20);
|
||||
|
||||
var isFirstCall = !existingCandles.Any();
|
||||
|
||||
var startDate = !isFirstCall
|
||||
? existingCandles.Max(c => c.Date).AddMinutes(GetTimeframeMinutes(timeframe))
|
||||
? existingCandles.Last().Date
|
||||
: new DateTime(2017, 1, 1);
|
||||
|
||||
// Fetch new candles from external API
|
||||
@@ -125,7 +125,9 @@ public class PriceFetcher15MinGrain : Grain, IPriceFetcher15MinGrain, IRemindabl
|
||||
newCandles.Count, streamKey);
|
||||
|
||||
// Process all new candles
|
||||
var processedCandles = newCandles.OrderBy(c => c.Date).ToList();
|
||||
var processedCandles = newCandles.OrderBy(c => c.Date)
|
||||
.Where(c => c.Date <= DateTime.UtcNow) // Avoid duplicates
|
||||
.ToList();
|
||||
|
||||
foreach (var candle in processedCandles)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user