From caa0d9e1a60d7aa9efa62bdb4c110fd4e697f5b2 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Sun, 14 Sep 2025 16:21:48 +0700 Subject: [PATCH] plug candle store and bot --- .../Grains/ICandleStoreGrain.cs | 1 + .../Bots/TradingBotBase.cs | 12 +- .../Grains/CandleStoreGrain.cs | 52 +++-- .../Grains/PriceFetcher15MinGrain.cs | 187 ++++++++++++++++++ .../{CandleExtensions.cs => CandleHelpers.cs} | 25 ++- 5 files changed, 232 insertions(+), 45 deletions(-) create mode 100644 src/Managing.Application/Grains/PriceFetcher15MinGrain.cs rename src/Managing.Domain/Candles/{CandleExtensions.cs => CandleHelpers.cs} (83%) diff --git a/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs b/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs index b252eb20..b417a6a0 100644 --- a/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs +++ b/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs @@ -15,5 +15,6 @@ public interface ICandleStoreGrain : IGrainWithStringKey /// /// List of candles ordered by date Task> GetCandlesAsync(); + Task GetLastCandle(); } diff --git a/src/Managing.Application/Bots/TradingBotBase.cs b/src/Managing.Application/Bots/TradingBotBase.cs index ce37fba2..ccdc8f30 100644 --- a/src/Managing.Application/Bots/TradingBotBase.cs +++ b/src/Managing.Application/Bots/TradingBotBase.cs @@ -51,7 +51,7 @@ public class TradingBotBase : ITradingBot Signals = new Dictionary(); Positions = new Dictionary(); WalletBalances = new Dictionary(); - PreloadSince = CandleExtensions.GetBotPreloadSinceFromTimeframe(config.Timeframe); + PreloadSince = CandleHelpers.GetBotPreloadSinceFromTimeframe(config.Timeframe); } public async Task Start(BotStatus previousStatus) @@ -119,11 +119,11 @@ public class TradingBotBase : ITradingBot public async Task LoadLastCandle() { - await ServiceScopeHelpers.WithScopedService(_scopeFactory, async exchangeService => + await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory => { - var candles = await exchangeService.GetCandlesInflux(Account.Exchange, Config.Ticker, PreloadSince, - Config.Timeframe, DateTime.UtcNow, 1); - LastCandle = candles.Single(); + var grainKey = CandleHelpers.GetCandleStoreGrainKey(Account.Exchange, Config.Ticker, Config.Timeframe); + var grain = grainFactory.GetGrain(grainKey); + LastCandle = await grain.GetLastCandle(); }); } @@ -1829,7 +1829,7 @@ public class TradingBotBase : ITradingBot } // Calculate cooldown end time based on last position closing time - var baseIntervalSeconds = CandleExtensions.GetBaseIntervalInSeconds(Config.Timeframe); + var baseIntervalSeconds = CandleHelpers.GetBaseIntervalInSeconds(Config.Timeframe); var cooldownEndTime = LastPositionClosingTime.Value.AddSeconds(baseIntervalSeconds * Config.CooldownPeriod); var isInCooldown = LastCandle.Date < cooldownEndTime; diff --git a/src/Managing.Application/Grains/CandleStoreGrain.cs b/src/Managing.Application/Grains/CandleStoreGrain.cs index 96988477..17c6f4cd 100644 --- a/src/Managing.Application/Grains/CandleStoreGrain.cs +++ b/src/Managing.Application/Grains/CandleStoreGrain.cs @@ -16,7 +16,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver private readonly IPersistentState _state; private readonly ILogger _logger; private readonly ICandleRepository _candleRepository; - + private const int MaxCandleCount = 500; private IAsyncStream _priceStream; private StreamSubscriptionHandle _streamSubscription; @@ -38,20 +38,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver _logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey); // Parse the grain key to extract exchange, ticker, and timeframe - var parts = grainKey.Split('-'); - if (parts.Length != 3) - { - _logger.LogError("Invalid grain key format: {GrainKey}. Expected format: Exchange-Ticker-Timeframe", grainKey); - return; - } - - if (!Enum.TryParse(parts[0], out var exchange) || - !Enum.TryParse(parts[1], out var ticker) || - !Enum.TryParse(parts[2], out var timeframe)) - { - _logger.LogError("Failed to parse grain key components: {GrainKey}", grainKey); - return; - } + var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey); // Initialize state if empty if (_state.State.Candles == null || _state.State.Candles.Count == 0) @@ -95,7 +82,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver { try { - _logger.LogDebug("Received new candle for {GrainKey} at {Date}", + _logger.LogDebug("Received new candle for {GrainKey} at {Date}", this.GetPrimaryKeyString(), candle.Date); // Initialize state if needed @@ -120,7 +107,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver // Persist the updated state await _state.WriteStateAsync(); - _logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}", + _logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}", this.GetPrimaryKeyString(), _state.State.Candles.Count); } catch (Exception ex) @@ -145,14 +132,17 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver { try { - _logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}", + _logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}", exchange, ticker, timeframe); // Load the last 500 candles from the database var endDate = DateTime.UtcNow; - var startDate = endDate.AddDays(-30); // Look back 30 days to ensure we get enough data + var startDate = + CandleHelpers + .GetBotPreloadSinceFromTimeframe(timeframe); // Look back 30 days to ensure we get enough data - var candles = await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount); + var candles = + await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount); if (candles?.Any() == true) { @@ -163,23 +153,23 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver await _state.WriteStateAsync(); - _logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}", + _logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}", _state.State.Candles.Count, exchange, ticker, timeframe); } else { _state.State.Candles = new List(); await _state.WriteStateAsync(); - - _logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}", + + _logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}", exchange, ticker, timeframe); } } catch (Exception ex) { - _logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}", + _logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}", exchange, ticker, timeframe); - + // Initialize empty state on error _state.State.Candles = new List(); await _state.WriteStateAsync(); @@ -192,9 +182,9 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver { var streamProvider = this.GetStreamProvider("DefaultStreamProvider"); _priceStream = streamProvider.GetStream(streamKey); - + _streamSubscription = await _priceStream.SubscribeAsync(this); - + _logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey); } catch (Exception ex) @@ -202,6 +192,11 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver _logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey); } } + + public Task GetLastCandle() + { + return Task.FromResult(_state.State.Candles?.LastOrDefault()); + } } /// @@ -210,6 +205,5 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver [GenerateSerializer] public class CandleStoreGrainState { - [Id(0)] - public List Candles { get; set; } = new(); + [Id(0)] public List Candles { get; set; } = new(); } \ No newline at end of file diff --git a/src/Managing.Application/Grains/PriceFetcher15MinGrain.cs b/src/Managing.Application/Grains/PriceFetcher15MinGrain.cs new file mode 100644 index 00000000..52c321d0 --- /dev/null +++ b/src/Managing.Application/Grains/PriceFetcher15MinGrain.cs @@ -0,0 +1,187 @@ +using Managing.Application.Abstractions.Grains; +using Managing.Application.Abstractions.Repositories; +using Managing.Application.Abstractions.Services; +using Managing.Common; +using Managing.Domain.Accounts; +using Managing.Domain.Candles; +using Microsoft.Extensions.Logging; +using Orleans.Concurrency; +using Orleans.Streams; +using static Managing.Common.Enums; + +namespace Managing.Application.Grains; + +/// +/// StatelessWorker grain for fetching 5-minute price data from external APIs and publishing to Orleans streams. +/// This grain runs every 5 minutes and processes all exchange/ticker combinations for the 5-minute timeframe. +/// +[StatelessWorker] +public class PriceFetcher15MinGrain : Grain, IPriceFetcher15MinGrain, IRemindable +{ + private readonly ILogger _logger; + private readonly IExchangeService _exchangeService; + private readonly ICandleRepository _candleRepository; + private readonly IGrainFactory _grainFactory; + + private const string FetchPricesReminderName = "Fetch15minPricesReminder"; + + // Predefined lists of trading parameters to fetch + private static readonly TradingExchanges[] SupportedExchanges = + { + TradingExchanges.Evm + }; + + private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers; + + private static readonly Timeframe TargetTimeframe = Timeframe.FifteenMinutes; + + public PriceFetcher15MinGrain( + ILogger logger, + IExchangeService exchangeService, + ICandleRepository candleRepository, + IGrainFactory grainFactory) + { + _logger = logger; + _exchangeService = exchangeService; + _candleRepository = candleRepository; + _grainFactory = grainFactory; + } + + public override async Task OnActivateAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("{0} activated", nameof(PriceFetcher15MinGrain)); + + // Register a reminder to fetch prices every 5 minutes + await this.RegisterOrUpdateReminder( + FetchPricesReminderName, + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(5)); + + await base.OnActivateAsync(cancellationToken); + } + + public async Task FetchAndPublishPricesAsync() + { + try + { + _logger.LogInformation("Starting {timeframe} price fetch cycle", TargetTimeframe); + + var fetchTasks = new List(); + + // Create fetch tasks for all exchange/ticker combinations for 15-minute timeframe + foreach (var exchange in SupportedExchanges) + { + foreach (var ticker in SupportedTickers) + { + fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe)); + } + } + + // Execute all fetch operations in parallel + await Task.WhenAll(fetchTasks); + + _logger.LogInformation("{0} - Completed {1} price fetch cycle for {2} combinations", + nameof(PriceFetcher15MinGrain), TargetTimeframe, fetchTasks.Count); + + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during price fetch cycle"); + return false; + } + } + + private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe) + { + try + { + // Create a dummy account for API calls (this may need to be adjusted based on your implementation) + var account = new Account + { + Name = "PriceFetcher", + Exchange = exchange, + Type = AccountType.Watch + }; + + // Get the last candle date from database + var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe, + DateTime.UtcNow.AddDays(-30), 1); + + var isFirstCall = !existingCandles.Any(); + + var startDate = !isFirstCall + ? existingCandles.Max(c => c.Date).AddMinutes(GetTimeframeMinutes(timeframe)) + : new DateTime(2017, 1, 1); + + // Fetch new candles from external API + var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, isFirstCall); + + if (newCandles?.Any() == true) + { + var streamKey = CandleHelpers.GetCandleStoreGrainKey(exchange, ticker, timeframe); + + _logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}", + newCandles.Count, streamKey); + + // Process all new candles + var processedCandles = newCandles.OrderBy(c => c.Date).ToList(); + + foreach (var candle in processedCandles) + { + // Ensure candle has correct metadata + candle.Exchange = exchange; + candle.Ticker = ticker; + candle.Timeframe = timeframe; + } + + // Save all candles to database in a single batch + if (processedCandles.Any()) + { + await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false); + + _logger.LogDebug( + "[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}", + ticker, exchange, timeframe, + processedCandles.Count, streamKey); + } + + var streamProvider = this.GetStreamProvider("DefaultStreamProvider"); + var stream = streamProvider.GetStream(streamKey); + + // Publish to stream (if needed) + foreach (var candle in processedCandles) + { + await stream.OnNextAsync(candle); + _logger.LogTrace("Published candle for {StreamKey} at {Date}", + streamKey, candle.Date); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}", + exchange, ticker, timeframe); + } + } + + private static int GetTimeframeMinutes(Timeframe timeframe) => timeframe switch + { + Timeframe.OneMinute => 1, + Timeframe.FiveMinutes => 5, + Timeframe.FifteenMinutes => 15, + Timeframe.ThirtyMinutes => 30, + Timeframe.OneHour => 60, + Timeframe.FourHour => 240, + Timeframe.OneDay => 1440, + _ => 1 + }; + + public async Task ReceiveReminder(string reminderName, TickStatus status) + { + if (reminderName == FetchPricesReminderName) + { + await FetchAndPublishPricesAsync(); + } + } +} \ No newline at end of file diff --git a/src/Managing.Domain/Candles/CandleExtensions.cs b/src/Managing.Domain/Candles/CandleHelpers.cs similarity index 83% rename from src/Managing.Domain/Candles/CandleExtensions.cs rename to src/Managing.Domain/Candles/CandleHelpers.cs index 25f1af56..b27e0de4 100644 --- a/src/Managing.Domain/Candles/CandleExtensions.cs +++ b/src/Managing.Domain/Candles/CandleHelpers.cs @@ -1,16 +1,10 @@ -using static Managing.Common.Enums; +using Managing.Core; +using static Managing.Common.Enums; namespace Managing.Domain.Candles; -public static class CandleExtensions +public static class CandleHelpers { - - public static Candle SetupClosingCandle(this Candle candle) - { - - return candle; - } - public static DateTime GetBotPreloadSinceFromTimeframe(Timeframe timeframe) { return timeframe switch @@ -96,4 +90,15 @@ public static class CandleExtensions _ => throw new NotImplementedException() }; } -} + + public static string GetCandleStoreGrainKey(TradingExchanges exchange, Ticker ticker, Timeframe timeframe) + { + return string.Join("-", exchange, ticker, timeframe); + } + + public static (TradingExchanges exchange, Ticker ticker, Timeframe timeframe) ParseCandleStoreGrainKey(string grainKey) + { + var components = grainKey.Split('-'); + return (MiscExtensions.ParseEnum(components[0]), MiscExtensions.ParseEnum(components[1]), MiscExtensions.ParseEnum(components[2])); + } +} \ No newline at end of file