Price reminder and init approval
* Start price reminder grain * Add config and init grain at startup * Save init wallet when already init
This commit is contained in:
215
src/Managing.Application/Grains/CandleStoreGrain.cs
Normal file
215
src/Managing.Application/Grains/CandleStoreGrain.cs
Normal file
@@ -0,0 +1,215 @@
|
||||
using Managing.Application.Abstractions.Grains;
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Domain.Candles;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Streams;
|
||||
using static Managing.Common.Enums;
|
||||
|
||||
namespace Managing.Application.Grains;
|
||||
|
||||
/// <summary>
|
||||
/// Grain for managing in-memory historical candle data with Orleans state persistence.
|
||||
/// Subscribes to price streams and maintains a rolling window of 500 candles.
|
||||
/// </summary>
|
||||
public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
|
||||
{
|
||||
private readonly IPersistentState<CandleStoreGrainState> _state;
|
||||
private readonly ILogger<CandleStoreGrain> _logger;
|
||||
private readonly ICandleRepository _candleRepository;
|
||||
|
||||
private const int MaxCandleCount = 500;
|
||||
private IAsyncStream<Candle> _priceStream;
|
||||
private StreamSubscriptionHandle<Candle> _streamSubscription;
|
||||
|
||||
public CandleStoreGrain(
|
||||
[PersistentState("candle-store-state", "candle-store")]
|
||||
IPersistentState<CandleStoreGrainState> state,
|
||||
ILogger<CandleStoreGrain> logger,
|
||||
ICandleRepository candleRepository)
|
||||
{
|
||||
_state = state;
|
||||
_logger = logger;
|
||||
_candleRepository = candleRepository;
|
||||
}
|
||||
|
||||
public override async Task OnActivateAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var grainKey = this.GetPrimaryKeyString();
|
||||
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
|
||||
|
||||
// Parse the grain key to extract exchange, ticker, and timeframe
|
||||
var parts = grainKey.Split('-');
|
||||
if (parts.Length != 3)
|
||||
{
|
||||
_logger.LogError("Invalid grain key format: {GrainKey}. Expected format: Exchange-Ticker-Timeframe", grainKey);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!Enum.TryParse<TradingExchanges>(parts[0], out var exchange) ||
|
||||
!Enum.TryParse<Ticker>(parts[1], out var ticker) ||
|
||||
!Enum.TryParse<Timeframe>(parts[2], out var timeframe))
|
||||
{
|
||||
_logger.LogError("Failed to parse grain key components: {GrainKey}", grainKey);
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize state if empty
|
||||
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
|
||||
{
|
||||
await LoadInitialCandlesAsync(exchange, ticker, timeframe);
|
||||
}
|
||||
|
||||
// Subscribe to the price stream
|
||||
await SubscribeToPriceStreamAsync(grainKey);
|
||||
|
||||
await base.OnActivateAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
|
||||
{
|
||||
// Unsubscribe from the stream
|
||||
if (_streamSubscription != null)
|
||||
{
|
||||
await _streamSubscription.UnsubscribeAsync();
|
||||
_streamSubscription = null;
|
||||
}
|
||||
|
||||
await base.OnDeactivateAsync(reason, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<List<Candle>> GetCandlesAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
return Task.FromResult(_state.State.Candles?.ToList() ?? new List<Candle>());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error retrieving candles for grain {GrainKey}", this.GetPrimaryKeyString());
|
||||
return Task.FromResult(new List<Candle>());
|
||||
}
|
||||
}
|
||||
|
||||
// Stream observer implementation
|
||||
public async Task OnNextAsync(Candle candle, StreamSequenceToken token = null)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogDebug("Received new candle for {GrainKey} at {Date}",
|
||||
this.GetPrimaryKeyString(), candle.Date);
|
||||
|
||||
// Initialize state if needed
|
||||
if (_state.State.Candles == null)
|
||||
{
|
||||
_state.State.Candles = new List<Candle>();
|
||||
}
|
||||
|
||||
// Add the new candle
|
||||
_state.State.Candles.Add(candle);
|
||||
|
||||
// Maintain rolling window of 500 candles
|
||||
if (_state.State.Candles.Count > MaxCandleCount)
|
||||
{
|
||||
// Sort by date and keep the most recent 500
|
||||
_state.State.Candles = _state.State.Candles
|
||||
.OrderBy(c => c.Date)
|
||||
.TakeLast(MaxCandleCount)
|
||||
.ToList();
|
||||
}
|
||||
|
||||
// Persist the updated state
|
||||
await _state.WriteStateAsync();
|
||||
|
||||
_logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
|
||||
this.GetPrimaryKeyString(), _state.State.Candles.Count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing new candle for grain {GrainKey}", this.GetPrimaryKeyString());
|
||||
}
|
||||
}
|
||||
|
||||
public Task OnCompletedAsync()
|
||||
{
|
||||
_logger.LogInformation("Stream completed for grain {GrainKey}", this.GetPrimaryKeyString());
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task OnErrorAsync(Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Stream error for grain {GrainKey}", this.GetPrimaryKeyString());
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task LoadInitialCandlesAsync(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
||||
exchange, ticker, timeframe);
|
||||
|
||||
// Load the last 500 candles from the database
|
||||
var endDate = DateTime.UtcNow;
|
||||
var startDate = endDate.AddDays(-30); // Look back 30 days to ensure we get enough data
|
||||
|
||||
var candles = await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
|
||||
|
||||
if (candles?.Any() == true)
|
||||
{
|
||||
_state.State.Candles = candles
|
||||
.OrderBy(c => c.Date)
|
||||
.TakeLast(MaxCandleCount)
|
||||
.ToList();
|
||||
|
||||
await _state.WriteStateAsync();
|
||||
|
||||
_logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
||||
_state.State.Candles.Count, exchange, ticker, timeframe);
|
||||
}
|
||||
else
|
||||
{
|
||||
_state.State.Candles = new List<Candle>();
|
||||
await _state.WriteStateAsync();
|
||||
|
||||
_logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
|
||||
exchange, ticker, timeframe);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
|
||||
exchange, ticker, timeframe);
|
||||
|
||||
// Initialize empty state on error
|
||||
_state.State.Candles = new List<Candle>();
|
||||
await _state.WriteStateAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SubscribeToPriceStreamAsync(string streamKey)
|
||||
{
|
||||
try
|
||||
{
|
||||
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
|
||||
_priceStream = streamProvider.GetStream<Candle>(streamKey);
|
||||
|
||||
_streamSubscription = await _priceStream.SubscribeAsync(this);
|
||||
|
||||
_logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// State object for CandleStoreGrain containing the rolling window of candles
|
||||
/// </summary>
|
||||
[GenerateSerializer]
|
||||
public class CandleStoreGrainState
|
||||
{
|
||||
[Id(0)]
|
||||
public List<Candle> Candles { get; set; } = new();
|
||||
}
|
||||
Reference in New Issue
Block a user