Fix grain price fetcher

This commit is contained in:
2025-09-14 15:49:49 +07:00
parent cb98e91a02
commit bac93199c0
11 changed files with 129 additions and 208 deletions

View File

@@ -240,7 +240,7 @@ builder.Services.RegisterApiDependencies(builder.Configuration);
// Orleans is always configured, but grains can be controlled // Orleans is always configured, but grains can be controlled
builder.Host.ConfigureOrleans(builder.Configuration, builder.Environment.IsProduction()); builder.Host.ConfigureOrleans(builder.Configuration, builder.Environment.IsProduction());
builder.Services.AddHostedServices();
builder.Services.AddEndpointsApiExplorer(); builder.Services.AddEndpointsApiExplorer();
builder.Services.AddOpenApiDocument(document => builder.Services.AddOpenApiDocument(document =>
{ {

View File

@@ -38,10 +38,10 @@
"AllowedHosts": "*", "AllowedHosts": "*",
"KAIGEN_SECRET_KEY": "KaigenXCowchain", "KAIGEN_SECRET_KEY": "KaigenXCowchain",
"KAIGEN_CREDITS_ENABLED": false, "KAIGEN_CREDITS_ENABLED": false,
"WorkerBotManager": true, "WorkerBotManager": false,
"WorkerBalancesTracking": false, "WorkerBalancesTracking": false,
"WorkerNotifyBundleBacktest": false, "WorkerNotifyBundleBacktest": false,
"WorkerPricesFifteenMinutes": true, "WorkerPricesFifteenMinutes": false,
"WorkerPricesOneHour": false, "WorkerPricesOneHour": false,
"WorkerPricesFourHours": false, "WorkerPricesFourHours": false,
"WorkerPricesOneDay": false, "WorkerPricesOneDay": false,

View File

@@ -7,7 +7,7 @@ namespace Managing.Application.Abstractions.Grains;
/// This stateless worker grain handles fetching 5-minute price data from external APIs /// This stateless worker grain handles fetching 5-minute price data from external APIs
/// and publishing to Orleans streams. /// and publishing to Orleans streams.
/// </summary> /// </summary>
public partial interface IPriceFetcher5MinGrain : IGrainWithIntegerKey public partial interface IPriceFetcher15MinGrain : IGrainWithIntegerKey
{ {
/// <summary> /// <summary>
/// Fetches 5-minute price data for all supported exchange/ticker combinations /// Fetches 5-minute price data for all supported exchange/ticker combinations

View File

@@ -25,4 +25,5 @@ public interface ICandleRepository
Enums.Timeframe timeframe, Enums.Timeframe timeframe,
DateTime start); DateTime start);
Task InsertCandle(Candle candle); Task InsertCandle(Candle candle);
Task InsertCandles(IEnumerable<Candle> candles);
} }

View File

@@ -122,8 +122,8 @@ public class TradingBotBase : ITradingBot
await ServiceScopeHelpers.WithScopedService<IExchangeService>(_scopeFactory, async exchangeService => await ServiceScopeHelpers.WithScopedService<IExchangeService>(_scopeFactory, async exchangeService =>
{ {
var candles = await exchangeService.GetCandlesInflux(Account.Exchange, Config.Ticker, PreloadSince, var candles = await exchangeService.GetCandlesInflux(Account.Exchange, Config.Ticker, PreloadSince,
Config.Timeframe, DateTime.UtcNow, 2); Config.Timeframe, DateTime.UtcNow, 1);
LastCandle = candles.Last(); LastCandle = candles.Single();
}); });
} }

View File

@@ -1,172 +0,0 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Common;
using Managing.Domain.Accounts;
using Managing.Domain.Candles;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Streams;
using static Managing.Common.Enums;
namespace Managing.Application.Grains;
/// <summary>
/// StatelessWorker grain for fetching 5-minute price data from external APIs and publishing to Orleans streams.
/// This grain runs every 5 minutes and processes all exchange/ticker combinations for the 5-minute timeframe.
/// </summary>
[StatelessWorker]
public class PriceFetcher5MinGrain : Grain, IPriceFetcher5MinGrain, IRemindable
{
private readonly ILogger<PriceFetcher5MinGrain> _logger;
private readonly IExchangeService _exchangeService;
private readonly ICandleRepository _candleRepository;
private readonly IGrainFactory _grainFactory;
private const string FetchPricesReminderName = "FetchPricesReminder";
// Predefined lists of trading parameters to fetch
private static readonly TradingExchanges[] SupportedExchanges =
{
TradingExchanges.GmxV2
};
private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers;
private static readonly Timeframe TargetTimeframe = Timeframe.FiveMinutes;
public PriceFetcher5MinGrain(
ILogger<PriceFetcher5MinGrain> logger,
IExchangeService exchangeService,
ICandleRepository candleRepository,
IGrainFactory grainFactory)
{
_logger = logger;
_exchangeService = exchangeService;
_candleRepository = candleRepository;
_grainFactory = grainFactory;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("PriceFetcher5MinGrain activated");
// Register a reminder to fetch prices every 5 minutes
await this.RegisterOrUpdateReminder(
FetchPricesReminderName,
TimeSpan.FromMinutes(5),
TimeSpan.FromMinutes(5));
await base.OnActivateAsync(cancellationToken);
}
public async Task<bool> FetchAndPublishPricesAsync()
{
try
{
_logger.LogInformation("Starting 5-minute price fetch cycle");
var fetchTasks = new List<Task>();
// Create fetch tasks for all exchange/ticker combinations for 5-minute timeframe
foreach (var exchange in SupportedExchanges)
{
foreach (var ticker in SupportedTickers)
{
fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe));
}
}
// Execute all fetch operations in parallel
await Task.WhenAll(fetchTasks);
_logger.LogInformation("Completed 5-minute price fetch cycle for {TotalCombinations} combinations",
fetchTasks.Count);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during price fetch cycle");
return false;
}
}
private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
{
try
{
// Create a dummy account for API calls (this may need to be adjusted based on your implementation)
var account = new Account
{
Name = "PriceFetcher",
Exchange = exchange,
Type = AccountType.Watch
};
// Get the last candle date from database
var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe,
DateTime.UtcNow.AddDays(-7), 1);
var startDate = existingCandles.Any()
? existingCandles.Max(c => c.Date).AddMinutes(GetTimeframeMinutes(timeframe))
: DateTime.UtcNow.AddDays(-1);
// Fetch new candles from external API
var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, true);
if (newCandles?.Any() == true)
{
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
var streamKey = $"{exchange}-{ticker}-{timeframe}";
var stream = streamProvider.GetStream<Candle>(streamKey);
_logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}",
newCandles.Count, streamKey);
// Process each new candle
foreach (var candle in newCandles.OrderBy(c => c.Date))
{
// Ensure candle has correct metadata
candle.Exchange = exchange;
candle.Ticker = ticker;
candle.Timeframe = timeframe;
// Save to database
await _candleRepository.InsertCandle(candle);
// Publish to stream
await stream.OnNextAsync(candle);
_logger.LogTrace("Published candle for {StreamKey} at {Date}",
streamKey, candle.Date);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
}
}
private static int GetTimeframeMinutes(Timeframe timeframe) => timeframe switch
{
Timeframe.OneMinute => 1,
Timeframe.FiveMinutes => 5,
Timeframe.FifteenMinutes => 15,
Timeframe.ThirtyMinutes => 30,
Timeframe.OneHour => 60,
Timeframe.FourHour => 240,
Timeframe.OneDay => 1440,
_ => 1
};
public async Task ReceiveReminder(string reminderName, TickStatus status)
{
if (reminderName == FetchPricesReminderName)
{
await FetchAndPublishPricesAsync();
}
}
}

View File

@@ -5,17 +5,23 @@ namespace Managing.Application.Grains;
public class PriceFetcherInitializer : IHostedService public class PriceFetcherInitializer : IHostedService
{ {
private readonly IClusterClient _clusterClient; private readonly IGrainFactory _grainFactory;
public PriceFetcherInitializer(IClusterClient clusterClient) public PriceFetcherInitializer(IClusterClient grainFactory)
{ {
_clusterClient = clusterClient; _grainFactory = grainFactory;
} }
public Task StartAsync(CancellationToken cancellationToken) public async Task StartAsync(CancellationToken cancellationToken)
{ {
_clusterClient.GetGrain<IPriceFetcher5MinGrain>(0); var fiveMinute = _grainFactory.GetGrain<IPriceFetcher15MinGrain>(0);
return Task.CompletedTask; Console.WriteLine("GrainId : {0}", fiveMinute.GetGrainId());
// Actually call a method on the grain to activate it
// This will trigger OnActivateAsync and register the reminder
await fiveMinute.FetchAndPublishPricesAsync();
Console.WriteLine("PriceFetcher5MinGrain activated and initial fetch completed");
} }
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)

View File

@@ -10,6 +10,7 @@ using Managing.Application.Abstractions.Services;
using Managing.Application.Accounts; using Managing.Application.Accounts;
using Managing.Application.Agents; using Managing.Application.Agents;
using Managing.Application.Backtests; using Managing.Application.Backtests;
using Managing.Application.Grains;
using Managing.Application.ManageBot; using Managing.Application.ManageBot;
using Managing.Application.ManageBot.Commands; using Managing.Application.ManageBot.Commands;
using Managing.Application.MoneyManagements; using Managing.Application.MoneyManagements;
@@ -66,13 +67,12 @@ public static class ApiBootstrap
.AddWorkers(configuration) .AddWorkers(configuration)
.AddFluentValidation() .AddFluentValidation()
.AddMediatR() .AddMediatR()
.AddHostedServices()
; ;
} }
private static IServiceCollection AddHostedServices(this IServiceCollection services) public static IServiceCollection AddHostedServices(this IServiceCollection services)
{ {
// services.AddHostedService<PriceFetcherInitializer>(); services.AddHostedService<PriceFetcherInitializer>();
return services; return services;
} }
@@ -333,7 +333,10 @@ public static class ApiBootstrap
services.AddTransient<IAccountService, AccountService>(); services.AddTransient<IAccountService, AccountService>();
services.AddTransient<ITradingService, TradingService>(); services.AddTransient<ITradingService, TradingService>();
services.AddTransient<IMessengerService, MessengerService>(); services.AddTransient<IMessengerService, MessengerService>();
services.AddTransient<ICandleRepository, CandleRepository>(); // Use Singleton for InfluxDB repositories to prevent connection disposal issues in Orleans grains
services.AddSingleton<IInfluxDbRepository, InfluxDbRepository>();
services.AddSingleton<ICandleRepository, CandleRepository>();
services.AddSingleton<IAgentBalanceRepository, AgentBalanceRepository>();
}); });
}) })
; ;
@@ -416,10 +419,10 @@ public static class ApiBootstrap
services.AddTransient<IWorkerRepository, PostgreSqlWorkerRepository>(); services.AddTransient<IWorkerRepository, PostgreSqlWorkerRepository>();
services.AddTransient<ISynthRepository, PostgreSqlSynthRepository>(); services.AddTransient<ISynthRepository, PostgreSqlSynthRepository>();
// InfluxDb Repositories // InfluxDb Repositories - Use Singleton for proper connection management in Orleans grains
services.AddTransient<IInfluxDbRepository, InfluxDbRepository>(); services.AddSingleton<IInfluxDbRepository, InfluxDbRepository>();
services.AddTransient<ICandleRepository, CandleRepository>(); services.AddSingleton<ICandleRepository, CandleRepository>();
services.AddTransient<IAgentBalanceRepository, AgentBalanceRepository>(); services.AddSingleton<IAgentBalanceRepository, AgentBalanceRepository>();
// Cache // Cache
services.AddDistributedMemoryCache(); services.AddDistributedMemoryCache();

View File

@@ -2,7 +2,7 @@
namespace Managing.Infrastructure.Databases.InfluxDb.Abstractions; namespace Managing.Infrastructure.Databases.InfluxDb.Abstractions;
public interface IInfluxDbRepository public interface IInfluxDbRepository : IDisposable
{ {
string Organization { get; } string Organization { get; }

View File

@@ -111,9 +111,9 @@ public class CandleRepository : ICandleRepository
return results; return results;
} }
public async Task InsertCandle(Candle candle) public Task InsertCandle(Candle candle)
{ {
await _influxDbRepository.WriteAsync(write => _influxDbRepository.Write(write =>
{ {
PriceDto price = PriceHelpers.Map(candle); PriceDto price = PriceHelpers.Map(candle);
write.WriteMeasurement( write.WriteMeasurement(
@@ -121,8 +121,25 @@ public class CandleRepository : ICandleRepository
WritePrecision.Ns, WritePrecision.Ns,
_priceBucket, _priceBucket,
_influxDbRepository.Organization); _influxDbRepository.Organization);
return Task.CompletedTask;
}); });
return Task.CompletedTask;
}
public Task InsertCandles(IEnumerable<Candle> candles)
{
_influxDbRepository.Write(write =>
{
foreach (var candle in candles)
{
PriceDto price = PriceHelpers.Map(candle);
write.WriteMeasurement(
price,
WritePrecision.Ns,
_priceBucket,
_influxDbRepository.Organization);
}
});
return Task.CompletedTask;
} }
public void Test(Candle candle) public void Test(Candle candle)

View File

@@ -3,10 +3,17 @@ using Managing.Infrastructure.Databases.InfluxDb.Abstractions;
namespace Managing.Infrastructure.Databases.InfluxDb; namespace Managing.Infrastructure.Databases.InfluxDb;
public class InfluxDbRepository : IInfluxDbRepository public class InfluxDbRepository : IInfluxDbRepository, IDisposable
{ {
private readonly string _token; private readonly string _token;
private readonly string _url; private readonly string _url;
private readonly InfluxDBClient _client;
private readonly WriteApi _writeApi;
private readonly QueryApi _queryApi;
private readonly SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _querySemaphore = new SemaphoreSlim(1, 1);
private bool _disposed = false;
public string Organization { get; set; } public string Organization { get; set; }
public InfluxDbRepository(IInfluxDbSettings settings) public InfluxDbRepository(IInfluxDbSettings settings)
@@ -14,26 +21,85 @@ public class InfluxDbRepository : IInfluxDbRepository
_token = settings.Token; _token = settings.Token;
_url = settings.Url; _url = settings.Url;
Organization = settings.Organization; Organization = settings.Organization;
// Create a single client instance that will be reused
_client = new InfluxDBClient(_url, _token);
_writeApi = _client.GetWriteApi();
_queryApi = _client.GetQueryApi();
} }
public void Write(Action<WriteApi> action) public void Write(Action<WriteApi> action)
{ {
using var client = InfluxDBClientFactory.Create(_url, _token); if (_disposed) throw new ObjectDisposedException(nameof(InfluxDbRepository));
using var write = client.GetWriteApi();
action(write); _writeSemaphore.Wait();
try
{
action(_writeApi);
}
finally
{
_writeSemaphore.Release();
}
} }
public Task WriteAsync(Func<WriteApi, Task> action) public async Task WriteAsync(Func<WriteApi, Task> action)
{ {
// Get write API asynchronously if (_disposed) throw new ObjectDisposedException(nameof(InfluxDbRepository));
using var client = new InfluxDBClient(_url, _token);
using var write = client.GetWriteApi(); await _writeSemaphore.WaitAsync();
return action(write); try
{
await action(_writeApi);
} }
finally
{
_writeSemaphore.Release();
}
}
public async Task<T> QueryAsync<T>(Func<QueryApi, Task<T>> action) public async Task<T> QueryAsync<T>(Func<QueryApi, Task<T>> action)
{ {
using var client = InfluxDBClientFactory.Create(_url, _token); if (_disposed) throw new ObjectDisposedException(nameof(InfluxDbRepository));
var query = client.GetQueryApi();
return await action(query); await _querySemaphore.WaitAsync();
try
{
return await action(_queryApi);
}
finally
{
_querySemaphore.Release();
}
}
public void Dispose()
{
if (!_disposed)
{
try
{
// Give the WriteApi time to flush any pending writes
_writeApi?.Dispose();
}
catch (Exception)
{
// Ignore disposal errors
}
try
{
_client?.Dispose();
}
catch (Exception)
{
// Ignore disposal errors
}
_writeSemaphore?.Dispose();
_querySemaphore?.Dispose();
_disposed = true;
}
} }
} }