Refactor LiveBotRegistryGrain and PlatformSummaryGrain to improve active bot tracking
- Introduced CalculateActiveBotsCount method in LiveBotRegistryGrain to streamline active bot count calculations. - Updated logging to reflect active bot counts accurately during registration and unregistration. - Added historical tracking of strategy activation/deactivation events in PlatformSummaryGrain, including a new StrategyEvent class and related logic to manage event history. - Enhanced CalculateActiveStrategiesForDate method to compute active strategies based on historical events.
This commit is contained in:
@@ -45,6 +45,29 @@ public class PlatformSummaryGrainState
|
|||||||
|
|
||||||
// Flag to track if volume has been updated by events (not from bot strategies)
|
// Flag to track if volume has been updated by events (not from bot strategies)
|
||||||
[Id(15)] public bool VolumeUpdatedByEvents { get; set; }
|
[Id(15)] public bool VolumeUpdatedByEvents { get; set; }
|
||||||
|
|
||||||
|
// Historical strategy activation/deactivation events
|
||||||
|
[Id(16)] public List<StrategyEvent> StrategyEvents { get; set; } = new();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Strategy activation/deactivation event
|
||||||
|
/// </summary>
|
||||||
|
[GenerateSerializer]
|
||||||
|
public class StrategyEvent
|
||||||
|
{
|
||||||
|
[Id(0)] public DateTime Timestamp { get; set; }
|
||||||
|
[Id(1)] public StrategyEventType EventType { get; set; }
|
||||||
|
[Id(2)] public int NetChange { get; set; } // +1 for activation, -1 for deactivation
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Type of strategy event
|
||||||
|
/// </summary>
|
||||||
|
public enum StrategyEventType
|
||||||
|
{
|
||||||
|
Activation,
|
||||||
|
Deactivation
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
|
|||||||
_state.State.TotalBotsCount);
|
_state.State.TotalBotsCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int CalculateActiveBotsCount() => _state.State.Bots.Values.Count(b => b.Status == BotStatus.Running);
|
||||||
|
|
||||||
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
|
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("LiveBotRegistryGrain deactivating. Reason: {Reason}. Total bots: {TotalBots}",
|
_logger.LogInformation("LiveBotRegistryGrain deactivating. Reason: {Reason}. Total bots: {TotalBots}",
|
||||||
@@ -53,16 +55,15 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
|
|||||||
var entry = new BotRegistryEntry(identifier, userId);
|
var entry = new BotRegistryEntry(identifier, userId);
|
||||||
_state.State.Bots[identifier] = entry;
|
_state.State.Bots[identifier] = entry;
|
||||||
|
|
||||||
// O(1) FIX: Increment the counters
|
// Increment the counters
|
||||||
_state.State.TotalBotsCount++;
|
_state.State.TotalBotsCount++;
|
||||||
_state.State.ActiveBotsCount++;
|
|
||||||
_state.State.LastUpdated = DateTime.UtcNow;
|
_state.State.LastUpdated = DateTime.UtcNow;
|
||||||
|
|
||||||
await _state.WriteStateAsync();
|
await _state.WriteStateAsync();
|
||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"Bot {Identifier} registered successfully for user {UserId}. Total bots: {TotalBots}, Active bots: {ActiveBots}",
|
"Bot {Identifier} registered successfully for user {UserId}. Total bots: {TotalBots}, Active bots: {ActiveBots}",
|
||||||
identifier, userId, _state.State.TotalBotsCount, _state.State.ActiveBotsCount);
|
identifier, userId, _state.State.TotalBotsCount, CalculateActiveBotsCount());
|
||||||
|
|
||||||
// Notify platform summary grain about strategy count change
|
// Notify platform summary grain about strategy count change
|
||||||
await NotifyPlatformSummaryAsync();
|
await NotifyPlatformSummaryAsync();
|
||||||
@@ -86,20 +87,15 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
|
|||||||
|
|
||||||
_state.State.Bots.Remove(identifier);
|
_state.State.Bots.Remove(identifier);
|
||||||
|
|
||||||
// O(1) FIX: Decrement the counters based on the removed entry's status
|
// Decrement the counters
|
||||||
_state.State.TotalBotsCount--;
|
_state.State.TotalBotsCount--;
|
||||||
if (entryToRemove.Status == BotStatus.Running)
|
|
||||||
{
|
|
||||||
_state.State.ActiveBotsCount--;
|
|
||||||
}
|
|
||||||
|
|
||||||
_state.State.LastUpdated = DateTime.UtcNow;
|
_state.State.LastUpdated = DateTime.UtcNow;
|
||||||
|
|
||||||
await _state.WriteStateAsync();
|
await _state.WriteStateAsync();
|
||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"Bot {Identifier} unregistered successfully from user {UserId}. Total bots: {TotalBots}",
|
"Bot {Identifier} unregistered successfully from user {UserId}. Total bots: {TotalBots}, Active bots: {ActiveBots}",
|
||||||
identifier, entryToRemove.UserId, _state.State.TotalBotsCount);
|
identifier, entryToRemove.UserId, _state.State.TotalBotsCount, CalculateActiveBotsCount());
|
||||||
|
|
||||||
// Notify platform summary grain about strategy count change
|
// Notify platform summary grain about strategy count change
|
||||||
await NotifyPlatformSummaryAsync();
|
await NotifyPlatformSummaryAsync();
|
||||||
@@ -148,16 +144,6 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// O(1) FIX: Conditionally adjust the counter
|
|
||||||
if (newStatus == BotStatus.Running && previousStatus != BotStatus.Running)
|
|
||||||
{
|
|
||||||
_state.State.ActiveBotsCount++;
|
|
||||||
}
|
|
||||||
else if (newStatus != BotStatus.Running && previousStatus == BotStatus.Running)
|
|
||||||
{
|
|
||||||
_state.State.ActiveBotsCount--;
|
|
||||||
}
|
|
||||||
|
|
||||||
entry.Status = newStatus;
|
entry.Status = newStatus;
|
||||||
entry.LastStatusUpdate = DateTime.UtcNow;
|
entry.LastStatusUpdate = DateTime.UtcNow;
|
||||||
_state.State.LastUpdated = DateTime.UtcNow;
|
_state.State.LastUpdated = DateTime.UtcNow;
|
||||||
@@ -166,7 +152,7 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
|
|||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"Bot {Identifier} status updated from {PreviousStatus} to {NewStatus}. Active bots: {ActiveBots}",
|
"Bot {Identifier} status updated from {PreviousStatus} to {NewStatus}. Active bots: {ActiveBots}",
|
||||||
identifier, previousStatus, newStatus, _state.State.ActiveBotsCount);
|
identifier, previousStatus, newStatus, CalculateActiveBotsCount());
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -190,11 +176,12 @@ public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
var activeCount = CalculateActiveBotsCount();
|
||||||
var platformGrain = GrainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
|
var platformGrain = GrainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
|
||||||
await platformGrain.UpdateActiveStrategyCountAsync(_state.State.ActiveBotsCount);
|
await platformGrain.UpdateActiveStrategyCountAsync(activeCount);
|
||||||
|
|
||||||
_logger.LogDebug("Notified platform summary about active strategy count change. New count: {ActiveCount}",
|
_logger.LogDebug("Notified platform summary about active strategy count change. New count: {ActiveCount}",
|
||||||
_state.State.ActiveBotsCount);
|
activeCount);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -195,6 +195,32 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
|||||||
newActiveCount = 0;
|
newActiveCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var previousCount = _state.State.TotalActiveStrategies;
|
||||||
|
var change = newActiveCount - previousCount;
|
||||||
|
|
||||||
|
// Record the strategy event if there was a change
|
||||||
|
if (change != 0)
|
||||||
|
{
|
||||||
|
var eventType = change > 0 ? StrategyEventType.Activation : StrategyEventType.Deactivation;
|
||||||
|
var strategyEvent = new StrategyEvent
|
||||||
|
{
|
||||||
|
Timestamp = DateTime.UtcNow,
|
||||||
|
EventType = eventType,
|
||||||
|
NetChange = change
|
||||||
|
};
|
||||||
|
|
||||||
|
_state.State.StrategyEvents.Add(strategyEvent);
|
||||||
|
|
||||||
|
// Keep only last 1000 events to prevent unbounded growth
|
||||||
|
if (_state.State.StrategyEvents.Count > 1000)
|
||||||
|
{
|
||||||
|
_state.State.StrategyEvents.RemoveRange(0, _state.State.StrategyEvents.Count - 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogInformation("Recorded strategy {EventType} event: {Change} strategies at {Timestamp}",
|
||||||
|
eventType, change, strategyEvent.Timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
_state.State.TotalActiveStrategies = newActiveCount;
|
_state.State.TotalActiveStrategies = newActiveCount;
|
||||||
await _state.WriteStateAsync();
|
await _state.WriteStateAsync();
|
||||||
}
|
}
|
||||||
@@ -280,7 +306,7 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
|||||||
{
|
{
|
||||||
Date = today,
|
Date = today,
|
||||||
TotalAgents = _state.State.TotalAgents,
|
TotalAgents = _state.State.TotalAgents,
|
||||||
TotalStrategies = _state.State.TotalActiveStrategies,
|
TotalStrategies = CalculateActiveStrategiesForDate(today),
|
||||||
TotalVolume = _state.State.TotalPlatformVolume,
|
TotalVolume = _state.State.TotalPlatformVolume,
|
||||||
TotalPnL = _state.State.TotalPlatformPnL,
|
TotalPnL = _state.State.TotalPlatformPnL,
|
||||||
NetPnL = _state.State.NetPnL,
|
NetPnL = _state.State.NetPnL,
|
||||||
@@ -321,6 +347,49 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
|||||||
return timeSinceLastUpdate > TimeSpan.FromMinutes(5);
|
return timeSinceLastUpdate > TimeSpan.FromMinutes(5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Calculates the number of active strategies on a specific date by replaying historical events
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="targetDate">The date to calculate active strategies for</param>
|
||||||
|
/// <returns>The number of active strategies on the target date</returns>
|
||||||
|
private int CalculateActiveStrategiesForDate(DateTime targetDate)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Start with 0 active strategies
|
||||||
|
var activeCount = 0;
|
||||||
|
|
||||||
|
// Replay all strategy events up to and including the target date
|
||||||
|
var relevantEvents = _state.State.StrategyEvents
|
||||||
|
.Where(e => e.Timestamp.Date <= targetDate.Date)
|
||||||
|
.OrderBy(e => e.Timestamp)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var strategyEvent in relevantEvents)
|
||||||
|
{
|
||||||
|
activeCount += strategyEvent.NetChange;
|
||||||
|
|
||||||
|
// Ensure we don't go below 0 (defensive programming)
|
||||||
|
if (activeCount < 0)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Active strategy count went negative ({Count}) after event {EventType} at {Timestamp}, resetting to 0",
|
||||||
|
activeCount, strategyEvent.EventType, strategyEvent.Timestamp);
|
||||||
|
activeCount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogDebug("Calculated {ActiveCount} active strategies for date {TargetDate} based on {EventCount} historical events",
|
||||||
|
activeCount, targetDate.Date, relevantEvents.Count);
|
||||||
|
|
||||||
|
return activeCount;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error calculating active strategies for date {TargetDate}", targetDate);
|
||||||
|
return 0; // Return 0 as fallback
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Fixes the first snapshot date to ensure it's the day before the first position
|
/// Fixes the first snapshot date to ensure it's the day before the first position
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -519,9 +588,9 @@ public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
|
|||||||
// Use TradingBox to calculate metrics for filtered positions
|
// Use TradingBox to calculate metrics for filtered positions
|
||||||
var metrics = TradingBox.CalculatePlatformSummaryMetrics(filteredPositions);
|
var metrics = TradingBox.CalculatePlatformSummaryMetrics(filteredPositions);
|
||||||
|
|
||||||
// Get current agent and strategy counts (these are current state, not historical)
|
// Get historical agent and strategy counts for the target date
|
||||||
var totalAgents = await _agentService.GetTotalAgentCount();
|
var totalAgents = await _agentService.GetTotalAgentCount();
|
||||||
var totalStrategies = _state.State.TotalActiveStrategies;
|
var totalStrategies = CalculateActiveStrategiesForDate(targetDate);
|
||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"Calculated CUMULATIVE snapshot for {TargetDate}: Volume={TotalVolume}, OpenInterest={OpenInterest}, PositionCount={TotalPositionCount}, Fees={Fees}, PnL={PnL}",
|
"Calculated CUMULATIVE snapshot for {TargetDate}: Volume={TotalVolume}, OpenInterest={OpenInterest}, PositionCount={TotalPositionCount}, Fees={Fees}, PnL={PnL}",
|
||||||
|
|||||||
Reference in New Issue
Block a user