using Managing.Application.Abstractions; using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Models; using Managing.Application.Abstractions.Services; using Managing.Application.Orleans; using Managing.Domain.Bots; using Managing.Domain.Candles; using Microsoft.Extensions.Logging; using static Managing.Common.Enums; namespace Managing.Application.Grains; /// /// Grain for managing platform-wide summary metrics with real-time updates and periodic snapshots /// Uses custom trading placement with load balancing and built-in fallback. /// [TradingPlacement] // Use custom trading placement with load balancing public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable { private readonly IPersistentState _state; private readonly IBotService _botService; private readonly IAgentService _agentService; private readonly ITradingService _tradingService; private readonly ILogger _logger; private const string _dailySnapshotReminder = "DailySnapshot"; public PlatformSummaryGrain( [PersistentState("platform-summary-state", "platform-summary-store")] IPersistentState state, IBotService botService, IAgentService agentService, ITradingService tradingService, ILogger logger) { _state = state; _botService = botService; _agentService = agentService; _tradingService = tradingService; _logger = logger; } public override async Task OnActivateAsync(CancellationToken cancellationToken) { _logger.LogInformation("Platform Summary Grain activated"); // Set up reminder for daily snapshots using precise timing var now = DateTime.UtcNow; // Daily reminder - runs at midnight (00:00 UTC) var nextDailyTime = CandleHelpers.GetNextExpectedCandleTime(Timeframe.OneDay, now); var timeUntilNextDay = nextDailyTime - now; await this.RegisterOrUpdateReminder(_dailySnapshotReminder, timeUntilNextDay, TimeSpan.FromDays(1).Add(TimeSpan.FromMinutes(3))); _logger.LogInformation("Daily reminder scheduled - Next daily: {NextDaily}", nextDailyTime); await RefreshDataAsync(); // // Initial data load if state is empty // if (_state.State.LastUpdated == default) // { // await RefreshDataAsync(); // } } public async Task GetPlatformSummaryAsync() { // If data is stale or has pending changes, refresh it if (IsDataStale() || _state.State.HasPendingChanges) { await RefreshDataAsync(); } return MapToViewModel(_state.State); } public async Task RefreshDataAsync() { try { _logger.LogInformation("Refreshing platform summary data"); var agents = await _agentService.GetAllAgentSummaries(); var strategies = await _botService.GetBotsAsync(); // Calculate totals var totalAgents = agents.Count(); var totalActiveStrategies = strategies.Count(s => s.Status == BotStatus.Running); // Calculate volume and PnL from strategies var totalVolume = strategies.Sum(s => s.Volume); var totalPnL = strategies.Sum(s => s.Pnl); // Calculate real open interest and position count from actual positions var (totalOpenInterest, totalPositionCount) = await CalculatePositionMetricsAsync(); // Update state _state.State.TotalAgents = totalAgents; _state.State.TotalActiveStrategies = totalActiveStrategies; _state.State.TotalPlatformVolume = totalVolume; _state.State.TotalPlatformPnL = totalPnL; _state.State.OpenInterest = totalOpenInterest; _state.State.TotalPositionCount = totalPositionCount; _state.State.LastUpdated = DateTime.UtcNow; _state.State.HasPendingChanges = false; // Update volume breakdown by asset UpdateVolumeBreakdown(strategies); // Update position count breakdown UpdatePositionCountBreakdown(strategies); await _state.WriteStateAsync(); _logger.LogInformation("Platform summary data refreshed successfully"); } catch (Exception ex) { _logger.LogError(ex, "Error refreshing platform summary data"); } } private void UpdateVolumeBreakdown(IEnumerable strategies) { _state.State.VolumeByAsset.Clear(); // Group strategies by ticker and sum their volumes var volumeByAsset = strategies .Where(s => s.Volume > 0) .GroupBy(s => s.Ticker) .ToDictionary(g => g.Key, g => g.Sum(s => s.Volume)); foreach (var kvp in volumeByAsset) { _state.State.VolumeByAsset[kvp.Key] = kvp.Value; } _logger.LogDebug("Updated volume breakdown: {AssetCount} assets with total volume {TotalVolume}", volumeByAsset.Count, volumeByAsset.Values.Sum()); } private void UpdatePositionCountBreakdown(IEnumerable strategies) { _state.State.PositionCountByAsset.Clear(); _state.State.PositionCountByDirection.Clear(); // Use position counts directly from bot statistics var activeStrategies = strategies.Where(s => s.Status != BotStatus.Saved).ToList(); if (activeStrategies.Any()) { // Group by asset and sum position counts per asset var positionsByAsset = activeStrategies .GroupBy(s => s.Ticker) .ToDictionary(g => g.Key, g => g.Sum(b => b.LongPositionCount + b.ShortPositionCount)); // Sum long and short position counts across all bots var totalLongPositions = activeStrategies.Sum(s => s.LongPositionCount); var totalShortPositions = activeStrategies.Sum(s => s.ShortPositionCount); // Update state foreach (var kvp in positionsByAsset) { _state.State.PositionCountByAsset[kvp.Key] = kvp.Value; } _state.State.PositionCountByDirection[TradeDirection.Long] = totalLongPositions; _state.State.PositionCountByDirection[TradeDirection.Short] = totalShortPositions; _logger.LogDebug( "Updated position breakdown from bot statistics: {AssetCount} assets, Long={LongPositions}, Short={ShortPositions}", positionsByAsset.Count, totalLongPositions, totalShortPositions); } else { _logger.LogDebug("No active strategies found for position breakdown"); } } private async Task<(decimal totalOpenInterest, int totalPositionCount)> CalculatePositionMetricsAsync() { try { // Get all open positions from all accounts // Get positions directly from database instead of exchange var allPositions = await _tradingService.GetAllDatabasePositionsAsync(); var openPositions = allPositions?.Where(p => !p.IsFinished()); if (openPositions?.Any() == true) { var positionCount = allPositions.Count(); // Calculate open interest as the sum of leveraged position notional values // Open interest = sum of (position size * price * leverage) for all open positions var openInterest = openPositions .Sum(p => (p.Open.Price * p.Open.Quantity) * p.Open.Leverage); _logger.LogDebug( "Calculated position metrics: {PositionCount} positions, {OpenInterest} leveraged open interest", positionCount, openInterest); return (openInterest, positionCount); } else { _logger.LogDebug("No open positions found for metrics calculation"); return (0m, allPositions.Count()); } } catch (Exception ex) { _logger.LogWarning(ex, "Failed to calculate position metrics, returning zero values"); return (0m, 0); } } public Task GetTotalVolumeAsync() { return Task.FromResult(_state.State.TotalPlatformVolume); } public Task GetTotalPnLAsync() { return Task.FromResult(_state.State.TotalPlatformPnL); } public Task GetTotalOpenInterest() { return Task.FromResult(_state.State.OpenInterest); } public Task GetTotalPositionCountAsync() { return Task.FromResult(_state.State.TotalPositionCount); } public Task GetTotalFeesAsync() { return Task.FromResult(_state.State.TotalPlatformFees); } public Task> GetVolumeHistoryAsync() { var historyPoints = _state.State.DailySnapshots .OrderBy(s => s.Date) .Select(s => new VolumeHistoryPoint { Date = s.Date, Volume = s.TotalVolume }) .ToList(); return Task.FromResult(historyPoints); } // Event handlers for immediate updates public async Task UpdateActiveStrategyCountAsync(int newActiveCount) { try { _logger.LogInformation("Updating active strategies count to: {NewActiveCount}", newActiveCount); // Validate input if (newActiveCount < 0) { _logger.LogWarning("Invalid active strategy count: {Count}, setting to 0", newActiveCount); newActiveCount = 0; } _state.State.TotalActiveStrategies = newActiveCount; _state.State.HasPendingChanges = true; await _state.WriteStateAsync(); } catch (Exception ex) { _logger.LogError(ex, "Error updating active strategy count to: {NewActiveCount}", newActiveCount); } } public async Task OnPositionClosedAsync(PositionClosedEvent evt) { try { _logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}", evt.PositionIdentifier, evt.Ticker, evt.RealizedPnL); // Validate event data if (evt == null || evt.PositionIdentifier == Guid.Empty || evt.Ticker == Ticker.Unknown) { _logger.LogWarning("Invalid PositionClosedEvent received: {Event}", evt); return; } _state.State.TotalPlatformVolume += evt.Volume; _state.State.TotalPlatformPnL += evt.RealizedPnL; // Update volume by asset var asset = evt.Ticker; if (!_state.State.VolumeByAsset.ContainsKey(asset)) { _state.State.VolumeByAsset[asset] = 0; } _state.State.VolumeByAsset[asset] += evt.Volume; // Update open interest (subtract the closed position's volume) _state.State.OpenInterest = Math.Max(0, _state.State.OpenInterest - evt.Volume); _state.State.HasPendingChanges = true; await _state.WriteStateAsync(); } catch (Exception ex) { _logger.LogError(ex, "Error processing PositionClosedEvent: {Event}", evt); } } public async Task OnPositionOpenAsync(PositionOpenEvent evt) { try { _logger.LogInformation("Position opened: {PositionIdentifier} for {Ticker} with volume: {Volume}", evt.PositionIdentifier, evt.Ticker, evt.Volume); // Validate event data if (evt == null || evt.Ticker == Ticker.Unknown || evt.Volume <= 0) { _logger.LogWarning("Invalid PositionOpenEvent received: {Event}", evt); return; } // Update platform volume _state.State.TotalPlatformVolume += evt.Volume; // Update volume by asset var asset = evt.Ticker; if (!_state.State.VolumeByAsset.ContainsKey(asset)) { _state.State.VolumeByAsset[asset] = 0; } _state.State.VolumeByAsset[asset] += evt.Volume; // Update open interest and position count // Since this is called only when position is fully open on broker, we always increase counts _state.State.TotalPositionCount++; _state.State.OpenInterest += evt.Volume; // Update position count by asset if (!_state.State.PositionCountByAsset.ContainsKey(asset)) { _state.State.PositionCountByAsset[asset] = 0; } _state.State.PositionCountByAsset[asset]++; // Update position count by direction if (!_state.State.PositionCountByDirection.ContainsKey(evt.Direction)) { _state.State.PositionCountByDirection[evt.Direction] = 0; } _state.State.PositionCountByDirection[evt.Direction]++; // Update fees if provided if (evt.Fee > 0) { _state.State.TotalPlatformFees += evt.Fee; } _state.State.HasPendingChanges = true; await _state.WriteStateAsync(); } catch (Exception ex) { _logger.LogError(ex, "Error processing TradeExecutedEvent: {Event}", evt); } } // Reminder handlers for periodic snapshots public async Task ReceiveReminder(string reminderName, TickStatus status) { _logger.LogInformation("Reminder received: {ReminderName}", reminderName); switch (reminderName) { case _dailySnapshotReminder: await TakeDailySnapshotAsync(); break; } } private async Task TakeDailySnapshotAsync() { _logger.LogInformation("Taking daily snapshot"); // Store 24-hour ago values for comparison _state.State.TotalAgents24hAgo = _state.State.TotalAgents; _state.State.TotalActiveStrategies24hAgo = _state.State.TotalActiveStrategies; _state.State.TotalPlatformPnL24hAgo = _state.State.TotalPlatformPnL; _state.State.TotalPlatformVolume24hAgo = _state.State.TotalPlatformVolume; _state.State.TotalOpenInterest24hAgo = _state.State.OpenInterest; _state.State.TotalPositionCount24hAgo = _state.State.TotalPositionCount; _state.State.TotalPlatformFees24hAgo = _state.State.TotalPlatformFees; // Add daily snapshot var dailySnapshot = new DailySnapshot { Date = DateTime.UtcNow.Date, TotalAgents = _state.State.TotalAgents, TotalStrategies = _state.State.TotalActiveStrategies, TotalVolume = _state.State.TotalPlatformVolume, TotalPnL = _state.State.TotalPlatformPnL, TotalOpenInterest = _state.State.OpenInterest, TotalPositionCount = _state.State.TotalPositionCount, TotalFees = _state.State.TotalPlatformFees }; _state.State.DailySnapshots.Add(dailySnapshot); // Keep only last 30 days var cutoff = DateTime.UtcNow.AddDays(-30); _state.State.DailySnapshots.RemoveAll(s => s.Date < cutoff); _state.State.LastSnapshot = DateTime.UtcNow; await _state.WriteStateAsync(); } private bool IsDataStale() { var timeSinceLastUpdate = DateTime.UtcNow - _state.State.LastUpdated; return timeSinceLastUpdate > TimeSpan.FromMinutes(5); } private PlatformSummaryViewModel MapToViewModel(PlatformSummaryGrainState state) { // Generate volume history from daily snapshots var volumeHistory = state.DailySnapshots .OrderBy(s => s.Date) .Select(s => new VolumeHistoryPoint { Date = s.Date, Volume = s.TotalVolume }) .ToList(); return new PlatformSummaryViewModel { TotalAgents = state.TotalAgents, TotalActiveStrategies = state.TotalActiveStrategies, TotalPlatformPnL = state.TotalPlatformPnL, TotalPlatformVolume = state.TotalPlatformVolume, TotalPlatformVolumeLast24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo, TotalOpenInterest = state.OpenInterest, TotalPositionCount = state.TotalPositionCount, TotalPlatformFees = state.TotalPlatformFees, // 24-hour changes AgentsChange24h = state.TotalAgents - state.TotalAgents24hAgo, StrategiesChange24h = state.TotalActiveStrategies - state.TotalActiveStrategies24hAgo, PnLChange24h = state.TotalPlatformPnL - state.TotalPlatformPnL24hAgo, VolumeChange24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo, OpenInterestChange24h = state.OpenInterest - state.TotalOpenInterest24hAgo, PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo, FeesChange24h = state.TotalPlatformFees - state.TotalPlatformFees24hAgo, // Breakdowns VolumeByAsset = state.VolumeByAsset ?? new Dictionary(), PositionCountByAsset = state.PositionCountByAsset ?? new Dictionary(), PositionCountByDirection = state.PositionCountByDirection ?? new Dictionary(), // Volume history for charting (last 30 days) VolumeHistory = volumeHistory, // Metadata LastUpdated = state.LastUpdated, Last24HourSnapshot = state.LastSnapshot }; } }