Clean and update event

This commit is contained in:
2025-09-27 22:20:12 +07:00
parent 6d91c75ec2
commit d432549d26
13 changed files with 255 additions and 239 deletions

View File

@@ -1,6 +1,6 @@
#nullable enable
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Models;
using Managing.Application.Orleans;
@@ -91,40 +91,107 @@ public class AgentGrain : Grain, IAgentGrain
_logger.LogInformation("Agent {UserId} updated with name {AgentName}", this.GetPrimaryKeyLong(), agentName);
}
public async Task OnAgentSummaryUpdateAsync(AgentSummaryUpdateEvent updateEvent)
public async Task OnPositionOpenedAsync(PositionOpenEvent evt)
{
try
{
_logger.LogInformation("Received agent summary update event for user {UserId}, event type: {EventType}",
this.GetPrimaryKeyLong(), updateEvent.EventType);
_logger.LogInformation("Position opened event received for user {UserId}, position: {PositionId}",
this.GetPrimaryKeyLong(), evt.PositionIdentifier);
// Only update summary if the event is for this agent's bots
if (_state.State.BotIds.Contains(updateEvent.BotId))
{
await UpdateSummary();
}
// Update event-driven metrics
_state.State.TotalVolume += evt.Volume;
_state.State.TotalFees += evt.Fee;
_state.State.LastSummaryUpdate = DateTime.UtcNow;
await _state.WriteStateAsync();
// Update the agent summary with the new data
await UpdateSummary();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing agent summary update event for user {UserId}",
_logger.LogError(ex, "Error processing position opened event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
public async Task UpdateSummary()
public async Task OnPositionClosedAsync(PositionClosedEvent evt)
{
try
{
// Get all bots for this agent
var bots = await _botService.GetBotsByIdsAsync(_state.State.BotIds);
_logger.LogInformation("Position closed event received for user {UserId}, position: {PositionId}, PnL: {PnL}",
this.GetPrimaryKeyLong(), evt.PositionIdentifier, evt.RealizedPnL);
// Calculate aggregated statistics from bot data
var totalPnL = bots.Sum(b => b.Pnl);
var totalWins = bots.Sum(b => b.TradeWins);
var totalLosses = bots.Sum(b => b.TradeLosses);
// Update event-driven metrics
_state.State.TotalPnL += evt.RealizedPnL;
_state.State.TotalVolume += evt.Volume;
// Update wins/losses count
if (evt.RealizedPnL > 0)
{
_state.State.Wins++;
}
else if (evt.RealizedPnL < 0)
{
_state.State.Losses++;
}
_state.State.LastSummaryUpdate = DateTime.UtcNow;
await _state.WriteStateAsync();
// Update the agent summary with the new data
await UpdateSummary();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing position closed event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
public async Task OnPositionUpdatedAsync(PositionUpdatedEvent evt)
{
try
{
_logger.LogInformation("Position updated event received for user {UserId}, position: {PositionId}",
this.GetPrimaryKeyLong(), evt.PositionIdentifier);
// Update event-driven metrics for PnL changes
// Note: This is for real-time PnL updates, not cumulative like closed positions
_state.State.LastSummaryUpdate = DateTime.UtcNow;
await _state.WriteStateAsync();
// Update the agent summary with the new data
await UpdateSummary();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing position updated event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
/// <summary>
/// Updates the agent summary by recalculating from position data (used for initialization or manual refresh)
/// </summary>
private async Task UpdateSummary()
{
try
{
// Get all positions for this agent's bots as initiator
var positions = await _tradingService.GetPositionsByInitiatorIdentifiersAsync(_state.State.BotIds);
// Calculate aggregated statistics from position data
var totalPnL = positions.Sum(p => p.ProfitAndLoss?.Realized ?? 0);
var totalVolume = positions.Sum(p => p.Open.Price * p.Open.Quantity);
var totalFees = positions.Sum(p => p.CalculateTotalFees());
// Calculate wins/losses from position PnL
var totalWins = positions.Count(p => (p.ProfitAndLoss?.Realized ?? 0) > 0);
var totalLosses = positions.Count(p => (p.ProfitAndLoss?.Realized ?? 0) < 0);
// Calculate ROI based on total volume traded with proper division by zero handling
var totalVolume = bots.Sum(b => b.Volume);
decimal totalROI;
if (totalVolume > 0)
@@ -148,11 +215,11 @@ public class AgentGrain : Grain, IAgentGrain
totalROI = 0;
}
// Calculate Runtime based on the farthest date from bot startup times
// Calculate Runtime based on the earliest position date
DateTime? runtime = null;
if (bots.Any())
if (positions.Any())
{
runtime = bots.Max(b => b.StartupTime);
runtime = positions.Min(p => p.Date);
}
// Calculate total balance (USDC + open positions value)
@@ -192,6 +259,10 @@ public class AgentGrain : Grain, IAgentGrain
totalBalance = 0; // Set to 0 if calculation fails
}
// Get active strategies count from bot data (this is still needed for running bots)
var bots = await _botService.GetBotsByIdsAsync(_state.State.BotIds);
var activeStrategiesCount = bots.Count(b => b.Status == BotStatus.Running);
var summary = new AgentSummary
{
UserId = (int)this.GetPrimaryKeyLong(),
@@ -201,13 +272,16 @@ public class AgentGrain : Grain, IAgentGrain
Losses = totalLosses,
TotalROI = totalROI,
Runtime = runtime,
ActiveStrategiesCount = bots.Count(b => b.Status == BotStatus.Running),
ActiveStrategiesCount = activeStrategiesCount,
TotalVolume = totalVolume,
TotalBalance = totalBalance,
};
// Save summary to database
await _agentService.SaveOrUpdateAgentSummary(summary);
_logger.LogDebug("Updated agent summary from position data for user {UserId}: PnL={PnL}, Volume={Volume}, Wins={Wins}, Losses={Losses}",
this.GetPrimaryKeyLong(), totalPnL, totalVolume, totalWins, totalLosses);
}
catch (Exception ex)
{
@@ -219,6 +293,10 @@ public class AgentGrain : Grain, IAgentGrain
{
if (_state.State.BotIds.Add(botId))
{
// Update active strategies count
_state.State.ActiveStrategiesCount = _state.State.BotIds.Count;
_state.State.LastSummaryUpdate = DateTime.UtcNow;
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} registered to Agent {UserId}", botId, this.GetPrimaryKeyLong());
@@ -231,6 +309,10 @@ public class AgentGrain : Grain, IAgentGrain
{
if (_state.State.BotIds.Remove(botId))
{
// Update active strategies count
_state.State.ActiveStrategiesCount = _state.State.BotIds.Count;
_state.State.LastSummaryUpdate = DateTime.UtcNow;
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} unregistered from Agent {UserId}", botId, this.GetPrimaryKeyLong());

View File

@@ -1,4 +1,3 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Orleans;
using Microsoft.Extensions.Logging;
@@ -17,17 +16,14 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
{
private readonly IPersistentState<BotRegistryState> _state;
private readonly ILogger<LiveBotRegistryGrain> _logger;
private readonly IBotService _botService;
public LiveBotRegistryGrain(
[PersistentState("bot-registry", "registry-store")]
IPersistentState<BotRegistryState> state,
ILogger<LiveBotRegistryGrain> logger,
IBotService botService)
ILogger<LiveBotRegistryGrain> logger)
{
_state = state;
_logger = logger;
_botService = botService;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
@@ -196,8 +192,8 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
{
var platformGrain = GrainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
await platformGrain.UpdateActiveStrategyCountAsync(_state.State.ActiveBotsCount);
_logger.LogDebug("Notified platform summary about active strategy count change. New count: {ActiveCount}",
_logger.LogDebug("Notified platform summary about active strategy count change. New count: {ActiveCount}",
_state.State.ActiveBotsCount);
}
catch (Exception ex)

View File

@@ -24,6 +24,49 @@ namespace Managing.Application.Bots.Models
/// </summary>
[Id(4)]
public CachedBalanceData? CachedBalanceData { get; set; } = null;
// Event-driven metrics for real-time updates
/// <summary>
/// Total PnL calculated from position events
/// </summary>
[Id(5)]
public decimal TotalPnL { get; set; } = 0;
/// <summary>
/// Total volume calculated from position events
/// </summary>
[Id(6)]
public decimal TotalVolume { get; set; } = 0;
/// <summary>
/// Total wins count from closed positions
/// </summary>
[Id(7)]
public int Wins { get; set; } = 0;
/// <summary>
/// Total losses count from closed positions
/// </summary>
[Id(8)]
public int Losses { get; set; } = 0;
/// <summary>
/// Total fees paid from position events
/// </summary>
[Id(9)]
public decimal TotalFees { get; set; } = 0;
/// <summary>
/// Active strategies count (updated by bot registration/unregistration)
/// </summary>
[Id(10)]
public int ActiveStrategiesCount { get; set; } = 0;
/// <summary>
/// Last time the summary was updated
/// </summary>
[Id(11)]
public DateTime LastSummaryUpdate { get; set; } = DateTime.UtcNow;
}
/// <summary>

View File

@@ -1,6 +1,5 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Trading.Commands;
using Managing.Application.Trading.Handlers;
@@ -95,10 +94,6 @@ public class TradingBotBase : ITradingBot
$"📢 I'll notify you when signals are triggered.";
await LogInformation(startupMessage);
// Notify AgentGrain about bot startup
await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.BotStarted,
$"Bot: {Config.Name}, Ticker: {Config.Ticker}");
break;
case BotStatus.Running:
@@ -393,36 +388,29 @@ public class TradingBotBase : ITradingBot
}
});
NotificationEventType eventType = NotificationEventType.PositionUpdated;
if (!Config.IsForBacktest)
{
var brokerPosition = brokerPositions.FirstOrDefault(p => p.Ticker == Config.Ticker);
if (brokerPosition != null)
{
// Calculate net PnL after fees for broker position
var previousPositionStatus = internalPosition.Status;
// Position found on the broker, means the position is filled
var brokerNetPnL = brokerPosition.GetNetPnL();
UpdatePositionPnl(positionForSignal.Identifier, brokerNetPnL);
internalPosition.ProfitAndLoss = new ProfitAndLoss { Realized = brokerNetPnL };
internalPosition.Status = PositionStatus.Filled;
await SetPositionStatus(internalPosition.SignalIdentifier, PositionStatus.Filled);
// Update Open trade status when position is found on broker
if (internalPosition.Open != null)
internalPosition.Open.SetStatus(TradeStatus.Filled);
positionForSignal.Open.SetStatus(TradeStatus.Filled);
eventType = NotificationEventType.PositionOpened;
await UpdatePositionDatabase(internalPosition);
if (previousPositionStatus != PositionStatus.Filled && internalPosition.Status == PositionStatus.Filled)
{
internalPosition.Open.SetStatus(TradeStatus.Filled);
}
// Also update the position in the bot's positions dictionary
if (positionForSignal.Open != null)
{
positionForSignal.Open.SetStatus(TradeStatus.Filled);
}
if (internalPosition.Status.Equals(PositionStatus.New))
{
await SetPositionStatus(internalPosition.SignalIdentifier, PositionStatus.Filled);
// Notify platform summary about the executed trade
await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionOpened,
$"Position found on broker: {internalPosition.Identifier}", internalPosition);
await NotifyAgentAndPlatformGrainAsync(NotificationEventType.PositionUpdated, internalPosition);
}
}
else
@@ -431,18 +419,6 @@ public class TradingBotBase : ITradingBot
if (internalPosition.Status.Equals(PositionStatus.Filled))
{
internalPosition.Status = PositionStatus.Finished;
// Update Open trade status when position becomes Finished
if (internalPosition.Open != null)
{
internalPosition.Open.SetStatus(TradeStatus.Filled);
}
// Also update the position in the bot's positions dictionary
if (positionForSignal.Open != null)
{
positionForSignal.Open.SetStatus(TradeStatus.Filled);
}
}
}
}
@@ -496,6 +472,7 @@ public class TradingBotBase : ITradingBot
}
else if (ordersCount == 2)
{
// TODO: This should never happen, but just in case
// Check if position is already open on broker with 2 orders
await LogInformation(
$"🔍 **Checking Broker Position**\nPosition has exactly `{orders.Count()}` open orders\nChecking if position is already open on broker...");
@@ -532,8 +509,7 @@ public class TradingBotBase : ITradingBot
await SetPositionStatus(signal.Identifier, PositionStatus.Filled);
// Notify platform summary about the executed trade
await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionOpened,
$"Position found on broker with 2 orders: {internalPosition.Identifier}",
await NotifyAgentAndPlatformGrainAsync(NotificationEventType.PositionOpened,
internalPosition);
}
else
@@ -560,8 +536,7 @@ public class TradingBotBase : ITradingBot
{
await HandleClosedPosition(positionForSignal);
}
else if (internalPosition.Status == PositionStatus.Filled ||
internalPosition.Status == PositionStatus.PartiallyFilled)
else if (internalPosition.Status == PositionStatus.Filled)
{
Candle lastCandle = null;
await ServiceScopeHelpers.WithScopedService<IExchangeService>(_scopeFactory, async exchangeService =>
@@ -600,6 +575,7 @@ public class TradingBotBase : ITradingBot
}
}
// For backtest and to make sure position is closed based on SL and TP
if (positionForSignal.OriginDirection == TradeDirection.Long)
{
if (positionForSignal.StopLoss.Price >= lastCandle.Low)
@@ -779,36 +755,7 @@ public class TradingBotBase : ITradingBot
}
}
if (!Config.IsForBacktest)
{
// Update position in database with broker data
await ServiceScopeHelpers.WithScopedService<ITradingService>(_scopeFactory, async tradingService =>
{
// Update the internal position with broker data
internalPosition.Status = PositionStatus.Filled;
// Apply fees to the internal position PnL before saving
if (internalPosition.ProfitAndLoss != null)
{
var totalFees = internalPosition.CalculateTotalFees();
internalPosition.ProfitAndLoss.Realized = internalPosition.ProfitAndLoss.Realized - totalFees;
}
// Update Open trade status when position is updated to Filled
if (internalPosition.Open != null)
{
internalPosition.Open.SetStatus(TradeStatus.Filled);
}
// Also update the position in the bot's positions dictionary
if (positionForSignal.Open != null)
{
positionForSignal.Open.SetStatus(TradeStatus.Filled);
}
// Save updated position to database
await tradingService.UpdatePositionAsync(internalPosition);
});
}
}
catch (Exception ex)
{
@@ -818,6 +765,14 @@ public class TradingBotBase : ITradingBot
}
}
private async Task UpdatePositionDatabase(Position position)
{
await ServiceScopeHelpers.WithScopedService<ITradingService>(_scopeFactory, async tradingService =>
{
await tradingService.UpdatePositionAsync(position);
});
}
private async Task<Position> OpenPosition(LightSignal signal)
{
Logger.LogInformation($"Opening position for {signal.Identifier}");
@@ -936,8 +891,8 @@ public class TradingBotBase : ITradingBot
}
// Notify AgentGrain about position opening
await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionOpened,
$"Signal: {signal.Identifier}", position);
await NotifyAgentAndPlatformGrainAsync(NotificationEventType.PositionOpened,
position);
Logger.LogInformation($"Position requested");
return position; // Return the created position without adding to list
@@ -1418,13 +1373,6 @@ public class TradingBotBase : ITradingBot
string.Format("💰 **Balance Updated**\nNew bot trading balance: `${0:F2}`",
Config.BotTradingBalance));
}
// Notify AgentGrain about position closing
var pnlInfo = position.ProfitAndLoss?.Realized != null
? string.Format("PnL: {0:F2}", position.ProfitAndLoss.Realized)
: "PnL: Unknown";
await NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType.PositionClosed,
string.Format("Signal: {0}, {1}", position.SignalIdentifier, pnlInfo), position);
}
else
{
@@ -1555,7 +1503,7 @@ public class TradingBotBase : ITradingBot
public decimal GetProfitAndLoss()
{
// Calculate net PnL after deducting fees for each position
var netPnl = Positions.Values.Where(p => p.ProfitAndLoss != null && p.IsFinished())
var netPnl = Positions.Values.Where(p => p.ProfitAndLoss != null)
.Sum(p => p.GetNetPnL());
return netPnl;
}
@@ -2120,13 +2068,12 @@ public class TradingBotBase : ITradingBot
}
/// <summary>
/// Notifies both AgentGrain and PlatformSummaryGrain about bot events
/// Notifies both AgentGrain and PlatformSummaryGrain about bot events using unified event data
/// </summary>
/// <param name="eventType">The type of event (e.g., PositionOpened, PositionClosed)</param>
/// <param name="additionalData">Optional additional context data</param>
/// <param name="eventType">The type of event (e.g., PositionOpened, PositionClosed, PositionUpdated)</param>
/// <param name="position">Optional position data for platform summary events</param>
private async Task NotifyAgentAndPlatformGrainAsync(AgentSummaryEventType eventType, string additionalData = null,
Position position = null)
private async Task NotifyAgentAndPlatformGrainAsync(NotificationEventType eventType,
Position position)
{
if (Config.IsForBacktest)
{
@@ -2137,33 +2084,13 @@ public class TradingBotBase : ITradingBot
{
await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
{
// Notify AgentGrain (user-specific metrics)
if (Account?.User != null)
{
var agentGrain = grainFactory.GetGrain<IAgentGrain>(Account.User.Id);
var updateEvent = new AgentSummaryUpdateEvent
{
BotId = Identifier,
EventType = eventType,
Timestamp = DateTime.UtcNow,
AdditionalData = additionalData
};
await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
Logger.LogDebug("Sent agent notification: {EventType} for bot {BotId}", eventType, Identifier);
}
// Notify PlatformSummaryGrain (platform-wide metrics)
var agentGrain = grainFactory.GetGrain<IAgentGrain>(Account.User.Id);
var platformGrain = grainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
// Create unified event objects based on event type
switch (eventType)
{
case AgentSummaryEventType.PositionOpened when position != null:
// Position opening is now handled by TradeExecutedEvent in PublishTradeExecutedEventAsync
Logger.LogDebug(
"Position opened notification sent via TradeExecutedEvent for position {PositionId}",
position.Identifier);
case NotificationEventType.PositionOpened:
var positionOpenEvent = new PositionOpenEvent
{
PositionIdentifier = position.Identifier,
@@ -2172,10 +2099,15 @@ public class TradingBotBase : ITradingBot
Fee = position.GasFees + position.UiFees,
Direction = position.OriginDirection
};
await agentGrain.OnPositionOpenedAsync(positionOpenEvent);
await platformGrain.OnPositionOpenAsync(positionOpenEvent);
Logger.LogDebug("Sent position opened event to both grains for position {PositionId}",
position.Identifier);
break;
case AgentSummaryEventType.PositionClosed when position != null:
case NotificationEventType.PositionClosed:
var positionClosedEvent = new PositionClosedEvent
{
PositionIdentifier = position.Identifier,
@@ -2183,8 +2115,23 @@ public class TradingBotBase : ITradingBot
RealizedPnL = position.ProfitAndLoss?.Realized ?? 0,
Volume = position.Open.Price * position.Open.Quantity * position.Open.Leverage,
};
await agentGrain.OnPositionClosedAsync(positionClosedEvent);
await platformGrain.OnPositionClosedAsync(positionClosedEvent);
Logger.LogDebug("Sent platform position closed notification for position {PositionId}",
Logger.LogDebug("Sent position closed event to both grains for position {PositionId}",
position.Identifier);
break;
case NotificationEventType.PositionUpdated:
var positionUpdatedEvent = new PositionUpdatedEvent
{
PositionIdentifier = position.Identifier,
};
await agentGrain.OnPositionUpdatedAsync(positionUpdatedEvent);
Logger.LogDebug("Sent position updated event to both grains for position {PositionId}",
position.Identifier);
break;
}