From bac93199c0d409b4dfbd1a268df1f3d68b6bc770 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Sun, 14 Sep 2025 15:49:49 +0700 Subject: [PATCH] Fix grain price fetcher --- src/Managing.Api/Program.cs | 2 +- src/Managing.Api/appsettings.Oda.json | 4 +- ...MinGrain.cs => IPriceFetcher15MinGrain.cs} | 2 +- .../Repositories/ICandleRepository.cs | 1 + .../Bots/TradingBotBase.cs | 4 +- .../Grains/PriceFetcher5MinGrain.cs | 172 ------------------ .../Grains/PriceFetcherInitializer.cs | 18 +- src/Managing.Bootstrap/ApiBootstrap.cs | 19 +- .../Abstractions/IInfluxDbRepository.cs | 2 +- .../InfluxDb/CandleRepository.cs | 23 ++- .../InfluxDb/InfluxDbRepository.cs | 90 +++++++-- 11 files changed, 129 insertions(+), 208 deletions(-) rename src/Managing.Application.Abstractions/Grains/{IPriceFetcher5MinGrain.cs => IPriceFetcher15MinGrain.cs} (89%) delete mode 100644 src/Managing.Application/Grains/PriceFetcher5MinGrain.cs diff --git a/src/Managing.Api/Program.cs b/src/Managing.Api/Program.cs index db6e6e23..86f4cc2c 100644 --- a/src/Managing.Api/Program.cs +++ b/src/Managing.Api/Program.cs @@ -240,7 +240,7 @@ builder.Services.RegisterApiDependencies(builder.Configuration); // Orleans is always configured, but grains can be controlled builder.Host.ConfigureOrleans(builder.Configuration, builder.Environment.IsProduction()); - +builder.Services.AddHostedServices(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddOpenApiDocument(document => { diff --git a/src/Managing.Api/appsettings.Oda.json b/src/Managing.Api/appsettings.Oda.json index 2b62e644..bb51e372 100644 --- a/src/Managing.Api/appsettings.Oda.json +++ b/src/Managing.Api/appsettings.Oda.json @@ -38,10 +38,10 @@ "AllowedHosts": "*", "KAIGEN_SECRET_KEY": "KaigenXCowchain", "KAIGEN_CREDITS_ENABLED": false, - "WorkerBotManager": true, + "WorkerBotManager": false, "WorkerBalancesTracking": false, "WorkerNotifyBundleBacktest": false, - "WorkerPricesFifteenMinutes": true, + "WorkerPricesFifteenMinutes": false, "WorkerPricesOneHour": false, "WorkerPricesFourHours": false, "WorkerPricesOneDay": false, diff --git a/src/Managing.Application.Abstractions/Grains/IPriceFetcher5MinGrain.cs b/src/Managing.Application.Abstractions/Grains/IPriceFetcher15MinGrain.cs similarity index 89% rename from src/Managing.Application.Abstractions/Grains/IPriceFetcher5MinGrain.cs rename to src/Managing.Application.Abstractions/Grains/IPriceFetcher15MinGrain.cs index 04d7a693..d0aa6438 100644 --- a/src/Managing.Application.Abstractions/Grains/IPriceFetcher5MinGrain.cs +++ b/src/Managing.Application.Abstractions/Grains/IPriceFetcher15MinGrain.cs @@ -7,7 +7,7 @@ namespace Managing.Application.Abstractions.Grains; /// This stateless worker grain handles fetching 5-minute price data from external APIs /// and publishing to Orleans streams. /// -public partial interface IPriceFetcher5MinGrain : IGrainWithIntegerKey +public partial interface IPriceFetcher15MinGrain : IGrainWithIntegerKey { /// /// Fetches 5-minute price data for all supported exchange/ticker combinations diff --git a/src/Managing.Application.Abstractions/Repositories/ICandleRepository.cs b/src/Managing.Application.Abstractions/Repositories/ICandleRepository.cs index 4a0434d1..0887c9a4 100644 --- a/src/Managing.Application.Abstractions/Repositories/ICandleRepository.cs +++ b/src/Managing.Application.Abstractions/Repositories/ICandleRepository.cs @@ -25,4 +25,5 @@ public interface ICandleRepository Enums.Timeframe timeframe, DateTime start); Task InsertCandle(Candle candle); + Task InsertCandles(IEnumerable candles); } diff --git a/src/Managing.Application/Bots/TradingBotBase.cs b/src/Managing.Application/Bots/TradingBotBase.cs index 63990adc..ce37fba2 100644 --- a/src/Managing.Application/Bots/TradingBotBase.cs +++ b/src/Managing.Application/Bots/TradingBotBase.cs @@ -122,8 +122,8 @@ public class TradingBotBase : ITradingBot await ServiceScopeHelpers.WithScopedService(_scopeFactory, async exchangeService => { var candles = await exchangeService.GetCandlesInflux(Account.Exchange, Config.Ticker, PreloadSince, - Config.Timeframe, DateTime.UtcNow, 2); - LastCandle = candles.Last(); + Config.Timeframe, DateTime.UtcNow, 1); + LastCandle = candles.Single(); }); } diff --git a/src/Managing.Application/Grains/PriceFetcher5MinGrain.cs b/src/Managing.Application/Grains/PriceFetcher5MinGrain.cs deleted file mode 100644 index 77b90b7d..00000000 --- a/src/Managing.Application/Grains/PriceFetcher5MinGrain.cs +++ /dev/null @@ -1,172 +0,0 @@ -using Managing.Application.Abstractions.Grains; -using Managing.Application.Abstractions.Repositories; -using Managing.Application.Abstractions.Services; -using Managing.Common; -using Managing.Domain.Accounts; -using Managing.Domain.Candles; -using Microsoft.Extensions.Logging; -using Orleans.Concurrency; -using Orleans.Streams; -using static Managing.Common.Enums; - -namespace Managing.Application.Grains; - -/// -/// StatelessWorker grain for fetching 5-minute price data from external APIs and publishing to Orleans streams. -/// This grain runs every 5 minutes and processes all exchange/ticker combinations for the 5-minute timeframe. -/// -[StatelessWorker] -public class PriceFetcher5MinGrain : Grain, IPriceFetcher5MinGrain, IRemindable -{ - private readonly ILogger _logger; - private readonly IExchangeService _exchangeService; - private readonly ICandleRepository _candleRepository; - private readonly IGrainFactory _grainFactory; - - private const string FetchPricesReminderName = "FetchPricesReminder"; - - // Predefined lists of trading parameters to fetch - private static readonly TradingExchanges[] SupportedExchanges = - { - TradingExchanges.GmxV2 - }; - - private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers; - - private static readonly Timeframe TargetTimeframe = Timeframe.FiveMinutes; - - public PriceFetcher5MinGrain( - ILogger logger, - IExchangeService exchangeService, - ICandleRepository candleRepository, - IGrainFactory grainFactory) - { - _logger = logger; - _exchangeService = exchangeService; - _candleRepository = candleRepository; - _grainFactory = grainFactory; - } - - public override async Task OnActivateAsync(CancellationToken cancellationToken) - { - _logger.LogInformation("PriceFetcher5MinGrain activated"); - - // Register a reminder to fetch prices every 5 minutes - await this.RegisterOrUpdateReminder( - FetchPricesReminderName, - TimeSpan.FromMinutes(5), - TimeSpan.FromMinutes(5)); - - await base.OnActivateAsync(cancellationToken); - } - - public async Task FetchAndPublishPricesAsync() - { - try - { - _logger.LogInformation("Starting 5-minute price fetch cycle"); - - var fetchTasks = new List(); - - // Create fetch tasks for all exchange/ticker combinations for 5-minute timeframe - foreach (var exchange in SupportedExchanges) - { - foreach (var ticker in SupportedTickers) - { - fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe)); - } - } - - // Execute all fetch operations in parallel - await Task.WhenAll(fetchTasks); - - _logger.LogInformation("Completed 5-minute price fetch cycle for {TotalCombinations} combinations", - fetchTasks.Count); - - return true; - } - catch (Exception ex) - { - _logger.LogError(ex, "Error during price fetch cycle"); - return false; - } - } - - private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe) - { - try - { - // Create a dummy account for API calls (this may need to be adjusted based on your implementation) - var account = new Account - { - Name = "PriceFetcher", - Exchange = exchange, - Type = AccountType.Watch - }; - - // Get the last candle date from database - var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe, - DateTime.UtcNow.AddDays(-7), 1); - - var startDate = existingCandles.Any() - ? existingCandles.Max(c => c.Date).AddMinutes(GetTimeframeMinutes(timeframe)) - : DateTime.UtcNow.AddDays(-1); - - // Fetch new candles from external API - var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, true); - - if (newCandles?.Any() == true) - { - var streamProvider = this.GetStreamProvider("DefaultStreamProvider"); - var streamKey = $"{exchange}-{ticker}-{timeframe}"; - var stream = streamProvider.GetStream(streamKey); - - _logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}", - newCandles.Count, streamKey); - - // Process each new candle - foreach (var candle in newCandles.OrderBy(c => c.Date)) - { - // Ensure candle has correct metadata - candle.Exchange = exchange; - candle.Ticker = ticker; - candle.Timeframe = timeframe; - - // Save to database - await _candleRepository.InsertCandle(candle); - - // Publish to stream - await stream.OnNextAsync(candle); - - _logger.LogTrace("Published candle for {StreamKey} at {Date}", - streamKey, candle.Date); - } - } - } - catch (Exception ex) - { - _logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}", - exchange, ticker, timeframe); - } - } - - private static int GetTimeframeMinutes(Timeframe timeframe) => timeframe switch - { - Timeframe.OneMinute => 1, - Timeframe.FiveMinutes => 5, - Timeframe.FifteenMinutes => 15, - Timeframe.ThirtyMinutes => 30, - Timeframe.OneHour => 60, - Timeframe.FourHour => 240, - Timeframe.OneDay => 1440, - _ => 1 - }; - - public async Task ReceiveReminder(string reminderName, TickStatus status) - { - if (reminderName == FetchPricesReminderName) - { - await FetchAndPublishPricesAsync(); - } - } -} \ No newline at end of file diff --git a/src/Managing.Application/Grains/PriceFetcherInitializer.cs b/src/Managing.Application/Grains/PriceFetcherInitializer.cs index 18cdf812..36ec43af 100644 --- a/src/Managing.Application/Grains/PriceFetcherInitializer.cs +++ b/src/Managing.Application/Grains/PriceFetcherInitializer.cs @@ -5,17 +5,23 @@ namespace Managing.Application.Grains; public class PriceFetcherInitializer : IHostedService { - private readonly IClusterClient _clusterClient; + private readonly IGrainFactory _grainFactory; - public PriceFetcherInitializer(IClusterClient clusterClient) + public PriceFetcherInitializer(IClusterClient grainFactory) { - _clusterClient = clusterClient; + _grainFactory = grainFactory; } - public Task StartAsync(CancellationToken cancellationToken) + public async Task StartAsync(CancellationToken cancellationToken) { - _clusterClient.GetGrain(0); - return Task.CompletedTask; + var fiveMinute = _grainFactory.GetGrain(0); + Console.WriteLine("GrainId : {0}", fiveMinute.GetGrainId()); + + // Actually call a method on the grain to activate it + // This will trigger OnActivateAsync and register the reminder + await fiveMinute.FetchAndPublishPricesAsync(); + + Console.WriteLine("PriceFetcher5MinGrain activated and initial fetch completed"); } public Task StopAsync(CancellationToken cancellationToken) diff --git a/src/Managing.Bootstrap/ApiBootstrap.cs b/src/Managing.Bootstrap/ApiBootstrap.cs index 4463bb33..c384f87d 100644 --- a/src/Managing.Bootstrap/ApiBootstrap.cs +++ b/src/Managing.Bootstrap/ApiBootstrap.cs @@ -10,6 +10,7 @@ using Managing.Application.Abstractions.Services; using Managing.Application.Accounts; using Managing.Application.Agents; using Managing.Application.Backtests; +using Managing.Application.Grains; using Managing.Application.ManageBot; using Managing.Application.ManageBot.Commands; using Managing.Application.MoneyManagements; @@ -66,13 +67,12 @@ public static class ApiBootstrap .AddWorkers(configuration) .AddFluentValidation() .AddMediatR() - .AddHostedServices() ; } - private static IServiceCollection AddHostedServices(this IServiceCollection services) + public static IServiceCollection AddHostedServices(this IServiceCollection services) { - // services.AddHostedService(); + services.AddHostedService(); return services; } @@ -333,7 +333,10 @@ public static class ApiBootstrap services.AddTransient(); services.AddTransient(); services.AddTransient(); - services.AddTransient(); + // Use Singleton for InfluxDB repositories to prevent connection disposal issues in Orleans grains + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); }); }) ; @@ -416,10 +419,10 @@ public static class ApiBootstrap services.AddTransient(); services.AddTransient(); - // InfluxDb Repositories - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); + // InfluxDb Repositories - Use Singleton for proper connection management in Orleans grains + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); // Cache services.AddDistributedMemoryCache(); diff --git a/src/Managing.Infrastructure.Database/InfluxDb/Abstractions/IInfluxDbRepository.cs b/src/Managing.Infrastructure.Database/InfluxDb/Abstractions/IInfluxDbRepository.cs index 7f2ccef3..02c9eb07 100644 --- a/src/Managing.Infrastructure.Database/InfluxDb/Abstractions/IInfluxDbRepository.cs +++ b/src/Managing.Infrastructure.Database/InfluxDb/Abstractions/IInfluxDbRepository.cs @@ -2,7 +2,7 @@ namespace Managing.Infrastructure.Databases.InfluxDb.Abstractions; -public interface IInfluxDbRepository +public interface IInfluxDbRepository : IDisposable { string Organization { get; } diff --git a/src/Managing.Infrastructure.Database/InfluxDb/CandleRepository.cs b/src/Managing.Infrastructure.Database/InfluxDb/CandleRepository.cs index b6743a15..4baea514 100644 --- a/src/Managing.Infrastructure.Database/InfluxDb/CandleRepository.cs +++ b/src/Managing.Infrastructure.Database/InfluxDb/CandleRepository.cs @@ -111,9 +111,9 @@ public class CandleRepository : ICandleRepository return results; } - public async Task InsertCandle(Candle candle) + public Task InsertCandle(Candle candle) { - await _influxDbRepository.WriteAsync(write => + _influxDbRepository.Write(write => { PriceDto price = PriceHelpers.Map(candle); write.WriteMeasurement( @@ -121,8 +121,25 @@ public class CandleRepository : ICandleRepository WritePrecision.Ns, _priceBucket, _influxDbRepository.Organization); - return Task.CompletedTask; }); + return Task.CompletedTask; + } + + public Task InsertCandles(IEnumerable candles) + { + _influxDbRepository.Write(write => + { + foreach (var candle in candles) + { + PriceDto price = PriceHelpers.Map(candle); + write.WriteMeasurement( + price, + WritePrecision.Ns, + _priceBucket, + _influxDbRepository.Organization); + } + }); + return Task.CompletedTask; } public void Test(Candle candle) diff --git a/src/Managing.Infrastructure.Database/InfluxDb/InfluxDbRepository.cs b/src/Managing.Infrastructure.Database/InfluxDb/InfluxDbRepository.cs index 71139405..12194753 100644 --- a/src/Managing.Infrastructure.Database/InfluxDb/InfluxDbRepository.cs +++ b/src/Managing.Infrastructure.Database/InfluxDb/InfluxDbRepository.cs @@ -3,10 +3,17 @@ using Managing.Infrastructure.Databases.InfluxDb.Abstractions; namespace Managing.Infrastructure.Databases.InfluxDb; -public class InfluxDbRepository : IInfluxDbRepository +public class InfluxDbRepository : IInfluxDbRepository, IDisposable { private readonly string _token; private readonly string _url; + private readonly InfluxDBClient _client; + private readonly WriteApi _writeApi; + private readonly QueryApi _queryApi; + private readonly SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _querySemaphore = new SemaphoreSlim(1, 1); + private bool _disposed = false; + public string Organization { get; set; } public InfluxDbRepository(IInfluxDbSettings settings) @@ -14,26 +21,85 @@ public class InfluxDbRepository : IInfluxDbRepository _token = settings.Token; _url = settings.Url; Organization = settings.Organization; + + // Create a single client instance that will be reused + _client = new InfluxDBClient(_url, _token); + _writeApi = _client.GetWriteApi(); + _queryApi = _client.GetQueryApi(); } public void Write(Action action) { - using var client = InfluxDBClientFactory.Create(_url, _token); - using var write = client.GetWriteApi(); - action(write); + if (_disposed) throw new ObjectDisposedException(nameof(InfluxDbRepository)); + + _writeSemaphore.Wait(); + try + { + action(_writeApi); + } + finally + { + _writeSemaphore.Release(); + } } - public Task WriteAsync(Func action) + public async Task WriteAsync(Func action) { - // Get write API asynchronously - using var client = new InfluxDBClient(_url, _token); - using var write = client.GetWriteApi(); - return action(write); + if (_disposed) throw new ObjectDisposedException(nameof(InfluxDbRepository)); + + await _writeSemaphore.WaitAsync(); + try + { + await action(_writeApi); + } + finally + { + _writeSemaphore.Release(); + } } + public async Task QueryAsync(Func> action) { - using var client = InfluxDBClientFactory.Create(_url, _token); - var query = client.GetQueryApi(); - return await action(query); + if (_disposed) throw new ObjectDisposedException(nameof(InfluxDbRepository)); + + await _querySemaphore.WaitAsync(); + try + { + return await action(_queryApi); + } + finally + { + _querySemaphore.Release(); + } + } + + public void Dispose() + { + if (!_disposed) + { + try + { + // Give the WriteApi time to flush any pending writes + _writeApi?.Dispose(); + } + catch (Exception) + { + // Ignore disposal errors + } + + try + { + _client?.Dispose(); + } + catch (Exception) + { + // Ignore disposal errors + } + + _writeSemaphore?.Dispose(); + _querySemaphore?.Dispose(); + + _disposed = true; + } } } \ No newline at end of file