Add event

This commit is contained in:
2025-08-15 01:23:39 +07:00
parent 2622da05e6
commit e6c3ec139a
8 changed files with 169 additions and 44 deletions

View File

@@ -71,12 +71,6 @@ public class StrategyDeployedEvent : PlatformMetricsEvent
[Id(3)] [Id(3)]
public string StrategyName { get; set; } = string.Empty; public string StrategyName { get; set; } = string.Empty;
[Id(4)]
public decimal InitialVolume { get; set; }
[Id(5)]
public decimal InitialPnL { get; set; }
} }
/// <summary> /// <summary>
@@ -105,18 +99,12 @@ public class PositionOpenedEvent : PlatformMetricsEvent
public Guid PositionId { get; set; } public Guid PositionId { get; set; }
[Id(2)] [Id(2)]
public Guid StrategyId { get; set; } public Ticker Ticker { get; set; }
[Id(3)] [Id(3)]
public string Ticker { get; set; } = string.Empty; public decimal Volume { get; set; }
[Id(4)] [Id(4)]
public decimal Size { get; set; }
[Id(5)]
public decimal NotionalValue { get; set; }
[Id(6)]
public TradeDirection Direction { get; set; } public TradeDirection Direction { get; set; }
} }
@@ -130,16 +118,16 @@ public class PositionClosedEvent : PlatformMetricsEvent
public Guid PositionId { get; set; } public Guid PositionId { get; set; }
[Id(2)] [Id(2)]
public Guid StrategyId { get; set; } public Ticker Ticker { get; set; }
[Id(3)] [Id(3)]
public string Ticker { get; set; } = string.Empty;
[Id(4)]
public decimal RealizedPnL { get; set; } public decimal RealizedPnL { get; set; }
[Id(5)] [Id(4)]
public decimal Volume { get; set; } public decimal Volume { get; set; }
[Id(5)]
public decimal InitialVolume { get; set; }
} }
/// <summary> /// <summary>
@@ -158,7 +146,7 @@ public class TradeExecutedEvent : PlatformMetricsEvent
public Guid StrategyId { get; set; } public Guid StrategyId { get; set; }
[Id(4)] [Id(4)]
public string Ticker { get; set; } = string.Empty; public Ticker Ticker { get; set; }
[Id(5)] [Id(5)]
public decimal Volume { get; set; } public decimal Volume { get; set; }

View File

@@ -65,11 +65,11 @@ public class PlatformSummaryGrainState
// Volume breakdown by asset // Volume breakdown by asset
[Id(17)] [Id(17)]
public Dictionary<string, decimal> VolumeByAsset { get; set; } = new(); public Dictionary<Ticker, decimal> VolumeByAsset { get; set; } = new();
// Position count breakdown // Position count breakdown
[Id(18)] [Id(18)]
public Dictionary<string, int> PositionCountByAsset { get; set; } = new(); public Dictionary<Ticker, int> PositionCountByAsset { get; set; } = new();
[Id(19)] [Id(19)]
public Dictionary<TradeDirection, int> PositionCountByDirection { get; set; } = new(); public Dictionary<TradeDirection, int> PositionCountByDirection { get; set; } = new();

View File

@@ -93,13 +93,13 @@ public class PlatformSummaryViewModel
/// Volume breakdown by asset/ticker /// Volume breakdown by asset/ticker
/// </summary> /// </summary>
[Id(13)] [Id(13)]
public required Dictionary<string, decimal> VolumeByAsset { get; set; } public required Dictionary<Ticker, decimal> VolumeByAsset { get; set; }
/// <summary> /// <summary>
/// Position count breakdown by asset/ticker /// Position count breakdown by asset/ticker
/// </summary> /// </summary>
[Id(14)] [Id(14)]
public required Dictionary<string, int> PositionCountByAsset { get; set; } public required Dictionary<Ticker, int> PositionCountByAsset { get; set; }
/// <summary> /// <summary>
/// Position count breakdown by direction (Long/Short) /// Position count breakdown by direction (Long/Short)

View File

@@ -1,3 +1,4 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Grains;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using static Managing.Common.Enums; using static Managing.Common.Enums;
@@ -13,14 +14,17 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
{ {
private readonly IPersistentState<BotRegistryState> _state; private readonly IPersistentState<BotRegistryState> _state;
private readonly ILogger<LiveBotRegistryGrain> _logger; private readonly ILogger<LiveBotRegistryGrain> _logger;
private readonly IBotService _botService;
public LiveBotRegistryGrain( public LiveBotRegistryGrain(
[PersistentState("bot-registry", "registry-store")] [PersistentState("bot-registry", "registry-store")]
IPersistentState<BotRegistryState> state, IPersistentState<BotRegistryState> state,
ILogger<LiveBotRegistryGrain> logger) ILogger<LiveBotRegistryGrain> logger,
IBotService botService)
{ {
_state = state; _state = state;
_logger = logger; _logger = logger;
_botService = botService;
} }
public override async Task OnActivateAsync(CancellationToken cancellationToken) public override async Task OnActivateAsync(CancellationToken cancellationToken)
@@ -60,6 +64,9 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
_logger.LogInformation( _logger.LogInformation(
"Bot {Identifier} registered successfully for user {UserId}. Total bots: {TotalBots}, Active bots: {ActiveBots}", "Bot {Identifier} registered successfully for user {UserId}. Total bots: {TotalBots}, Active bots: {ActiveBots}",
identifier, userId, _state.State.TotalBotsCount, _state.State.ActiveBotsCount); identifier, userId, _state.State.TotalBotsCount, _state.State.ActiveBotsCount);
// Notify platform summary grain about strategy deployment
await NotifyStrategyDeployedAsync(identifier, userId);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -94,6 +101,9 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
_logger.LogInformation( _logger.LogInformation(
"Bot {Identifier} unregistered successfully from user {UserId}. Total bots: {TotalBots}", "Bot {Identifier} unregistered successfully from user {UserId}. Total bots: {TotalBots}",
identifier, entryToRemove.UserId, _state.State.TotalBotsCount); identifier, entryToRemove.UserId, _state.State.TotalBotsCount);
// Notify platform summary grain about strategy stopped
await NotifyStrategyStoppedAsync(identifier, entryToRemove.UserId);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -176,4 +186,66 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
return Task.FromResult(entry.Status); return Task.FromResult(entry.Status);
} }
private async Task NotifyStrategyDeployedAsync(Guid identifier, int userId)
{
try
{
// Get bot details for the event
var bot = await _botService.GetBotByIdentifier(identifier);
if (bot != null)
{
var platformGrain = GrainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
var deployedEvent = new StrategyDeployedEvent
{
StrategyId = identifier,
AgentName = bot.User.AgentName,
StrategyName = bot.Name
};
await platformGrain.OnStrategyDeployedAsync(deployedEvent);
_logger.LogDebug("Notified platform summary about strategy deployment: {StrategyName}", bot.Name);
}
else
{
_logger.LogWarning("Could not find bot {Identifier} to notify platform summary", identifier);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to notify platform summary about strategy deployment for bot {Identifier}", identifier);
}
}
private async Task NotifyStrategyStoppedAsync(Guid identifier, int userId)
{
try
{
// Get bot details for the event
var bot = await _botService.GetBotByIdentifier(identifier);
if (bot != null)
{
var platformGrain = GrainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
var stoppedEvent = new StrategyStoppedEvent
{
StrategyId = identifier,
AgentName = bot.User?.Name ?? $"User-{userId}",
StrategyName = bot.Name
};
await platformGrain.OnStrategyStoppedAsync(stoppedEvent);
_logger.LogDebug("Notified platform summary about strategy stopped: {StrategyName}", bot.Name);
}
else
{
_logger.LogWarning("Could not find bot {Identifier} to notify platform summary", identifier);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to notify platform summary about strategy stopped for bot {Identifier}", identifier);
}
}
} }

View File

@@ -377,6 +377,27 @@ public class TradingBotBase : ITradingBot
if (position.Status.Equals(PositionStatus.New)) if (position.Status.Equals(PositionStatus.New))
{ {
await SetPositionStatus(position.SignalIdentifier, PositionStatus.Filled); await SetPositionStatus(position.SignalIdentifier, PositionStatus.Filled);
// Notify platform summary about the executed trade
try
{
await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
{
var platformGrain = grainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
var tradeExecutedEvent = new TradeExecutedEvent
{
TradeId = position.Identifier,
Ticker = position.Ticker,
Volume = position.Open.Price * position.Open.Quantity * position.Open.Leverage
};
await platformGrain.OnTradeExecutedAsync(tradeExecutedEvent);
});
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Failed to notify platform summary about trade execution for position {PositionId}", position.Identifier);
}
} }
position = brokerPosition; position = brokerPosition;

View File

@@ -100,10 +100,10 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
_state.State.HasPendingChanges = false; _state.State.HasPendingChanges = false;
// Update volume breakdown by asset // Update volume breakdown by asset
await UpdateVolumeBreakdownAsync(strategies); UpdateVolumeBreakdown(strategies);
// Update position count breakdown // Update position count breakdown
await UpdatePositionCountBreakdownAsync(strategies); UpdatePositionCountBreakdown(strategies);
await _state.WriteStateAsync(); await _state.WriteStateAsync();
@@ -115,14 +115,14 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
} }
} }
private async Task UpdateVolumeBreakdownAsync(IEnumerable<Bot> strategies) private void UpdateVolumeBreakdown(IEnumerable<Bot> strategies)
{ {
_state.State.VolumeByAsset.Clear(); _state.State.VolumeByAsset.Clear();
// Group strategies by ticker and sum their volumes // Group strategies by ticker and sum their volumes
var volumeByAsset = strategies var volumeByAsset = strategies
.Where(s => s.Volume > 0) .Where(s => s.Volume > 0)
.GroupBy(s => s.Ticker.ToString()) .GroupBy(s => s.Ticker)
.ToDictionary(g => g.Key, g => g.Sum(s => s.Volume)); .ToDictionary(g => g.Key, g => g.Sum(s => s.Volume));
foreach (var kvp in volumeByAsset) foreach (var kvp in volumeByAsset)
@@ -134,7 +134,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
volumeByAsset.Count, volumeByAsset.Values.Sum()); volumeByAsset.Count, volumeByAsset.Values.Sum());
} }
private async Task UpdatePositionCountBreakdownAsync(IEnumerable<Bot> strategies) private void UpdatePositionCountBreakdown(IEnumerable<Bot> strategies)
{ {
_state.State.PositionCountByAsset.Clear(); _state.State.PositionCountByAsset.Clear();
_state.State.PositionCountByDirection.Clear(); _state.State.PositionCountByDirection.Clear();
@@ -146,7 +146,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
{ {
// Group by asset and sum position counts per asset // Group by asset and sum position counts per asset
var positionsByAsset = activeStrategies var positionsByAsset = activeStrategies
.GroupBy(s => s.Ticker.ToString()) .GroupBy(s => s.Ticker)
.ToDictionary(g => g.Key, g => g.Sum(b => b.LongPositionCount + b.ShortPositionCount)); .ToDictionary(g => g.Key, g => g.Sum(b => b.LongPositionCount + b.ShortPositionCount));
// Sum long and short position counts across all bots // Sum long and short position counts across all bots
@@ -252,7 +252,6 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
_logger.LogInformation("Position opened: {PositionId} for {Ticker}", evt.PositionId, evt.Ticker); _logger.LogInformation("Position opened: {PositionId} for {Ticker}", evt.PositionId, evt.Ticker);
_state.State.TotalPositionCount++; _state.State.TotalPositionCount++;
_state.State.TotalOpenInterest += evt.NotionalValue;
_state.State.HasPendingChanges = true; _state.State.HasPendingChanges = true;
await _state.WriteStateAsync(); await _state.WriteStateAsync();
} }
@@ -274,6 +273,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
} }
_state.State.VolumeByAsset[asset] += evt.Volume; _state.State.VolumeByAsset[asset] += evt.Volume;
_state.State.TotalOpenInterest -= evt.InitialVolume;
_state.State.HasPendingChanges = true; _state.State.HasPendingChanges = true;
await _state.WriteStateAsync(); await _state.WriteStateAsync();
@@ -285,7 +285,6 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
evt.TradeId, evt.Ticker, evt.Volume); evt.TradeId, evt.Ticker, evt.Volume);
_state.State.TotalPlatformVolume += evt.Volume; _state.State.TotalPlatformVolume += evt.Volume;
_state.State.TotalPlatformPnL += evt.PnL;
// Update volume by asset // Update volume by asset
var asset = evt.Ticker; var asset = evt.Ticker;
@@ -402,8 +401,8 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo, PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo,
// Breakdowns // Breakdowns
VolumeByAsset = state.VolumeByAsset ?? new Dictionary<string, decimal>(), VolumeByAsset = state.VolumeByAsset ?? new Dictionary<Ticker, decimal>(),
PositionCountByAsset = state.PositionCountByAsset ?? new Dictionary<string, int>(), PositionCountByAsset = state.PositionCountByAsset ?? new Dictionary<Ticker, int>(),
PositionCountByDirection = state.PositionCountByDirection ?? new Dictionary<TradeDirection, int>(), PositionCountByDirection = state.PositionCountByDirection ?? new Dictionary<TradeDirection, int>(),
// Metadata // Metadata

View File

@@ -1,4 +1,5 @@
using Managing.Application.Abstractions; using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services; using Managing.Application.Abstractions.Services;
using Managing.Application.Trading.Commands; using Managing.Application.Trading.Commands;
using Managing.Domain.Shared.Helpers; using Managing.Domain.Shared.Helpers;
@@ -12,6 +13,7 @@ public class ClosePositionCommandHandler(
IExchangeService exchangeService, IExchangeService exchangeService,
IAccountService accountService, IAccountService accountService,
ITradingService tradingService, ITradingService tradingService,
IGrainFactory? grainFactory = null,
ILogger<ClosePositionCommandHandler> logger = null) ILogger<ClosePositionCommandHandler> logger = null)
: ICommandHandler<ClosePositionCommand, Position> : ICommandHandler<ClosePositionCommand, Position>
{ {
@@ -68,6 +70,30 @@ public class ClosePositionCommandHandler(
if (!request.IsForBacktest) if (!request.IsForBacktest)
await tradingService.UpdatePositionAsync(request.Position); await tradingService.UpdatePositionAsync(request.Position);
// Notify platform summary about the closed position
try
{
var platformGrain = grainFactory?.GetGrain<IPlatformSummaryGrain>("platform-summary");
if (platformGrain != null)
{
var positionClosedEvent = new PositionClosedEvent
{
PositionId = request.Position.Identifier,
Ticker = request.Position.Ticker,
RealizedPnL = request.Position.ProfitAndLoss?.Realized ?? 0,
Volume = closedPosition.Quantity * lastPrice * request.Position.Open.Leverage,
InitialVolume = request.Position.Open.Quantity * request.Position.Open.Price * request.Position.Open.Leverage
};
await platformGrain.OnPositionClosedAsync(positionClosedEvent);
}
}
catch (Exception ex)
{
SentrySdk.CaptureException(ex);
logger?.LogError(ex, "Failed to notify platform summary about position closure for position {PositionId}", request.Position.Identifier);
}
} }
return request.Position; return request.Position;
@@ -77,6 +103,7 @@ public class ClosePositionCommandHandler(
logger?.LogError(ex, "Error closing position: {Message} \n Stacktrace : {StackTrace}", ex.Message, logger?.LogError(ex, "Error closing position: {Message} \n Stacktrace : {StackTrace}", ex.Message,
ex.StackTrace); ex.StackTrace);
SentrySdk.CaptureException(ex);
throw; throw;
} }
} }

View File

@@ -1,4 +1,5 @@
using Managing.Application.Abstractions; using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services; using Managing.Application.Abstractions.Services;
using Managing.Application.Trading.Commands; using Managing.Application.Trading.Commands;
using Managing.Common; using Managing.Common;
@@ -11,20 +12,14 @@ namespace Managing.Application.Trading.Handlers
public class OpenPositionCommandHandler( public class OpenPositionCommandHandler(
IExchangeService exchangeService, IExchangeService exchangeService,
IAccountService accountService, IAccountService accountService,
ITradingService tradingService) ITradingService tradingService,
IGrainFactory? grainFactory = null)
: ICommandHandler<OpenPositionRequest, Position> : ICommandHandler<OpenPositionRequest, Position>
{ {
public async Task<Position> Handle(OpenPositionRequest request) public async Task<Position> Handle(OpenPositionRequest request)
{ {
var account = await accountService.GetAccount(request.AccountName, hideSecrets: false, getBalance: false); var account = await accountService.GetAccount(request.AccountName, hideSecrets: false, getBalance: false);
if (!request.IsForPaperTrading)
{
// var cancelOrderResult = await exchangeService.CancelOrder(account, request.Ticker);
// if (!cancelOrderResult)
// {
// throw new Exception($"Not able to close all orders for {request.Ticker}");
// }
}
var initiator = request.IsForPaperTrading ? PositionInitiator.PaperTrading : request.Initiator; var initiator = request.IsForPaperTrading ? PositionInitiator.PaperTrading : request.Initiator;
var position = new Position(Guid.NewGuid(), request.AccountName, request.Direction, var position = new Position(Guid.NewGuid(), request.AccountName, request.Direction,
@@ -108,6 +103,29 @@ namespace Managing.Application.Trading.Handlers
if (!request.IsForPaperTrading) if (!request.IsForPaperTrading)
{ {
await tradingService.InsertPositionAsync(position); await tradingService.InsertPositionAsync(position);
// Notify platform summary about the opened position
try
{
var platformGrain = grainFactory?.GetGrain<IPlatformSummaryGrain>("platform-summary");
if (platformGrain != null)
{
var positionOpenedEvent = new PositionOpenedEvent
{
PositionId = position.Identifier,
Ticker = position.Ticker,
Volume = position.Open.Price * position.Open.Quantity * position.Open.Leverage,
Direction = position.OriginDirection
};
await platformGrain.OnPositionOpenedAsync(positionOpenedEvent);
}
}
catch (Exception)
{
// Log error but don't fail the position creation
// This is a non-critical notification
}
} }
return position; return position;