340 lines
12 KiB
C#
340 lines
12 KiB
C#
using Managing.Application.Abstractions.Grains;
|
|
using Managing.Application.Abstractions.Repositories;
|
|
using Managing.Domain.Candles;
|
|
using Microsoft.Extensions.Logging;
|
|
using Orleans.Streams;
|
|
using static Managing.Common.Enums;
|
|
|
|
namespace Managing.Application.Grains;
|
|
|
|
/// <summary>
|
|
/// Grain for managing in-memory historical candle data with Orleans state persistence.
|
|
/// Subscribes to price streams and maintains a rolling window of 500 candles.
|
|
/// </summary>
|
|
public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
|
|
{
|
|
private readonly IPersistentState<CandleStoreGrainState> _state;
|
|
private readonly ILogger<CandleStoreGrain> _logger;
|
|
private readonly ICandleRepository _candleRepository;
|
|
|
|
private const int MaxCandleCount = 500;
|
|
private IAsyncStream<Candle> _priceStream;
|
|
private StreamSubscriptionHandle<Candle> _streamSubscription;
|
|
|
|
public CandleStoreGrain(
|
|
[PersistentState("candle-store-state", "candle-store")]
|
|
IPersistentState<CandleStoreGrainState> state,
|
|
ILogger<CandleStoreGrain> logger,
|
|
ICandleRepository candleRepository)
|
|
{
|
|
_state = state;
|
|
_logger = logger;
|
|
_candleRepository = candleRepository;
|
|
}
|
|
|
|
public override async Task OnActivateAsync(CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
var grainKey = this.GetPrimaryKeyString();
|
|
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
|
|
|
|
// Parse the grain key to extract exchange, ticker, and timeframe
|
|
var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey);
|
|
|
|
await LoadInitialCandlesAsync(exchange, ticker, timeframe);
|
|
|
|
// Subscribe to the price stream
|
|
await SubscribeToPriceStreamAsync(grainKey);
|
|
|
|
await base.OnActivateAsync(cancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error during CandleStoreGrain activation for key: {GrainKey}",
|
|
this.GetPrimaryKeyString());
|
|
|
|
// Ensure state is initialized even if there's an error
|
|
if (_state.State.Candles == null)
|
|
{
|
|
_state.State.Candles = new List<Candle>();
|
|
await _state.WriteStateAsync();
|
|
}
|
|
|
|
throw; // Re-throw to let Orleans handle the activation failure
|
|
}
|
|
}
|
|
|
|
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
|
|
{
|
|
// Unsubscribe from the stream with proper error handling
|
|
if (_streamSubscription != null)
|
|
{
|
|
try
|
|
{
|
|
await _streamSubscription.UnsubscribeAsync();
|
|
_logger.LogDebug("Successfully unsubscribed from stream for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Log the error but don't throw - this is common during shutdown when
|
|
// the pub-sub rendezvous grain may already be deactivated
|
|
_logger.LogWarning(ex, "Failed to unsubscribe from stream during deactivation for grain {GrainKey}. This is normal during shutdown.",
|
|
this.GetPrimaryKeyString());
|
|
}
|
|
finally
|
|
{
|
|
_streamSubscription = null;
|
|
}
|
|
}
|
|
|
|
await base.OnDeactivateAsync(reason, cancellationToken);
|
|
}
|
|
|
|
public Task<HashSet<Candle>> GetCandlesAsync()
|
|
{
|
|
try
|
|
{
|
|
// Ensure state is initialized
|
|
if (_state.State.Candles == null)
|
|
{
|
|
_logger.LogWarning("State not initialized for grain {GrainKey}, returning empty list",
|
|
this.GetPrimaryKeyString());
|
|
return Task.FromResult(new HashSet<Candle>());
|
|
}
|
|
|
|
return Task.FromResult(_state.State.Candles.ToHashSet());
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error retrieving candles for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
return Task.FromResult(new HashSet<Candle>());
|
|
}
|
|
}
|
|
|
|
// Stream observer implementation
|
|
public async Task OnNextAsync(Candle candle, StreamSequenceToken token = null)
|
|
{
|
|
try
|
|
{
|
|
_logger.LogDebug("Received new candle for {GrainKey} at {Date}",
|
|
this.GetPrimaryKeyString(), candle.Date);
|
|
|
|
// Initialize state if needed
|
|
if (_state.State.Candles == null)
|
|
{
|
|
_state.State.Candles = new List<Candle>();
|
|
}
|
|
|
|
// Check for gaps in the candle data
|
|
if (HasGapInCandleData(candle))
|
|
{
|
|
await LoadInitialCandlesAsync(candle.Exchange, candle.Ticker, candle.Timeframe);
|
|
return;
|
|
}
|
|
|
|
// Add the new candle
|
|
_state.State.Candles.Add(candle);
|
|
|
|
// Maintain rolling window of 500 candles
|
|
if (_state.State.Candles.Count > MaxCandleCount)
|
|
{
|
|
// Sort by date and keep the most recent 500
|
|
_state.State.Candles = _state.State.Candles
|
|
.OrderBy(c => c.Date)
|
|
.TakeLast(MaxCandleCount)
|
|
.ToList();
|
|
}
|
|
|
|
// Persist the updated state
|
|
await _state.WriteStateAsync();
|
|
|
|
_logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
|
|
this.GetPrimaryKeyString(), _state.State.Candles.Count);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing new candle for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
}
|
|
}
|
|
|
|
public Task OnCompletedAsync()
|
|
{
|
|
_logger.LogInformation("Stream completed for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task OnErrorAsync(Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Stream error for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Checks if there's a gap in the candle data by comparing the time interval
|
|
/// between the new candle and the last candle in the store.
|
|
/// </summary>
|
|
/// <param name="newCandle">The new candle received from the stream</param>
|
|
/// <returns>True if a gap is detected, false otherwise</returns>
|
|
private bool HasGapInCandleData(Candle newCandle)
|
|
{
|
|
try
|
|
{
|
|
// If no candles exist, no gap to check
|
|
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
// Get the last candle from the store
|
|
var lastCandle = _state.State.Candles
|
|
.OrderBy(c => c.Date)
|
|
.LastOrDefault();
|
|
|
|
if (lastCandle == null)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
// Calculate the expected time interval for this timeframe
|
|
var expectedInterval = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(newCandle.Timeframe));
|
|
|
|
// Calculate the actual time difference between the new candle and the last candle
|
|
var actualInterval = newCandle.Date - lastCandle.Date;
|
|
|
|
// Check if the actual interval matches the expected interval (with a small tolerance)
|
|
var tolerance = TimeSpan.FromSeconds(1); // 1 second tolerance for minor timing differences
|
|
var isGap = Math.Abs((actualInterval - expectedInterval).TotalSeconds) > tolerance.TotalSeconds;
|
|
|
|
if (isGap)
|
|
{
|
|
_logger.LogWarning("Gap detected: Expected interval {ExpectedInterval}, actual interval {ActualInterval} for {GrainKey}",
|
|
expectedInterval, actualInterval, this.GetPrimaryKeyString());
|
|
}
|
|
|
|
return isGap;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error checking for gaps in candle data for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
// In case of error, assume no gap to avoid unnecessary refreshes
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private async Task LoadInitialCandlesAsync(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
|
|
{
|
|
try
|
|
{
|
|
_logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
|
exchange, ticker, timeframe);
|
|
|
|
// Ensure state is initialized first
|
|
if (_state.State.Candles == null)
|
|
{
|
|
_state.State.Candles = new List<Candle>();
|
|
}
|
|
|
|
// Load the last 500 candles from the database
|
|
var endDate = DateTime.UtcNow;
|
|
var startDate = CandleHelpers.GetBotPreloadSinceFromTimeframe(timeframe);
|
|
|
|
var candles =
|
|
await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
|
|
|
|
if (candles?.Any() == true)
|
|
{
|
|
_state.State.Candles = candles
|
|
.OrderBy(c => c.Date)
|
|
.TakeLast(MaxCandleCount)
|
|
.ToList();
|
|
|
|
await _state.WriteStateAsync();
|
|
|
|
_logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
|
_state.State.Candles.Count, exchange, ticker, timeframe);
|
|
}
|
|
else
|
|
{
|
|
_state.State.Candles = new List<Candle>();
|
|
await _state.WriteStateAsync();
|
|
|
|
_logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
|
|
exchange, ticker, timeframe);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
|
exchange, ticker, timeframe);
|
|
|
|
// Initialize empty state on error
|
|
_state.State.Candles = new List<Candle>();
|
|
await _state.WriteStateAsync();
|
|
}
|
|
}
|
|
|
|
private async Task SubscribeToPriceStreamAsync(string streamKey)
|
|
{
|
|
try
|
|
{
|
|
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
|
|
_priceStream = streamProvider.GetStream<Candle>("Candles", streamKey);
|
|
|
|
_streamSubscription = await _priceStream.SubscribeAsync(this);
|
|
|
|
_logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey);
|
|
}
|
|
}
|
|
|
|
public Task<List<Candle>> GetLastCandle(int count = 1)
|
|
{
|
|
try
|
|
{
|
|
// Ensure state is initialized
|
|
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
|
|
{
|
|
_logger.LogDebug("No candles available for grain {GrainKey}", this.GetPrimaryKeyString());
|
|
return Task.FromResult(new List<Candle>());
|
|
}
|
|
|
|
// Validate count parameter
|
|
if (count <= 0)
|
|
{
|
|
_logger.LogWarning("Invalid count parameter {Count} for grain {GrainKey}, using default value 1",
|
|
count, this.GetPrimaryKeyString());
|
|
count = 1;
|
|
}
|
|
|
|
// Get the last X candles, ordered by date
|
|
var lastCandles = _state.State.Candles
|
|
.OrderBy(c => c.Date)
|
|
.TakeLast(count)
|
|
.ToList();
|
|
|
|
_logger.LogDebug("Retrieved {Count} latest candles for grain {GrainKey}",
|
|
lastCandles.Count, this.GetPrimaryKeyString());
|
|
|
|
return Task.FromResult(lastCandles);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error retrieving last {Count} candles for grain {GrainKey}",
|
|
count, this.GetPrimaryKeyString());
|
|
return Task.FromResult(new List<Candle>());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// State object for CandleStoreGrain containing the rolling window of candles
|
|
/// </summary>
|
|
[GenerateSerializer]
|
|
public class CandleStoreGrainState
|
|
{
|
|
[Id(0)] public List<Candle> Candles { get; set; } = new();
|
|
} |