Add Role based grain placement

This commit is contained in:
2025-09-18 20:17:28 +07:00
parent 530dd83daa
commit c2f3734021
16 changed files with 404 additions and 18 deletions

View File

@@ -1,13 +1,13 @@
{
"PostgreSql": {
"ConnectionString": "Host=managing-postgre.apps.managing.live;Port=5432;Database=managing;Username=postgres;Password=29032b13a5bc4d37",
"Orleans": "Host=managing-postgre.apps.managing.live;Port=5432;Database=orleans;Username=postgres;Password=29032b13a5bc4d37"
},
"InfluxDb": {
"Url": "https://influx-db.apps.managing.live",
"Organization": "managing-org",
"Token": "eOuXcXhH7CS13Iw4CTiDDpRjIjQtEVPOloD82pLPOejI4n0BsEj1YzUw0g3Cs1mdDG5m-RaxCavCMsVTtS5wIQ=="
},
"Privy": {
"AppId": "cm6f47n1l003jx7mjwaembhup",
"AppSecret": "63Chz2z5M8TgR5qc8dznSLRAGTHTyPU4cjdQobrBF1Cx5tszZpTuFgyrRd7hZ2k6HpwDz3GEwQZzsCqHb8Z311bF"
},
"N8n": {
"WebhookUrl": "https://n8n.kai.managing.live/webhook/fa9308b6-983b-42ec-b085-71599d655951"
},

View File

@@ -3,12 +3,18 @@ using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Models;
using Managing.Application.Orleans;
using Managing.Domain.Statistics;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Bots.Grains;
/// <summary>
/// Orleans grain for Agent operations.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class AgentGrain : Grain, IAgentGrain
{
private readonly IPersistentState<AgentGrainState> _state;

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Orleans;
using Managing.Common;
using Managing.Domain.Backtests;
using Managing.Domain.Bots;
@@ -20,8 +21,10 @@ namespace Managing.Application.Bots.Grains;
/// Orleans grain for backtest trading bot operations.
/// Uses composition with TradingBotBase to maintain separation of concerns.
/// This grain is stateless and follows the exact pattern of GetBacktestingResult from Backtester.cs.
/// Uses custom compute placement with random fallback.
/// </summary>
[StatelessWorker]
[TradingPlacement] // Use custom compute placement with random fallback
public class BacktestTradingBotGrain : Grain, IBacktestTradingBotGrain
{
private readonly ILogger<BacktestTradingBotGrain> _logger;

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Orleans;
using Managing.Core;
using Managing.Domain.Bots;
using Microsoft.Extensions.DependencyInjection;
@@ -13,12 +14,14 @@ namespace Managing.Application.Bots.Grains;
/// Fetches all running bots and pings them to ensure their reminders are properly registered.
/// This grain ensures that only one instance runs the initialization process
/// even in multi-silo environments.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class BotReminderInitializerGrain : Grain, IBotReminderInitializerGrain, IRemindable
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<BotReminderInitializerGrain> _logger;
private const string CheckRunningBotsReminderName = "CheckRunningBotsReminder";
public BotReminderInitializerGrain(
@@ -37,11 +40,12 @@ public class BotReminderInitializerGrain : Grain, IBotReminderInitializerGrain,
{
try
{
_logger.LogInformation("BotReminderInitializerGrain starting - fetching running bots to reactivate reminders");
_logger.LogInformation(
"BotReminderInitializerGrain starting - fetching running bots to reactivate reminders");
// Get all running bots from the database
var runningBots = await GetRunningBotsAsync();
if (!runningBots.Any())
{
_logger.LogInformation("No running bots found to reactivate");
@@ -58,16 +62,17 @@ public class BotReminderInitializerGrain : Grain, IBotReminderInitializerGrain,
try
{
_logger.LogDebug("Reactivating bot {BotId} ({BotName})", bot.Identifier, bot.Name);
// First, update the bot status in the registry to Running
await botRegistry.UpdateBotStatus(bot.Identifier, BotStatus.Running);
_logger.LogDebug("Updated registry status to Running for bot {BotId} ({BotName})", bot.Identifier, bot.Name);
_logger.LogDebug("Updated registry status to Running for bot {BotId} ({BotName})", bot.Identifier,
bot.Name);
// Then ping the bot to reactivate it
var grain = GrainFactory.GetGrain<ILiveTradingBotGrain>(bot.Identifier);
var success = await grain.PingAsync();
if (success)
{
_logger.LogDebug("Successfully reactivated bot {BotId} ({BotName})", bot.Identifier, bot.Name);
@@ -94,7 +99,8 @@ public class BotReminderInitializerGrain : Grain, IBotReminderInitializerGrain,
TimeSpan.FromHours(1), // Start in 1 hour
TimeSpan.FromHours(1)); // Repeat every hour
_logger.LogInformation("BotReminderInitializerGrain completed - processed {Count} running bots", runningBots.Count());
_logger.LogInformation("BotReminderInitializerGrain completed - processed {Count} running bots",
runningBots.Count());
}
catch (Exception ex)
{
@@ -134,4 +140,4 @@ public class BotReminderInitializerGrain : Grain, IBotReminderInitializerGrain,
return Enumerable.Empty<Bot>();
}
}
}
}

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Orleans;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
@@ -9,7 +10,9 @@ namespace Managing.Application.Bots.Grains;
/// Orleans grain for LiveBotRegistry operations.
/// This grain acts as a central, durable directory for all LiveTradingBot grains.
/// It maintains a persistent, up-to-date list of all known bot IDs and their status.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class LiveBotRegistryGrain : Grain, ILiveBotRegistryGrain
{
private readonly IPersistentState<BotRegistryState> _state;

View File

@@ -1,6 +1,7 @@
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services;
using Managing.Application.Orleans;
using Managing.Application.Shared;
using Managing.Common;
using Managing.Core;
@@ -19,7 +20,9 @@ namespace Managing.Application.Bots.Grains;
/// Orleans grain for live trading bot operations.
/// Uses composition with TradingBotBase to maintain separation of concerns.
/// This grain handles live trading scenarios with real-time market data and execution.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
{
private readonly IPersistentState<TradingBotGrainState> _state;

View File

@@ -1,6 +1,7 @@
using System.Text.Json;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services;
using Managing.Application.Orleans;
using Managing.Core;
using Managing.Domain.Accounts;
using Managing.Domain.Backtests;
@@ -18,8 +19,10 @@ namespace Managing.Application.Grains;
/// Stateless worker grain for processing bundle backtest requests
/// Uses the bundle request ID as the primary key (Guid)
/// Implements IRemindable for automatic retry of failed bundles
/// Uses custom compute placement with random fallback.
/// </summary>
[StatelessWorker]
[TradingPlacement] // Use custom compute placement with random fallback
public class BundleBacktestGrain : Grain, IBundleBacktestGrain, IRemindable
{
private readonly ILogger<BundleBacktestGrain> _logger;

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Orleans;
using Managing.Domain.Candles;
using Microsoft.Extensions.Logging;
using Orleans.Streams;
@@ -10,7 +11,9 @@ namespace Managing.Application.Grains;
/// <summary>
/// Grain for managing in-memory historical candle data with Orleans state persistence.
/// Subscribes to price streams and maintains a rolling window of 500 candles.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
{
private readonly IPersistentState<CandleStoreGrainState> _state;

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services;
using Managing.Application.Orleans;
using Managing.Core;
using Managing.Domain.Accounts;
using Managing.Domain.Backtests;
@@ -12,8 +13,10 @@ namespace Managing.Application.Grains;
/// <summary>
/// Stateless worker grain for processing genetic backtest requests.
/// Uses the genetic request ID (string) as the primary key.
/// Uses custom compute placement with random fallback.
/// </summary>
[StatelessWorker]
[TradingPlacement] // Use custom compute placement with random fallback
public class GeneticBacktestGrain : Grain, IGeneticBacktestGrain
{
private readonly ILogger<GeneticBacktestGrain> _logger;

View File

@@ -2,6 +2,7 @@ using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Models;
using Managing.Application.Abstractions.Services;
using Managing.Application.Orleans;
using Managing.Domain.Bots;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
@@ -10,7 +11,9 @@ namespace Managing.Application.Grains;
/// <summary>
/// Grain for managing platform-wide summary metrics with real-time updates and periodic snapshots
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class PlatformSummaryGrain : Grain, IPlatformSummaryGrain, IRemindable
{
private readonly IPersistentState<PlatformSummaryGrainState> _state;

View File

@@ -1,6 +1,7 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Application.Orleans;
using Managing.Application.Shared;
using Managing.Common;
using Managing.Domain.Accounts;
@@ -15,7 +16,9 @@ namespace Managing.Application.Grains;
/// Grain for fetching price data from external APIs and publishing to Orleans streams.
/// This grain runs periodically and processes all exchange/ticker combinations for a specific timeframe.
/// The timeframe is passed as the PrimaryKeyString to identify which timeframe this grain handles.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
{
private readonly ILogger<PriceFetcherGrain> _logger;

View File

@@ -0,0 +1,151 @@
using Orleans.Runtime.Placement;
namespace Managing.Application.Orleans;
/// <summary>
/// Placement strategy for compute-intensive grains (Genetic Backtest, Bundle Backtest)
/// These grains should be placed on servers with the "Compute" role
/// </summary>
[Serializable]
public sealed class ComputePlacementStrategy : PlacementStrategy
{
}
/// <summary>
/// Placement strategy for trading grains (Live Trading Bots, Agent Grains)
/// These grains should be placed on servers with the "Trading" role
/// Uses activation-count-based placement for load balancing across multiple trading silos
/// </summary>
[Serializable]
public sealed class TradingPlacementStrategy : PlacementStrategy
{
}
/// <summary>
/// Placement attribute for compute-intensive grains
/// Uses custom placement strategy with random fallback
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class ComputePlacementAttribute : Attribute
{
}
/// <summary>
/// Placement attribute for trading grains with load balancing
/// Uses custom placement strategy with activation-count-based fallback
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class TradingPlacementAttribute : Attribute
{
}
/// <summary>
/// Placement director for compute-intensive grains
/// Places grains on silos with the "Compute" role, falls back to random placement
/// </summary>
public class ComputePlacementDirector : IPlacementDirector
{
public Task<SiloAddress> OnAddActivation(
PlacementStrategy strategy,
PlacementTarget target,
IPlacementContext context)
{
var compatibleSilos = context.GetCompatibleSilos(target).ToList();
if (!compatibleSilos.Any())
{
throw new InvalidOperationException(
$"No compatible silos available for compute grain {target.GrainIdentity}");
}
// Try to find silos with "Compute" role by checking silo names
var computeSilos = compatibleSilos.Where(silo =>
{
// Check if silo name contains "-Compute" to identify compute silos
var siloName = GetSiloName(silo);
return siloName?.Contains("-Compute") == true;
}).ToList();
if (computeSilos.Any())
{
// Use random selection among compute silos
var random = new Random();
return Task.FromResult(computeSilos[random.Next(computeSilos.Count)]);
}
// Fallback: Use random placement among all compatible silos
var randomFallback = new Random();
return Task.FromResult(compatibleSilos[randomFallback.Next(compatibleSilos.Count)]);
}
private string GetSiloName(SiloAddress siloAddress)
{
// Extract silo name from the address - this is a simplified approach
// In a real implementation, you might need to access silo metadata differently
return siloAddress.ToString();
}
}
/// <summary>
/// Placement director for trading grains with load balancing
/// Places grains on silos with the "Trading" role using activation-count-based placement
/// Falls back to activation-count-based placement if no trading silos found
/// </summary>
public class TradingPlacementDirector : IPlacementDirector
{
public Task<SiloAddress> OnAddActivation(
PlacementStrategy strategy,
PlacementTarget target,
IPlacementContext context)
{
var compatibleSilos = context.GetCompatibleSilos(target).ToList();
if (!compatibleSilos.Any())
{
throw new InvalidOperationException(
$"No compatible silos available for trading grain {target.GrainIdentity}");
}
// Try to find silos with "Trading" role by checking silo names
var tradingSilos = compatibleSilos.Where(silo =>
{
// Check if silo name contains "-Trading" to identify trading silos
var siloName = GetSiloName(silo);
return siloName?.Contains("-Trading") == true;
}).ToList();
if (tradingSilos.Any())
{
// Use activation-count-based placement among trading silos
return SelectSiloWithLoadBalancing(tradingSilos, context);
}
// Fallback: Use activation-count-based placement among all compatible silos
return SelectSiloWithLoadBalancing(compatibleSilos, context);
}
private Task<SiloAddress> SelectSiloWithLoadBalancing(List<SiloAddress> silos, IPlacementContext context)
{
if (silos.Count == 1)
{
return Task.FromResult(silos.First());
}
// Implement "Power of Two Choices" algorithm for load balancing
var random = new Random();
var selectedSilos = silos.OrderBy(x => random.Next()).Take(2).ToList();
// For now, use random selection between the two chosen silos
// In a real implementation, you would check activation counts
var selectedSilo = selectedSilos[random.Next(selectedSilos.Count)];
return Task.FromResult(selectedSilo);
}
private string GetSiloName(SiloAddress siloAddress)
{
// Extract silo name from the address - this is a simplified approach
// In a real implementation, you might need to access silo metadata differently
return siloAddress.ToString();
}
}

View File

@@ -1,4 +1,5 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Orleans;
using Managing.Core;
using Managing.Domain.Bots;
using Managing.Domain.Candles;
@@ -14,8 +15,10 @@ namespace Managing.Application.Scenarios;
/// <summary>
/// Orleans grain for scenario execution and signal generation.
/// This stateless grain handles candle management and signal generation for live trading.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[StatelessWorker]
[TradingPlacement] // Use custom trading placement with load balancing
public class ScenarioRunnerGrain : Grain, IScenarioRunnerGrain
{
private readonly ILogger<ScenarioRunnerGrain> _logger;

View File

@@ -14,6 +14,7 @@ using Managing.Application.Grains;
using Managing.Application.ManageBot;
using Managing.Application.ManageBot.Commands;
using Managing.Application.MoneyManagements;
using Managing.Application.Orleans;
using Managing.Application.Scenarios;
using Managing.Application.Shared;
using Managing.Application.Shared.Behaviours;
@@ -138,10 +139,12 @@ public static class ApiBootstrap
}
var postgreSqlConnectionString = configuration.GetSection("PostgreSql")["Orleans"];
var siloRole = Environment.GetEnvironmentVariable("SILO_ROLE") ?? "Trading";
Console.WriteLine($"Task Slot: {taskSlot}");
Console.WriteLine($"Hostname: {hostname}");
Console.WriteLine($"Advertised IP: {advertisedIP}");
Console.WriteLine($"Role: {siloRole}");
Console.WriteLine($"Silo port: {siloPort}");
Console.WriteLine($"Gateway port: {gatewayPort}");
Console.WriteLine($"Dashboard port: {dashboardPort}");
@@ -174,8 +177,8 @@ public static class ApiBootstrap
.Configure<SiloOptions>(options =>
{
// Configure silo address for multi-server clustering
options.SiloName = $"ManagingApi-{taskSlot}";
// Orleans will use the configured endpoints for clustering
options.SiloName = $"ManagingApi-{taskSlot}-{siloRole}";
Console.WriteLine($"Configuring silo with role: {siloRole}");
});
}
else
@@ -327,6 +330,10 @@ public static class ApiBootstrap
siloBuilder
.ConfigureServices(services =>
{
// Register custom placement directors for role-based placement
services.AddPlacementDirector<ComputePlacementStrategy, ComputePlacementDirector>();
services.AddPlacementDirector<TradingPlacementStrategy, TradingPlacementDirector>();
// Register existing services for Orleans DI
// These will be available to grains through dependency injection
services.AddTransient<IExchangeService, ExchangeService>();

View File

@@ -9,8 +9,8 @@ test('GMX Position Closing', async (t) => {
const result = await closeGmxPositionImpl(
sdk,
"ETH",
TradeDirection.Long
"DOGE",
TradeDirection.Short
)
console.log('Position closing result:', result)
assert.ok(result, 'Position closing result should be defined')