Reduce Agent Summary call

This commit is contained in:
2025-09-15 00:19:21 +07:00
parent 37d57a1bb8
commit b0d2dcc6b9
10 changed files with 402 additions and 36 deletions

View File

@@ -0,0 +1,26 @@
using Orleans;
using static Managing.Common.Enums;
namespace Managing.Application.Abstractions.Models;
/// <summary>
/// Event sent to AgentGrain to trigger summary updates
/// </summary>
[GenerateSerializer]
public class AgentSummaryUpdateEvent
{
[Id(0)]
public int UserId { get; set; }
[Id(1)]
public Guid BotId { get; set; }
[Id(2)]
public AgentSummaryEventType EventType { get; set; }
[Id(3)]
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
[Id(4)]
public string? AdditionalData { get; set; } // Optional additional context
}

View File

@@ -0,0 +1,142 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Grains;
using Managing.Application.Bots.Models;
using Managing.Domain.Statistics;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
namespace Managing.Application.Tests;
public class AgentGrainTests
{
private readonly Mock<IPersistentState<AgentGrainState>> _mockState;
private readonly Mock<ILogger<AgentGrain>> _mockLogger;
private readonly Mock<IBotService> _mockBotService;
private readonly Mock<IAgentService> _mockAgentService;
private readonly Mock<IExchangeService> _mockExchangeService;
private readonly Mock<IUserService> _mockUserService;
private readonly Mock<IAccountService> _mockAccountService;
private readonly Mock<ITradingService> _mockTradingService;
public AgentGrainTests()
{
_mockState = new Mock<IPersistentState<AgentGrainState>>();
_mockLogger = new Mock<ILogger<AgentGrain>>();
_mockBotService = new Mock<IBotService>();
_mockAgentService = new Mock<IAgentService>();
_mockExchangeService = new Mock<IExchangeService>();
_mockUserService = new Mock<IUserService>();
_mockAccountService = new Mock<IAccountService>();
_mockTradingService = new Mock<ITradingService>();
// Setup default state
_mockState.Setup(x => x.State).Returns(new AgentGrainState
{
AgentName = "TestAgent",
BotIds = new HashSet<Guid> { Guid.NewGuid() }
});
}
[Fact]
public async Task OnAgentSummaryUpdateAsync_WithValidBotId_ShouldCallUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var botId = _mockState.Object.State.BotIds.First();
var updateEvent = new AgentSummaryUpdateEvent
{
UserId = 1,
BotId = botId,
EventType = "PositionOpened",
Timestamp = DateTime.UtcNow
};
// Setup mocks
_mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny<HashSet<Guid>>()))
.ReturnsAsync(new List<Bot>());
_mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()))
.Returns(Task.CompletedTask);
// Act
await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Once);
}
[Fact]
public async Task OnAgentSummaryUpdateAsync_WithInvalidBotId_ShouldNotCallUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var updateEvent = new AgentSummaryUpdateEvent
{
UserId = 1,
BotId = Guid.NewGuid(), // Different bot ID
EventType = "PositionOpened",
Timestamp = DateTime.UtcNow
};
// Act
await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Never);
}
[Fact]
public async Task RegisterBotAsync_ShouldUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var newBotId = Guid.NewGuid();
// Setup mocks
_mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny<HashSet<Guid>>()))
.ReturnsAsync(new List<Bot>());
_mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()))
.Returns(Task.CompletedTask);
// Act
await agentGrain.RegisterBotAsync(newBotId);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Once);
}
[Fact]
public async Task UnregisterBotAsync_ShouldUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var botId = _mockState.Object.State.BotIds.First();
// Setup mocks
_mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny<HashSet<Guid>>()))
.ReturnsAsync(new List<Bot>());
_mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()))
.Returns(Task.CompletedTask);
// Act
await agentGrain.UnregisterBotAsync(botId);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Once);
}
private AgentGrain CreateAgentGrain()
{
return new AgentGrain(
_mockState.Object,
_mockLogger.Object,
_mockBotService.Object,
_mockAgentService.Object,
_mockExchangeService.Object,
_mockUserService.Object,
_mockAccountService.Object,
_mockTradingService.Object);
}
}

View File

@@ -0,0 +1,143 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Grains;
using Managing.Application.Bots.Models;
using Managing.Domain.Statistics;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
using static Managing.Common.Enums;
namespace Managing.Application.Tests;
public class AgentGrainTests
{
private readonly Mock<IPersistentState<AgentGrainState>> _mockState;
private readonly Mock<ILogger<AgentGrain>> _mockLogger;
private readonly Mock<IBotService> _mockBotService;
private readonly Mock<IAgentService> _mockAgentService;
private readonly Mock<IExchangeService> _mockExchangeService;
private readonly Mock<IUserService> _mockUserService;
private readonly Mock<IAccountService> _mockAccountService;
private readonly Mock<ITradingService> _mockTradingService;
public AgentGrainTests()
{
_mockState = new Mock<IPersistentState<AgentGrainState>>();
_mockLogger = new Mock<ILogger<AgentGrain>>();
_mockBotService = new Mock<IBotService>();
_mockAgentService = new Mock<IAgentService>();
_mockExchangeService = new Mock<IExchangeService>();
_mockUserService = new Mock<IUserService>();
_mockAccountService = new Mock<IAccountService>();
_mockTradingService = new Mock<ITradingService>();
// Setup default state
_mockState.Setup(x => x.State).Returns(new AgentGrainState
{
AgentName = "TestAgent",
BotIds = new HashSet<Guid> { Guid.NewGuid() }
});
}
[Fact]
public async Task OnAgentSummaryUpdateAsync_WithValidBotId_ShouldCallUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var botId = _mockState.Object.State.BotIds.First();
var updateEvent = new AgentSummaryUpdateEvent
{
UserId = 1,
BotId = botId,
EventType = AgentSummaryEventType.PositionOpened,
Timestamp = DateTime.UtcNow
};
// Setup mocks
_mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny<HashSet<Guid>>()))
.ReturnsAsync(new List<Bot>());
_mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()))
.Returns(Task.CompletedTask);
// Act
await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Once);
}
[Fact]
public async Task OnAgentSummaryUpdateAsync_WithInvalidBotId_ShouldNotCallUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var updateEvent = new AgentSummaryUpdateEvent
{
UserId = 1,
BotId = Guid.NewGuid(), // Different bot ID
EventType = AgentSummaryEventType.PositionOpened,
Timestamp = DateTime.UtcNow
};
// Act
await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Never);
}
[Fact]
public async Task RegisterBotAsync_ShouldUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var newBotId = Guid.NewGuid();
// Setup mocks
_mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny<HashSet<Guid>>()))
.ReturnsAsync(new List<Bot>());
_mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()))
.Returns(Task.CompletedTask);
// Act
await agentGrain.RegisterBotAsync(newBotId);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Once);
}
[Fact]
public async Task UnregisterBotAsync_ShouldUpdateSummary()
{
// Arrange
var agentGrain = CreateAgentGrain();
var botId = _mockState.Object.State.BotIds.First();
// Setup mocks
_mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny<HashSet<Guid>>()))
.ReturnsAsync(new List<Bot>());
_mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()))
.Returns(Task.CompletedTask);
// Act
await agentGrain.UnregisterBotAsync(botId);
// Assert
_mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny<AgentSummary>()), Times.Once);
}
private AgentGrain CreateAgentGrain()
{
return new AgentGrain(
_mockState.Object,
_mockLogger.Object,
_mockBotService.Object,
_mockAgentService.Object,
_mockExchangeService.Object,
_mockUserService.Object,
_mockAccountService.Object,
_mockTradingService.Object);
}
}

View File

@@ -1,3 +1,5 @@
using Managing.Application.Abstractions.Models;
namespace Managing.Application.Abstractions.Grains
{
public interface IAgentGrain : IGrainWithIntegerKey
@@ -23,5 +25,11 @@ namespace Managing.Application.Abstractions.Grains
/// Unregisters a bot from this agent.
/// </summary>
Task UnregisterBotAsync(Guid botId);
/// <summary>
/// Handles stream notifications for agent summary updates.
/// </summary>
/// <param name="updateEvent">The update event from the stream</param>
Task OnAgentSummaryUpdateAsync(AgentSummaryUpdateEvent updateEvent);
}
}

View File

@@ -1,15 +1,15 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Models;
using Managing.Application.Shared;
using Managing.Domain.Statistics;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Bots.Grains;
public class AgentGrain : Grain, IAgentGrain, IRemindable
public class AgentGrain : Grain, IAgentGrain
{
private readonly IPersistentState<AgentGrainState> _state;
private readonly ILogger<AgentGrain> _logger;
@@ -19,7 +19,6 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
private readonly IUserService _userService;
private readonly IAccountService _accountService;
private readonly ITradingService _tradingService;
private const string _updateSummaryReminderName = "UpdateAgentSummary";
public AgentGrain(
[PersistentState("agent-state", "agent-store")]
@@ -54,44 +53,27 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
await _state.WriteStateAsync();
await UpdateSummary();
_logger.LogInformation("Agent {UserId} initialized with name {AgentName}", userId, agentName);
await RegisterReminderAsync();
}
private async Task RegisterReminderAsync()
public async Task OnAgentSummaryUpdateAsync(AgentSummaryUpdateEvent updateEvent)
{
try
{
var options = GrainHelpers.GetDynamicRandomizedTimerOptions(TimeSpan.FromMinutes(2), 200);
_logger.LogInformation("Received agent summary update event for user {UserId}, event type: {EventType}",
this.GetPrimaryKeyLong(), updateEvent.EventType);
// Register a reminder that fires every 5 minutes
await this.RegisterOrUpdateReminder(_updateSummaryReminderName, options.dueTime,
options.period);
_logger.LogInformation("Reminder registered for agent {UserId} to update summary every 5 minutes",
this.GetPrimaryKeyLong());
}
catch (Exception ex)
// Only update summary if the event is for this agent's bots
if (_state.State.BotIds.Contains(updateEvent.BotId))
{
_logger.LogError(ex, "Failed to register reminder for agent {UserId}", this.GetPrimaryKeyLong());
}
}
public async Task ReceiveReminder(string reminderName, TickStatus status)
{
if (reminderName == _updateSummaryReminderName)
{
try
{
_logger.LogInformation("Reminder triggered for agent {UserId} to update summary",
this.GetPrimaryKeyLong());
await UpdateSummary();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating agent summary from reminder for user {UserId}",
_logger.LogError(ex, "Error processing agent summary update event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
}
public async Task UpdateSummary()
{
@@ -203,6 +185,9 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
{
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} registered to Agent {UserId}", botId, this.GetPrimaryKeyLong());
// Update summary after registering bot
await UpdateSummary();
}
}
@@ -212,6 +197,9 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
{
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} unregistered from Agent {UserId}", botId, this.GetPrimaryKeyLong());
// Update summary after unregistering bot
await UpdateSummary();
}
}
}

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Trading.Commands;
using Managing.Application.Trading.Handlers;
@@ -93,6 +94,9 @@ public class TradingBotBase : ITradingBot
$"📢 I'll notify you when signals are triggered.";
await LogInformation(startupMessage);
// Notify AgentGrain about bot startup
await NotifyAgentGrainAsync(AgentSummaryEventType.BotStarted, $"Bot: {Config.Name}, Ticker: {Config.Ticker}");
break;
case BotStatus.Running:
@@ -191,7 +195,7 @@ public class TradingBotBase : ITradingBot
}
}
public async Task UpdateSignals(HashSet<Candle>? candles = null)
public async Task UpdateSignals(HashSet<Candle> candles = null)
{
// If position open and not flipped, do not update signals
if (!Config.FlipPosition && Positions.Any(p => !p.Value.IsFinished())) return;
@@ -765,6 +769,9 @@ public class TradingBotBase : ITradingBot
async messengerService => { await messengerService.SendPosition(position); });
}
// Notify AgentGrain about position opening
await NotifyAgentGrainAsync(AgentSummaryEventType.PositionOpened, $"Signal: {signal.Identifier}");
Logger.LogInformation($"Position requested");
return position; // Return the created position without adding to list
}
@@ -1195,6 +1202,10 @@ public class TradingBotBase : ITradingBot
Logger.LogInformation(
$"💰 **Balance Updated**\nNew bot trading balance: `${Config.BotTradingBalance:F2}`");
}
// Notify AgentGrain about position closing
var pnlInfo = position.ProfitAndLoss?.Realized != null ? $"PnL: {position.ProfitAndLoss.Realized:F2}" : "PnL: Unknown";
await NotifyAgentGrainAsync(AgentSummaryEventType.PositionClosed, $"Signal: {position.SignalIdentifier}, {pnlInfo}");
}
else
{
@@ -1875,4 +1886,42 @@ public class TradingBotBase : ITradingBot
return isInCooldown;
}
/// <summary>
/// Sends a notification to the AgentGrain to trigger summary updates
/// </summary>
/// <param name="eventType">The type of event (e.g., PositionOpened, PositionClosed)</param>
/// <param name="additionalData">Optional additional context data</param>
private async Task NotifyAgentGrainAsync(AgentSummaryEventType eventType, string additionalData = null)
{
if (Config.IsForBacktest || Account?.User == null)
{
return; // Skip notifications for backtest or when no user context
}
try
{
await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
{
var agentGrain = grainFactory.GetGrain<IAgentGrain>(Account.User.Id);
var updateEvent = new AgentSummaryUpdateEvent
{
UserId = Account.User.Id,
BotId = Identifier,
EventType = eventType,
Timestamp = DateTime.UtcNow,
AdditionalData = additionalData
};
await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
Logger.LogDebug("Sent agent notification: {EventType} for bot {BotId}", eventType, Identifier);
});
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to send agent notification: {EventType} for bot {BotId}", eventType, Identifier);
}
}
}

View File

@@ -206,7 +206,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
try
{
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
_priceStream = streamProvider.GetStream<Candle>(streamKey);
_priceStream = streamProvider.GetStream<Candle>("Candles", streamKey);
_streamSubscription = await _priceStream.SubscribeAsync(this);

View File

@@ -201,8 +201,8 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
var now = DateTime.UtcNow;
var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now);
var period = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(TargetTimeframe));
_logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3} seconds",
nameof(PriceFetcherGrain), dueTime, now.Add(dueTime), period);
_logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3}",
string.Join("-", nameof(PriceFetcherGrain), TargetTimeframe), dueTime, now.Add(dueTime), period);
_timer = this.RegisterGrainTimer(
async _ => await FetchAndPublishPricesAsync(),

View File

@@ -313,7 +313,7 @@ namespace Managing.Application.ManageBot
}
else
{
_tradingBotLogger.LogInformation("Creating new bot statistics for bot {BotId}",
_tradingBotLogger.LogInformation("Update bot statistics for bot {BotId}",
bot.Identifier);
// Update existing bot
await repo.UpdateBot(bot);

View File

@@ -486,4 +486,14 @@ public static class Enums
CreatedAt,
UpdatedAt
}
/// <summary>
/// Event types for agent summary updates
/// </summary>
public enum AgentSummaryEventType
{
BotStarted,
PositionOpened,
PositionClosed,
}
}