diff --git a/src/Managing.Application.Abstractions/Grains/IPlatformSummaryGrain.cs b/src/Managing.Application.Abstractions/Grains/IPlatformSummaryGrain.cs index e3455df6..f34a2e2d 100644 --- a/src/Managing.Application.Abstractions/Grains/IPlatformSummaryGrain.cs +++ b/src/Managing.Application.Abstractions/Grains/IPlatformSummaryGrain.cs @@ -1,5 +1,6 @@ using Managing.Application.Abstractions.Models; using Orleans; +using Orleans.Concurrency; using static Managing.Common.Enums; namespace Managing.Application.Abstractions.Grains; @@ -39,6 +40,11 @@ public interface IPlatformSummaryGrain : IGrainWithStringKey /// Task GetTotalPositionCountAsync(); + /// + /// Gets the total platform fees + /// + Task GetTotalFeesAsync(); + /// /// Gets the daily volume history for the last 30 days for chart visualization /// @@ -48,9 +54,11 @@ public interface IPlatformSummaryGrain : IGrainWithStringKey /// /// Updates the active strategy count /// + [OneWay] Task UpdateActiveStrategyCountAsync(int newActiveCount); - Task OnPositionOpenedAsync(PositionOpenedEvent evt); + [OneWay] Task OnPositionClosedAsync(PositionClosedEvent evt); + [OneWay] Task OnTradeExecutedAsync(TradeExecutedEvent evt); } @@ -66,24 +74,6 @@ public abstract class PlatformMetricsEvent -/// -/// Event fired when a new position is opened -/// -[GenerateSerializer] -public class PositionOpenedEvent : PlatformMetricsEvent -{ - [Id(1)] - public Guid PositionId { get; set; } - - [Id(2)] - public Ticker Ticker { get; set; } - - [Id(3)] - public decimal Volume { get; set; } - - [Id(4)] - public TradeDirection Direction { get; set; } -} /// /// Event fired when a position is closed @@ -102,9 +92,6 @@ public class PositionClosedEvent : PlatformMetricsEvent [Id(4)] public decimal Volume { get; set; } - - [Id(5)] - public decimal InitialVolume { get; set; } } /// diff --git a/src/Managing.Application.Abstractions/Grains/PlatformSummaryGrainState.cs b/src/Managing.Application.Abstractions/Grains/PlatformSummaryGrainState.cs index 6a80a294..37a61869 100644 --- a/src/Managing.Application.Abstractions/Grains/PlatformSummaryGrainState.cs +++ b/src/Managing.Application.Abstractions/Grains/PlatformSummaryGrainState.cs @@ -37,6 +37,9 @@ public class PlatformSummaryGrainState [Id(8)] public int TotalPositionCount { get; set; } + [Id(20)] + public decimal TotalPlatformFees { get; set; } + // 24-hour ago values (for comparison) [Id(9)] public int TotalAgents24hAgo { get; set; } @@ -56,52 +59,25 @@ public class PlatformSummaryGrainState [Id(14)] public int TotalPositionCount24hAgo { get; set; } + [Id(21)] + public decimal TotalPlatformFees24hAgo { get; set; } + // Historical snapshots [Id(15)] - public List HourlySnapshots { get; set; } = new(); - - [Id(16)] public List DailySnapshots { get; set; } = new(); // Volume breakdown by asset - [Id(17)] + [Id(16)] public Dictionary VolumeByAsset { get; set; } = new(); // Position count breakdown - [Id(18)] + [Id(17)] public Dictionary PositionCountByAsset { get; set; } = new(); - [Id(19)] + [Id(18)] public Dictionary PositionCountByDirection { get; set; } = new(); } -/// -/// Hourly snapshot of platform metrics -/// -[GenerateSerializer] -public class HourlySnapshot -{ - [Id(0)] - public DateTime Timestamp { get; set; } - - [Id(1)] - public int TotalAgents { get; set; } - - [Id(2)] - public int TotalStrategies { get; set; } - - [Id(3)] - public decimal TotalVolume { get; set; } - - [Id(4)] - public decimal TotalPnL { get; set; } - - [Id(5)] - public decimal TotalOpenInterest { get; set; } - - [Id(6)] - public int TotalPositionCount { get; set; } -} /// /// Daily snapshot of platform metrics @@ -129,6 +105,9 @@ public class DailySnapshot [Id(6)] public int TotalPositionCount { get; set; } + + [Id(7)] + public decimal TotalFees { get; set; } } diff --git a/src/Managing.Application.Abstractions/Models/PlatformSummaryViewModel.cs b/src/Managing.Application.Abstractions/Models/PlatformSummaryViewModel.cs index ec1d472c..31befa05 100644 --- a/src/Managing.Application.Abstractions/Models/PlatformSummaryViewModel.cs +++ b/src/Managing.Application.Abstractions/Models/PlatformSummaryViewModel.cs @@ -51,6 +51,12 @@ public class PlatformSummaryViewModel [Id(6)] public required int TotalPositionCount { get; set; } + /// + /// Total platform-wide fees paid in USD + /// + [Id(19)] + public required decimal TotalPlatformFees { get; set; } + // 24-hour changes /// /// Change in agent count over the last 24 hours @@ -88,6 +94,12 @@ public class PlatformSummaryViewModel [Id(12)] public required int PositionCountChange24h { get; set; } + /// + /// Change in fees over the last 24 hours + /// + [Id(20)] + public required decimal FeesChange24h { get; set; } + // Breakdowns /// /// Volume breakdown by asset/ticker diff --git a/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs b/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs index c5bedcc4..12c39411 100644 --- a/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs +++ b/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs @@ -1,4 +1,5 @@ using Managing.Application.Abstractions.Models; +using Orleans.Concurrency; namespace Managing.Application.Abstractions.Grains { @@ -35,7 +36,8 @@ namespace Managing.Application.Abstractions.Grains /// /// Handles stream notifications for agent summary updates. /// - /// The update event from the stream + /// The update event from the stream] + [OneWay] Task OnAgentSummaryUpdateAsync(AgentSummaryUpdateEvent updateEvent); } } \ No newline at end of file diff --git a/src/Managing.Application/Bots/TradingBotBase.cs b/src/Managing.Application/Bots/TradingBotBase.cs index 355676db..46d0f598 100644 --- a/src/Managing.Application/Bots/TradingBotBase.cs +++ b/src/Managing.Application/Bots/TradingBotBase.cs @@ -96,8 +96,11 @@ public class TradingBotBase : ITradingBot await LogInformation(startupMessage); // Notify AgentGrain about bot startup - await NotifyAgentGrainAsync(AgentSummaryEventType.BotStarted, + await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.BotStarted, $"Bot: {Config.Name}, Ticker: {Config.Ticker}"); + + // Notify platform summary about active strategy count change + await NotifyPlatformSummaryAboutStrategyCount(); break; case BotStatus.Running: @@ -372,81 +375,60 @@ public class TradingBotBase : ITradingBot { try { - Position position = null; - List positionsExchange = null; + Position internalPosition = null; + List brokerPositions = null; await ServiceScopeHelpers.WithScopedService(_scopeFactory, async tradingService => { - position = Config.IsForBacktest + internalPosition = Config.IsForBacktest ? positionForSignal : await tradingService.GetPositionByIdentifierAsync(positionForSignal.Identifier); if (Config.IsForBacktest) { - positionsExchange = new List { position }; + brokerPositions = new List { internalPosition }; } else { - await ServiceScopeHelpers.WithScopedService(_scopeFactory, + brokerPositions = await ServiceScopeHelpers.WithScopedService>(_scopeFactory, async exchangeService => { - positionsExchange = (await exchangeService.GetBrokerPositions(Account)).ToList(); + return [.. await exchangeService.GetBrokerPositions(Account)]; }); } }); if (!Config.IsForBacktest) { - var brokerPosition = positionsExchange.FirstOrDefault(p => p.Ticker == Config.Ticker); + var brokerPosition = brokerPositions.FirstOrDefault(p => p.Ticker == Config.Ticker); if (brokerPosition != null) { UpdatePositionPnl(positionForSignal.Identifier, brokerPosition.ProfitAndLoss.Realized); + internalPosition.ProfitAndLoss = brokerPosition.ProfitAndLoss; + internalPosition.Status = PositionStatus.Filled; - if (position.Status.Equals(PositionStatus.New)) + if (internalPosition.Status.Equals(PositionStatus.New)) { - await SetPositionStatus(position.SignalIdentifier, PositionStatus.Filled); + await SetPositionStatus(internalPosition.SignalIdentifier, PositionStatus.Filled); // Notify platform summary about the executed trade - try - { - await ServiceScopeHelpers.WithScopedService(_scopeFactory, - async grainFactory => - { - var platformGrain = - grainFactory.GetGrain("platform-summary"); - var tradeExecutedEvent = new TradeExecutedEvent - { - TradeId = position.Identifier, - Ticker = position.Ticker, - Volume = position.Open.Price * position.Open.Quantity * position.Open.Leverage - }; - - await platformGrain.OnTradeExecutedAsync(tradeExecutedEvent); - }); - } - catch (Exception ex) - { - Logger.LogWarning(ex, - "Failed to notify platform summary about trade execution for position {PositionId}", - position.Identifier); - } + await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionOpened, + $"Position found on broker: {internalPosition.Identifier}", internalPosition); } - - position = brokerPosition; } else { - if (!position.Status.Equals(PositionStatus.New)) + if (!internalPosition.Status.Equals(PositionStatus.New)) { - position.Status = PositionStatus.Filled; + internalPosition.Status = PositionStatus.Filled; } } } - if (position.Status == PositionStatus.New) + if (internalPosition.Status == PositionStatus.New) { - List orders = null; - await ServiceScopeHelpers.WithScopedService(_scopeFactory, - async exchangeService => { orders = await exchangeService.GetOpenOrders(Account, Config.Ticker); }); + var orders = await ServiceScopeHelpers.WithScopedService>(_scopeFactory, + async exchangeService => { return [.. await exchangeService.GetOpenOrders(Account, Config.Ticker)]; }); + if (orders.Any()) { if (orders.Count() >= 3) @@ -485,6 +467,38 @@ public class TradingBotBase : ITradingBot $"⏳ **Waiting for Orders**\nPosition has `{orders.Count()}` open orders\nElapsed: `{timeSinceRequest.TotalMinutes:F1}min`\nWaiting `{remainingMinutes:F1}min` more before canceling"); } } + else if (orders.Count() == 2 && Positions[internalPosition.Identifier].Status == PositionStatus.New) + { + // Check if position is already open on broker with 2 orders + await LogInformation( + $"🔍 **Checking Broker Position**\nPosition has exactly `{orders.Count()}` open orders\nChecking if position is already open on broker..."); + + Position brokerPosition = null; + await ServiceScopeHelpers.WithScopedService(_scopeFactory, async exchangeService => + { + var brokerPositions = await exchangeService.GetBrokerPositions(Account); + brokerPosition = brokerPositions.FirstOrDefault(p => p.Ticker == Config.Ticker); + }); + + if (brokerPosition != null) + { + await LogInformation( + $"✅ **Position Found on Broker**\nPosition is already open on broker\nUpdating position status to Filled"); + + UpdatePositionPnl(positionForSignal.Identifier, brokerPosition.ProfitAndLoss.Realized); + await SetPositionStatus(signal.Identifier, PositionStatus.Filled); + + + // Notify platform summary about the executed trade + await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionOpened, + $"Position found on broker with 2 orders: {internalPosition.Identifier}", internalPosition); + } + else + { + await LogInformation( + $"⏸️ **Position Pending**\nPosition still waiting to open\n`{orders.Count()}` open orders remaining"); + } + } else { await LogInformation( @@ -498,11 +512,11 @@ public class TradingBotBase : ITradingBot await HandleClosedPosition(positionForSignal); } } - else if (position.Status == PositionStatus.Finished || position.Status == PositionStatus.Flipped) + else if (internalPosition.Status == PositionStatus.Finished || internalPosition.Status == PositionStatus.Flipped) { await HandleClosedPosition(positionForSignal); } - else if (position.Status == PositionStatus.Filled || position.Status == PositionStatus.PartiallyFilled) + else if (internalPosition.Status == PositionStatus.Filled || internalPosition.Status == PositionStatus.PartiallyFilled) { Candle lastCandle = null; await ServiceScopeHelpers.WithScopedService(_scopeFactory, async exchangeService => @@ -598,7 +612,7 @@ public class TradingBotBase : ITradingBot } } } - else if (position.Status == PositionStatus.Rejected || position.Status == PositionStatus.Canceled) + else if (internalPosition.Status == PositionStatus.Rejected || internalPosition.Status == PositionStatus.Canceled) { await LogWarning($"Open position trade is rejected for signal {signal.Identifier}"); if (signal.Status == SignalStatus.PositionOpen) @@ -642,6 +656,20 @@ public class TradingBotBase : ITradingBot currentPrice, true); } } + + if (!Config.IsForBacktest){ + // Update position in database with broker data + await ServiceScopeHelpers.WithScopedService(_scopeFactory, async tradingService => + { + // Update the internal position with broker data + internalPosition.Status = PositionStatus.Filled; + internalPosition.ProfitAndLoss = internalPosition.ProfitAndLoss; + + // Save updated position to database + await tradingService.UpdatePositionAsync(internalPosition); + }); + + } } catch (Exception ex) { @@ -769,8 +797,11 @@ public class TradingBotBase : ITradingBot } // Notify AgentGrain about position opening - await NotifyAgentGrainAsync(AgentSummaryEventType.PositionOpened, - $"Signal: {signal.Identifier}"); + await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionOpened, + $"Signal: {signal.Identifier}", position); + + // Publish TradeExecutedEvent for the opening trade (this handles both position opening and trade execution) + await PublishTradeExecutedEventAsync(position.Open, position, signal.Identifier, 0); Logger.LogInformation($"Position requested"); return position; // Return the created position without adding to list @@ -893,7 +924,7 @@ public class TradingBotBase : ITradingBot { List positions = null; await ServiceScopeHelpers.WithScopedService(_scopeFactory, - async exchangeService => { positions = (await exchangeService.GetBrokerPositions(Account)).ToList(); }); + async exchangeService => { positions = [.. await exchangeService.GetBrokerPositions(Account)]; }); if (!positions.Any(p => p.Ticker == Config.Ticker)) { return true; @@ -905,7 +936,7 @@ public class TradingBotBase : ITradingBot await ServiceScopeHelpers.WithScopedService(_scopeFactory, async exchangeService => { - orders = (await exchangeService.GetOpenOrders(Account, Config.Ticker)).ToList(); + orders = [.. await exchangeService.GetOpenOrders(Account, Config.Ticker)]; }); var reason = $"Cannot open position. There is already a position open for {Config.Ticker} on the broker."; @@ -930,50 +961,6 @@ public class TradingBotBase : ITradingBot } } - - /// - /// Gets the actual closing date of a position by checking which trade (Stop Loss or Take Profit) was executed. - /// - /// The finished position - /// The date when the position was closed, or null if cannot be determined - private DateTime? GetPositionClosingDate(Position position) - { - if (!position.IsFinished()) - { - return null; - } - - // Check which trade actually closed the position - if (position.StopLoss?.Status == TradeStatus.Filled && position.StopLoss.Date != default) - { - return position.StopLoss.Date; - } - - if (position.TakeProfit1?.Status == TradeStatus.Filled && position.TakeProfit1.Date != default) - { - return position.TakeProfit1.Date; - } - - if (position.TakeProfit2?.Status == TradeStatus.Filled && position.TakeProfit2.Date != default) - { - return position.TakeProfit2.Date; - } - - // Fallback: if we can't determine the exact closing trade, use the latest date available - var availableDates = new List(); - - if (position.StopLoss?.Date != default) - availableDates.Add(position.StopLoss.Date); - - if (position.TakeProfit1?.Date != default) - availableDates.Add(position.TakeProfit1.Date); - - if (position.TakeProfit2?.Date != default) - availableDates.Add(position.TakeProfit2.Date); - - return availableDates.Any() ? availableDates.Max() : position.Open.Date; - } - public async Task CloseTrade(LightSignal signal, Position position, Trade tradeToClose, decimal lastPrice, bool tradeClosingPosition = false) { @@ -1217,15 +1204,22 @@ public class TradingBotBase : ITradingBot Config.BotTradingBalance += position.ProfitAndLoss.Realized; Logger.LogInformation( - $"💰 **Balance Updated**\nNew bot trading balance: `${Config.BotTradingBalance:F2}`"); + string.Format("💰 **Balance Updated**\nNew bot trading balance: `${0:F2}`", Config.BotTradingBalance)); } // Notify AgentGrain about position closing var pnlInfo = position.ProfitAndLoss?.Realized != null - ? $"PnL: {position.ProfitAndLoss.Realized:F2}" + ? string.Format("PnL: {0:F2}", position.ProfitAndLoss.Realized) : "PnL: Unknown"; - await NotifyAgentGrainAsync(AgentSummaryEventType.PositionClosed, - $"Signal: {position.SignalIdentifier}, {pnlInfo}"); + await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionClosed, + string.Format("Signal: {0}, {1}", position.SignalIdentifier, pnlInfo), position); + + // Publish TradeExecutedEvent for the closing trade + var closingTrade = GetClosingTrade(position); + if (closingTrade != null) + { + await PublishTradeExecutedEventAsync(closingTrade, position, position.SignalIdentifier, position.ProfitAndLoss?.Realized ?? 0); + } } else { @@ -1425,6 +1419,20 @@ public class TradingBotBase : ITradingBot Config.IsForWatchingOnly = !Config.IsForWatchingOnly; await LogInformation( $"🔄 **Watch Mode Toggle**\nBot: `{Config.Name}`\nWatch Only: `{(Config.IsForWatchingOnly ? "ON" : "OFF")}`"); + + // Notify platform summary about strategy count change + await NotifyPlatformSummaryAboutStrategyCount(); + } + + /// + /// Handles bot stopping and notifies platform summary + /// + public async Task StopBot() + { + await LogInformation($"🛑 **Bot Stopped**\nBot: `{Config.Name}`\nTicker: `{Config.Ticker}`"); + + // Notify platform summary about strategy count change + await NotifyPlatformSummaryAboutStrategyCount(); } public async Task LogInformation(string message) @@ -1909,41 +1917,183 @@ public class TradingBotBase : ITradingBot } /// - /// Sends a notification to the AgentGrain to trigger summary updates + /// Publishes a TradeExecutedEvent to the platform summary grain /// - /// The type of event (e.g., PositionOpened, PositionClosed) - /// Optional additional context data - private async Task NotifyAgentGrainAsync(AgentSummaryEventType eventType, string additionalData = null) + /// The trade that was executed + /// The position this trade belongs to + /// The signal identifier + /// The PnL for this trade + private async Task PublishTradeExecutedEventAsync(Trade trade, Position position, string signalIdentifier, decimal pnl) { - if (Config.IsForBacktest || Account?.User == null) + if (Config.IsForBacktest) { - return; // Skip notifications for backtest or when no user context + return; // Skip notifications for backtest } try { await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory => { - var agentGrain = grainFactory.GetGrain(Account.User.Id); - - var updateEvent = new AgentSummaryUpdateEvent + var platformGrain = grainFactory.GetGrain("platform-summary"); + var tradeExecutedEvent = new TradeExecutedEvent { - UserId = Account.User.Id, - BotId = Identifier, - EventType = eventType, - Timestamp = DateTime.UtcNow, - AdditionalData = additionalData + TradeId = Guid.NewGuid(), // Generate new ID for the event + PositionId = position.Identifier, + StrategyId = position.InitiatorIdentifier, + Ticker = position.Ticker, + Volume = trade.Price * trade.Quantity * trade.Leverage, + PnL = pnl, + Fee = trade.Fee, + Direction = trade.Direction }; - - await agentGrain.OnAgentSummaryUpdateAsync(updateEvent); - - Logger.LogDebug("Sent agent notification: {EventType} for bot {BotId}", eventType, Identifier); + await platformGrain.OnTradeExecutedAsync(tradeExecutedEvent); + Logger.LogDebug("Published TradeExecutedEvent for trade {TradeId} in position {PositionId}", tradeExecutedEvent.TradeId, position.Identifier); }); } catch (Exception ex) { - Logger.LogError(ex, "Failed to send agent notification: {EventType} for bot {BotId}", eventType, - Identifier); + Logger.LogError(ex, "Failed to publish TradeExecutedEvent for position {PositionId}", position.Identifier); } } + + /// + /// Gets the trade that was used to close the position + /// + /// The position to check + /// The closing trade, or null if none found + private Trade GetClosingTrade(Position position) + { + // Check which trade was used to close the position + if (position.StopLoss?.Status == TradeStatus.Filled) + { + return position.StopLoss; + } + else if (position.TakeProfit1?.Status == TradeStatus.Filled) + { + return position.TakeProfit1; + } + else if (position.TakeProfit2?.Status == TradeStatus.Filled) + { + return position.TakeProfit2; + } + + // If no specific closing trade is found, create a synthetic one based on the position + // This handles cases where the position was closed manually or by the exchange + if (position.ProfitAndLoss?.Realized != null) + { + var closeDirection = position.OriginDirection == TradeDirection.Long ? TradeDirection.Short : TradeDirection.Long; + return new Trade( + DateTime.UtcNow, + closeDirection, + TradeStatus.Filled, + TradeType.StopMarket, + position.Ticker, + position.Open.Quantity, + position.Open.Price, // Use open price as approximation + position.Open.Leverage, + "synthetic-close", + "Position closed" + ); + } + + return null; + } + + /// + /// Notifies the platform summary grain about active strategy count changes + /// + private async Task NotifyPlatformSummaryAboutStrategyCount() + { + if (Config.IsForBacktest) + { + return; // Skip notifications for backtest + } + + try + { + await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory => + { + var platformGrain = grainFactory.GetGrain("platform-summary"); + + // Get current active strategy count from the platform + var currentSummary = await platformGrain.GetPlatformSummaryAsync(); + var currentActiveCount = currentSummary.TotalActiveStrategies; + + // Update the count (this will trigger a refresh if needed) + await platformGrain.UpdateActiveStrategyCountAsync(currentActiveCount); + + Logger.LogDebug("Notified platform summary about strategy count: {Count}", currentActiveCount); + }); + } + catch (Exception ex) + { + Logger.LogError(ex, "Failed to notify platform summary about strategy count"); + } + } + + /// + /// Notifies both AgentGrain and PlatformSummaryGrain about bot events + /// + /// The type of event (e.g., PositionOpened, PositionClosed) + /// Optional additional context data + /// Optional position data for platform summary events + private async Task NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType eventType, string additionalData = null, Position position = null) + { + if (Config.IsForBacktest) + { + return; // Skip notifications for backtest + } + + try + { + await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory => + { + // Notify AgentGrain (user-specific metrics) + if (Account?.User != null) + { + var agentGrain = grainFactory.GetGrain(Account.User.Id); + + var updateEvent = new AgentSummaryUpdateEvent + { + UserId = Account.User.Id, + BotId = Identifier, + EventType = eventType, + Timestamp = DateTime.UtcNow, + AdditionalData = additionalData + }; + + await agentGrain.OnAgentSummaryUpdateAsync(updateEvent); + Logger.LogDebug("Sent agent notification: {EventType} for bot {BotId}", eventType, Identifier); + } + + // Notify PlatformSummaryGrain (platform-wide metrics) + var platformGrain = grainFactory.GetGrain("platform-summary"); + + switch (eventType) + { + case AgentSummaryEventType.PositionOpened when position != null: + // Position opening is now handled by TradeExecutedEvent in PublishTradeExecutedEventAsync + Logger.LogDebug("Position opened notification sent via TradeExecutedEvent for position {PositionId}", position.Identifier); + break; + + case AgentSummaryEventType.PositionClosed when position != null: + var positionClosedEvent = new PositionClosedEvent + { + PositionId = position.Identifier, + Ticker = position.Ticker, + RealizedPnL = position.ProfitAndLoss?.Realized ?? 0, + Volume = position.Open.Price * position.Open.Quantity * position.Open.Leverage, + }; + await platformGrain.OnPositionClosedAsync(positionClosedEvent); + Logger.LogDebug("Sent platform position closed notification for position {PositionId}", position.Identifier); + break; + } + }); + } + catch (Exception ex) + { + Logger.LogError(ex, "Failed to send notifications: {EventType} for bot {BotId}", eventType, Identifier); + } + } + } \ No newline at end of file diff --git a/src/Managing.Application/Grains/PlatformSummaryGrain.cs b/src/Managing.Application/Grains/PlatformSummaryGrain.cs index 147e5876..780ec88d 100644 --- a/src/Managing.Application/Grains/PlatformSummaryGrain.cs +++ b/src/Managing.Application/Grains/PlatformSummaryGrain.cs @@ -4,6 +4,7 @@ using Managing.Application.Abstractions.Models; using Managing.Application.Abstractions.Services; using Managing.Application.Orleans; using Managing.Domain.Bots; +using Managing.Domain.Candles; using Microsoft.Extensions.Logging; using static Managing.Common.Enums; @@ -22,7 +23,6 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable private readonly ITradingService _tradingService; private readonly ILogger _logger; - private const string _hourlySnapshotReminder = "HourlySnapshot"; private const string _dailySnapshotReminder = "DailySnapshot"; public PlatformSummaryGrain( @@ -44,15 +44,16 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable { _logger.LogInformation("Platform Summary Grain activated"); - // Set up reminders for periodic snapshots - await this.RegisterOrUpdateReminder(_hourlySnapshotReminder, - TimeSpan.FromHours(1), TimeSpan.FromHours(1)); - + // Set up reminder for daily snapshots using precise timing var now = DateTime.UtcNow; - var nextMidnight = now.Date.AddDays(1); - var timeUntilMidnight = nextMidnight - now; + + // Daily reminder - runs at midnight (00:00 UTC) + var nextDailyTime = CandleHelpers.GetNextExpectedCandleTime(Timeframe.OneDay, now); + var timeUntilNextDay = nextDailyTime - now; await this.RegisterOrUpdateReminder(_dailySnapshotReminder, - timeUntilMidnight, TimeSpan.FromDays(1)); + timeUntilNextDay, TimeSpan.FromDays(1).Add(TimeSpan.FromMinutes(3))); + + _logger.LogInformation("Daily reminder scheduled - Next daily: {NextDaily}", nextDailyTime); // Initial data load if state is empty if (_state.State.LastUpdated == default) @@ -232,6 +233,11 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable return Task.FromResult(_state.State.TotalPositionCount); } + public Task GetTotalFeesAsync() + { + return Task.FromResult(_state.State.TotalPlatformFees); + } + public Task> GetVolumeHistoryAsync() { var historyPoints = _state.State.DailySnapshots @@ -249,63 +255,135 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable // Event handlers for immediate updates public async Task UpdateActiveStrategyCountAsync(int newActiveCount) { - _logger.LogInformation("Updating active strategies count to: {NewActiveCount}", newActiveCount); + try + { + _logger.LogInformation("Updating active strategies count to: {NewActiveCount}", newActiveCount); - _state.State.TotalActiveStrategies = newActiveCount; - _state.State.HasPendingChanges = true; - await _state.WriteStateAsync(); + // Validate input + if (newActiveCount < 0) + { + _logger.LogWarning("Invalid active strategy count: {Count}, setting to 0", newActiveCount); + newActiveCount = 0; + } + + _state.State.TotalActiveStrategies = newActiveCount; + _state.State.HasPendingChanges = true; + await _state.WriteStateAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error updating active strategy count to: {NewActiveCount}", newActiveCount); + } } - public async Task OnPositionOpenedAsync(PositionOpenedEvent evt) - { - _logger.LogInformation("Position opened: {PositionId} for {Ticker}", evt.PositionId, evt.Ticker); - - _state.State.TotalPositionCount++; - _state.State.HasPendingChanges = true; - await _state.WriteStateAsync(); - } public async Task OnPositionClosedAsync(PositionClosedEvent evt) { - _logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}", - evt.PositionId, evt.Ticker, evt.RealizedPnL); - - _state.State.TotalPositionCount--; - _state.State.TotalPlatformVolume += evt.Volume; - _state.State.TotalPlatformPnL += evt.RealizedPnL; - - // Update volume by asset - var asset = evt.Ticker; - if (!_state.State.VolumeByAsset.ContainsKey(asset)) + try { - _state.State.VolumeByAsset[asset] = 0; + _logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}", + evt.PositionId, evt.Ticker, evt.RealizedPnL); + + // Validate event data + if (evt == null || evt.PositionId == Guid.Empty || evt.Ticker == Ticker.Unknown) + { + _logger.LogWarning("Invalid PositionClosedEvent received: {Event}", evt); + return; + } + + // Ensure position count doesn't go negative + if (_state.State.TotalPositionCount > 0) + { + _state.State.TotalPositionCount--; + } + + _state.State.TotalPlatformVolume += evt.Volume; + _state.State.TotalPlatformPnL += evt.RealizedPnL; + + // Update volume by asset + var asset = evt.Ticker; + if (!_state.State.VolumeByAsset.ContainsKey(asset)) + { + _state.State.VolumeByAsset[asset] = 0; + } + + _state.State.VolumeByAsset[asset] += evt.Volume; + + // Update open interest (subtract the closed position's volume) + _state.State.TotalOpenInterest = Math.Max(0, _state.State.TotalOpenInterest - evt.Volume); + + _state.State.HasPendingChanges = true; + await _state.WriteStateAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing PositionClosedEvent: {Event}", evt); } - - _state.State.VolumeByAsset[asset] += evt.Volume; - _state.State.TotalOpenInterest -= evt.InitialVolume; - - _state.State.HasPendingChanges = true; - await _state.WriteStateAsync(); } public async Task OnTradeExecutedAsync(TradeExecutedEvent evt) { - _logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}", - evt.TradeId, evt.Ticker, evt.Volume); - - _state.State.TotalPlatformVolume += evt.Volume; - - // Update volume by asset - var asset = evt.Ticker; - if (!_state.State.VolumeByAsset.ContainsKey(asset)) + try { - _state.State.VolumeByAsset[asset] = 0; + _logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}", + evt.TradeId, evt.Ticker, evt.Volume); + + // Validate event data + if (evt == null || evt.Ticker == Ticker.Unknown || evt.Volume <= 0) + { + _logger.LogWarning("Invalid TradeExecutedEvent received: {Event}", evt); + return; + } + + // Update platform volume + _state.State.TotalPlatformVolume += evt.Volume; + + // Update volume by asset + var asset = evt.Ticker; + if (!_state.State.VolumeByAsset.ContainsKey(asset)) + { + _state.State.VolumeByAsset[asset] = 0; + } + _state.State.VolumeByAsset[asset] += evt.Volume; + + // Update open interest and position count + // Since this is called only when position is fully open on broker, we always increase counts + _state.State.TotalPositionCount++; + _state.State.TotalOpenInterest += evt.Volume; + + // Update position count by asset + if (!_state.State.PositionCountByAsset.ContainsKey(asset)) + { + _state.State.PositionCountByAsset[asset] = 0; + } + _state.State.PositionCountByAsset[asset]++; + + // Update position count by direction + if (!_state.State.PositionCountByDirection.ContainsKey(evt.Direction)) + { + _state.State.PositionCountByDirection[evt.Direction] = 0; + } + _state.State.PositionCountByDirection[evt.Direction]++; + + // Update PnL if provided + if (evt.PnL != 0) + { + _state.State.TotalPlatformPnL += evt.PnL; + } + + // Update fees if provided + if (evt.Fee > 0) + { + _state.State.TotalPlatformFees += evt.Fee; + } + + _state.State.HasPendingChanges = true; + await _state.WriteStateAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing TradeExecutedEvent: {Event}", evt); } - - _state.State.VolumeByAsset[asset] += evt.Volume; - - _state.State.HasPendingChanges = true; - await _state.WriteStateAsync(); } // Reminder handlers for periodic snapshots @@ -315,38 +393,12 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable switch (reminderName) { - case _hourlySnapshotReminder: - await TakeHourlySnapshotAsync(); - break; case _dailySnapshotReminder: await TakeDailySnapshotAsync(); break; } } - private async Task TakeHourlySnapshotAsync() - { - _logger.LogInformation("Taking hourly snapshot"); - - var snapshot = new HourlySnapshot - { - Timestamp = DateTime.UtcNow, - TotalAgents = _state.State.TotalAgents, - TotalStrategies = _state.State.TotalActiveStrategies, - TotalVolume = _state.State.TotalPlatformVolume, - TotalPnL = _state.State.TotalPlatformPnL, - TotalOpenInterest = _state.State.TotalOpenInterest, - TotalPositionCount = _state.State.TotalPositionCount - }; - - _state.State.HourlySnapshots.Add(snapshot); - - // Keep only last 24 hours - var cutoff = DateTime.UtcNow.AddHours(-24); - _state.State.HourlySnapshots.RemoveAll(s => s.Timestamp < cutoff); - - await _state.WriteStateAsync(); - } private async Task TakeDailySnapshotAsync() { @@ -359,6 +411,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable _state.State.TotalPlatformVolume24hAgo = _state.State.TotalPlatformVolume; _state.State.TotalOpenInterest24hAgo = _state.State.TotalOpenInterest; _state.State.TotalPositionCount24hAgo = _state.State.TotalPositionCount; + _state.State.TotalPlatformFees24hAgo = _state.State.TotalPlatformFees; // Add daily snapshot var dailySnapshot = new DailySnapshot @@ -369,7 +422,8 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable TotalVolume = _state.State.TotalPlatformVolume, TotalPnL = _state.State.TotalPlatformPnL, TotalOpenInterest = _state.State.TotalOpenInterest, - TotalPositionCount = _state.State.TotalPositionCount + TotalPositionCount = _state.State.TotalPositionCount, + TotalFees = _state.State.TotalPlatformFees }; _state.State.DailySnapshots.Add(dailySnapshot); @@ -410,6 +464,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable TotalPlatformVolumeLast24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo, TotalOpenInterest = state.TotalOpenInterest, TotalPositionCount = state.TotalPositionCount, + TotalPlatformFees = state.TotalPlatformFees, // 24-hour changes AgentsChange24h = state.TotalAgents - state.TotalAgents24hAgo, @@ -418,6 +473,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable VolumeChange24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo, OpenInterestChange24h = state.TotalOpenInterest - state.TotalOpenInterest24hAgo, PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo, + FeesChange24h = state.TotalPlatformFees - state.TotalPlatformFees24hAgo, // Breakdowns VolumeByAsset = state.VolumeByAsset ?? new Dictionary(), diff --git a/src/Managing.Application/Trading/Handlers/ClosePositionCommandHandler.cs b/src/Managing.Application/Trading/Handlers/ClosePositionCommandHandler.cs index 7252dde2..8fe37953 100644 --- a/src/Managing.Application/Trading/Handlers/ClosePositionCommandHandler.cs +++ b/src/Managing.Application/Trading/Handlers/ClosePositionCommandHandler.cs @@ -1,8 +1,6 @@ using Managing.Application.Abstractions; -using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Services; using Managing.Application.Trading.Commands; -using Managing.Core; using Managing.Domain.Shared.Helpers; using Managing.Domain.Trades; using Microsoft.Extensions.DependencyInjection; @@ -72,36 +70,6 @@ public class ClosePositionCommandHandler( if (!request.IsForBacktest) await tradingService.UpdatePositionAsync(request.Position); - - // Notify platform summary about the closed position - try - { - await ServiceScopeHelpers.WithScopedService(scopeFactory, async grainFactory => - { - var platformGrain = grainFactory.GetGrain("platform-summary"); - if (platformGrain != null) - { - var positionClosedEvent = new PositionClosedEvent - { - PositionId = request.Position.Identifier, - Ticker = request.Position.Ticker, - RealizedPnL = request.Position.ProfitAndLoss?.Realized ?? 0, - Volume = closedPosition.Quantity * lastPrice * request.Position.Open.Leverage, - InitialVolume = request.Position.Open.Quantity * request.Position.Open.Price * - request.Position.Open.Leverage - }; - - await platformGrain.OnPositionClosedAsync(positionClosedEvent); - } - }); - } - catch (Exception ex) - { - SentrySdk.CaptureException(ex); - logger?.LogError(ex, - "Failed to notify platform summary about position closure for position {PositionId}", - request.Position.Identifier); - } } return request.Position; diff --git a/src/Managing.Application/Trading/Handlers/OpenPositionCommandHandler.cs b/src/Managing.Application/Trading/Handlers/OpenPositionCommandHandler.cs index ab62e026..8675ebc5 100644 --- a/src/Managing.Application/Trading/Handlers/OpenPositionCommandHandler.cs +++ b/src/Managing.Application/Trading/Handlers/OpenPositionCommandHandler.cs @@ -1,5 +1,4 @@ using Managing.Application.Abstractions; -using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Services; using Managing.Application.Trading.Commands; using Managing.Common; @@ -13,7 +12,7 @@ namespace Managing.Application.Trading.Handlers IExchangeService exchangeService, IAccountService accountService, ITradingService tradingService, - IGrainFactory? grainFactory = null) + IGrainFactory grainFactory = null) : ICommandHandler { public async Task Handle(OpenPositionRequest request) @@ -106,29 +105,6 @@ namespace Managing.Application.Trading.Handlers if (!request.IsForPaperTrading) { await tradingService.InsertPositionAsync(position); - - // Notify platform summary about the opened position - try - { - var platformGrain = grainFactory?.GetGrain("platform-summary"); - if (platformGrain != null) - { - var positionOpenedEvent = new PositionOpenedEvent - { - PositionId = position.Identifier, - Ticker = position.Ticker, - Volume = position.Open.Price * position.Open.Quantity * position.Open.Leverage, - Direction = position.OriginDirection - }; - - await platformGrain.OnPositionOpenedAsync(positionOpenedEvent); - } - } - catch (Exception) - { - // Log error but don't fail the position creation - // This is a non-critical notification - } } return position;