plug candle store and bot

This commit is contained in:
2025-09-14 16:21:48 +07:00
parent bac93199c0
commit caa0d9e1a6
5 changed files with 232 additions and 45 deletions

View File

@@ -15,5 +15,6 @@ public interface ICandleStoreGrain : IGrainWithStringKey
/// </summary>
/// <returns>List of candles ordered by date</returns>
Task<List<Candle>> GetCandlesAsync();
Task<Candle> GetLastCandle();
}

View File

@@ -51,7 +51,7 @@ public class TradingBotBase : ITradingBot
Signals = new Dictionary<string, LightSignal>();
Positions = new Dictionary<Guid, Position>();
WalletBalances = new Dictionary<DateTime, decimal>();
PreloadSince = CandleExtensions.GetBotPreloadSinceFromTimeframe(config.Timeframe);
PreloadSince = CandleHelpers.GetBotPreloadSinceFromTimeframe(config.Timeframe);
}
public async Task Start(BotStatus previousStatus)
@@ -119,11 +119,11 @@ public class TradingBotBase : ITradingBot
public async Task LoadLastCandle()
{
await ServiceScopeHelpers.WithScopedService<IExchangeService>(_scopeFactory, async exchangeService =>
await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
{
var candles = await exchangeService.GetCandlesInflux(Account.Exchange, Config.Ticker, PreloadSince,
Config.Timeframe, DateTime.UtcNow, 1);
LastCandle = candles.Single();
var grainKey = CandleHelpers.GetCandleStoreGrainKey(Account.Exchange, Config.Ticker, Config.Timeframe);
var grain = grainFactory.GetGrain<ICandleStoreGrain>(grainKey);
LastCandle = await grain.GetLastCandle();
});
}
@@ -1829,7 +1829,7 @@ public class TradingBotBase : ITradingBot
}
// Calculate cooldown end time based on last position closing time
var baseIntervalSeconds = CandleExtensions.GetBaseIntervalInSeconds(Config.Timeframe);
var baseIntervalSeconds = CandleHelpers.GetBaseIntervalInSeconds(Config.Timeframe);
var cooldownEndTime = LastPositionClosingTime.Value.AddSeconds(baseIntervalSeconds * Config.CooldownPeriod);
var isInCooldown = LastCandle.Date < cooldownEndTime;

View File

@@ -16,7 +16,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
private readonly IPersistentState<CandleStoreGrainState> _state;
private readonly ILogger<CandleStoreGrain> _logger;
private readonly ICandleRepository _candleRepository;
private const int MaxCandleCount = 500;
private IAsyncStream<Candle> _priceStream;
private StreamSubscriptionHandle<Candle> _streamSubscription;
@@ -38,20 +38,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
_logger.LogInformation("CandleStoreGrain activated for key: {GrainKey}", grainKey);
// Parse the grain key to extract exchange, ticker, and timeframe
var parts = grainKey.Split('-');
if (parts.Length != 3)
{
_logger.LogError("Invalid grain key format: {GrainKey}. Expected format: Exchange-Ticker-Timeframe", grainKey);
return;
}
if (!Enum.TryParse<TradingExchanges>(parts[0], out var exchange) ||
!Enum.TryParse<Ticker>(parts[1], out var ticker) ||
!Enum.TryParse<Timeframe>(parts[2], out var timeframe))
{
_logger.LogError("Failed to parse grain key components: {GrainKey}", grainKey);
return;
}
var (exchange, ticker, timeframe) = CandleHelpers.ParseCandleStoreGrainKey(grainKey);
// Initialize state if empty
if (_state.State.Candles == null || _state.State.Candles.Count == 0)
@@ -95,7 +82,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
{
try
{
_logger.LogDebug("Received new candle for {GrainKey} at {Date}",
_logger.LogDebug("Received new candle for {GrainKey} at {Date}",
this.GetPrimaryKeyString(), candle.Date);
// Initialize state if needed
@@ -120,7 +107,7 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
// Persist the updated state
await _state.WriteStateAsync();
_logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
_logger.LogTrace("Updated candle store for {GrainKey}, total candles: {Count}",
this.GetPrimaryKeyString(), _state.State.Candles.Count);
}
catch (Exception ex)
@@ -145,14 +132,17 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
{
try
{
_logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
_logger.LogInformation("Loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
// Load the last 500 candles from the database
var endDate = DateTime.UtcNow;
var startDate = endDate.AddDays(-30); // Look back 30 days to ensure we get enough data
var startDate =
CandleHelpers
.GetBotPreloadSinceFromTimeframe(timeframe); // Look back 30 days to ensure we get enough data
var candles = await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
var candles =
await _candleRepository.GetCandles(exchange, ticker, timeframe, startDate, endDate, MaxCandleCount);
if (candles?.Any() == true)
{
@@ -163,23 +153,23 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
await _state.WriteStateAsync();
_logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
_logger.LogInformation("Loaded {Count} initial candles for {Exchange}-{Ticker}-{Timeframe}",
_state.State.Candles.Count, exchange, ticker, timeframe);
}
else
{
_state.State.Candles = new List<Candle>();
await _state.WriteStateAsync();
_logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
_logger.LogWarning("No initial candles found for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
_logger.LogError(ex, "Error loading initial candles for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
// Initialize empty state on error
_state.State.Candles = new List<Candle>();
await _state.WriteStateAsync();
@@ -192,9 +182,9 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
{
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
_priceStream = streamProvider.GetStream<Candle>(streamKey);
_streamSubscription = await _priceStream.SubscribeAsync(this);
_logger.LogInformation("Subscribed to price stream for {StreamKey}", streamKey);
}
catch (Exception ex)
@@ -202,6 +192,11 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
_logger.LogError(ex, "Error subscribing to price stream for {StreamKey}", streamKey);
}
}
public Task<Candle> GetLastCandle()
{
return Task.FromResult(_state.State.Candles?.LastOrDefault());
}
}
/// <summary>
@@ -210,6 +205,5 @@ public class CandleStoreGrain : Grain, ICandleStoreGrain, IAsyncObserver<Candle>
[GenerateSerializer]
public class CandleStoreGrainState
{
[Id(0)]
public List<Candle> Candles { get; set; } = new();
[Id(0)] public List<Candle> Candles { get; set; } = new();
}

View File

@@ -0,0 +1,187 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Common;
using Managing.Domain.Accounts;
using Managing.Domain.Candles;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Streams;
using static Managing.Common.Enums;
namespace Managing.Application.Grains;
/// <summary>
/// StatelessWorker grain for fetching 5-minute price data from external APIs and publishing to Orleans streams.
/// This grain runs every 5 minutes and processes all exchange/ticker combinations for the 5-minute timeframe.
/// </summary>
[StatelessWorker]
public class PriceFetcher15MinGrain : Grain, IPriceFetcher15MinGrain, IRemindable
{
private readonly ILogger<PriceFetcher15MinGrain> _logger;
private readonly IExchangeService _exchangeService;
private readonly ICandleRepository _candleRepository;
private readonly IGrainFactory _grainFactory;
private const string FetchPricesReminderName = "Fetch15minPricesReminder";
// Predefined lists of trading parameters to fetch
private static readonly TradingExchanges[] SupportedExchanges =
{
TradingExchanges.Evm
};
private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers;
private static readonly Timeframe TargetTimeframe = Timeframe.FifteenMinutes;
public PriceFetcher15MinGrain(
ILogger<PriceFetcher15MinGrain> logger,
IExchangeService exchangeService,
ICandleRepository candleRepository,
IGrainFactory grainFactory)
{
_logger = logger;
_exchangeService = exchangeService;
_candleRepository = candleRepository;
_grainFactory = grainFactory;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("{0} activated", nameof(PriceFetcher15MinGrain));
// Register a reminder to fetch prices every 5 minutes
await this.RegisterOrUpdateReminder(
FetchPricesReminderName,
TimeSpan.FromMinutes(5),
TimeSpan.FromMinutes(5));
await base.OnActivateAsync(cancellationToken);
}
public async Task<bool> FetchAndPublishPricesAsync()
{
try
{
_logger.LogInformation("Starting {timeframe} price fetch cycle", TargetTimeframe);
var fetchTasks = new List<Task>();
// Create fetch tasks for all exchange/ticker combinations for 15-minute timeframe
foreach (var exchange in SupportedExchanges)
{
foreach (var ticker in SupportedTickers)
{
fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe));
}
}
// Execute all fetch operations in parallel
await Task.WhenAll(fetchTasks);
_logger.LogInformation("{0} - Completed {1} price fetch cycle for {2} combinations",
nameof(PriceFetcher15MinGrain), TargetTimeframe, fetchTasks.Count);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during price fetch cycle");
return false;
}
}
private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
{
try
{
// Create a dummy account for API calls (this may need to be adjusted based on your implementation)
var account = new Account
{
Name = "PriceFetcher",
Exchange = exchange,
Type = AccountType.Watch
};
// Get the last candle date from database
var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe,
DateTime.UtcNow.AddDays(-30), 1);
var isFirstCall = !existingCandles.Any();
var startDate = !isFirstCall
? existingCandles.Max(c => c.Date).AddMinutes(GetTimeframeMinutes(timeframe))
: new DateTime(2017, 1, 1);
// Fetch new candles from external API
var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, isFirstCall);
if (newCandles?.Any() == true)
{
var streamKey = CandleHelpers.GetCandleStoreGrainKey(exchange, ticker, timeframe);
_logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}",
newCandles.Count, streamKey);
// Process all new candles
var processedCandles = newCandles.OrderBy(c => c.Date).ToList();
foreach (var candle in processedCandles)
{
// Ensure candle has correct metadata
candle.Exchange = exchange;
candle.Ticker = ticker;
candle.Timeframe = timeframe;
}
// Save all candles to database in a single batch
if (processedCandles.Any())
{
await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false);
_logger.LogDebug(
"[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}",
ticker, exchange, timeframe,
processedCandles.Count, streamKey);
}
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
var stream = streamProvider.GetStream<Candle>(streamKey);
// Publish to stream (if needed)
foreach (var candle in processedCandles)
{
await stream.OnNextAsync(candle);
_logger.LogTrace("Published candle for {StreamKey} at {Date}",
streamKey, candle.Date);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
}
}
private static int GetTimeframeMinutes(Timeframe timeframe) => timeframe switch
{
Timeframe.OneMinute => 1,
Timeframe.FiveMinutes => 5,
Timeframe.FifteenMinutes => 15,
Timeframe.ThirtyMinutes => 30,
Timeframe.OneHour => 60,
Timeframe.FourHour => 240,
Timeframe.OneDay => 1440,
_ => 1
};
public async Task ReceiveReminder(string reminderName, TickStatus status)
{
if (reminderName == FetchPricesReminderName)
{
await FetchAndPublishPricesAsync();
}
}
}

View File

@@ -1,16 +1,10 @@
using static Managing.Common.Enums;
using Managing.Core;
using static Managing.Common.Enums;
namespace Managing.Domain.Candles;
public static class CandleExtensions
public static class CandleHelpers
{
public static Candle SetupClosingCandle(this Candle candle)
{
return candle;
}
public static DateTime GetBotPreloadSinceFromTimeframe(Timeframe timeframe)
{
return timeframe switch
@@ -96,4 +90,15 @@ public static class CandleExtensions
_ => throw new NotImplementedException()
};
}
}
public static string GetCandleStoreGrainKey(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
{
return string.Join("-", exchange, ticker, timeframe);
}
public static (TradingExchanges exchange, Ticker ticker, Timeframe timeframe) ParseCandleStoreGrainKey(string grainKey)
{
var components = grainKey.Split('-');
return (MiscExtensions.ParseEnum<TradingExchanges>(components[0]), MiscExtensions.ParseEnum<Ticker>(components[1]), MiscExtensions.ParseEnum<Timeframe>(components[2]));
}
}