Files
managing-apps/src/Managing.Api.Workers/Workers/PositionManagerWorker.cs

304 lines
13 KiB
C#

using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Services;
using Managing.Application.Workers;
using Managing.Application.Workers.Abstractions;
using Managing.Domain.Accounts;
using Managing.Domain.Trades;
using Newtonsoft.Json;
using static Managing.Common.Enums;
namespace Managing.Api.Workers.Workers;
public class PositionManagerWorker : BaseWorker<PositionManagerWorker>
{
private static readonly WorkerType _workerType = WorkerType.PositionManager;
private readonly ITradingService _tradingService;
private readonly IExchangeService _exchangeService;
private readonly IAccountService _accountService;
private readonly ILogger<PositionManagerWorker> _logger;
private readonly ICacheService _cacheService;
public PositionManagerWorker(
ILogger<PositionManagerWorker> logger,
IWorkerService workerService,
ITradingService tradingService,
IExchangeService exchangeService,
IAccountService accountService, ICacheService cacheService) : base(
_workerType,
logger,
TimeSpan.FromMinutes(1),
workerService)
{
_logger = logger;
_tradingService = tradingService;
_exchangeService = exchangeService;
_accountService = accountService;
_cacheService = cacheService;
}
protected override async Task Run(CancellationToken cancellationToken)
{
await ManageNewPositions();
await ManagePartiallyFilledPositions();
await ManageFilledPositions();
}
private async Task ManagePartiallyFilledPositions()
{
var positions = GetPositions(PositionStatus.PartiallyFilled);
_logger.LogInformation("Processing {PartiallyFilledCount} partially filled positions", positions.Count());
foreach (var position in positions)
{
using (_logger.BeginScope("Position {PositionId} ({Ticker})", position.Identifier, position.Ticker))
{
try
{
// Lock position for processing
position.Status = PositionStatus.Updating;
_tradingService.UpdatePosition(position);
_logger.LogDebug("Processing risk orders for {Direction} position opened at {OpenDate}",
position.OriginDirection, position.Date.ToString("o"));
var account = await _accountService.GetAccount(position.AccountName, false, false);
var success = true;
// Process and update trades
var updatedSl = await ProcessTrade(account, position.StopLoss, "SL", async () =>
await _exchangeService.OpenStopLoss(account, position.Ticker, position.OriginDirection,
position.StopLoss.Price, position.StopLoss.Quantity, false, DateTime.UtcNow));
if (updatedSl != null)
{
position.StopLoss = updatedSl;
success &= updatedSl.Status.IsActive();
}
var updatedTp1 = await ProcessTrade(account, position.TakeProfit1, "TP1", async () =>
await _exchangeService.OpenTakeProfit(account, position.Ticker, position.OriginDirection,
position.TakeProfit1.Price, position.TakeProfit1.Quantity, false, DateTime.UtcNow));
if (updatedTp1 != null)
{
position.TakeProfit1 = updatedTp1;
success &= updatedTp1.Status.IsActive();
}
Trade? updatedTp2 = null;
if (position.TakeProfit2 != null)
{
updatedTp2 = await ProcessTrade(account, position.TakeProfit2, "TP2", async () =>
await _exchangeService.OpenTakeProfit(account, position.Ticker, position.OriginDirection,
position.TakeProfit2.Price, position.TakeProfit2.Quantity, false, DateTime.UtcNow));
if (updatedTp2 != null)
{
position.TakeProfit2 = updatedTp2;
success &= updatedTp2.Status.IsActive() || updatedTp2.Status == TradeStatus.Cancelled;
}
}
// Update position status based on trade states
position.Status = success && AllTradesActive(position)
? PositionStatus.Filled
: PositionStatus.PartiallyFilled;
_logger.LogInformation("Final position status: {Status}", position.Status);
}
catch (Exception ex)
{
_logger.LogError(ex, "Position processing failed");
position.Status = PositionStatus.PartiallyFilled;
}
finally
{
_tradingService.UpdatePosition(position);
}
}
}
}
private async Task<Trade?> ProcessTrade(Account account, Trade trade, string tradeType, Func<Task<Trade>> createTrade)
{
try
{
// 1. Check existing status on exchange
var exchangeTrade = await _exchangeService.GetTrade(account, trade.ExchangeOrderId, trade.Ticker);
if (exchangeTrade != null && exchangeTrade.Status.IsActive())
{
_logger.LogInformation("{TradeType} already exists on exchange - Status: {Status}",
tradeType, exchangeTrade.Status);
return exchangeTrade;
}
// 2. Only create new order if in pending state
if (trade.Status != TradeStatus.PendingOpen)
{
_logger.LogWarning("{TradeType} creation skipped - Invalid status: {Status}",
tradeType, trade.Status);
return null;
}
// 3. Create new order
var newTrade = await createTrade();
if (newTrade?.Status == TradeStatus.Requested)
{
_logger.LogInformation("{TradeType} successfully created - ExchangeID: {ExchangeOrderId}",
tradeType, newTrade.ExchangeOrderId);
return newTrade;
}
_logger.LogError("{TradeType} creation failed - Null response or invalid status", tradeType);
return null;
}
catch (Exception ex)
{
_logger.LogError(ex, "{TradeType} processing failed", tradeType);
return null;
}
}
private bool AllTradesActive(Position position)
{
return position.StopLoss.Status.IsActive() &&
position.TakeProfit1.Status.IsActive() &&
(position.TakeProfit2?.Status.IsActive() ?? true);
}
private async Task ManageFilledPositions()
{
var positions = GetPositions(PositionStatus.Filled);
_logger.LogInformation("Monitoring {FilledPositionCount} filled positions", positions.Count());
foreach (var position in positions)
{
using (_logger.BeginScope("Position {PositionId} ({Ticker})", position.Identifier, position.Ticker))
{
try
{
// Acquire processing lock
_logger.LogDebug("Acquiring position lock");
position.Status = PositionStatus.Updating;
_tradingService.UpdatePosition(position);
_logger.LogInformation("Managing filled position - Direction: {Direction}, Open Since: {OpenDate}",
position.OriginDirection, position.Date.ToString("yyyy-MM-dd HH:mm:ss"));
var account = await GetAccount(position.AccountName);
// Perform position management
var updatedPosition = await _tradingService.ManagePosition(account, position);
// Log status changes if they occurred
if (updatedPosition.Status != position.Status)
{
_logger.LogInformation("Position status updated: {OldStatus} → {NewStatus}",
position.Status, updatedPosition.Status);
}
_tradingService.UpdatePosition(updatedPosition);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to manage position - {ErrorMessage}", ex.Message);
// Reset status for retry
position.Status = PositionStatus.Filled;
_tradingService.UpdatePosition(position);
}
finally
{
// Ensure lock is always released
if (position.Status == PositionStatus.Updating)
{
position.Status = PositionStatus.Filled;
_tradingService.UpdatePosition(position);
}
}
}
}
}
private IEnumerable<Position> GetPositions(PositionStatus positionStatus)
{
return _tradingService.GetPositionsByStatus(positionStatus)
.Where(p => p.Initiator != PositionInitiator.PaperTrading);
}
private async Task ManageNewPositions()
{
var positions = GetPositions(PositionStatus.New);
_logger.LogInformation("Processing {NewPositionCount} new positions", positions.Count());
foreach (var position in positions)
{
using (_logger.BeginScope("Position {Identifier}", position.Identifier))
{
try
{
// Immediate status update for concurrency protection
_logger.LogDebug("[{Identifier}] Acquiring position lock via status update", position.Identifier);
position.Status = PositionStatus.Updating;
_tradingService.UpdatePosition(position);
var account = await GetAccount(position.AccountName);
var trade = await _exchangeService.GetTrade(account.Key, position.Open.ExchangeOrderId, position.Ticker);
var openTrade = position.Open;
if (trade.Status == TradeStatus.PendingOpen || trade.Status == TradeStatus.Requested)
{
// Position staleness check
if (position.Date < DateTime.UtcNow.AddDays(-1))
{
position.Status = PositionStatus.Canceled;
_tradingService.UpdatePosition(position);
_logger.LogWarning("[{Identifier}] Position canceled - stale since {PositionAge} days",
position.Identifier,
(DateTime.UtcNow - position.Date).TotalDays);
}
else
{
// Reset status for retry
position.Status = PositionStatus.New;
_tradingService.UpdatePosition(position);
_logger.LogInformation("[{Identifier}] Awaiting order fill - {Ticker} (0/{ExpectedQuantity})",
position.Identifier,
position.Ticker, openTrade.Quantity);
}
}
else
{
position.Status = PositionStatus.PartiallyFilled;
position.Open = openTrade;
// Position is now open, now waiting to open SLTP
_tradingService.UpdatePosition(position);
_logger.LogInformation("[{Identifier}] Position now open ",
position.Identifier);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing position {Identifier}", position.Identifier);
// Consider resetting to New status for retry if needed
position.Status = PositionStatus.New;
_tradingService.UpdatePosition(position);
}
}
}
}
private async Task<Account> GetAccount(string accountName)
{
var account = _cacheService.GetValue<Account>(accountName);
if (account == null)
{
account = await _accountService.GetAccount(accountName, false, false);
_cacheService.SaveValue(accountName, JsonConvert.SerializeObject(account));
}
return account;
}
}