using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Repositories; using Managing.Domain.Candles; using Microsoft.Extensions.Logging; using Orleans.Streams; using static Managing.Common.Enums; namespace Managing.Application.Grains; /// /// Grain for managing in-memory historical candle data with Orleans state persistence. /// Subscribes to price streams and maintains a rolling window of 500 candles. /// public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver { private readonly IPersistentState _state; private readonly ILogger _logger; private readonly ICandleRepository _candleRepository; private const int MaxCandleCount = 500; private IAsyncStream _priceStream; private StreamSubscriptionHandle _streamSubscription; public CandleStoreGrain( [PersistentState("candle-store-state", "candle-store")] IPersistentState state, ILogger logger, ICandleRepository candleRepository) { _state = state; _logger = logger; _candleRepository = candleRepository; } public override async Task OnActivateAsync(CancellationToken cancellationToken) { try { var grainKey = this.GetPrimaryKeyString(); _logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey); // Parse the grain key to extract exchange, ticker, and timeframe var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey); await LoadInitialCandlesAsync(exchange, ticker, timeframe); // Subscribe to the price stream await SubscribeToPriceStreamAsync(grainKey); await base.OnActivateAsync(cancellationToken); } catch (Exception ex) { _logger.LogError(ex, "Error during CandleStoreGrain activation for key: {GrainKey}", this.GetPrimaryKeyString()); // Ensure state is initialized even if there's an error if (_state.State.Candles == null) { _state.State.Candles = new List(); await _state.WriteStateAsync(); } throw; // Re-throw to let Orleans handle the activation failure } } public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken) { // Unsubscribe from the stream with proper error handling if (_streamSubscription != null) { try { await _streamSubscription.UnsubscribeAsync(); _logger.LogDebug("Successfully unsubscribed from stream for grain {GrainKey}", this.GetPrimaryKeyString()); } catch (Exception ex) { // Log the error but don't throw - this is common during shutdown when // the pub-sub rendezvous grain may already be deactivated _logger.LogWarning(ex, "Failed to unsubscribe from stream during deactivation for grain {GrainKey}. This is normal during shutdown.", this.GetPrimaryKeyString()); } finally { _streamSubscription = null; } } await base.OnDeactivateAsync(reason, cancellationToken); } public Task> GetCandlesAsync() { try { // Ensure state is initialized if (_state.State.Candles == null) { _logger.LogWarning("State not initialized for grain {GrainKey}, returning empty list", this.GetPrimaryKeyString()); return Task.FromResult(new HashSet()); } return Task.FromResult(_state.State.Candles.ToHashSet()); } catch (Exception ex) { _logger.LogError(ex, "Error retrieving candles for grain {GrainKey}", this.GetPrimaryKeyString()); return Task.FromResult(new HashSet()); } } // Stream observer implementation public async Task OnNextAsync(Candle candle, StreamSequenceToken token = null) { try { _logger.LogDebug("Received new candle for {GrainKey} at {Date}", this.GetPrimaryKeyString(), candle.Date); // Initialize state if needed if (_state.State.Candles == null) { _state.State.Candles = new List(); } // Add the new candle _state.State.Candles.Add(candle); // Maintain rolling window of 500 candles if (_state.State.Candles.Count > MaxCandleCount) { // Sort by date and keep the most recent 500 _state.State.Candles = _state.State.Candles .OrderBy(c => c.Date) .TakeLast(MaxCandleCount) .ToList(); } // Persist the updated state await _state.WriteStateAsync(); _logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}", this.GetPrimaryKeyString(), _state.State.Candles.Count); } catch (Exception ex) { _logger.LogError(ex, "Error processing new candle for grain {GrainKey}", this.GetPrimaryKeyString()); } } public Task OnCompletedAsync() { _logger.LogInformation("Stream completed for grain {GrainKey}", this.GetPrimaryKeyString()); return Task.CompletedTask; } public Task OnErrorAsync(Exception ex) { _logger.LogError(ex, "Stream error for grain {GrainKey}", this.GetPrimaryKeyString()); return Task.CompletedTask; } private async Task LoadInitialCandlesAsync(TradingExchanges exchange, Ticker ticker, Timeframe timeframe) { try { _logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}", exchange, ticker, timeframe); // Ensure state is initialized first if (_state.State.Candles == null) { _state.State.Candles = new List(); } // Load the last 500 candles from the database var endDate = DateTime.UtcNow; var startDate = CandleHelpers.GetBotPreloadSinceFromTimeframe(timeframe); var candles = await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount); if (candles?.Any() == true) { _state.State.Candles = candles .OrderBy(c => c.Date) .TakeLast(MaxCandleCount) .ToList(); await _state.WriteStateAsync(); _logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}", _state.State.Candles.Count, exchange, ticker, timeframe); } else { _state.State.Candles = new List(); await _state.WriteStateAsync(); _logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}", exchange, ticker, timeframe); } } catch (Exception ex) { _logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}", exchange, ticker, timeframe); // Initialize empty state on error _state.State.Candles = new List(); await _state.WriteStateAsync(); } } private async Task SubscribeToPriceStreamAsync(string streamKey) { try { var streamProvider = this.GetStreamProvider("ManagingStreamProvider"); _priceStream = streamProvider.GetStream("Candles", streamKey); _streamSubscription = await _priceStream.SubscribeAsync(this); _logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey); } catch (Exception ex) { _logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey); } } public Task GetLastCandle() { try { // Ensure state is initialized if (_state.State.Candles == null || _state.State.Candles.Count == 0) { _logger.LogDebug("No candles available for grain {GrainKey}", this.GetPrimaryKeyString()); return Task.FromResult(null); } return Task.FromResult(_state.State.Candles.LastOrDefault()); } catch (Exception ex) { _logger.LogError(ex, "Error retrieving last candle for grain {GrainKey}", this.GetPrimaryKeyString()); return Task.FromResult(null); } } } /// /// State object for CandleStoreGrain containing the rolling window of candles /// [GenerateSerializer] public class CandleStoreGrainState { [Id(0)] public List Candles { get; set; } = new(); }