Improve Platform stats
This commit is contained in:
@@ -4,6 +4,7 @@ using Managing.Application.Abstractions.Models;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
using Managing.Application.Orleans;
|
||||
using Managing.Domain.Bots;
|
||||
using Managing.Domain.Candles;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using static Managing.Common.Enums;
|
||||
|
||||
@@ -22,7 +23,6 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
private readonly ITradingService _tradingService;
|
||||
private readonly ILogger<PlatformSummaryGrain> _logger;
|
||||
|
||||
private const string _hourlySnapshotReminder = "HourlySnapshot";
|
||||
private const string _dailySnapshotReminder = "DailySnapshot";
|
||||
|
||||
public PlatformSummaryGrain(
|
||||
@@ -44,15 +44,16 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
{
|
||||
_logger.LogInformation("Platform Summary Grain activated");
|
||||
|
||||
// Set up reminders for periodic snapshots
|
||||
await this.RegisterOrUpdateReminder(_hourlySnapshotReminder,
|
||||
TimeSpan.FromHours(1), TimeSpan.FromHours(1));
|
||||
|
||||
// Set up reminder for daily snapshots using precise timing
|
||||
var now = DateTime.UtcNow;
|
||||
var nextMidnight = now.Date.AddDays(1);
|
||||
var timeUntilMidnight = nextMidnight - now;
|
||||
|
||||
// Daily reminder - runs at midnight (00:00 UTC)
|
||||
var nextDailyTime = CandleHelpers.GetNextExpectedCandleTime(Timeframe.OneDay, now);
|
||||
var timeUntilNextDay = nextDailyTime - now;
|
||||
await this.RegisterOrUpdateReminder(_dailySnapshotReminder,
|
||||
timeUntilMidnight, TimeSpan.FromDays(1));
|
||||
timeUntilNextDay, TimeSpan.FromDays(1).Add(TimeSpan.FromMinutes(3)));
|
||||
|
||||
_logger.LogInformation("Daily reminder scheduled - Next daily: {NextDaily}", nextDailyTime);
|
||||
|
||||
// Initial data load if state is empty
|
||||
if (_state.State.LastUpdated == default)
|
||||
@@ -232,6 +233,11 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
return Task.FromResult(_state.State.TotalPositionCount);
|
||||
}
|
||||
|
||||
public Task<decimal> GetTotalFeesAsync()
|
||||
{
|
||||
return Task.FromResult(_state.State.TotalPlatformFees);
|
||||
}
|
||||
|
||||
public Task<List<VolumeHistoryPoint>> GetVolumeHistoryAsync()
|
||||
{
|
||||
var historyPoints = _state.State.DailySnapshots
|
||||
@@ -249,63 +255,135 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
// Event handlers for immediate updates
|
||||
public async Task UpdateActiveStrategyCountAsync(int newActiveCount)
|
||||
{
|
||||
_logger.LogInformation("Updating active strategies count to: {NewActiveCount}", newActiveCount);
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Updating active strategies count to: {NewActiveCount}", newActiveCount);
|
||||
|
||||
_state.State.TotalActiveStrategies = newActiveCount;
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
// Validate input
|
||||
if (newActiveCount < 0)
|
||||
{
|
||||
_logger.LogWarning("Invalid active strategy count: {Count}, setting to 0", newActiveCount);
|
||||
newActiveCount = 0;
|
||||
}
|
||||
|
||||
_state.State.TotalActiveStrategies = newActiveCount;
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error updating active strategy count to: {NewActiveCount}", newActiveCount);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task OnPositionOpenedAsync(PositionOpenedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Position opened: {PositionId} for {Ticker}", evt.PositionId, evt.Ticker);
|
||||
|
||||
_state.State.TotalPositionCount++;
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
public async Task OnPositionClosedAsync(PositionClosedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}",
|
||||
evt.PositionId, evt.Ticker, evt.RealizedPnL);
|
||||
|
||||
_state.State.TotalPositionCount--;
|
||||
_state.State.TotalPlatformVolume += evt.Volume;
|
||||
_state.State.TotalPlatformPnL += evt.RealizedPnL;
|
||||
|
||||
// Update volume by asset
|
||||
var asset = evt.Ticker;
|
||||
if (!_state.State.VolumeByAsset.ContainsKey(asset))
|
||||
try
|
||||
{
|
||||
_state.State.VolumeByAsset[asset] = 0;
|
||||
_logger.LogInformation("Position closed: {PositionId} for {Ticker} with PnL: {PnL}",
|
||||
evt.PositionId, evt.Ticker, evt.RealizedPnL);
|
||||
|
||||
// Validate event data
|
||||
if (evt == null || evt.PositionId == Guid.Empty || evt.Ticker == Ticker.Unknown)
|
||||
{
|
||||
_logger.LogWarning("Invalid PositionClosedEvent received: {Event}", evt);
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure position count doesn't go negative
|
||||
if (_state.State.TotalPositionCount > 0)
|
||||
{
|
||||
_state.State.TotalPositionCount--;
|
||||
}
|
||||
|
||||
_state.State.TotalPlatformVolume += evt.Volume;
|
||||
_state.State.TotalPlatformPnL += evt.RealizedPnL;
|
||||
|
||||
// Update volume by asset
|
||||
var asset = evt.Ticker;
|
||||
if (!_state.State.VolumeByAsset.ContainsKey(asset))
|
||||
{
|
||||
_state.State.VolumeByAsset[asset] = 0;
|
||||
}
|
||||
|
||||
_state.State.VolumeByAsset[asset] += evt.Volume;
|
||||
|
||||
// Update open interest (subtract the closed position's volume)
|
||||
_state.State.TotalOpenInterest = Math.Max(0, _state.State.TotalOpenInterest - evt.Volume);
|
||||
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing PositionClosedEvent: {Event}", evt);
|
||||
}
|
||||
|
||||
_state.State.VolumeByAsset[asset] += evt.Volume;
|
||||
_state.State.TotalOpenInterest -= evt.InitialVolume;
|
||||
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
public async Task OnTradeExecutedAsync(TradeExecutedEvent evt)
|
||||
{
|
||||
_logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}",
|
||||
evt.TradeId, evt.Ticker, evt.Volume);
|
||||
|
||||
_state.State.TotalPlatformVolume += evt.Volume;
|
||||
|
||||
// Update volume by asset
|
||||
var asset = evt.Ticker;
|
||||
if (!_state.State.VolumeByAsset.ContainsKey(asset))
|
||||
try
|
||||
{
|
||||
_state.State.VolumeByAsset[asset] = 0;
|
||||
_logger.LogInformation("Trade executed: {TradeId} for {Ticker} with volume: {Volume}",
|
||||
evt.TradeId, evt.Ticker, evt.Volume);
|
||||
|
||||
// Validate event data
|
||||
if (evt == null || evt.Ticker == Ticker.Unknown || evt.Volume <= 0)
|
||||
{
|
||||
_logger.LogWarning("Invalid TradeExecutedEvent received: {Event}", evt);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update platform volume
|
||||
_state.State.TotalPlatformVolume += evt.Volume;
|
||||
|
||||
// Update volume by asset
|
||||
var asset = evt.Ticker;
|
||||
if (!_state.State.VolumeByAsset.ContainsKey(asset))
|
||||
{
|
||||
_state.State.VolumeByAsset[asset] = 0;
|
||||
}
|
||||
_state.State.VolumeByAsset[asset] += evt.Volume;
|
||||
|
||||
// Update open interest and position count
|
||||
// Since this is called only when position is fully open on broker, we always increase counts
|
||||
_state.State.TotalPositionCount++;
|
||||
_state.State.TotalOpenInterest += evt.Volume;
|
||||
|
||||
// Update position count by asset
|
||||
if (!_state.State.PositionCountByAsset.ContainsKey(asset))
|
||||
{
|
||||
_state.State.PositionCountByAsset[asset] = 0;
|
||||
}
|
||||
_state.State.PositionCountByAsset[asset]++;
|
||||
|
||||
// Update position count by direction
|
||||
if (!_state.State.PositionCountByDirection.ContainsKey(evt.Direction))
|
||||
{
|
||||
_state.State.PositionCountByDirection[evt.Direction] = 0;
|
||||
}
|
||||
_state.State.PositionCountByDirection[evt.Direction]++;
|
||||
|
||||
// Update PnL if provided
|
||||
if (evt.PnL != 0)
|
||||
{
|
||||
_state.State.TotalPlatformPnL += evt.PnL;
|
||||
}
|
||||
|
||||
// Update fees if provided
|
||||
if (evt.Fee > 0)
|
||||
{
|
||||
_state.State.TotalPlatformFees += evt.Fee;
|
||||
}
|
||||
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing TradeExecutedEvent: {Event}", evt);
|
||||
}
|
||||
|
||||
_state.State.VolumeByAsset[asset] += evt.Volume;
|
||||
|
||||
_state.State.HasPendingChanges = true;
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
// Reminder handlers for periodic snapshots
|
||||
@@ -315,38 +393,12 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
|
||||
switch (reminderName)
|
||||
{
|
||||
case _hourlySnapshotReminder:
|
||||
await TakeHourlySnapshotAsync();
|
||||
break;
|
||||
case _dailySnapshotReminder:
|
||||
await TakeDailySnapshotAsync();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task TakeHourlySnapshotAsync()
|
||||
{
|
||||
_logger.LogInformation("Taking hourly snapshot");
|
||||
|
||||
var snapshot = new HourlySnapshot
|
||||
{
|
||||
Timestamp = DateTime.UtcNow,
|
||||
TotalAgents = _state.State.TotalAgents,
|
||||
TotalStrategies = _state.State.TotalActiveStrategies,
|
||||
TotalVolume = _state.State.TotalPlatformVolume,
|
||||
TotalPnL = _state.State.TotalPlatformPnL,
|
||||
TotalOpenInterest = _state.State.TotalOpenInterest,
|
||||
TotalPositionCount = _state.State.TotalPositionCount
|
||||
};
|
||||
|
||||
_state.State.HourlySnapshots.Add(snapshot);
|
||||
|
||||
// Keep only last 24 hours
|
||||
var cutoff = DateTime.UtcNow.AddHours(-24);
|
||||
_state.State.HourlySnapshots.RemoveAll(s => s.Timestamp < cutoff);
|
||||
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
|
||||
private async Task TakeDailySnapshotAsync()
|
||||
{
|
||||
@@ -359,6 +411,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
_state.State.TotalPlatformVolume24hAgo = _state.State.TotalPlatformVolume;
|
||||
_state.State.TotalOpenInterest24hAgo = _state.State.TotalOpenInterest;
|
||||
_state.State.TotalPositionCount24hAgo = _state.State.TotalPositionCount;
|
||||
_state.State.TotalPlatformFees24hAgo = _state.State.TotalPlatformFees;
|
||||
|
||||
// Add daily snapshot
|
||||
var dailySnapshot = new DailySnapshot
|
||||
@@ -369,7 +422,8 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
TotalVolume = _state.State.TotalPlatformVolume,
|
||||
TotalPnL = _state.State.TotalPlatformPnL,
|
||||
TotalOpenInterest = _state.State.TotalOpenInterest,
|
||||
TotalPositionCount = _state.State.TotalPositionCount
|
||||
TotalPositionCount = _state.State.TotalPositionCount,
|
||||
TotalFees = _state.State.TotalPlatformFees
|
||||
};
|
||||
|
||||
_state.State.DailySnapshots.Add(dailySnapshot);
|
||||
@@ -410,6 +464,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
TotalPlatformVolumeLast24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo,
|
||||
TotalOpenInterest = state.TotalOpenInterest,
|
||||
TotalPositionCount = state.TotalPositionCount,
|
||||
TotalPlatformFees = state.TotalPlatformFees,
|
||||
|
||||
// 24-hour changes
|
||||
AgentsChange24h = state.TotalAgents - state.TotalAgents24hAgo,
|
||||
@@ -418,6 +473,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
||||
VolumeChange24h = state.TotalPlatformVolume - state.TotalPlatformVolume24hAgo,
|
||||
OpenInterestChange24h = state.TotalOpenInterest - state.TotalOpenInterest24hAgo,
|
||||
PositionCountChange24h = state.TotalPositionCount - state.TotalPositionCount24hAgo,
|
||||
FeesChange24h = state.TotalPlatformFees - state.TotalPlatformFees24hAgo,
|
||||
|
||||
// Breakdowns
|
||||
VolumeByAsset = state.VolumeByAsset ?? new Dictionary<Ticker, decimal>(),
|
||||
|
||||
Reference in New Issue
Block a user