Add platform grain

This commit is contained in:
2025-08-14 19:44:33 +07:00
parent 345d76e06f
commit 4a45d6c970
13 changed files with 2324 additions and 59 deletions

View File

@@ -633,6 +633,12 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
.Sum(p => p.Open.Quantity * p.Open.Price);
var roi = totalInvestment > 0 ? (pnl / totalInvestment) * 100 : 0;
// Calculate long and short position counts
var longPositionCount = _tradingBot.Positions.Values
.Count(p => p.OriginDirection == TradeDirection.Long);
var shortPositionCount = _tradingBot.Positions.Values
.Count(p => p.OriginDirection == TradeDirection.Short);
// Create complete Bot object with all statistics
bot = new Bot
{
@@ -648,7 +654,9 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
Pnl = pnl,
Roi = roi,
Volume = volume,
Fees = fees
Fees = fees,
LongPositionCount = longPositionCount,
ShortPositionCount = shortPositionCount
};
}
@@ -659,8 +667,8 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
if (success)
{
_logger.LogDebug(
"Successfully saved bot statistics for bot {BotId}: Wins={Wins}, Losses={Losses}, PnL={PnL}, ROI={ROI}%, Volume={Volume}, Fees={Fees}",
_state.State.Identifier, bot.TradeWins, bot.TradeLosses, bot.Pnl, bot.Roi, bot.Volume, bot.Fees);
"Successfully saved bot statistics for bot {BotId}: Wins={Wins}, Losses={Losses}, PnL={PnL}, ROI={ROI}%, Volume={Volume}, Fees={Fees}, Long={LongPositions}, Short={ShortPositions}",
_state.State.Identifier, bot.TradeWins, bot.TradeLosses, bot.Pnl, bot.Roi, bot.Volume, bot.Fees, bot.LongPositionCount, bot.ShortPositionCount);
}
else
{

View File

@@ -0,0 +1,416 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Domain.Bots;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Grains;
/// <summary>
/// Grain for managing platform-wide summary metrics with real-time updates and periodic snapshots
/// </summary>
public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
{
private readonly IPersistentState<PlatformSummaryGrainState> _state;
private readonly IBotService _botService;
private readonly IAgentService _agentService;
private readonly ITradingService _tradingService;
private readonly ILogger<PlatformSummaryGrain> _logger;
private const string _hourlySnapshotReminder = "HourlySnapshot";
private const string _dailySnapshotReminder = "DailySnapshot";
public PlatformSummaryGrain(
[PersistentState("platform-summary-state", "platform-summary-store")]
IPersistentState<PlatformSummaryGrainState> state,
IBotService botService,
IAgentService agentService,
ITradingService tradingService,
ILogger<PlatformSummaryGrain> logger)
{
_state = state;
_botService = botService;
_agentService = agentService;
_tradingService = tradingService;
_logger = logger;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Platform Summary Grain activated");
// Set up reminders for periodic snapshots
await this.RegisterOrUpdateReminder(_hourlySnapshotReminder,
TimeSpan.FromHours(1), TimeSpan.FromHours(1));
var now = DateTime.UtcNow;
var nextMidnight = now.Date.AddDays(1);
var timeUntilMidnight = nextMidnight - now;
await this.RegisterOrUpdateReminder(_dailySnapshotReminder,
timeUntilMidnight, TimeSpan.FromDays(1));
// Initial data load if state is empty
if (_state.State.LastUpdated == default)
{
await RefreshDataAsync();
}
}
public async Task<PlatformSummaryViewModel> GetPlatformSummaryAsync()
{
// If data is stale or has pending changes, refresh it
if (IsDataStale() || _state.State.HasPendingChanges)
{
await RefreshDataAsync();
}
return MapToViewModel(_state.State);
}
public async Task RefreshDataAsync()
{
try
{
_logger.LogInformation("Refreshing platform summary data");
// Get all data in parallel for better performance
var agentsTask = _agentService.GetAllAgentSummaries();
var strategiesTask = _botService.GetBotsAsync();
await Task.WhenAll(agentsTask, strategiesTask);
var agents = await agentsTask;
var strategies = await strategiesTask;
// Calculate totals
var totalAgents = agents.Count();
var totalActiveStrategies = strategies.Count(s => s.Status == BotStatus.Running);
// Calculate volume and PnL from strategies
var totalVolume = strategies.Sum(s => s.Volume);
var totalPnL = strategies.Sum(s => s.Pnl);
// Calculate real open interest and position count from actual positions
var (totalOpenInterest, totalPositionCount) = await CalculatePositionMetricsAsync();
// Update state
_state.State.TotalAgents = totalAgents;
_state.State.TotalActiveStrategies = totalActiveStrategies;
_state.State.TotalPlatformVolume = totalVolume;
_state.State.TotalPlatformPnL = totalPnL;
_state.State.TotalOpenInterest = totalOpenInterest;
_state.State.TotalPositionCount = totalPositionCount;
_state.State.LastUpdated = DateTime.UtcNow;
_state.State.HasPendingChanges = false;
// Update volume breakdown by asset
await UpdateVolumeBreakdownAsync(strategies);
// Update position count breakdown
await UpdatePositionCountBreakdownAsync(strategies);
await _state.WriteStateAsync();
_logger.LogInformation("Platform summary data refreshed successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error refreshing platform summary data");
}
}
private async Task UpdateVolumeBreakdownAsync(IEnumerable<Bot> strategies)
{
_state.State.VolumeByAsset.Clear();
// Group strategies by ticker and sum their volumes
var volumeByAsset = strategies
.Where(s => s.Volume > 0)
.GroupBy(s => s.Ticker.ToString())
.ToDictionary(g => g.Key, g => g.Sum(s => s.Volume));
foreach (var kvp in volumeByAsset)
{
_state.State.VolumeByAsset[kvp.Key] = kvp.Value;
}
_logger.LogDebug("Updated volume breakdown: {AssetCount} assets with total volume {TotalVolume}",
volumeByAsset.Count, volumeByAsset.Values.Sum());
}
private async Task UpdatePositionCountBreakdownAsync(IEnumerable<Bot> strategies)
{
_state.State.PositionCountByAsset.Clear();
_state.State.PositionCountByDirection.Clear();
// Use position counts directly from bot statistics
var activeStrategies = strategies.Where(s => s.Status != BotStatus.Saved).ToList();
if (activeStrategies.Any())
{
// Group by asset and sum position counts per asset
var positionsByAsset = activeStrategies
.GroupBy(s => s.Ticker.ToString())
.ToDictionary(g => g.Key, g => g.Sum(b => b.LongPositionCount + b.ShortPositionCount));
// Sum long and short position counts across all bots
var totalLongPositions = activeStrategies.Sum(s => s.LongPositionCount);
var totalShortPositions = activeStrategies.Sum(s => s.ShortPositionCount);
// Update state
foreach (var kvp in positionsByAsset)
{
_state.State.PositionCountByAsset[kvp.Key] = kvp.Value;
}
_state.State.PositionCountByDirection[TradeDirection.Long] = totalLongPositions;
_state.State.PositionCountByDirection[TradeDirection.Short] = totalShortPositions;
_logger.LogDebug("Updated position breakdown from bot statistics: {AssetCount} assets, Long={LongPositions}, Short={ShortPositions}",
positionsByAsset.Count, totalLongPositions, totalShortPositions);
}
else
{
_logger.LogDebug("No active strategies found for position breakdown");
}
}
private async Task<(decimal totalOpenInterest, int totalPositionCount)> CalculatePositionMetricsAsync()
{
try
{
// Get all open positions from all accounts
var openPositions = await _tradingService.GetBrokerPositions(null);
if (openPositions?.Any() == true)
{
var positionCount = openPositions.Count();
// Calculate open interest as the sum of position notional values
// Open interest = sum of (position size * price) for all open positions
var openInterest = openPositions
.Where(p => p.Open?.Price > 0 && p.Open?.Quantity > 0)
.Sum(p => p.Open.Price * p.Open.Quantity);
_logger.LogDebug("Calculated position metrics: {PositionCount} positions, {OpenInterest} open interest",
positionCount, openInterest);
return (openInterest, positionCount);
}
else
{
_logger.LogDebug("No open positions found for metrics calculation");
return (0m, 0);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to calculate position metrics, returning zero values");
return (0m, 0);
}
}
public Task<decimal> GetTotalVolumeAsync()
{
return Task.FromResult(_state.State.TotalPlatformVolume);
}
public Task<decimal> GetTotalPnLAsync()
{
return Task.FromResult(_state.State.TotalPlatformPnL);
}
public Task<decimal> GetTotalOpenInterestAsync()
{
return Task.FromResult(_state.State.TotalOpenInterest);
}
public Task<int> GetTotalPositionCountAsync()
{
return Task.FromResult(_state.State.TotalPositionCount);
}
// Event handlers for immediate updates
public async Task OnStrategyDeployedAsync(StrategyDeployedEvent evt)
{
_logger.LogInformation("Strategy deployed: {StrategyId} - {StrategyName}", evt.StrategyId, evt.StrategyName);
_state.State.TotalActiveStrategies++;
_state.State.HasPendingChanges = true;
await _state.WriteStateAsync();
}
public async Task OnStrategyStoppedAsync(StrategyStoppedEvent evt)
{
_logger.LogInformation("Strategy stopped: {StrategyId} - {StrategyName}", evt.StrategyId, evt.StrategyName);
_state.State.TotalActiveStrategies--;
_state.State.HasPendingChanges = true;
await _state.WriteStateAsync();
}
public async Task OnPositionOpenedAsync(PositionOpenedEvent evt)
{
_logger.LogInformation("Position opened: {PositionId} for {Ticker}", evt.PositionId, evt.Ticker);
_state.State.TotalPositionCount++;
_state.State.TotalOpenInterest += evt.NotionalValue;
_state.State.HasPendingChanges = true;
await _state.WriteStateAsync();
}
public async Task OnPositionClosedAsync(PositionClosedEvent evt)
{
_logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}",
evt.PositionId, evt.Ticker, evt.RealizedPnL);
_state.State.TotalPositionCount--;
_state.State.TotalPlatformVolume += evt.Volume;
_state.State.TotalPlatformPnL += evt.RealizedPnL;
// Update volume by asset
var asset = evt.Ticker;
if (!_state.State.VolumeByAsset.ContainsKey(asset))
{
_state.State.VolumeByAsset[asset] = 0;
}
_state.State.VolumeByAsset[asset] += evt.Volume;
_state.State.HasPendingChanges = true;
await _state.WriteStateAsync();
}
public async Task OnTradeExecutedAsync(TradeExecutedEvent evt)
{
_logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}",
evt.TradeId, evt.Ticker, evt.Volume);
_state.State.TotalPlatformVolume += evt.Volume;
_state.State.TotalPlatformPnL += evt.PnL;
// Update volume by asset
var asset = evt.Ticker;
if (!_state.State.VolumeByAsset.ContainsKey(asset))
{
_state.State.VolumeByAsset[asset] = 0;
}
_state.State.VolumeByAsset[asset] += evt.Volume;
_state.State.HasPendingChanges = true;
await _state.WriteStateAsync();
}
// Reminder handlers for periodic snapshots
public async Task ReceiveReminder(string reminderName, TickStatus status)
{
_logger.LogInformation("Reminder received: {ReminderName}", reminderName);
switch (reminderName)
{
case _hourlySnapshotReminder:
await TakeHourlySnapshotAsync();
break;
case _dailySnapshotReminder:
await TakeDailySnapshotAsync();
break;
}
}
private async Task TakeHourlySnapshotAsync()
{
_logger.LogInformation("Taking hourly snapshot");
var snapshot = new HourlySnapshot
{
Timestamp = DateTime.UtcNow,
TotalAgents = _state.State.TotalAgents,
TotalStrategies = _state.State.TotalActiveStrategies,
TotalVolume = _state.State.TotalPlatformVolume,
TotalPnL = _state.State.TotalPlatformPnL,
TotalOpenInterest = _state.State.TotalOpenInterest,
TotalPositionCount = _state.State.TotalPositionCount
};
_state.State.HourlySnapshots.Add(snapshot);
// Keep only last 24 hours
var cutoff = DateTime.UtcNow.AddHours(-24);
_state.State.HourlySnapshots.RemoveAll(s => s.Timestamp < cutoff);
await _state.WriteStateAsync();
}
private async Task TakeDailySnapshotAsync()
{
_logger.LogInformation("Taking daily snapshot");
// Store 24-hour ago values for comparison
_state.State.TotalAgents24hAgo = _state.State.TotalAgents;
_state.State.TotalActiveStrategies24hAgo = _state.State.TotalActiveStrategies;
_state.State.TotalPlatformPnL24hAgo = _state.State.TotalPlatformPnL;
_state.State.TotalPlatformVolume24hAgo = _state.State.TotalPlatformVolume;
_state.State.TotalOpenInterest24hAgo = _state.State.TotalOpenInterest;
_state.State.TotalPositionCount24hAgo = _state.State.TotalPositionCount;
// Add daily snapshot
var dailySnapshot = new DailySnapshot
{
Date = DateTime.UtcNow.Date,
TotalAgents = _state.State.TotalAgents,
TotalStrategies = _state.State.TotalActiveStrategies,
TotalVolume = _state.State.TotalPlatformVolume,
TotalPnL = _state.State.TotalPlatformPnL,
TotalOpenInterest = _state.State.TotalOpenInterest,
TotalPositionCount = _state.State.TotalPositionCount
};
_state.State.DailySnapshots.Add(dailySnapshot);
// Keep only last 30 days
var cutoff = DateTime.UtcNow.AddDays(-30);
_state.State.DailySnapshots.RemoveAll(s => s.Date < cutoff);
_state.State.LastSnapshot = DateTime.UtcNow;
await _state.WriteStateAsync();
}
private bool IsDataStale()
{
var timeSinceLastUpdate = DateTime.UtcNow - _state.State.LastUpdated;
return timeSinceLastUpdate > TimeSpan.FromMinutes(5);
}
private PlatformSummaryViewModel MapToViewModel(PlatformSummaryGrainState state)
{
return new PlatformSummaryViewModel
{
TotalAgents = state.TotalAgents,
TotalActiveStrategies = state.TotalActiveStrategies,
TotalPlatformPnL = state.TotalPlatformPnL,
TotalPlatformVolume = state.TotalPlatformVolume,
TotalPlatformVolumeLast24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo,
TotalOpenInterest = state.TotalOpenInterest,
TotalPositionCount = state.TotalPositionCount,
// 24-hour changes
AgentsChange24h = state.TotalAgents - state.TotalAgents24hAgo,
StrategiesChange24h = state.TotalActiveStrategies - state.TotalActiveStrategies24hAgo,
PnLChange24h = state.TotalPlatformPnL - state.TotalPlatformPnL24hAgo,
VolumeChange24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo,
OpenInterestChange24h = state.TotalOpenInterest - state.TotalOpenInterest24hAgo,
PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo,
// Breakdowns
VolumeByAsset = state.VolumeByAsset,
PositionCountByAsset = state.PositionCountByAsset,
PositionCountByDirection = state.PositionCountByDirection,
// Metadata
LastUpdated = state.LastUpdated,
Last24HourSnapshot = state.LastSnapshot
};
}
}