diff --git a/src/Managing.Application.Abstractions/Models/AgentSummaryUpdateEvent.cs b/src/Managing.Application.Abstractions/Models/AgentSummaryUpdateEvent.cs
new file mode 100644
index 00000000..3f5f1acc
--- /dev/null
+++ b/src/Managing.Application.Abstractions/Models/AgentSummaryUpdateEvent.cs
@@ -0,0 +1,26 @@
+using Orleans;
+using static Managing.Common.Enums;
+
+namespace Managing.Application.Abstractions.Models;
+
+///
+/// Event sent to AgentGrain to trigger summary updates
+///
+[GenerateSerializer]
+public class AgentSummaryUpdateEvent
+{
+ [Id(0)]
+ public int UserId { get; set; }
+
+ [Id(1)]
+ public Guid BotId { get; set; }
+
+ [Id(2)]
+ public AgentSummaryEventType EventType { get; set; }
+
+ [Id(3)]
+ public DateTime Timestamp { get; set; } = DateTime.UtcNow;
+
+ [Id(4)]
+ public string? AdditionalData { get; set; } // Optional additional context
+}
diff --git a/src/Managing.Application.Tests/AgentGrainStreamTests.cs b/src/Managing.Application.Tests/AgentGrainStreamTests.cs
new file mode 100644
index 00000000..18e428ca
--- /dev/null
+++ b/src/Managing.Application.Tests/AgentGrainStreamTests.cs
@@ -0,0 +1,142 @@
+using Managing.Application.Abstractions;
+using Managing.Application.Abstractions.Models;
+using Managing.Application.Abstractions.Services;
+using Managing.Application.Bots.Grains;
+using Managing.Application.Bots.Models;
+using Managing.Domain.Statistics;
+using Microsoft.Extensions.Logging;
+using Moq;
+using Xunit;
+
+namespace Managing.Application.Tests;
+
+public class AgentGrainTests
+{
+ private readonly Mock> _mockState;
+ private readonly Mock> _mockLogger;
+ private readonly Mock _mockBotService;
+ private readonly Mock _mockAgentService;
+ private readonly Mock _mockExchangeService;
+ private readonly Mock _mockUserService;
+ private readonly Mock _mockAccountService;
+ private readonly Mock _mockTradingService;
+
+ public AgentGrainTests()
+ {
+ _mockState = new Mock>();
+ _mockLogger = new Mock>();
+ _mockBotService = new Mock();
+ _mockAgentService = new Mock();
+ _mockExchangeService = new Mock();
+ _mockUserService = new Mock();
+ _mockAccountService = new Mock();
+ _mockTradingService = new Mock();
+
+ // Setup default state
+ _mockState.Setup(x => x.State).Returns(new AgentGrainState
+ {
+ AgentName = "TestAgent",
+ BotIds = new HashSet { Guid.NewGuid() }
+ });
+ }
+
+ [Fact]
+ public async Task OnAgentSummaryUpdateAsync_WithValidBotId_ShouldCallUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var botId = _mockState.Object.State.BotIds.First();
+ var updateEvent = new AgentSummaryUpdateEvent
+ {
+ UserId = 1,
+ BotId = botId,
+ EventType = "PositionOpened",
+ Timestamp = DateTime.UtcNow
+ };
+
+ // Setup mocks
+ _mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny>()))
+ .ReturnsAsync(new List());
+ _mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // Act
+ await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Once);
+ }
+
+ [Fact]
+ public async Task OnAgentSummaryUpdateAsync_WithInvalidBotId_ShouldNotCallUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var updateEvent = new AgentSummaryUpdateEvent
+ {
+ UserId = 1,
+ BotId = Guid.NewGuid(), // Different bot ID
+ EventType = "PositionOpened",
+ Timestamp = DateTime.UtcNow
+ };
+
+ // Act
+ await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Never);
+ }
+
+ [Fact]
+ public async Task RegisterBotAsync_ShouldUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var newBotId = Guid.NewGuid();
+
+ // Setup mocks
+ _mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny>()))
+ .ReturnsAsync(new List());
+ _mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // Act
+ await agentGrain.RegisterBotAsync(newBotId);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Once);
+ }
+
+ [Fact]
+ public async Task UnregisterBotAsync_ShouldUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var botId = _mockState.Object.State.BotIds.First();
+
+ // Setup mocks
+ _mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny>()))
+ .ReturnsAsync(new List());
+ _mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // Act
+ await agentGrain.UnregisterBotAsync(botId);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Once);
+ }
+
+ private AgentGrain CreateAgentGrain()
+ {
+ return new AgentGrain(
+ _mockState.Object,
+ _mockLogger.Object,
+ _mockBotService.Object,
+ _mockAgentService.Object,
+ _mockExchangeService.Object,
+ _mockUserService.Object,
+ _mockAccountService.Object,
+ _mockTradingService.Object);
+ }
+}
diff --git a/src/Managing.Application.Tests/AgentGrainTests.cs b/src/Managing.Application.Tests/AgentGrainTests.cs
new file mode 100644
index 00000000..7f524b80
--- /dev/null
+++ b/src/Managing.Application.Tests/AgentGrainTests.cs
@@ -0,0 +1,143 @@
+using Managing.Application.Abstractions;
+using Managing.Application.Abstractions.Models;
+using Managing.Application.Abstractions.Services;
+using Managing.Application.Bots.Grains;
+using Managing.Application.Bots.Models;
+using Managing.Domain.Statistics;
+using Microsoft.Extensions.Logging;
+using Moq;
+using Xunit;
+using static Managing.Common.Enums;
+
+namespace Managing.Application.Tests;
+
+public class AgentGrainTests
+{
+ private readonly Mock> _mockState;
+ private readonly Mock> _mockLogger;
+ private readonly Mock _mockBotService;
+ private readonly Mock _mockAgentService;
+ private readonly Mock _mockExchangeService;
+ private readonly Mock _mockUserService;
+ private readonly Mock _mockAccountService;
+ private readonly Mock _mockTradingService;
+
+ public AgentGrainTests()
+ {
+ _mockState = new Mock>();
+ _mockLogger = new Mock>();
+ _mockBotService = new Mock();
+ _mockAgentService = new Mock();
+ _mockExchangeService = new Mock();
+ _mockUserService = new Mock();
+ _mockAccountService = new Mock();
+ _mockTradingService = new Mock();
+
+ // Setup default state
+ _mockState.Setup(x => x.State).Returns(new AgentGrainState
+ {
+ AgentName = "TestAgent",
+ BotIds = new HashSet { Guid.NewGuid() }
+ });
+ }
+
+ [Fact]
+ public async Task OnAgentSummaryUpdateAsync_WithValidBotId_ShouldCallUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var botId = _mockState.Object.State.BotIds.First();
+ var updateEvent = new AgentSummaryUpdateEvent
+ {
+ UserId = 1,
+ BotId = botId,
+ EventType = AgentSummaryEventType.PositionOpened,
+ Timestamp = DateTime.UtcNow
+ };
+
+ // Setup mocks
+ _mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny>()))
+ .ReturnsAsync(new List());
+ _mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // Act
+ await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Once);
+ }
+
+ [Fact]
+ public async Task OnAgentSummaryUpdateAsync_WithInvalidBotId_ShouldNotCallUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var updateEvent = new AgentSummaryUpdateEvent
+ {
+ UserId = 1,
+ BotId = Guid.NewGuid(), // Different bot ID
+ EventType = AgentSummaryEventType.PositionOpened,
+ Timestamp = DateTime.UtcNow
+ };
+
+ // Act
+ await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Never);
+ }
+
+ [Fact]
+ public async Task RegisterBotAsync_ShouldUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var newBotId = Guid.NewGuid();
+
+ // Setup mocks
+ _mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny>()))
+ .ReturnsAsync(new List());
+ _mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // Act
+ await agentGrain.RegisterBotAsync(newBotId);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Once);
+ }
+
+ [Fact]
+ public async Task UnregisterBotAsync_ShouldUpdateSummary()
+ {
+ // Arrange
+ var agentGrain = CreateAgentGrain();
+ var botId = _mockState.Object.State.BotIds.First();
+
+ // Setup mocks
+ _mockBotService.Setup(x => x.GetBotsByIdsAsync(It.IsAny>()))
+ .ReturnsAsync(new List());
+ _mockAgentService.Setup(x => x.SaveOrUpdateAgentSummary(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // Act
+ await agentGrain.UnregisterBotAsync(botId);
+
+ // Assert
+ _mockAgentService.Verify(x => x.SaveOrUpdateAgentSummary(It.IsAny()), Times.Once);
+ }
+
+ private AgentGrain CreateAgentGrain()
+ {
+ return new AgentGrain(
+ _mockState.Object,
+ _mockLogger.Object,
+ _mockBotService.Object,
+ _mockAgentService.Object,
+ _mockExchangeService.Object,
+ _mockUserService.Object,
+ _mockAccountService.Object,
+ _mockTradingService.Object);
+ }
+}
diff --git a/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs b/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs
index b71ef86e..6f9d1b06 100644
--- a/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs
+++ b/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs
@@ -1,3 +1,5 @@
+using Managing.Application.Abstractions.Models;
+
namespace Managing.Application.Abstractions.Grains
{
public interface IAgentGrain : IGrainWithIntegerKey
@@ -23,5 +25,11 @@ namespace Managing.Application.Abstractions.Grains
/// Unregisters a bot from this agent.
///
Task UnregisterBotAsync(Guid botId);
+
+ ///
+ /// Handles stream notifications for agent summary updates.
+ ///
+ /// The update event from the stream
+ Task OnAgentSummaryUpdateAsync(AgentSummaryUpdateEvent updateEvent);
}
}
\ No newline at end of file
diff --git a/src/Managing.Application/Bots/Grains/AgentGrain.cs b/src/Managing.Application/Bots/Grains/AgentGrain.cs
index b80d6ecb..4dd2dc1d 100644
--- a/src/Managing.Application/Bots/Grains/AgentGrain.cs
+++ b/src/Managing.Application/Bots/Grains/AgentGrain.cs
@@ -1,15 +1,15 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
+using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Models;
-using Managing.Application.Shared;
using Managing.Domain.Statistics;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Bots.Grains;
-public class AgentGrain : Grain, IAgentGrain, IRemindable
+public class AgentGrain : Grain, IAgentGrain
{
private readonly IPersistentState _state;
private readonly ILogger _logger;
@@ -19,7 +19,6 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
private readonly IUserService _userService;
private readonly IAccountService _accountService;
private readonly ITradingService _tradingService;
- private const string _updateSummaryReminderName = "UpdateAgentSummary";
public AgentGrain(
[PersistentState("agent-state", "agent-store")]
@@ -54,42 +53,25 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
await _state.WriteStateAsync();
await UpdateSummary();
_logger.LogInformation("Agent {UserId} initialized with name {AgentName}", userId, agentName);
- await RegisterReminderAsync();
}
- private async Task RegisterReminderAsync()
+ public async Task OnAgentSummaryUpdateAsync(AgentSummaryUpdateEvent updateEvent)
{
try
{
- var options = GrainHelpers.GetDynamicRandomizedTimerOptions(TimeSpan.FromMinutes(2), 200);
-
- // Register a reminder that fires every 5 minutes
- await this.RegisterOrUpdateReminder(_updateSummaryReminderName, options.dueTime,
- options.period);
- _logger.LogInformation("Reminder registered for agent {UserId} to update summary every 5 minutes",
- this.GetPrimaryKeyLong());
+ _logger.LogInformation("Received agent summary update event for user {UserId}, event type: {EventType}",
+ this.GetPrimaryKeyLong(), updateEvent.EventType);
+
+ // Only update summary if the event is for this agent's bots
+ if (_state.State.BotIds.Contains(updateEvent.BotId))
+ {
+ await UpdateSummary();
+ }
}
catch (Exception ex)
{
- _logger.LogError(ex, "Failed to register reminder for agent {UserId}", this.GetPrimaryKeyLong());
- }
- }
-
- public async Task ReceiveReminder(string reminderName, TickStatus status)
- {
- if (reminderName == _updateSummaryReminderName)
- {
- try
- {
- _logger.LogInformation("Reminder triggered for agent {UserId} to update summary",
- this.GetPrimaryKeyLong());
- await UpdateSummary();
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error updating agent summary from reminder for user {UserId}",
- this.GetPrimaryKeyLong());
- }
+ _logger.LogError(ex, "Error processing agent summary update event for user {UserId}",
+ this.GetPrimaryKeyLong());
}
}
@@ -203,6 +185,9 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
{
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} registered to Agent {UserId}", botId, this.GetPrimaryKeyLong());
+
+ // Update summary after registering bot
+ await UpdateSummary();
}
}
@@ -212,6 +197,9 @@ public class AgentGrain : Grain, IAgentGrain, IRemindable
{
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} unregistered from Agent {UserId}", botId, this.GetPrimaryKeyLong());
+
+ // Update summary after unregistering bot
+ await UpdateSummary();
}
}
}
\ No newline at end of file
diff --git a/src/Managing.Application/Bots/TradingBotBase.cs b/src/Managing.Application/Bots/TradingBotBase.cs
index fdce0cce..60bcb018 100644
--- a/src/Managing.Application/Bots/TradingBotBase.cs
+++ b/src/Managing.Application/Bots/TradingBotBase.cs
@@ -1,5 +1,6 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
+using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Trading.Commands;
using Managing.Application.Trading.Handlers;
@@ -93,6 +94,9 @@ public class TradingBotBase : ITradingBot
$"📢 I'll notify you when signals are triggered.";
await LogInformation(startupMessage);
+
+ // Notify AgentGrain about bot startup
+ await NotifyAgentGrainAsync(AgentSummaryEventType.BotStarted, $"Bot: {Config.Name}, Ticker: {Config.Ticker}");
break;
case BotStatus.Running:
@@ -191,7 +195,7 @@ public class TradingBotBase : ITradingBot
}
}
- public async Task UpdateSignals(HashSet? candles = null)
+ public async Task UpdateSignals(HashSet candles = null)
{
// If position open and not flipped, do not update signals
if (!Config.FlipPosition && Positions.Any(p => !p.Value.IsFinished())) return;
@@ -765,6 +769,9 @@ public class TradingBotBase : ITradingBot
async messengerService => { await messengerService.SendPosition(position); });
}
+ // Notify AgentGrain about position opening
+ await NotifyAgentGrainAsync(AgentSummaryEventType.PositionOpened, $"Signal: {signal.Identifier}");
+
Logger.LogInformation($"Position requested");
return position; // Return the created position without adding to list
}
@@ -1195,6 +1202,10 @@ public class TradingBotBase : ITradingBot
Logger.LogInformation(
$"💰 **Balance Updated**\nNew bot trading balance: `${Config.BotTradingBalance:F2}`");
}
+
+ // Notify AgentGrain about position closing
+ var pnlInfo = position.ProfitAndLoss?.Realized != null ? $"PnL: {position.ProfitAndLoss.Realized:F2}" : "PnL: Unknown";
+ await NotifyAgentGrainAsync(AgentSummaryEventType.PositionClosed, $"Signal: {position.SignalIdentifier}, {pnlInfo}");
}
else
{
@@ -1875,4 +1886,42 @@ public class TradingBotBase : ITradingBot
return isInCooldown;
}
+
+ ///
+ /// Sends a notification to the AgentGrain to trigger summary updates
+ ///
+ /// The type of event (e.g., PositionOpened, PositionClosed)
+ /// Optional additional context data
+ private async Task NotifyAgentGrainAsync(AgentSummaryEventType eventType, string additionalData = null)
+ {
+ if (Config.IsForBacktest || Account?.User == null)
+ {
+ return; // Skip notifications for backtest or when no user context
+ }
+
+ try
+ {
+ await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory =>
+ {
+ var agentGrain = grainFactory.GetGrain(Account.User.Id);
+
+ var updateEvent = new AgentSummaryUpdateEvent
+ {
+ UserId = Account.User.Id,
+ BotId = Identifier,
+ EventType = eventType,
+ Timestamp = DateTime.UtcNow,
+ AdditionalData = additionalData
+ };
+
+ await agentGrain.OnAgentSummaryUpdateAsync(updateEvent);
+
+ Logger.LogDebug("Sent agent notification: {EventType} for bot {BotId}", eventType, Identifier);
+ });
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, "Failed to send agent notification: {EventType} for bot {BotId}", eventType, Identifier);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/Managing.Application/Grains/CandleStoreGrain.cs b/src/Managing.Application/Grains/CandleStoreGrain.cs
index f914ca96..2949020e 100644
--- a/src/Managing.Application/Grains/CandleStoreGrain.cs
+++ b/src/Managing.Application/Grains/CandleStoreGrain.cs
@@ -206,7 +206,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
try
{
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
- _priceStream = streamProvider.GetStream(streamKey);
+ _priceStream = streamProvider.GetStream("Candles", streamKey);
_streamSubscription = await _priceStream.SubscribeAsync(this);
diff --git a/src/Managing.Application/Grains/PriceFetcherGrain.cs b/src/Managing.Application/Grains/PriceFetcherGrain.cs
index a1af9523..3522eb1c 100644
--- a/src/Managing.Application/Grains/PriceFetcherGrain.cs
+++ b/src/Managing.Application/Grains/PriceFetcherGrain.cs
@@ -201,8 +201,8 @@ public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
var now = DateTime.UtcNow;
var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now);
var period = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(TargetTimeframe));
- _logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3} seconds",
- nameof(PriceFetcherGrain), dueTime, now.Add(dueTime), period);
+ _logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3}",
+ string.Join("-", nameof(PriceFetcherGrain), TargetTimeframe), dueTime, now.Add(dueTime), period);
_timer = this.RegisterGrainTimer(
async _ => await FetchAndPublishPricesAsync(),
diff --git a/src/Managing.Application/ManageBot/BotService.cs b/src/Managing.Application/ManageBot/BotService.cs
index 70cdb723..1a7e185f 100644
--- a/src/Managing.Application/ManageBot/BotService.cs
+++ b/src/Managing.Application/ManageBot/BotService.cs
@@ -313,7 +313,7 @@ namespace Managing.Application.ManageBot
}
else
{
- _tradingBotLogger.LogInformation("Creating new bot statistics for bot {BotId}",
+ _tradingBotLogger.LogInformation("Update bot statistics for bot {BotId}",
bot.Identifier);
// Update existing bot
await repo.UpdateBot(bot);
diff --git a/src/Managing.Common/Enums.cs b/src/Managing.Common/Enums.cs
index 88d8ade8..1561671e 100644
--- a/src/Managing.Common/Enums.cs
+++ b/src/Managing.Common/Enums.cs
@@ -486,4 +486,14 @@ public static class Enums
CreatedAt,
UpdatedAt
}
+
+ ///
+ /// Event types for agent summary updates
+ ///
+ public enum AgentSummaryEventType
+ {
+ BotStarted,
+ PositionOpened,
+ PositionClosed,
+ }
}
\ No newline at end of file