From 7e52b7a7342dd3b09914e808a65abdcfe5507477 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Mon, 10 Nov 2025 01:44:33 +0700 Subject: [PATCH] Improve workers for backtests --- .cursor/rules/fullstack.mdc | 1 + .../Controllers/BacktestController.cs | 2 +- src/Managing.Api/Controllers/JobController.cs | 77 +++++++++ .../Repositories/IJobRepository.cs | 24 +++ .../Services/IGeneticService.cs | 2 +- .../Backtests/BacktestJobService.cs | 53 +++++++ src/Managing.Application/GeneticService.cs | 147 +++++++++++++---- .../Workers/BacktestComputeWorker.cs | 50 ++++-- .../PostgreSql/PostgreSqlGeneticRepository.cs | 11 +- .../PostgreSql/PostgreSqlJobRepository.cs | 149 ++++++++++++++++++ .../src/generated/ManagingApi.ts | 96 ++++++++++- .../src/generated/ManagingApiTypes.ts | 2 +- .../src/pages/adminPage/jobs/jobsSettings.tsx | 68 +++++++- .../src/pages/adminPage/jobs/jobsTable.tsx | 136 ++++++++++++---- .../appsettings.Development.json | 13 +- .../appsettings.ProductionLocal.json | 25 +-- .../appsettings.SandboxLocal.json | 23 +-- src/Managing.Workers/appsettings.json | 5 +- 18 files changed, 740 insertions(+), 144 deletions(-) diff --git a/.cursor/rules/fullstack.mdc b/.cursor/rules/fullstack.mdc index 9bb8df8b..6e1238b3 100644 --- a/.cursor/rules/fullstack.mdc +++ b/.cursor/rules/fullstack.mdc @@ -96,4 +96,5 @@ Key Principles - dont use command line to edit file, use agent mode capabilities to do it - when dividing, make sure variable is not zero - to test a single ts test you can run : npm run test:single test/plugins/test-name-file.test.tsx + - do not implement business logic on the controller, keep the business logic for Service files diff --git a/src/Managing.Api/Controllers/BacktestController.cs b/src/Managing.Api/Controllers/BacktestController.cs index 06739a0c..a9852c5b 100644 --- a/src/Managing.Api/Controllers/BacktestController.cs +++ b/src/Managing.Api/Controllers/BacktestController.cs @@ -933,7 +933,7 @@ public class BacktestController : BaseController var user = await GetUser(); // Create genetic request using the GeneticService directly - var geneticRequest = _geneticService.CreateGeneticRequest( + var geneticRequest = await _geneticService.CreateGeneticRequestAsync( user, request.Ticker, request.Timeframe, diff --git a/src/Managing.Api/Controllers/JobController.cs b/src/Managing.Api/Controllers/JobController.cs index 0d6c7102..b1d0c088 100644 --- a/src/Managing.Api/Controllers/JobController.cs +++ b/src/Managing.Api/Controllers/JobController.cs @@ -3,6 +3,7 @@ using System.Text.Json; using Managing.Api.Models.Responses; using Managing.Application.Abstractions.Repositories; using Managing.Application.Abstractions.Services; +using Managing.Application.Backtests; using Managing.Application.Shared; using Managing.Domain.Backtests; using Microsoft.AspNetCore.Authorization; @@ -25,6 +26,7 @@ public class JobController : BaseController private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IAdminConfigurationService _adminService; private readonly ILogger _logger; + private readonly JobService _jobService; /// /// Initializes a new instance of the class. @@ -32,15 +34,18 @@ public class JobController : BaseController /// The service for user management. /// The service scope factory for creating scoped services. /// The admin configuration service for authorization checks. + /// The job service for job operations. /// The logger instance. public JobController( IUserService userService, IServiceScopeFactory serviceScopeFactory, IAdminConfigurationService adminService, + JobService jobService, ILogger logger) : base(userService) { _serviceScopeFactory = serviceScopeFactory; _adminService = adminService; + _jobService = jobService; _logger = logger; } @@ -284,5 +289,77 @@ public class JobController : BaseController return Ok(response); } + + /// + /// Retries a failed or cancelled job by resetting it to Pending status. + /// Admin only endpoint. + /// + /// The job ID to retry + /// Success response + [HttpPost("{jobId}/retry")] + public async Task RetryJob(string jobId) + { + if (!await IsUserAdmin()) + { + _logger.LogWarning("Non-admin user attempted to retry job"); + return StatusCode(403, new { error = "Only admin users can retry jobs" }); + } + + if (!Guid.TryParse(jobId, out var jobGuid)) + { + return BadRequest("Invalid job ID format. Must be a valid GUID."); + } + + try + { + var job = await _jobService.RetryJobAsync(jobGuid); + + return Ok(new { message = $"Job {jobId} has been reset to Pending status and will be picked up by workers.", jobId = job.Id }); + } + catch (InvalidOperationException ex) + { + if (ex.Message.Contains("not found")) + { + return NotFound(ex.Message); + } + return BadRequest(ex.Message); + } + } + + /// + /// Deletes a job from the database. + /// Admin only endpoint. + /// + /// The job ID to delete + /// Success response + [HttpDelete("{jobId}")] + public async Task DeleteJob(string jobId) + { + if (!await IsUserAdmin()) + { + _logger.LogWarning("Non-admin user attempted to delete job"); + return StatusCode(403, new { error = "Only admin users can delete jobs" }); + } + + if (!Guid.TryParse(jobId, out var jobGuid)) + { + return BadRequest("Invalid job ID format. Must be a valid GUID."); + } + + try + { + await _jobService.DeleteJobAsync(jobGuid); + + return Ok(new { message = $"Job {jobId} has been deleted successfully.", jobId }); + } + catch (InvalidOperationException ex) + { + if (ex.Message.Contains("not found")) + { + return NotFound(ex.Message); + } + return BadRequest(ex.Message); + } + } } diff --git a/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs b/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs index 4219e529..7a92d90f 100644 --- a/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs +++ b/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs @@ -62,6 +62,24 @@ public interface IJobRepository /// Task> GetByGeneticRequestIdAsync(string geneticRequestId); + /// + /// Gets running job counts per user for a specific job type across all workers (global limit per user) + /// + /// The ID of the worker (kept for interface compatibility, not used for filtering) + /// The job type to filter by + /// Dictionary mapping UserId to count of running jobs + Task> GetRunningJobCountsByUserIdAsync(string workerId, JobType jobType); + + /// + /// Claims a random available job, atomically excluding users that have reached their capacity. + /// The capacity check happens within the SQL query using a subquery, ensuring thread-safety across multiple workers. + /// Returns null if no jobs are available. + /// + /// The ID of the worker claiming the job + /// The job type to claim + /// Maximum concurrent jobs allowed per user + Task ClaimRandomJobAsync(string workerId, JobType jobType, int maxConcurrentPerUser); + /// /// Gets paginated jobs with optional filters and sorting /// @@ -91,6 +109,12 @@ public interface IJobRepository /// /// Summary containing counts by status, job type, and their combinations Task GetSummaryAsync(); + + /// + /// Deletes a job by its ID + /// + /// The job ID to delete + Task DeleteAsync(Guid jobId); } /// diff --git a/src/Managing.Application.Abstractions/Services/IGeneticService.cs b/src/Managing.Application.Abstractions/Services/IGeneticService.cs index 8c6e9967..d3659cc5 100644 --- a/src/Managing.Application.Abstractions/Services/IGeneticService.cs +++ b/src/Managing.Application.Abstractions/Services/IGeneticService.cs @@ -28,7 +28,7 @@ public interface IGeneticService /// The maximum take profit percentage /// The list of eligible indicators /// The created genetic request - GeneticRequest CreateGeneticRequest( + Task CreateGeneticRequestAsync( User user, Ticker ticker, Timeframe timeframe, diff --git a/src/Managing.Application/Backtests/BacktestJobService.cs b/src/Managing.Application/Backtests/BacktestJobService.cs index f6810f0d..54d4f5d8 100644 --- a/src/Managing.Application/Backtests/BacktestJobService.cs +++ b/src/Managing.Application/Backtests/BacktestJobService.cs @@ -250,5 +250,58 @@ public class JobService throw; } } + + /// + /// Retries a failed or cancelled job by resetting it to Pending status. + /// + /// The job ID to retry + /// The updated job + /// Thrown if job cannot be retried + public async Task RetryJobAsync(Guid jobId) + { + var job = await _jobRepository.GetByIdAsync(jobId); + + if (job == null) + { + throw new InvalidOperationException($"Job with ID {jobId} not found."); + } + + // Only allow retrying Failed or Cancelled jobs + // Running jobs should be handled by stale job recovery, not manual retry + if (job.Status != JobStatus.Failed && job.Status != JobStatus.Cancelled) + { + throw new InvalidOperationException($"Cannot retry job with status {job.Status}. Only Failed or Cancelled jobs can be retried."); + } + + // Reset job to pending state + job.Status = JobStatus.Pending; + job.AssignedWorkerId = null; + job.LastHeartbeat = null; + job.StartedAt = null; + job.CompletedAt = null; + job.ProgressPercentage = 0; + job.RetryAfter = null; + // Keep ErrorMessage for reference, but clear it on next run + // Keep RetryCount to track total retries + // Reset IsRetryable to true + job.IsRetryable = true; + + await _jobRepository.UpdateAsync(job); + + _logger.LogInformation("Job {JobId} reset to Pending status for retry", jobId); + + return job; + } + + /// + /// Deletes a job from the database. + /// + /// The job ID to delete + /// Thrown if job cannot be found + public async Task DeleteJobAsync(Guid jobId) + { + await _jobRepository.DeleteAsync(jobId); + _logger.LogInformation("Deleted job {JobId}", jobId); + } } diff --git a/src/Managing.Application/GeneticService.cs b/src/Managing.Application/GeneticService.cs index fe5eeece..5ed3bbdc 100644 --- a/src/Managing.Application/GeneticService.cs +++ b/src/Managing.Application/GeneticService.cs @@ -1,6 +1,5 @@ using System.Text.Json; using GeneticSharp; -using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Repositories; using Managing.Application.Abstractions.Services; using Managing.Core; @@ -28,6 +27,7 @@ public class GeneticService : IGeneticService private readonly IMessengerService _messengerService; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IGrainFactory _grainFactory; + private readonly IJobRepository _jobRepository; // Predefined parameter ranges for each indicator (matching backtestGenetic.tsx) public static readonly Dictionary ParameterRanges = new() @@ -196,7 +196,8 @@ public class GeneticService : IGeneticService ILogger logger, IMessengerService messengerService, IServiceScopeFactory serviceScopeFactory, - IGrainFactory grainFactory) + IGrainFactory grainFactory, + IJobRepository jobRepository) { _geneticRepository = geneticRepository; _backtester = backtester; @@ -204,9 +205,10 @@ public class GeneticService : IGeneticService _messengerService = messengerService; _serviceScopeFactory = serviceScopeFactory; _grainFactory = grainFactory; + _jobRepository = jobRepository; } - public GeneticRequest CreateGeneticRequest( + public async Task CreateGeneticRequestAsync( User user, Ticker ticker, Timeframe timeframe, @@ -245,15 +247,31 @@ public class GeneticService : IGeneticService _geneticRepository.InsertGeneticRequestForUser(user, geneticRequest); - // Trigger Orleans grain to process this request asynchronously + // Create a single job for this genetic request that will run until completion try { - var grain = _grainFactory.GetGrain(id); - _ = grain.ProcessGeneticRequestAsync(); + var job = new Job + { + UserId = user.Id, + Status = JobStatus.Pending, + JobType = JobType.Genetic, + Priority = 0, + ConfigJson = "{}", // Not needed for genetic jobs, GeneticRequestId is used + StartDate = startDate, + EndDate = endDate, + GeneticRequestId = id, + RetryCount = 0, + MaxRetries = 3, + IsRetryable = true + }; + + await _jobRepository.CreateAsync(job); + _logger.LogInformation("Created genetic job {JobId} for genetic request {RequestId}", job.Id, id); } catch (Exception ex) { - _logger.LogWarning(ex, "Failed to trigger GeneticBacktestGrain for request {RequestId}", id); + _logger.LogError(ex, "Failed to create job for genetic request {RequestId}", id); + throw; } return geneticRequest; @@ -365,31 +383,83 @@ public class GeneticService : IGeneticService var generationCount = 0; ga.GenerationRan += async (sender, e) => { - generationCount = ga.GenerationsNumber; - - // Update progress every generation - var bestFitness = ga.BestChromosome?.Fitness ?? 0; - request.CurrentGeneration = generationCount; - request.BestFitnessSoFar = bestFitness; - - if (ga.BestChromosome is TradingBotChromosome bestChromosome) + try { - var genes = bestChromosome.GetGenes(); - var geneValues = genes.Select(g => + generationCount = ga.GenerationsNumber; + + // Update progress every generation + var bestFitness = ga.BestChromosome?.Fitness ?? 0; + var bestChromosomeJson = (string?)null; + var bestIndividual = (string?)null; + + if (ga.BestChromosome is TradingBotChromosome bestChromosome) { - if (g.Value is double doubleValue) return doubleValue; - if (g.Value is int intValue) return (double)intValue; - return Convert.ToDouble(g.Value.ToString()); - }).ToArray(); - request.BestChromosome = JsonSerializer.Serialize(geneValues); + var genes = bestChromosome.GetGenes(); + var geneValues = genes.Select(g => + { + if (g.Value is double doubleValue) return doubleValue; + if (g.Value is int intValue) return (double)intValue; + return Convert.ToDouble(g.Value.ToString()); + }).ToArray(); + bestChromosomeJson = JsonSerializer.Serialize(geneValues); + bestIndividual = bestChromosome.ToString(); + } + + // Update ProgressInfo with current generation information + var progressInfo = JsonSerializer.Serialize(new + { + generation = generationCount, + best_fitness = bestFitness, + population_size = request.PopulationSize, + generations = request.Generations, + updated_at = DateTime.UtcNow + }); + + // Update the domain object for local use + request.CurrentGeneration = generationCount; + request.BestFitnessSoFar = bestFitness; + request.BestChromosome = bestChromosomeJson; + request.BestIndividual = bestIndividual; + request.ProgressInfo = progressInfo; + + // Update the database with current generation progress using a new scope + // This prevents DbContext concurrency issues when running in parallel + await ServiceScopeHelpers.WithScopedService( + _serviceScopeFactory, + async geneticService => + { + // Reload the request from the database in the new scope + // Use the user from the original request to get the request by ID + var dbRequest = geneticService.GetGeneticRequestByIdForUser(request.User, request.RequestId); + + if (dbRequest != null) + { + // Update the loaded request with current generation data + dbRequest.CurrentGeneration = generationCount; + dbRequest.BestFitnessSoFar = bestFitness; + dbRequest.BestChromosome = bestChromosomeJson; + dbRequest.BestIndividual = bestIndividual; + dbRequest.ProgressInfo = progressInfo; + + // Save the update + await geneticService.UpdateGeneticRequestAsync(dbRequest); + } + }); + + _logger.LogDebug("Updated genetic request {RequestId} at generation {Generation} with fitness {Fitness}", + request.RequestId, generationCount, bestFitness); + + // Check for cancellation + if (cancellationToken.IsCancellationRequested) + { + ga.Stop(); + } } - - await UpdateGeneticRequestAsync(request); - - // Check for cancellation - if (cancellationToken.IsCancellationRequested) + catch (Exception ex) { - ga.Stop(); + _logger.LogError(ex, "Error updating genetic request {RequestId} at generation {Generation}", + request.RequestId, generationCount); + // Don't throw - continue with next generation } }; @@ -421,11 +491,27 @@ public class GeneticService : IGeneticService _logger.LogInformation("Genetic algorithm completed for request {RequestId}. Best fitness: {Fitness}", request.RequestId, bestFitness); - // Update request with results + // Update request with final results request.Status = GeneticRequestStatus.Completed; request.CompletedAt = DateTime.UtcNow; request.BestFitness = bestFitness; request.BestIndividual = bestChromosome?.ToString() ?? "unknown"; + request.CurrentGeneration = ga.GenerationsNumber; + request.BestFitnessSoFar = bestFitness; + + // Update BestChromosome if not already set + if (bestChromosome != null && string.IsNullOrEmpty(request.BestChromosome)) + { + var genes = bestChromosome.GetGenes(); + var geneValues = genes.Select(g => + { + if (g.Value is double doubleValue) return doubleValue; + if (g.Value is int intValue) return (double)intValue; + return Convert.ToDouble(g.Value.ToString()); + }).ToArray(); + request.BestChromosome = JsonSerializer.Serialize(geneValues); + } + request.ProgressInfo = JsonSerializer.Serialize(new { generation = ga.GenerationsNumber, @@ -436,6 +522,9 @@ public class GeneticService : IGeneticService }); await UpdateGeneticRequestAsync(request); + + _logger.LogInformation("Final update completed for genetic request {RequestId}. Generation: {Generation}, Best Fitness: {Fitness}", + request.RequestId, ga.GenerationsNumber, bestFitness); // Send notification about the completed genetic algorithm try diff --git a/src/Managing.Application/Workers/BacktestComputeWorker.cs b/src/Managing.Application/Workers/BacktestComputeWorker.cs index 27b630b4..90debc37 100644 --- a/src/Managing.Application/Workers/BacktestComputeWorker.cs +++ b/src/Managing.Application/Workers/BacktestComputeWorker.cs @@ -21,7 +21,7 @@ public class BacktestComputeWorker : BackgroundService private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; private readonly BacktestComputeWorkerOptions _options; - private readonly SemaphoreSlim _semaphore; + private readonly SemaphoreSlim _instanceSemaphore; public BacktestComputeWorker( IServiceScopeFactory scopeFactory, @@ -31,14 +31,14 @@ public class BacktestComputeWorker : BackgroundService _scopeFactory = scopeFactory; _logger = logger; _options = options.Value; - _semaphore = new SemaphoreSlim(_options.MaxConcurrentBacktests, _options.MaxConcurrentBacktests); + _instanceSemaphore = new SemaphoreSlim(_options.MaxConcurrentPerInstance, _options.MaxConcurrentPerInstance); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation( - "BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrent: {MaxConcurrent}, PollInterval: {PollInterval}s", - _options.WorkerId, _options.MaxConcurrentBacktests, _options.JobPollIntervalSeconds); + "BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrentPerUser: {MaxConcurrentPerUser}, MaxConcurrentPerInstance: {MaxConcurrentPerInstance}, PollInterval: {PollInterval}s", + _options.WorkerId, _options.MaxConcurrentPerUser, _options.MaxConcurrentPerInstance, _options.JobPollIntervalSeconds); // Background task for stale job recovery var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(stoppingToken), stoppingToken); @@ -67,10 +67,10 @@ public class BacktestComputeWorker : BackgroundService private async Task ProcessJobsAsync(CancellationToken cancellationToken) { - // Check if we have capacity - if (!await _semaphore.WaitAsync(0, cancellationToken)) + // Check if this instance has capacity + if (!await _instanceSemaphore.WaitAsync(0, cancellationToken)) { - // At capacity, skip this iteration + // Instance at capacity, skip this iteration return; } @@ -79,17 +79,23 @@ public class BacktestComputeWorker : BackgroundService using var scope = _scopeFactory.CreateScope(); var jobRepository = scope.ServiceProvider.GetRequiredService(); - // Try to claim a backtest job (exclude genetic jobs) - var job = await jobRepository.ClaimNextJobAsync(_options.WorkerId, JobType.Backtest); + // Claim a random backtest job atomically, excluding users at capacity + // The SQL query checks running job counts within the transaction, ensuring thread-safety + var job = await jobRepository.ClaimRandomJobAsync( + _options.WorkerId, + JobType.Backtest, + _options.MaxConcurrentPerUser); if (job == null) { - // No jobs available, release semaphore - _semaphore.Release(); + // No jobs available for users not at capacity, release semaphore + _instanceSemaphore.Release(); return; } - _logger.LogInformation("Claimed backtest job {JobId} for worker {WorkerId}", job.Id, _options.WorkerId); + _logger.LogInformation( + "Claimed random backtest job {JobId} (UserId: {UserId}) for worker {WorkerId}", + job.Id, job.UserId, _options.WorkerId); // Process the job asynchronously (don't await, let it run in background) // Create a new scope for the job processing to ensure proper lifetime management @@ -99,16 +105,21 @@ public class BacktestComputeWorker : BackgroundService { await ProcessJobAsync(job, cancellationToken); } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing job {JobId}", job.Id); + throw; + } finally { - _semaphore.Release(); + _instanceSemaphore.Release(); } }, cancellationToken); } catch (Exception ex) { _logger.LogError(ex, "Error claiming or processing job"); - _semaphore.Release(); + _instanceSemaphore.Release(); throw; } } @@ -560,7 +571,7 @@ public class BacktestComputeWorker : BackgroundService public override void Dispose() { - _semaphore?.Dispose(); + _instanceSemaphore?.Dispose(); base.Dispose(); } } @@ -578,9 +589,14 @@ public class BacktestComputeWorkerOptions public string WorkerId { get; set; } = Environment.MachineName; /// - /// Maximum number of concurrent backtests to process + /// Maximum number of concurrent backtests per user (global limit across all workers) /// - public int MaxConcurrentBacktests { get; set; } = 6; + public int MaxConcurrentPerUser { get; set; } = 6; + + /// + /// Maximum number of concurrent backtests per worker instance (local limit for this worker) + /// + public int MaxConcurrentPerInstance { get; set; } = 6; /// /// Interval in seconds between job polling attempts diff --git a/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlGeneticRepository.cs b/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlGeneticRepository.cs index 48e00ef5..f0db3f4f 100644 --- a/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlGeneticRepository.cs +++ b/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlGeneticRepository.cs @@ -75,9 +75,10 @@ public class PostgreSqlGeneticRepository : IGeneticRepository public async Task UpdateGeneticRequestAsync(GeneticRequest geneticRequest) { - var existingEntity = _context.GeneticRequests + var existingEntity = await _context.GeneticRequests + .AsTracking() // Explicitly enable tracking to ensure entity is tracked .Include(gr => gr.User) - .FirstOrDefault(gr => gr.RequestId == geneticRequest.RequestId); + .FirstOrDefaultAsync(gr => gr.RequestId == geneticRequest.RequestId); if (existingEntity != null) { @@ -110,9 +111,13 @@ public class PostgreSqlGeneticRepository : IGeneticRepository existingEntity.EligibleIndicatorsJson = "[]"; } - // Only update the tracked entity, do not attach a new one + // Save changes - entity is tracked so changes will be persisted await _context.SaveChangesAsync(); } + else + { + throw new InvalidOperationException($"Genetic request with RequestId '{geneticRequest.RequestId}' not found in database"); + } } public void DeleteGeneticRequestByIdForUser(User user, string id) diff --git a/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs b/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs index 1edb92aa..383c6640 100644 --- a/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs +++ b/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs @@ -219,6 +219,136 @@ public class PostgreSqlJobRepository : IJobRepository return entities.Select(MapToDomain); } + public async Task> GetRunningJobCountsByUserIdAsync(string workerId, JobType jobType) + { + // Get running job counts per user across all workers (global limit per user) + var counts = await _context.Jobs + .Where(j => j.Status == (int)JobStatus.Running && + j.JobType == (int)jobType) + .GroupBy(j => j.UserId) + .Select(g => new { UserId = g.Key, Count = g.Count() }) + .ToListAsync(); + + return counts.ToDictionary(x => x.UserId, x => x.Count); + } + + public async Task ClaimRandomJobAsync(string workerId, JobType jobType, int maxConcurrentPerUser) + { + // Use execution strategy to support retry with transactions + var strategy = _context.Database.CreateExecutionStrategy(); + + return await strategy.ExecuteAsync(async () => + { + await using var transaction = await _context.Database.BeginTransactionAsync(); + + try + { + // Build SQL query that atomically excludes users at capacity using a subquery + // This ensures thread-safety across multiple workers - the check and claim happen atomically + var sql = @" + SELECT j.""Id"", j.""BundleRequestId"", j.""UserId"", j.""Status"", j.""JobType"", j.""Priority"", + j.""ConfigJson"", j.""StartDate"", j.""EndDate"", j.""ProgressPercentage"", + j.""AssignedWorkerId"", j.""LastHeartbeat"", j.""CreatedAt"", j.""StartedAt"", + j.""CompletedAt"", j.""ResultJson"", j.""ErrorMessage"", j.""RequestId"", + j.""GeneticRequestId"", j.""RetryCount"", j.""MaxRetries"", j.""RetryAfter"", + j.""IsRetryable"", j.""FailureCategory"" + FROM ""Jobs"" j + WHERE j.""Status"" = @status + AND j.""JobType"" = @jobType + AND ( + SELECT COUNT(*) + FROM ""Jobs"" running + WHERE running.""UserId"" = j.""UserId"" + AND running.""Status"" = @runningStatus + AND running.""JobType"" = @jobType + ) < @maxConcurrentPerUser + ORDER BY RANDOM() + LIMIT 1 + FOR UPDATE SKIP LOCKED"; + + var parameters = new List + { + new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending }, + new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType }, + new NpgsqlParameter("runningStatus", NpgsqlDbType.Integer) { Value = (int)JobStatus.Running }, + new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser } + }; + + _logger.LogDebug("Claiming random job atomically (maxConcurrentPerUser: {MaxConcurrent})", maxConcurrentPerUser); + + // Execute raw SQL using ADO.NET to get the job with row-level locking + var connection = _context.Database.GetDbConnection(); + await using var command = connection.CreateCommand(); + command.Transaction = transaction.GetDbTransaction(); + command.CommandText = sql; + command.Parameters.AddRange(parameters.ToArray()); + + JobEntity? job = null; + await using var reader = await command.ExecuteReaderAsync(); + + if (await reader.ReadAsync()) + { + job = new JobEntity + { + Id = reader.GetGuid(reader.GetOrdinal("Id")), + BundleRequestId = reader.IsDBNull(reader.GetOrdinal("BundleRequestId")) ? null : reader.GetGuid(reader.GetOrdinal("BundleRequestId")), + UserId = reader.GetInt32(reader.GetOrdinal("UserId")), + Status = reader.GetInt32(reader.GetOrdinal("Status")), + JobType = reader.GetInt32(reader.GetOrdinal("JobType")), + Priority = reader.GetInt32(reader.GetOrdinal("Priority")), + ConfigJson = reader.GetString(reader.GetOrdinal("ConfigJson")), + StartDate = reader.GetDateTime(reader.GetOrdinal("StartDate")), + EndDate = reader.GetDateTime(reader.GetOrdinal("EndDate")), + ProgressPercentage = reader.GetInt32(reader.GetOrdinal("ProgressPercentage")), + AssignedWorkerId = reader.IsDBNull(reader.GetOrdinal("AssignedWorkerId")) ? null : reader.GetString(reader.GetOrdinal("AssignedWorkerId")), + LastHeartbeat = reader.IsDBNull(reader.GetOrdinal("LastHeartbeat")) ? null : reader.GetDateTime(reader.GetOrdinal("LastHeartbeat")), + CreatedAt = reader.GetDateTime(reader.GetOrdinal("CreatedAt")), + StartedAt = reader.IsDBNull(reader.GetOrdinal("StartedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("StartedAt")), + CompletedAt = reader.IsDBNull(reader.GetOrdinal("CompletedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("CompletedAt")), + ResultJson = reader.IsDBNull(reader.GetOrdinal("ResultJson")) ? null : reader.GetString(reader.GetOrdinal("ResultJson")), + ErrorMessage = reader.IsDBNull(reader.GetOrdinal("ErrorMessage")) ? null : reader.GetString(reader.GetOrdinal("ErrorMessage")), + RequestId = reader.IsDBNull(reader.GetOrdinal("RequestId")) ? null : reader.GetString(reader.GetOrdinal("RequestId")), + GeneticRequestId = reader.IsDBNull(reader.GetOrdinal("GeneticRequestId")) ? null : reader.GetString(reader.GetOrdinal("GeneticRequestId")), + RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")), + MaxRetries = reader.GetInt32(reader.GetOrdinal("MaxRetries")), + RetryAfter = reader.IsDBNull(reader.GetOrdinal("RetryAfter")) ? null : reader.GetDateTime(reader.GetOrdinal("RetryAfter")), + IsRetryable = reader.GetBoolean(reader.GetOrdinal("IsRetryable")), + FailureCategory = reader.IsDBNull(reader.GetOrdinal("FailureCategory")) ? null : reader.GetInt32(reader.GetOrdinal("FailureCategory")) + }; + } + + await reader.CloseAsync(); + + if (job == null) + { + _logger.LogDebug("No random job found to claim for worker {WorkerId}", workerId); + await transaction.CommitAsync(); + return null; + } + + // Attach and update the job entity + _context.Jobs.Attach(job); + job.Status = (int)JobStatus.Running; + job.AssignedWorkerId = workerId; + job.StartedAt = DateTime.UtcNow; + job.LastHeartbeat = DateTime.UtcNow; + + await _context.SaveChangesAsync(); + await transaction.CommitAsync(); + + _logger.LogInformation("Claimed random job {JobId} (UserId: {UserId}) for worker {WorkerId}", + job.Id, job.UserId, workerId); + return MapToDomain(job); + } + catch (Exception ex) + { + await transaction.RollbackAsync(); + _logger.LogError(ex, "Error claiming random job for worker {WorkerId}", workerId); + throw; + } + }); + } + public async Task<(IEnumerable Jobs, int TotalCount)> GetPaginatedAsync( int page, int pageSize, @@ -466,6 +596,25 @@ public class PostgreSqlJobRepository : IJobRepository } } + public async Task DeleteAsync(Guid jobId) + { + // Use AsTracking() to enable change tracking since DbContext uses NoTracking by default + var entity = await _context.Jobs + .AsTracking() + .FirstOrDefaultAsync(e => e.Id == jobId); + + if (entity == null) + { + _logger.LogWarning("Job {JobId} not found for deletion", jobId); + throw new InvalidOperationException($"Job with ID {jobId} not found."); + } + + _context.Jobs.Remove(entity); + await _context.SaveChangesAsync(); + + _logger.LogInformation("Deleted job {JobId}", jobId); + } + // Helper classes for raw SQL query results private class StatusCountResult { diff --git a/src/Managing.WebApp/src/generated/ManagingApi.ts b/src/Managing.WebApp/src/generated/ManagingApi.ts index a3281327..19cd16b9 100644 --- a/src/Managing.WebApp/src/generated/ManagingApi.ts +++ b/src/Managing.WebApp/src/generated/ManagingApi.ts @@ -2403,7 +2403,7 @@ export class JobClient extends AuthorizedApiBase { this.baseUrl = baseUrl ?? "http://localhost:5000"; } - job_GetJobStatus(jobId: string): Promise { + job_GetJobStatus(jobId: string): Promise { let url_ = this.baseUrl + "/Job/{jobId}"; if (jobId === undefined || jobId === null) throw new Error("The parameter 'jobId' must be defined."); @@ -2424,13 +2424,13 @@ export class JobClient extends AuthorizedApiBase { }); } - protected processJob_GetJobStatus(response: Response): Promise { + protected processJob_GetJobStatus(response: Response): Promise { const status = response.status; let _headers: any = {}; if (response.headers && response.headers.forEach) { response.headers.forEach((v: any, k: any) => _headers[k] = v); }; if (status === 200) { return response.text().then((_responseText) => { let result200: any = null; - result200 = _responseText === "" ? null : JSON.parse(_responseText, this.jsonParseReviver) as BacktestJobStatusResponse; + result200 = _responseText === "" ? null : JSON.parse(_responseText, this.jsonParseReviver) as JobStatusResponse; return result200; }); } else if (status !== 200 && status !== 204) { @@ -2438,7 +2438,50 @@ export class JobClient extends AuthorizedApiBase { return throwException("An unexpected server error occurred.", status, _responseText, _headers); }); } - return Promise.resolve(null as any); + return Promise.resolve(null as any); + } + + job_DeleteJob(jobId: string): Promise { + let url_ = this.baseUrl + "/Job/{jobId}"; + if (jobId === undefined || jobId === null) + throw new Error("The parameter 'jobId' must be defined."); + url_ = url_.replace("{jobId}", encodeURIComponent("" + jobId)); + url_ = url_.replace(/[?&]$/, ""); + + let options_: RequestInit = { + method: "DELETE", + headers: { + "Accept": "application/octet-stream" + } + }; + + return this.transformOptions(options_).then(transformedOptions_ => { + return this.http.fetch(url_, transformedOptions_); + }).then((_response: Response) => { + return this.processJob_DeleteJob(_response); + }); + } + + protected processJob_DeleteJob(response: Response): Promise { + const status = response.status; + let _headers: any = {}; if (response.headers && response.headers.forEach) { response.headers.forEach((v: any, k: any) => _headers[k] = v); }; + if (status === 200 || status === 206) { + const contentDisposition = response.headers ? response.headers.get("content-disposition") : undefined; + let fileNameMatch = contentDisposition ? /filename\*=(?:(\\?['"])(.*?)\1|(?:[^\s]+'.*?')?([^;\n]*))/g.exec(contentDisposition) : undefined; + let fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[3] || fileNameMatch[2] : undefined; + if (fileName) { + fileName = decodeURIComponent(fileName); + } else { + fileNameMatch = contentDisposition ? /filename="?([^"]*?)"?(;|$)/g.exec(contentDisposition) : undefined; + fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[1] : undefined; + } + return response.blob().then(blob => { return { fileName: fileName, data: blob, status: status, headers: _headers }; }); + } else if (status !== 200 && status !== 204) { + return response.text().then((_responseText) => { + return throwException("An unexpected server error occurred.", status, _responseText, _headers); + }); + } + return Promise.resolve(null as any); } job_GetJobs(page: number | undefined, pageSize: number | undefined, sortBy: string | undefined, sortOrder: string | undefined, status: string | null | undefined, jobType: string | null | undefined, userId: number | null | undefined, workerId: string | null | undefined, bundleRequestId: string | null | undefined): Promise { @@ -2536,6 +2579,49 @@ export class JobClient extends AuthorizedApiBase { } return Promise.resolve(null as any); } + + job_RetryJob(jobId: string): Promise { + let url_ = this.baseUrl + "/Job/{jobId}/retry"; + if (jobId === undefined || jobId === null) + throw new Error("The parameter 'jobId' must be defined."); + url_ = url_.replace("{jobId}", encodeURIComponent("" + jobId)); + url_ = url_.replace(/[?&]$/, ""); + + let options_: RequestInit = { + method: "POST", + headers: { + "Accept": "application/octet-stream" + } + }; + + return this.transformOptions(options_).then(transformedOptions_ => { + return this.http.fetch(url_, transformedOptions_); + }).then((_response: Response) => { + return this.processJob_RetryJob(_response); + }); + } + + protected processJob_RetryJob(response: Response): Promise { + const status = response.status; + let _headers: any = {}; if (response.headers && response.headers.forEach) { response.headers.forEach((v: any, k: any) => _headers[k] = v); }; + if (status === 200 || status === 206) { + const contentDisposition = response.headers ? response.headers.get("content-disposition") : undefined; + let fileNameMatch = contentDisposition ? /filename\*=(?:(\\?['"])(.*?)\1|(?:[^\s]+'.*?')?([^;\n]*))/g.exec(contentDisposition) : undefined; + let fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[3] || fileNameMatch[2] : undefined; + if (fileName) { + fileName = decodeURIComponent(fileName); + } else { + fileNameMatch = contentDisposition ? /filename="?([^"]*?)"?(;|$)/g.exec(contentDisposition) : undefined; + fileName = fileNameMatch && fileNameMatch.length > 1 ? fileNameMatch[1] : undefined; + } + return response.blob().then(blob => { return { fileName: fileName, data: blob, status: status, headers: _headers }; }); + } else if (status !== 200 && status !== 204) { + return response.text().then((_responseText) => { + return throwException("An unexpected server error occurred.", status, _responseText, _headers); + }); + } + return Promise.resolve(null as any); + } } export class MoneyManagementClient extends AuthorizedApiBase { @@ -5397,7 +5483,7 @@ export interface AgentBalance { time?: Date; } -export interface BacktestJobStatusResponse { +export interface JobStatusResponse { jobId?: string; status?: string | null; progressPercentage?: number; diff --git a/src/Managing.WebApp/src/generated/ManagingApiTypes.ts b/src/Managing.WebApp/src/generated/ManagingApiTypes.ts index 5b82926c..891554fd 100644 --- a/src/Managing.WebApp/src/generated/ManagingApiTypes.ts +++ b/src/Managing.WebApp/src/generated/ManagingApiTypes.ts @@ -1208,7 +1208,7 @@ export interface AgentBalance { time?: Date; } -export interface BacktestJobStatusResponse { +export interface JobStatusResponse { jobId?: string; status?: string | null; progressPercentage?: number; diff --git a/src/Managing.WebApp/src/pages/adminPage/jobs/jobsSettings.tsx b/src/Managing.WebApp/src/pages/adminPage/jobs/jobsSettings.tsx index 5e9c62a3..d6118767 100644 --- a/src/Managing.WebApp/src/pages/adminPage/jobs/jobsSettings.tsx +++ b/src/Managing.WebApp/src/pages/adminPage/jobs/jobsSettings.tsx @@ -1,9 +1,9 @@ import {useState} from 'react' -import {useQuery} from '@tanstack/react-query' +import {useMutation, useQuery, useQueryClient} from '@tanstack/react-query' import useApiUrlStore from '../../../app/store/apiStore' import {JobClient} from '../../../generated/ManagingApi' -import {BottomMenuBar} from '../../../components/mollecules' +import {BottomMenuBar, Toast} from '../../../components/mollecules' import JobsTable from './jobsTable' @@ -22,6 +22,66 @@ const JobsSettings: React.FC = () => { const [showTable, setShowTable] = useState(false) const jobClient = new JobClient({}, apiUrl) + const queryClient = useQueryClient() + + // Retry job mutation + const retryJobMutation = useMutation({ + mutationFn: async (jobId: string) => { + // The API returns FileResponse but backend actually returns JSON + const response = await jobClient.job_RetryJob(jobId) + // Parse the response as JSON + const text = await response.data.text() + return JSON.parse(text) + }, + onSuccess: () => { + // Invalidate jobs queries to refresh the list + queryClient.invalidateQueries({ queryKey: ['jobs'] }) + queryClient.invalidateQueries({ queryKey: ['jobSummary'] }) + }, + }) + + const handleRetryJob = async (jobId: string) => { + const toast = new Toast('Retrying job...') + try { + await retryJobMutation.mutateAsync(jobId) + toast.update('success', 'Job has been reset to Pending status') + } catch (error: any) { + const errorMessage = error?.response?.data?.error || error?.message || 'Failed to retry job' + toast.update('error', errorMessage) + } + } + + // Delete job mutation + const deleteJobMutation = useMutation({ + mutationFn: async (jobId: string) => { + // The API returns FileResponse but backend actually returns JSON + const response = await jobClient.job_DeleteJob(jobId) + // Parse the response as JSON if there's content + if (response.data && response.data.size > 0) { + const text = await response.data.text() + if (text) { + return JSON.parse(text) + } + } + return { message: 'Job deleted successfully', jobId } + }, + onSuccess: () => { + // Invalidate jobs queries to refresh the list + queryClient.invalidateQueries({ queryKey: ['jobs'] }) + queryClient.invalidateQueries({ queryKey: ['jobSummary'] }) + }, + }) + + const handleDeleteJob = async (jobId: string) => { + const toast = new Toast('Deleting job...') + try { + await deleteJobMutation.mutateAsync(jobId) + toast.update('success', 'Job has been deleted successfully') + } catch (error: any) { + const errorMessage = error?.response?.data?.error || error?.message || 'Failed to delete job' + toast.update('error', errorMessage) + } + } // Fetch job summary statistics const { @@ -478,6 +538,10 @@ const JobsSettings: React.FC = () => { sortOrder={sortOrder} onPageChange={handlePageChange} onSortChange={handleSortChange} + onRetryJob={handleRetryJob} + isRetrying={retryJobMutation.isPending} + onDeleteJob={handleDeleteJob} + isDeleting={deleteJobMutation.isPending} /> )} diff --git a/src/Managing.WebApp/src/pages/adminPage/jobs/jobsTable.tsx b/src/Managing.WebApp/src/pages/adminPage/jobs/jobsTable.tsx index 1680388d..06544e8c 100644 --- a/src/Managing.WebApp/src/pages/adminPage/jobs/jobsTable.tsx +++ b/src/Managing.WebApp/src/pages/adminPage/jobs/jobsTable.tsx @@ -1,4 +1,4 @@ -import React, {useMemo} from 'react' +import React, {useMemo, useState} from 'react' import {type JobListItemResponse} from '../../../generated/ManagingApi' import {Table} from '../../../components/mollecules' @@ -13,6 +13,10 @@ interface IJobsTable { sortOrder: string onPageChange: (page: number) => void onSortChange: (sortBy: string) => void + onRetryJob?: (jobId: string) => void + isRetrying?: boolean + onDeleteJob?: (jobId: string) => void + isDeleting?: boolean } const JobsTable: React.FC = ({ @@ -25,8 +29,13 @@ const JobsTable: React.FC = ({ sortBy, sortOrder, onPageChange, - onSortChange + onSortChange, + onRetryJob, + isRetrying = false, + onDeleteJob, + isDeleting = false }) => { + const [deleteConfirmJobId, setDeleteConfirmJobId] = useState(null) const getStatusBadge = (status: string | null | undefined) => { if (!status) return - @@ -88,14 +97,6 @@ const JobsTable: React.FC = ({ } const columns = useMemo(() => [ - { - Header: () => , - accessor: 'jobId', - width: 200, - Cell: ({ value }: any) => ( - {value || '-'} - ), - }, { Header: () => , accessor: 'status', @@ -147,22 +148,6 @@ const JobsTable: React.FC = ({ {value || '-'} ), }, - { - Header: 'Bundle Request ID', - accessor: 'bundleRequestId', - width: 200, - Cell: ({ value }: any) => ( - {value || '-'} - ), - }, - { - Header: 'Genetic Request ID', - accessor: 'geneticRequestId', - width: 200, - Cell: ({ value }: any) => ( - {value || '-'} - ), - }, { Header: () => , accessor: 'createdAt', @@ -189,7 +174,69 @@ const JobsTable: React.FC = ({ {value || '-'} ), }, - ], [sortBy, sortOrder, onSortChange]) + { + Header: 'Actions', + accessor: 'actions', + width: 180, + Cell: ({ row }: any) => { + const job = row.original + const isFailed = job.status?.toLowerCase() === 'failed' + const canRetry = isFailed && onRetryJob + const canDelete = onDeleteJob + + if (!canRetry && !canDelete) { + return - + } + + return ( +
+ {canRetry && ( + + )} + {canDelete && ( + + )} +
+ ) + }, + }, + { + Header: () => , + accessor: 'jobId', + width: 200, + Cell: ({ value }: any) => ( + {value || '-'} + ), + }, + ], [sortBy, sortOrder, onSortChange, onRetryJob, isRetrying, onDeleteJob, isDeleting]) const tableData = useMemo(() => { return jobs.map((job) => ({ @@ -200,8 +247,6 @@ const JobsTable: React.FC = ({ progressPercentage: job.progressPercentage, userId: job.userId, assignedWorkerId: job.assignedWorkerId, - bundleRequestId: job.bundleRequestId, - geneticRequestId: job.geneticRequestId, createdAt: job.createdAt, startedAt: job.startedAt, completedAt: job.completedAt, @@ -305,6 +350,39 @@ const JobsTable: React.FC = ({ )} )} + + {/* Delete Confirmation Modal */} + {deleteConfirmJobId && ( +
+
+

Confirm Delete

+

+ Are you sure you want to delete this job? This action cannot be undone. +

+
+ + +
+
+
+ )} ) } diff --git a/src/Managing.Workers/appsettings.Development.json b/src/Managing.Workers/appsettings.Development.json index 4d5c399e..cab4833a 100644 --- a/src/Managing.Workers/appsettings.Development.json +++ b/src/Managing.Workers/appsettings.Development.json @@ -1,14 +1,8 @@ { - "Logging": { - "LogLevel": { - "Default": "Information", - "Microsoft.Hosting.Lifetime": "Information" - } - }, "WorkerBacktestCompute": true, "BacktestComputeWorker": { - "WorkerId": "Oda-backtest-0", - "MaxConcurrentBacktests": 6, + "MaxConcurrentPerUser": 3, + "MaxConcurrentPerInstance": 6, "JobPollIntervalSeconds": 5, "HeartbeatIntervalSeconds": 30, "StaleJobTimeoutMinutes": 5 @@ -27,8 +21,5 @@ "Url": "http://localhost:8086/", "Organization": "managing-org", "Token": "Fw2FPL2OwTzDHzSbR2Sd5xs0EKQYy00Q-hYKYAhr9cC1_q5YySONpxuf_Ck0PTjyUiF13xXmi__bu_pXH-H9zA==" - }, - "Sentry": { - "Dsn": "" } } diff --git a/src/Managing.Workers/appsettings.ProductionLocal.json b/src/Managing.Workers/appsettings.ProductionLocal.json index 68569778..abb35e4e 100644 --- a/src/Managing.Workers/appsettings.ProductionLocal.json +++ b/src/Managing.Workers/appsettings.ProductionLocal.json @@ -1,20 +1,15 @@ { - "Logging": { - "LogLevel": { - "Default": "Information", - "Microsoft.Hosting.Lifetime": "Information" - } - }, "WorkerBacktestCompute": true, "BacktestComputeWorker": { - "MaxConcurrentBacktests": 6, + "MaxConcurrentPerUser": 2, + "MaxConcurrentPerInstance": 5, "JobPollIntervalSeconds": 5, "HeartbeatIntervalSeconds": 30, "StaleJobTimeoutMinutes": 5 }, "WorkerGeneticCompute": true, "GeneticComputeWorker": { - "MaxConcurrentGenetics": 2, + "MaxConcurrentGenetics": 1, "JobPollIntervalSeconds": 5, "HeartbeatIntervalSeconds": 30, "StaleJobTimeoutMinutes": 10 @@ -26,20 +21,6 @@ "Url": "https://influx-db.kaigen.managing.live", "Organization": "managing-org", "Token": "ROvQoZ1Dg5jiKDFxB0saEGqHC3rsLkUNlPL6_AFbOcpNjMieIv8v58yA4v5tFU9sX9LLvXEToPvUrxqQEMaWDw==" - }, - "Sentry": { - "Dsn": "https://fe12add48c56419bbdfa86227c188e7a@glitch.kai.managing.live/1" - }, - "N8n": { - "WebhookUrl": "https://n8n.kai.managing.live/webhook/fa9308b6-983b-42ec-b085-71599d655951", - "IndicatorRequestWebhookUrl": "https://n8n.kai.managing.live/webhook/3aa07b66-1e64-46a7-8618-af300914cb11", - "Username": "managing-api", - "Password": "T259836*PdiV2@%!eR%Qf4" - }, - "Kaigen": { - "BaseUrl": "https://kaigen-back-kaigen-stage.up.railway.app", - "DebitEndpoint": "/api/credits/debit", - "RefundEndpoint": "/api/credits/refund" } } diff --git a/src/Managing.Workers/appsettings.SandboxLocal.json b/src/Managing.Workers/appsettings.SandboxLocal.json index c620ecef..62069c05 100644 --- a/src/Managing.Workers/appsettings.SandboxLocal.json +++ b/src/Managing.Workers/appsettings.SandboxLocal.json @@ -1,13 +1,8 @@ { - "Logging": { - "LogLevel": { - "Default": "Information", - "Microsoft.Hosting.Lifetime": "Information" - } - }, "WorkerBacktestCompute": true, "BacktestComputeWorker": { - "MaxConcurrentBacktests": 6, + "MaxConcurrentPerUser": 1, + "MaxConcurrentPerInstance": 5, "JobPollIntervalSeconds": 5, "HeartbeatIntervalSeconds": 30, "StaleJobTimeoutMinutes": 5 @@ -26,20 +21,6 @@ "Url": "https://influx-db.apps.managing.live", "Organization": "managing-org", "Token": "zODh8Hn8sN5VwpVJH0HAwDpCJPE4oB5IUg8L4Q0T67KM1Rta6PoM0nATUzf1ddkyWx_VledooZXfFIddahbL9Q==" - }, - "Sentry": { - "Dsn": "https://fe12add48c56419bbdfa86227c188e7a@glitch.kai.managing.live/1" - }, - "N8n": { - "WebhookUrl": "https://n8n.kai.managing.live/webhook/fa9308b6-983b-42ec-b085-71599d655951", - "IndicatorRequestWebhookUrl": "https://n8n.kai.managing.live/webhook/3aa07b66-1e64-46a7-8618-af300914cb11", - "Username": "managing-api", - "Password": "T259836*PdiV2@%!eR%Qf4" - }, - "Kaigen": { - "BaseUrl": "https://kaigen-back-kaigen-stage.up.railway.app", - "DebitEndpoint": "/api/credits/debit", - "RefundEndpoint": "/api/credits/refund" } } diff --git a/src/Managing.Workers/appsettings.json b/src/Managing.Workers/appsettings.json index eee55bfc..959a896c 100644 --- a/src/Managing.Workers/appsettings.json +++ b/src/Managing.Workers/appsettings.json @@ -7,7 +7,8 @@ }, "WorkerBacktestCompute": true, "BacktestComputeWorker": { - "MaxConcurrentBacktests": 6, + "MaxConcurrentPerUser": 6, + "MaxConcurrentPerInstance": 6, "JobPollIntervalSeconds": 5, "HeartbeatIntervalSeconds": 30, "StaleJobTimeoutMinutes": 5 @@ -20,7 +21,7 @@ "StaleJobTimeoutMinutes": 10 }, "Sentry": { - "Dsn": "https://fe12add48c56419bbdfa86227c188e7a@glitch.kai.managing.live/1" + "Dsn": "https://ba7ab16fc3aa445480c115861b4ec8b9@glitch.kai.managing.live/4" }, "N8n": { "WebhookUrl": "https://n8n.kai.managing.live/webhook/fa9308b6-983b-42ec-b085-71599d655951",