Files
managing-apps/src/Managing.Application/Grains/CandleStoreGrain.cs
Oda cee3902a4d Update SDK (#35)
* Update SDK for swap

* Fix web3proxy build

* Update types

* Fix swap

* Send token test and BASE transfer

* fix cache and hook

* Fix send

* Update health check with uiFeereceiver

* Fix sdk

* Fix get positions

* Fix timeoutloop

* Fix open position

* Fix closes positions

* Review
2025-09-17 14:28:56 +07:00

281 lines
9.9 KiB
C#

using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Domain.Candles;
using Microsoft.Extensions.Logging;
using Orleans.Streams;
using static Managing.Common.Enums;
namespace Managing.Application.Grains;
/// <summary>
/// Grain for managing in-memory historical candle data with Orleans state persistence.
/// Subscribes to price streams and maintains a rolling window of 500 candles.
/// </summary>
public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
{
private readonly IPersistentState<CandleStoreGrainState> _state;
private readonly ILogger<CandleStoreGrain> _logger;
private readonly ICandleRepository _candleRepository;
private const int MaxCandleCount = 500;
private IAsyncStream<Candle> _priceStream;
private StreamSubscriptionHandle<Candle> _streamSubscription;
public CandleStoreGrain(
[PersistentState("candle-store-state", "candle-store")]
IPersistentState<CandleStoreGrainState> state,
ILogger<CandleStoreGrain> logger,
ICandleRepository candleRepository)
{
_state = state;
_logger = logger;
_candleRepository = candleRepository;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
try
{
var grainKey = this.GetPrimaryKeyString();
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
// Parse the grain key to extract exchange, ticker, and timeframe
var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey);
await LoadInitialCandlesAsync(exchange, ticker, timeframe);
// Subscribe to the price stream
await SubscribeToPriceStreamAsync(grainKey);
await base.OnActivateAsync(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during CandleStoreGrain activation for key: {GrainKey}",
this.GetPrimaryKeyString());
// Ensure state is initialized even if there's an error
if (_state.State.Candles == null)
{
_state.State.Candles = new List<Candle>();
await _state.WriteStateAsync();
}
throw; // Re-throw to let Orleans handle the activation failure
}
}
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
// Unsubscribe from the stream with proper error handling
if (_streamSubscription != null)
{
try
{
await _streamSubscription.UnsubscribeAsync();
_logger.LogDebug("Successfully unsubscribed from stream for grain {GrainKey}", this.GetPrimaryKeyString());
}
catch (Exception ex)
{
// Log the error but don't throw - this is common during shutdown when
// the pub-sub rendezvous grain may already be deactivated
_logger.LogWarning(ex, "Failed to unsubscribe from stream during deactivation for grain {GrainKey}. This is normal during shutdown.",
this.GetPrimaryKeyString());
}
finally
{
_streamSubscription = null;
}
}
await base.OnDeactivateAsync(reason, cancellationToken);
}
public Task<HashSet<Candle>> GetCandlesAsync()
{
try
{
// Ensure state is initialized
if (_state.State.Candles == null)
{
_logger.LogWarning("State not initialized for grain {GrainKey}, returning empty list",
this.GetPrimaryKeyString());
return Task.FromResult(new HashSet<Candle>());
}
return Task.FromResult(_state.State.Candles.ToHashSet());
}
catch (Exception ex)
{
_logger.LogError(ex, "Error retrieving candles for grain {GrainKey}", this.GetPrimaryKeyString());
return Task.FromResult(new HashSet<Candle>());
}
}
// Stream observer implementation
public async Task OnNextAsync(Candle candle, StreamSequenceToken token = null)
{
try
{
_logger.LogDebug("Received new candle for {GrainKey} at {Date}",
this.GetPrimaryKeyString(), candle.Date);
// Initialize state if needed
if (_state.State.Candles == null)
{
_state.State.Candles = new List<Candle>();
}
// Add the new candle
_state.State.Candles.Add(candle);
// Maintain rolling window of 500 candles
if (_state.State.Candles.Count > MaxCandleCount)
{
// Sort by date and keep the most recent 500
_state.State.Candles = _state.State.Candles
.OrderBy(c => c.Date)
.TakeLast(MaxCandleCount)
.ToList();
}
// Persist the updated state
await _state.WriteStateAsync();
_logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
this.GetPrimaryKeyString(), _state.State.Candles.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing new candle for grain {GrainKey}", this.GetPrimaryKeyString());
}
}
public Task OnCompletedAsync()
{
_logger.LogInformation("Stream completed for grain {GrainKey}", this.GetPrimaryKeyString());
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
_logger.LogError(ex, "Stream error for grain {GrainKey}", this.GetPrimaryKeyString());
return Task.CompletedTask;
}
private async Task LoadInitialCandlesAsync(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
{
try
{
_logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
// Ensure state is initialized first
if (_state.State.Candles == null)
{
_state.State.Candles = new List<Candle>();
}
// Load the last 500 candles from the database
var endDate = DateTime.UtcNow;
var startDate = CandleHelpers.GetBotPreloadSinceFromTimeframe(timeframe);
var candles =
await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
if (candles?.Any() == true)
{
_state.State.Candles = candles
.OrderBy(c => c.Date)
.TakeLast(MaxCandleCount)
.ToList();
await _state.WriteStateAsync();
_logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
_state.State.Candles.Count, exchange, ticker, timeframe);
}
else
{
_state.State.Candles = new List<Candle>();
await _state.WriteStateAsync();
_logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
// Initialize empty state on error
_state.State.Candles = new List<Candle>();
await _state.WriteStateAsync();
}
}
private async Task SubscribeToPriceStreamAsync(string streamKey)
{
try
{
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
_priceStream = streamProvider.GetStream<Candle>("Candles", streamKey);
_streamSubscription = await _priceStream.SubscribeAsync(this);
_logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey);
}
}
public Task<List<Candle>> GetLastCandle(int count = 1)
{
try
{
// Ensure state is initialized
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
{
_logger.LogDebug("No candles available for grain {GrainKey}", this.GetPrimaryKeyString());
return Task.FromResult(new List<Candle>());
}
// Validate count parameter
if (count <= 0)
{
_logger.LogWarning("Invalid count parameter {Count} for grain {GrainKey}, using default value 1",
count, this.GetPrimaryKeyString());
count = 1;
}
// Get the last X candles, ordered by date
var lastCandles = _state.State.Candles
.OrderBy(c => c.Date)
.TakeLast(count)
.ToList();
_logger.LogDebug("Retrieved {Count} latest candles for grain {GrainKey}",
lastCandles.Count, this.GetPrimaryKeyString());
return Task.FromResult(lastCandles);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error retrieving last {Count} candles for grain {GrainKey}",
count, this.GetPrimaryKeyString());
return Task.FromResult(new List<Candle>());
}
}
}
/// <summary>
/// State object for CandleStoreGrain containing the rolling window of candles
/// </summary>
[GenerateSerializer]
public class CandleStoreGrainState
{
[Id(0)] public List<Candle> Candles { get; set; } = new();
}