Add copy trading functionality with StartCopyTrading endpoint and related models. Implemented position copying from master bot and subscription to copy trading stream in LiveTradingBotGrain. Updated TradingBotConfig to support copy trading parameters.
This commit is contained in:
@@ -12,6 +12,7 @@ using Managing.Domain.Trades;
|
||||
using Managing.Domain.Users;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Streams;
|
||||
using static Managing.Common.Enums;
|
||||
|
||||
namespace Managing.Application.Bots.Grains;
|
||||
@@ -31,6 +32,7 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
private TradingBotBase? _tradingBot;
|
||||
private IDisposable? _timer;
|
||||
private string _reminderName = "RebootReminder";
|
||||
private StreamSubscriptionHandle<Position>? _copyTradingStreamHandle;
|
||||
|
||||
public LiveTradingBotGrain(
|
||||
[PersistentState("live-trading-bot", "bot-store")]
|
||||
@@ -222,6 +224,9 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
RegisterAndStartTimer();
|
||||
await RegisterReminder();
|
||||
|
||||
// Subscribe to copy trading stream if configured
|
||||
await SubscribeToCopyTradingStreamAsync();
|
||||
|
||||
// Update both database and registry status
|
||||
await SaveBotAsync(BotStatus.Running);
|
||||
await UpdateBotRegistryStatus(BotStatus.Running);
|
||||
@@ -337,6 +342,9 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
StopAndDisposeTimer();
|
||||
await UnregisterReminder();
|
||||
|
||||
// Unsubscribe from copy trading stream
|
||||
await UnsubscribeFromCopyTradingStreamAsync();
|
||||
|
||||
// Track runtime: accumulate current session runtime when stopping
|
||||
if (_state.State.LastStartTime.HasValue)
|
||||
{
|
||||
@@ -386,6 +394,76 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Subscribes to the copy trading stream if this bot is configured for copy trading
|
||||
/// </summary>
|
||||
private async Task SubscribeToCopyTradingStreamAsync()
|
||||
{
|
||||
// Only subscribe if this is a copy trading bot and we have a master bot identifier
|
||||
if (!_state.State.Config.IsForCopyTrading || !_state.State.Config.MasterBotIdentifier.HasValue)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
|
||||
var streamId = StreamId.Create("CopyTrading", _state.State.Config.MasterBotIdentifier.Value);
|
||||
_copyTradingStreamHandle = await streamProvider.GetStream<Position>(streamId)
|
||||
.SubscribeAsync(OnCopyTradingPositionReceivedAsync);
|
||||
|
||||
_logger.LogInformation("LiveTradingBotGrain {GrainId} subscribed to copy trading stream for master bot {MasterBotId}",
|
||||
this.GetPrimaryKey(), _state.State.Config.MasterBotIdentifier.Value);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to subscribe to copy trading stream for bot {GrainId}", this.GetPrimaryKey());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribes from the copy trading stream
|
||||
/// </summary>
|
||||
private async Task UnsubscribeFromCopyTradingStreamAsync()
|
||||
{
|
||||
if (_copyTradingStreamHandle != null)
|
||||
{
|
||||
await _copyTradingStreamHandle.UnsubscribeAsync();
|
||||
_copyTradingStreamHandle = null;
|
||||
_logger.LogInformation("LiveTradingBotGrain {GrainId} unsubscribed from copy trading stream", this.GetPrimaryKey());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles incoming positions from the copy trading stream
|
||||
/// </summary>
|
||||
private async Task OnCopyTradingPositionReceivedAsync(Position masterPosition, StreamSequenceToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_tradingBot == null)
|
||||
{
|
||||
_logger.LogWarning("Received copy trading position {PositionId} but trading bot is not running for bot {GrainId}",
|
||||
masterPosition.Identifier, this.GetPrimaryKey());
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("📡 Copy trading: Received position {PositionId} from master bot for bot {GrainId}",
|
||||
masterPosition.Identifier, this.GetPrimaryKey());
|
||||
|
||||
// Create a copy of the position for this bot
|
||||
await _tradingBot.CopyPositionFromMasterAsync(masterPosition);
|
||||
|
||||
_logger.LogInformation("✅ Copy trading: Successfully copied position {PositionId} for bot {GrainId}",
|
||||
masterPosition.Identifier, this.GetPrimaryKey());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to handle copy trading position {PositionId} for bot {GrainId}",
|
||||
masterPosition.Identifier, this.GetPrimaryKey());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a TradingBotBase instance using composition
|
||||
/// </summary>
|
||||
@@ -398,7 +476,8 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var logger = scope.ServiceProvider.GetRequiredService<ILogger<TradingBotBase>>();
|
||||
var tradingBot = new TradingBotBase(logger, _scopeFactory, config);
|
||||
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
|
||||
var tradingBot = new TradingBotBase(logger, _scopeFactory, config, streamProvider);
|
||||
|
||||
// Load state into the trading bot instance
|
||||
LoadStateIntoTradingBot(tradingBot);
|
||||
|
||||
Reference in New Issue
Block a user