Update front and fix back
This commit is contained in:
@@ -18,10 +18,10 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
private readonly IAgentService _agentService;
|
||||
private readonly ITradingService _tradingService;
|
||||
private readonly ILogger<PlatformSummaryGrain> _logger;
|
||||
|
||||
|
||||
private const string _hourlySnapshotReminder = "HourlySnapshot";
|
||||
private const string _dailySnapshotReminder = "DailySnapshot";
|
||||
|
||||
|
||||
public PlatformSummaryGrain(
|
||||
[PersistentState("platform-summary-state", "platform-summary-store")]
|
||||
IPersistentState<PlatformSummaryGrainState> state,
|
||||
@@ -36,28 +36,28 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
_tradingService = tradingService;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
|
||||
public override async Task OnActivateAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.LogInformation("Platform Summary Grain activated");
|
||||
|
||||
|
||||
// Set up reminders for periodic snapshots
|
||||
await this.RegisterOrUpdateReminder(_hourlySnapshotReminder,
|
||||
await this.RegisterOrUpdateReminder(_hourlySnapshotReminder,
|
||||
TimeSpan.FromHours(1), TimeSpan.FromHours(1));
|
||||
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
var nextMidnight = now.Date.AddDays(1);
|
||||
var timeUntilMidnight = nextMidnight - now;
|
||||
await this.RegisterOrUpdateReminder(_dailySnapshotReminder,
|
||||
await this.RegisterOrUpdateReminder(_dailySnapshotReminder,
|
||||
timeUntilMidnight, TimeSpan.FromDays(1));
|
||||
|
||||
|
||||
// Initial data load if state is empty
|
||||
if (_state.State.LastUpdated == default)
|
||||
{
|
||||
await RefreshDataAsync();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public async Task<PlatformSummaryViewModel> GetPlatformSummaryAsync()
|
||||
{
|
||||
// If data is stale or has pending changes, refresh it
|
||||
@@ -65,36 +65,36 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
{
|
||||
await RefreshDataAsync();
|
||||
}
|
||||
|
||||
|
||||
return MapToViewModel(_state.State);
|
||||
}
|
||||
|
||||
|
||||
public async Task RefreshDataAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Refreshing platform summary data");
|
||||
|
||||
|
||||
// Get all data in parallel for better performance
|
||||
var agentsTask = _agentService.GetAllAgentSummaries();
|
||||
var strategiesTask = _botService.GetBotsAsync();
|
||||
|
||||
|
||||
await Task.WhenAll(agentsTask, strategiesTask);
|
||||
|
||||
|
||||
var agents = await agentsTask;
|
||||
var strategies = await strategiesTask;
|
||||
|
||||
|
||||
// Calculate totals
|
||||
var totalAgents = agents.Count();
|
||||
var totalActiveStrategies = strategies.Count(s => s.Status == BotStatus.Running);
|
||||
|
||||
|
||||
// Calculate volume and PnL from strategies
|
||||
var totalVolume = strategies.Sum(s => s.Volume);
|
||||
var totalPnL = strategies.Sum(s => s.Pnl);
|
||||
|
||||
|
||||
// Calculate real open interest and position count from actual positions
|
||||
var (totalOpenInterest, totalPositionCount) = await CalculatePositionMetricsAsync();
|
||||
|
||||
|
||||
// Update state
|
||||
_state.State.TotalAgents = totalAgents;
|
||||
_state.State.TotalActiveStrategies = totalActiveStrategies;
|
||||
@@ -104,15 +104,15 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
_state.State.TotalPositionCount = totalPositionCount;
|
||||
_state.State.LastUpdated = DateTime.UtcNow;
|
||||
_state.State.HasPendingChanges = false;
|
||||
|
||||
|
||||
// Update volume breakdown by asset
|
||||
await UpdateVolumeBreakdownAsync(strategies);
|
||||
|
||||
|
||||
// Update position count breakdown
|
||||
await UpdatePositionCountBreakdownAsync(strategies);
|
||||
|
||||
|
||||
await _state.WriteStateAsync();
|
||||
|
||||
|
||||
_logger.LogInformation("Platform summary data refreshed successfully");
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -120,55 +120,56 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
_logger.LogError(ex, "Error refreshing platform summary data");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task UpdateVolumeBreakdownAsync(IEnumerable<Bot> strategies)
|
||||
{
|
||||
_state.State.VolumeByAsset.Clear();
|
||||
|
||||
|
||||
// Group strategies by ticker and sum their volumes
|
||||
var volumeByAsset = strategies
|
||||
.Where(s => s.Volume > 0)
|
||||
.GroupBy(s => s.Ticker.ToString())
|
||||
.ToDictionary(g => g.Key, g => g.Sum(s => s.Volume));
|
||||
|
||||
|
||||
foreach (var kvp in volumeByAsset)
|
||||
{
|
||||
_state.State.VolumeByAsset[kvp.Key] = kvp.Value;
|
||||
}
|
||||
|
||||
_logger.LogDebug("Updated volume breakdown: {AssetCount} assets with total volume {TotalVolume}",
|
||||
|
||||
_logger.LogDebug("Updated volume breakdown: {AssetCount} assets with total volume {TotalVolume}",
|
||||
volumeByAsset.Count, volumeByAsset.Values.Sum());
|
||||
}
|
||||
|
||||
|
||||
private async Task UpdatePositionCountBreakdownAsync(IEnumerable<Bot> strategies)
|
||||
{
|
||||
_state.State.PositionCountByAsset.Clear();
|
||||
_state.State.PositionCountByDirection.Clear();
|
||||
|
||||
|
||||
// Use position counts directly from bot statistics
|
||||
var activeStrategies = strategies.Where(s => s.Status != BotStatus.Saved).ToList();
|
||||
|
||||
|
||||
if (activeStrategies.Any())
|
||||
{
|
||||
// Group by asset and sum position counts per asset
|
||||
var positionsByAsset = activeStrategies
|
||||
.GroupBy(s => s.Ticker.ToString())
|
||||
.ToDictionary(g => g.Key, g => g.Sum(b => b.LongPositionCount + b.ShortPositionCount));
|
||||
|
||||
|
||||
// Sum long and short position counts across all bots
|
||||
var totalLongPositions = activeStrategies.Sum(s => s.LongPositionCount);
|
||||
var totalShortPositions = activeStrategies.Sum(s => s.ShortPositionCount);
|
||||
|
||||
|
||||
// Update state
|
||||
foreach (var kvp in positionsByAsset)
|
||||
{
|
||||
_state.State.PositionCountByAsset[kvp.Key] = kvp.Value;
|
||||
}
|
||||
|
||||
|
||||
_state.State.PositionCountByDirection[TradeDirection.Long] = totalLongPositions;
|
||||
_state.State.PositionCountByDirection[TradeDirection.Short] = totalShortPositions;
|
||||
|
||||
_logger.LogDebug("Updated position breakdown from bot statistics: {AssetCount} assets, Long={LongPositions}, Short={ShortPositions}",
|
||||
|
||||
_logger.LogDebug(
|
||||
"Updated position breakdown from bot statistics: {AssetCount} assets, Long={LongPositions}, Short={ShortPositions}",
|
||||
positionsByAsset.Count, totalLongPositions, totalShortPositions);
|
||||
}
|
||||
else
|
||||
@@ -176,27 +177,27 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
_logger.LogDebug("No active strategies found for position breakdown");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task<(decimal totalOpenInterest, int totalPositionCount)> CalculatePositionMetricsAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get all open positions from all accounts
|
||||
var openPositions = await _tradingService.GetBrokerPositions(null);
|
||||
|
||||
|
||||
if (openPositions?.Any() == true)
|
||||
{
|
||||
var positionCount = openPositions.Count();
|
||||
|
||||
|
||||
// Calculate open interest as the sum of position notional values
|
||||
// Open interest = sum of (position size * price) for all open positions
|
||||
var openInterest = openPositions
|
||||
.Where(p => p.Open?.Price > 0 && p.Open?.Quantity > 0)
|
||||
.Sum(p => p.Open.Price * p.Open.Quantity);
|
||||
|
||||
_logger.LogDebug("Calculated position metrics: {PositionCount} positions, {OpenInterest} open interest",
|
||||
|
||||
_logger.LogDebug("Calculated position metrics: {PositionCount} positions, {OpenInterest} open interest",
|
||||
positionCount, openInterest);
|
||||
|
||||
|
||||
return (openInterest, positionCount);
|
||||
}
|
||||
else
|
||||
@@ -211,102 +212,104 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
return (0m, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Task<decimal> GetTotalVolumeAsync()
|
||||
{
|
||||
return Task.FromResult(_state.State.TotalPlatformVolume);
|
||||
}
|
||||
|
||||
|
||||
public Task<decimal> GetTotalPnLAsync()
|
||||
{
|
||||
return Task.FromResult(_state.State.TotalPlatformPnL);
|
||||
}
|
||||
|
||||
public Task<decimal> GetTotalOpenInterestAsync()
|
||||
|
||||
public Task<decimal> GetTotalOpenInterest()
|
||||
{
|
||||
return Task.FromResult(_state.State.TotalOpenInterest);
|
||||
}
|
||||
|
||||
|
||||
public Task<int> GetTotalPositionCountAsync()
|
||||
{
|
||||
return Task.FromResult(_state.State.TotalPositionCount);
|
||||
}
|
||||
|
||||
|
||||
// Event handlers for immediate updates
|
||||
public async Task OnStrategyDeployedAsync(StrategyDeployedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Strategy deployed: {StrategyId} - {StrategyName}", evt.StrategyId, evt.StrategyName);
|
||||
|
||||
|
||||
_state.State.TotalActiveStrategies++;
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
public async Task OnStrategyStoppedAsync(StrategyStoppedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Strategy stopped: {StrategyId} - {StrategyName}", evt.StrategyId, evt.StrategyName);
|
||||
|
||||
|
||||
_state.State.TotalActiveStrategies--;
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
public async Task OnPositionOpenedAsync(PositionOpenedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Position opened: {PositionId} for {Ticker}", evt.PositionId, evt.Ticker);
|
||||
|
||||
|
||||
_state.State.TotalPositionCount++;
|
||||
_state.State.TotalOpenInterest += evt.NotionalValue;
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
public async Task OnPositionClosedAsync(PositionClosedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}",
|
||||
_logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}",
|
||||
evt.PositionId, evt.Ticker, evt.RealizedPnL);
|
||||
|
||||
|
||||
_state.State.TotalPositionCount--;
|
||||
_state.State.TotalPlatformVolume += evt.Volume;
|
||||
_state.State.TotalPlatformPnL += evt.RealizedPnL;
|
||||
|
||||
|
||||
// Update volume by asset
|
||||
var asset = evt.Ticker;
|
||||
if (!_state.State.VolumeByAsset.ContainsKey(asset))
|
||||
{
|
||||
_state.State.VolumeByAsset[asset] = 0;
|
||||
}
|
||||
|
||||
_state.State.VolumeByAsset[asset] += evt.Volume;
|
||||
|
||||
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
public async Task OnTradeExecutedAsync(TradeExecutedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}",
|
||||
_logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}",
|
||||
evt.TradeId, evt.Ticker, evt.Volume);
|
||||
|
||||
|
||||
_state.State.TotalPlatformVolume += evt.Volume;
|
||||
_state.State.TotalPlatformPnL += evt.PnL;
|
||||
|
||||
|
||||
// Update volume by asset
|
||||
var asset = evt.Ticker;
|
||||
if (!_state.State.VolumeByAsset.ContainsKey(asset))
|
||||
{
|
||||
_state.State.VolumeByAsset[asset] = 0;
|
||||
}
|
||||
|
||||
_state.State.VolumeByAsset[asset] += evt.Volume;
|
||||
|
||||
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
// Reminder handlers for periodic snapshots
|
||||
public async Task ReceiveReminder(string reminderName, TickStatus status)
|
||||
{
|
||||
_logger.LogInformation("Reminder received: {ReminderName}", reminderName);
|
||||
|
||||
|
||||
switch (reminderName)
|
||||
{
|
||||
case _hourlySnapshotReminder:
|
||||
@@ -317,11 +320,11 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task TakeHourlySnapshotAsync()
|
||||
{
|
||||
_logger.LogInformation("Taking hourly snapshot");
|
||||
|
||||
|
||||
var snapshot = new HourlySnapshot
|
||||
{
|
||||
Timestamp = DateTime.UtcNow,
|
||||
@@ -332,20 +335,20 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
TotalOpenInterest = _state.State.TotalOpenInterest,
|
||||
TotalPositionCount = _state.State.TotalPositionCount
|
||||
};
|
||||
|
||||
|
||||
_state.State.HourlySnapshots.Add(snapshot);
|
||||
|
||||
|
||||
// Keep only last 24 hours
|
||||
var cutoff = DateTime.UtcNow.AddHours(-24);
|
||||
_state.State.HourlySnapshots.RemoveAll(s => s.Timestamp < cutoff);
|
||||
|
||||
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
private async Task TakeDailySnapshotAsync()
|
||||
{
|
||||
_logger.LogInformation("Taking daily snapshot");
|
||||
|
||||
|
||||
// Store 24-hour ago values for comparison
|
||||
_state.State.TotalAgents24hAgo = _state.State.TotalAgents;
|
||||
_state.State.TotalActiveStrategies24hAgo = _state.State.TotalActiveStrategies;
|
||||
@@ -353,7 +356,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
_state.State.TotalPlatformVolume24hAgo = _state.State.TotalPlatformVolume;
|
||||
_state.State.TotalOpenInterest24hAgo = _state.State.TotalOpenInterest;
|
||||
_state.State.TotalPositionCount24hAgo = _state.State.TotalPositionCount;
|
||||
|
||||
|
||||
// Add daily snapshot
|
||||
var dailySnapshot = new DailySnapshot
|
||||
{
|
||||
@@ -365,24 +368,24 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
TotalOpenInterest = _state.State.TotalOpenInterest,
|
||||
TotalPositionCount = _state.State.TotalPositionCount
|
||||
};
|
||||
|
||||
|
||||
_state.State.DailySnapshots.Add(dailySnapshot);
|
||||
|
||||
|
||||
// Keep only last 30 days
|
||||
var cutoff = DateTime.UtcNow.AddDays(-30);
|
||||
_state.State.DailySnapshots.RemoveAll(s => s.Date < cutoff);
|
||||
|
||||
|
||||
_state.State.LastSnapshot = DateTime.UtcNow;
|
||||
|
||||
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
|
||||
private bool IsDataStale()
|
||||
{
|
||||
var timeSinceLastUpdate = DateTime.UtcNow - _state.State.LastUpdated;
|
||||
return timeSinceLastUpdate > TimeSpan.FromMinutes(5);
|
||||
}
|
||||
|
||||
|
||||
private PlatformSummaryViewModel MapToViewModel(PlatformSummaryGrainState state)
|
||||
{
|
||||
return new PlatformSummaryViewModel
|
||||
@@ -394,7 +397,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
TotalPlatformVolumeLast24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo,
|
||||
TotalOpenInterest = state.TotalOpenInterest,
|
||||
TotalPositionCount = state.TotalPositionCount,
|
||||
|
||||
|
||||
// 24-hour changes
|
||||
AgentsChange24h = state.TotalAgents - state.TotalAgents24hAgo,
|
||||
StrategiesChange24h = state.TotalActiveStrategies - state.TotalActiveStrategies24hAgo,
|
||||
@@ -402,15 +405,15 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
VolumeChange24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo,
|
||||
OpenInterestChange24h = state.TotalOpenInterest - state.TotalOpenInterest24hAgo,
|
||||
PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo,
|
||||
|
||||
|
||||
// Breakdowns
|
||||
VolumeByAsset = state.VolumeByAsset,
|
||||
PositionCountByAsset = state.PositionCountByAsset,
|
||||
PositionCountByDirection = state.PositionCountByDirection,
|
||||
|
||||
VolumeByAsset = state.VolumeByAsset ?? new Dictionary<string, decimal>(),
|
||||
PositionCountByAsset = state.PositionCountByAsset ?? new Dictionary<string, int>(),
|
||||
PositionCountByDirection = state.PositionCountByDirection ?? new Dictionary<TradeDirection, int>(),
|
||||
|
||||
// Metadata
|
||||
LastUpdated = state.LastUpdated,
|
||||
Last24HourSnapshot = state.LastSnapshot
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user