diff --git a/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs b/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs
index b252eb20..b417a6a0 100644
--- a/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs
+++ b/src/Managing.Application.Abstractions/Grains/ICandleStoreGrain.cs
@@ -15,5 +15,6 @@ public interface ICandleStoreGrain : IGrainWithStringKey
///
/// List of candles ordered by date
Task> GetCandlesAsync();
+ Task GetLastCandle();
}
diff --git a/src/Managing.Application/Bots/TradingBotBase.cs b/src/Managing.Application/Bots/TradingBotBase.cs
index ce37fba2..ccdc8f30 100644
--- a/src/Managing.Application/Bots/TradingBotBase.cs
+++ b/src/Managing.Application/Bots/TradingBotBase.cs
@@ -51,7 +51,7 @@ public class TradingBotBase : ITradingBot
Signals = new Dictionary();
Positions = new Dictionary();
WalletBalances = new Dictionary();
- PreloadSince = CandleExtensions.GetBotPreloadSinceFromTimeframe(config.Timeframe);
+ PreloadSince = CandleHelpers.GetBotPreloadSinceFromTimeframe(config.Timeframe);
}
public async Task Start(BotStatus previousStatus)
@@ -119,11 +119,11 @@ public class TradingBotBase : ITradingBot
public async Task LoadLastCandle()
{
- await ServiceScopeHelpers.WithScopedService(_scopeFactory, async exchangeService =>
+ await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory =>
{
- var candles = await exchangeService.GetCandlesInflux(Account.Exchange, Config.Ticker, PreloadSince,
- Config.Timeframe, DateTime.UtcNow, 1);
- LastCandle = candles.Single();
+ var grainKey = CandleHelpers.GetCandleStoreGrainKey(Account.Exchange, Config.Ticker, Config.Timeframe);
+ var grain = grainFactory.GetGrain(grainKey);
+ LastCandle = await grain.GetLastCandle();
});
}
@@ -1829,7 +1829,7 @@ public class TradingBotBase : ITradingBot
}
// Calculate cooldown end time based on last position closing time
- var baseIntervalSeconds = CandleExtensions.GetBaseIntervalInSeconds(Config.Timeframe);
+ var baseIntervalSeconds = CandleHelpers.GetBaseIntervalInSeconds(Config.Timeframe);
var cooldownEndTime = LastPositionClosingTime.Value.AddSeconds(baseIntervalSeconds * Config.CooldownPeriod);
var isInCooldown = LastCandle.Date < cooldownEndTime;
diff --git a/src/Managing.Application/Grains/CandleStoreGrain.cs b/src/Managing.Application/Grains/CandleStoreGrain.cs
index 96988477..17c6f4cd 100644
--- a/src/Managing.Application/Grains/CandleStoreGrain.cs
+++ b/src/Managing.Application/Grains/CandleStoreGrain.cs
@@ -16,7 +16,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
private readonly IPersistentState _state;
private readonly ILogger _logger;
private readonly ICandleRepository _candleRepository;
-
+
private const int MaxCandleCount = 500;
private IAsyncStream _priceStream;
private StreamSubscriptionHandle _streamSubscription;
@@ -38,20 +38,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
// Parse the grain key to extract exchange, ticker, and timeframe
- var parts = grainKey.Split('-');
- if (parts.Length != 3)
- {
- _logger.LogError("Invalid grain key format: {GrainKey}. Expected format: Exchange-Ticker-Timeframe", grainKey);
- return;
- }
-
- if (!Enum.TryParse(parts[0], out var exchange) ||
- !Enum.TryParse(parts[1], out var ticker) ||
- !Enum.TryParse(parts[2], out var timeframe))
- {
- _logger.LogError("Failed to parse grain key components: {GrainKey}", grainKey);
- return;
- }
+ var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey);
// Initialize state if empty
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
@@ -95,7 +82,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
{
try
{
- _logger.LogDebug("Received new candle for {GrainKey} at {Date}",
+ _logger.LogDebug("Received new candle for {GrainKey} at {Date}",
this.GetPrimaryKeyString(), candle.Date);
// Initialize state if needed
@@ -120,7 +107,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
// Persist the updated state
await _state.WriteStateAsync();
- _logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
+ _logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
this.GetPrimaryKeyString(), _state.State.Candles.Count);
}
catch (Exception ex)
@@ -145,14 +132,17 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
{
try
{
- _logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
+ _logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
// Load the last 500 candles from the database
var endDate = DateTime.UtcNow;
- var startDate = endDate.AddDays(-30); // Look back 30 days to ensure we get enough data
+ var startDate =
+ CandleHelpers
+ .GetBotPreloadSinceFromTimeframe(timeframe); // Look back 30 days to ensure we get enough data
- var candles = await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
+ var candles =
+ await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
if (candles?.Any() == true)
{
@@ -163,23 +153,23 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
await _state.WriteStateAsync();
- _logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
+ _logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
_state.State.Candles.Count, exchange, ticker, timeframe);
}
else
{
_state.State.Candles = new List();
await _state.WriteStateAsync();
-
- _logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
+
+ _logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
}
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
+ _logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
-
+
// Initialize empty state on error
_state.State.Candles = new List();
await _state.WriteStateAsync();
@@ -192,9 +182,9 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
{
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
_priceStream = streamProvider.GetStream(streamKey);
-
+
_streamSubscription = await _priceStream.SubscribeAsync(this);
-
+
_logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey);
}
catch (Exception ex)
@@ -202,6 +192,11 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
_logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey);
}
}
+
+ public Task GetLastCandle()
+ {
+ return Task.FromResult(_state.State.Candles?.LastOrDefault());
+ }
}
///
@@ -210,6 +205,5 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver
[GenerateSerializer]
public class CandleStoreGrainState
{
- [Id(0)]
- public List Candles { get; set; } = new();
+ [Id(0)] public List Candles { get; set; } = new();
}
\ No newline at end of file
diff --git a/src/Managing.Application/Grains/PriceFetcher15MinGrain.cs b/src/Managing.Application/Grains/PriceFetcher15MinGrain.cs
new file mode 100644
index 00000000..52c321d0
--- /dev/null
+++ b/src/Managing.Application/Grains/PriceFetcher15MinGrain.cs
@@ -0,0 +1,187 @@
+using Managing.Application.Abstractions.Grains;
+using Managing.Application.Abstractions.Repositories;
+using Managing.Application.Abstractions.Services;
+using Managing.Common;
+using Managing.Domain.Accounts;
+using Managing.Domain.Candles;
+using Microsoft.Extensions.Logging;
+using Orleans.Concurrency;
+using Orleans.Streams;
+using static Managing.Common.Enums;
+
+namespace Managing.Application.Grains;
+
+///
+/// StatelessWorker grain for fetching 5-minute price data from external APIs and publishing to Orleans streams.
+/// This grain runs every 5 minutes and processes all exchange/ticker combinations for the 5-minute timeframe.
+///
+[StatelessWorker]
+public class PriceFetcher15MinGrain : Grain, IPriceFetcher15MinGrain, IRemindable
+{
+ private readonly ILogger _logger;
+ private readonly IExchangeService _exchangeService;
+ private readonly ICandleRepository _candleRepository;
+ private readonly IGrainFactory _grainFactory;
+
+ private const string FetchPricesReminderName = "Fetch15minPricesReminder";
+
+ // Predefined lists of trading parameters to fetch
+ private static readonly TradingExchanges[] SupportedExchanges =
+ {
+ TradingExchanges.Evm
+ };
+
+ private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers;
+
+ private static readonly Timeframe TargetTimeframe = Timeframe.FifteenMinutes;
+
+ public PriceFetcher15MinGrain(
+ ILogger logger,
+ IExchangeService exchangeService,
+ ICandleRepository candleRepository,
+ IGrainFactory grainFactory)
+ {
+ _logger = logger;
+ _exchangeService = exchangeService;
+ _candleRepository = candleRepository;
+ _grainFactory = grainFactory;
+ }
+
+ public override async Task OnActivateAsync(CancellationToken cancellationToken)
+ {
+ _logger.LogInformation("{0} activated", nameof(PriceFetcher15MinGrain));
+
+ // Register a reminder to fetch prices every 5 minutes
+ await this.RegisterOrUpdateReminder(
+ FetchPricesReminderName,
+ TimeSpan.FromMinutes(5),
+ TimeSpan.FromMinutes(5));
+
+ await base.OnActivateAsync(cancellationToken);
+ }
+
+ public async Task FetchAndPublishPricesAsync()
+ {
+ try
+ {
+ _logger.LogInformation("Starting {timeframe} price fetch cycle", TargetTimeframe);
+
+ var fetchTasks = new List();
+
+ // Create fetch tasks for all exchange/ticker combinations for 15-minute timeframe
+ foreach (var exchange in SupportedExchanges)
+ {
+ foreach (var ticker in SupportedTickers)
+ {
+ fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe));
+ }
+ }
+
+ // Execute all fetch operations in parallel
+ await Task.WhenAll(fetchTasks);
+
+ _logger.LogInformation("{0} - Completed {1} price fetch cycle for {2} combinations",
+ nameof(PriceFetcher15MinGrain), TargetTimeframe, fetchTasks.Count);
+
+ return true;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error during price fetch cycle");
+ return false;
+ }
+ }
+
+ private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
+ {
+ try
+ {
+ // Create a dummy account for API calls (this may need to be adjusted based on your implementation)
+ var account = new Account
+ {
+ Name = "PriceFetcher",
+ Exchange = exchange,
+ Type = AccountType.Watch
+ };
+
+ // Get the last candle date from database
+ var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe,
+ DateTime.UtcNow.AddDays(-30), 1);
+
+ var isFirstCall = !existingCandles.Any();
+
+ var startDate = !isFirstCall
+ ? existingCandles.Max(c => c.Date).AddMinutes(GetTimeframeMinutes(timeframe))
+ : new DateTime(2017, 1, 1);
+
+ // Fetch new candles from external API
+ var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, isFirstCall);
+
+ if (newCandles?.Any() == true)
+ {
+ var streamKey = CandleHelpers.GetCandleStoreGrainKey(exchange, ticker, timeframe);
+
+ _logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}",
+ newCandles.Count, streamKey);
+
+ // Process all new candles
+ var processedCandles = newCandles.OrderBy(c => c.Date).ToList();
+
+ foreach (var candle in processedCandles)
+ {
+ // Ensure candle has correct metadata
+ candle.Exchange = exchange;
+ candle.Ticker = ticker;
+ candle.Timeframe = timeframe;
+ }
+
+ // Save all candles to database in a single batch
+ if (processedCandles.Any())
+ {
+ await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false);
+
+ _logger.LogDebug(
+ "[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}",
+ ticker, exchange, timeframe,
+ processedCandles.Count, streamKey);
+ }
+
+ var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
+ var stream = streamProvider.GetStream(streamKey);
+
+ // Publish to stream (if needed)
+ foreach (var candle in processedCandles)
+ {
+ await stream.OnNextAsync(candle);
+ _logger.LogTrace("Published candle for {StreamKey} at {Date}",
+ streamKey, candle.Date);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}",
+ exchange, ticker, timeframe);
+ }
+ }
+
+ private static int GetTimeframeMinutes(Timeframe timeframe) => timeframe switch
+ {
+ Timeframe.OneMinute => 1,
+ Timeframe.FiveMinutes => 5,
+ Timeframe.FifteenMinutes => 15,
+ Timeframe.ThirtyMinutes => 30,
+ Timeframe.OneHour => 60,
+ Timeframe.FourHour => 240,
+ Timeframe.OneDay => 1440,
+ _ => 1
+ };
+
+ public async Task ReceiveReminder(string reminderName, TickStatus status)
+ {
+ if (reminderName == FetchPricesReminderName)
+ {
+ await FetchAndPublishPricesAsync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Managing.Domain/Candles/CandleExtensions.cs b/src/Managing.Domain/Candles/CandleHelpers.cs
similarity index 83%
rename from src/Managing.Domain/Candles/CandleExtensions.cs
rename to src/Managing.Domain/Candles/CandleHelpers.cs
index 25f1af56..b27e0de4 100644
--- a/src/Managing.Domain/Candles/CandleExtensions.cs
+++ b/src/Managing.Domain/Candles/CandleHelpers.cs
@@ -1,16 +1,10 @@
-using static Managing.Common.Enums;
+using Managing.Core;
+using static Managing.Common.Enums;
namespace Managing.Domain.Candles;
-public static class CandleExtensions
+public static class CandleHelpers
{
-
- public static Candle SetupClosingCandle(this Candle candle)
- {
-
- return candle;
- }
-
public static DateTime GetBotPreloadSinceFromTimeframe(Timeframe timeframe)
{
return timeframe switch
@@ -96,4 +90,15 @@ public static class CandleExtensions
_ => throw new NotImplementedException()
};
}
-}
+
+ public static string GetCandleStoreGrainKey(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
+ {
+ return string.Join("-", exchange, ticker, timeframe);
+ }
+
+ public static (TradingExchanges exchange, Ticker ticker, Timeframe timeframe) ParseCandleStoreGrainKey(string grainKey)
+ {
+ var components = grainKey.Split('-');
+ return (MiscExtensions.ParseEnum(components[0]), MiscExtensions.ParseEnum(components[1]), MiscExtensions.ParseEnum(components[2]));
+ }
+}
\ No newline at end of file