Improve a bit workers. bug : Bundle reset after all backtest finish
This commit is contained in:
@@ -55,5 +55,6 @@
|
||||
"WorkerLeaderboard": false,
|
||||
"WorkerFundingRatesWatcher": false,
|
||||
"WorkerGeneticAlgorithm": false,
|
||||
"WorkerBundleBacktest": false
|
||||
"WorkerBundleBacktest": false,
|
||||
"WorkerBundleBacktestHealthCheck": false
|
||||
}
|
||||
@@ -86,6 +86,7 @@
|
||||
"WorkerFundingRatesWatcher": false,
|
||||
"WorkerGeneticAlgorithm": false,
|
||||
"WorkerBundleBacktest": false,
|
||||
"WorkerBundleBacktestHealthCheck": false,
|
||||
"WorkerBalancesTracking": false,
|
||||
"WorkerNotifyBundleBacktest": false,
|
||||
"SqlMonitoring": {
|
||||
|
||||
@@ -642,6 +642,30 @@ public class BacktestExecutor
|
||||
var runningJobs = jobs.Count(j => j.Status == JobStatus.Running);
|
||||
var totalJobs = jobs.Count();
|
||||
|
||||
// CRITICAL: If bundle is already in a final state (Completed/Failed with CompletedAt set),
|
||||
// don't overwrite it unless we're detecting a legitimate change
|
||||
if (bundleRequest.CompletedAt.HasValue &&
|
||||
(bundleRequest.Status == BundleBacktestRequestStatus.Completed ||
|
||||
bundleRequest.Status == BundleBacktestRequestStatus.Failed))
|
||||
{
|
||||
// Bundle already finalized, only update if job counts indicate it should be re-opened
|
||||
// (This shouldn't happen in normal flow, but guards against race conditions)
|
||||
if (completedJobs + failedJobs == totalJobs)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Bundle {BundleRequestId} already completed/failed. Skipping status update.",
|
||||
bundleRequestId);
|
||||
return; // Don't modify a completed bundle
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Bundle {BundleRequestId} was marked as completed/failed but has incomplete jobs ({Completed}+{Failed}/{Total}). Reopening.",
|
||||
bundleRequestId, completedJobs, failedJobs, totalJobs);
|
||||
// Allow the update to proceed to fix inconsistent state
|
||||
}
|
||||
}
|
||||
|
||||
// Update bundle request progress
|
||||
bundleRequest.CompletedBacktests = completedJobs;
|
||||
bundleRequest.FailedBacktests = failedJobs;
|
||||
@@ -668,11 +692,14 @@ public class BacktestExecutor
|
||||
bundleRequest.CompletedAt = DateTime.UtcNow;
|
||||
bundleRequest.CurrentBacktest = null;
|
||||
}
|
||||
else if (runningJobs > 0)
|
||||
else if (runningJobs > 0 || completedJobs > 0 || failedJobs > 0)
|
||||
{
|
||||
// Some jobs still running
|
||||
// Some jobs are running, or some have completed/failed (meaning work has started)
|
||||
// Once a bundle has started processing, it should stay "Running" until all jobs are done
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Running;
|
||||
}
|
||||
// If all jobs are still pending (completedJobs = 0, failedJobs = 0, runningJobs = 0),
|
||||
// keep the current status (likely Pending)
|
||||
|
||||
// Update results list with the new backtest ID
|
||||
var resultsList = bundleRequest.Results?.ToList() ?? new List<string>();
|
||||
|
||||
@@ -458,6 +458,30 @@ public class BacktestComputeWorker : BackgroundService
|
||||
|
||||
var previousStatus = bundleRequest.Status;
|
||||
|
||||
// CRITICAL: If bundle is already in a final state (Completed/Failed with CompletedAt set),
|
||||
// don't overwrite it unless we're detecting a legitimate change
|
||||
if (bundleRequest.CompletedAt.HasValue &&
|
||||
(bundleRequest.Status == BundleBacktestRequestStatus.Completed ||
|
||||
bundleRequest.Status == BundleBacktestRequestStatus.Failed))
|
||||
{
|
||||
// Bundle already finalized, only update if job counts indicate it should be re-opened
|
||||
// (This shouldn't happen in normal flow, but guards against race conditions)
|
||||
if (completedJobs + failedJobs == totalJobs)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Bundle {BundleRequestId} already completed/failed. Skipping status update.",
|
||||
bundleRequestId);
|
||||
return; // Don't modify a completed bundle
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Bundle {BundleRequestId} was marked as completed/failed but has incomplete jobs ({Completed}+{Failed}/{Total}). Reopening.",
|
||||
bundleRequestId, completedJobs, failedJobs, totalJobs);
|
||||
// Allow the update to proceed to fix inconsistent state
|
||||
}
|
||||
}
|
||||
|
||||
// Update bundle request progress
|
||||
bundleRequest.CompletedBacktests = completedJobs;
|
||||
bundleRequest.FailedBacktests = failedJobs;
|
||||
@@ -483,11 +507,14 @@ public class BacktestComputeWorker : BackgroundService
|
||||
bundleRequest.CompletedAt = DateTime.UtcNow;
|
||||
bundleRequest.CurrentBacktest = null;
|
||||
}
|
||||
else if (runningJobs > 0)
|
||||
else if (runningJobs > 0 || completedJobs > 0 || failedJobs > 0)
|
||||
{
|
||||
// Some jobs still running
|
||||
// Some jobs are running, or some have completed/failed (meaning work has started)
|
||||
// Once a bundle has started processing, it should stay "Running" until all jobs are done
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Running;
|
||||
}
|
||||
// If all jobs are still pending (completedJobs = 0, failedJobs = 0, runningJobs = 0),
|
||||
// keep the current status (likely Pending)
|
||||
|
||||
// Update results list from completed jobs
|
||||
var completedJobResults = jobs
|
||||
@@ -554,11 +581,68 @@ public class BacktestComputeWorker : BackgroundService
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Get stale jobs for this worker
|
||||
// Get running jobs for this worker
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
|
||||
// CRITICAL FIX: Check for jobs stuck at 100% progress
|
||||
// These jobs completed execution but their status wasn't updated to Completed
|
||||
// This causes the worker to think it's at max capacity
|
||||
var stuckCompletedJobs = runningJobs
|
||||
.Where(j => j.JobType == JobType.Backtest && j.ProgressPercentage >= 100)
|
||||
.ToList();
|
||||
|
||||
if (stuckCompletedJobs.Any())
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"🔧 Found {Count} jobs stuck at 100% progress for worker {WorkerId}. Auto-completing them.",
|
||||
stuckCompletedJobs.Count, _options.WorkerId);
|
||||
|
||||
foreach (var stuckJob in stuckCompletedJobs)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"🔧 Job {JobId} stuck at 100% progress in Running status since {StartedAt}. Marking as completed.",
|
||||
stuckJob.Id, stuckJob.StartedAt);
|
||||
|
||||
stuckJob.Status = JobStatus.Completed;
|
||||
stuckJob.CompletedAt = stuckJob.CompletedAt ?? DateTime.UtcNow;
|
||||
stuckJob.LastHeartbeat = DateTime.UtcNow;
|
||||
|
||||
// Add note to error message if not already set
|
||||
if (string.IsNullOrEmpty(stuckJob.ErrorMessage))
|
||||
{
|
||||
stuckJob.ErrorMessage = "Job completed but status was not updated (auto-recovered)";
|
||||
}
|
||||
|
||||
await jobRepository.UpdateAsync(stuckJob);
|
||||
|
||||
// Clean up progress tracker if still present
|
||||
_jobProgressTrackers.TryRemove(stuckJob.Id, out _);
|
||||
_runningJobTasks.TryRemove(stuckJob.Id, out _);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (stuckJob.BundleRequestId.HasValue)
|
||||
{
|
||||
try
|
||||
{
|
||||
await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, scope.ServiceProvider);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error updating bundle request progress for stuck job {JobId}", stuckJob.Id);
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"✅ Successfully auto-completed stuck job {JobId}. Worker can now claim new jobs.",
|
||||
stuckJob.Id);
|
||||
}
|
||||
}
|
||||
|
||||
// Get stale jobs for this worker
|
||||
var now = DateTime.UtcNow;
|
||||
var staleJobs = runningJobs
|
||||
.Where(j => j.JobType == JobType.Backtest &&
|
||||
j.ProgressPercentage < 100 && // Don't mark stuck-at-100% jobs as stale
|
||||
(
|
||||
// Stale heartbeat (no heartbeat in timeout period)
|
||||
j.LastHeartbeat == null ||
|
||||
|
||||
@@ -75,6 +75,9 @@ public static class ComputeBootstrap
|
||||
services.AddScoped<IGeneticService, GeneticService>();
|
||||
services.AddTransient<IExchangeProcessor, EvmProcessor>();
|
||||
|
||||
// Job service (needed for BundleBacktestHealthCheckWorker to recreate missing jobs)
|
||||
services.AddTransient<JobService>();
|
||||
|
||||
services.AddTransient<ITradaoService, TradaoService>();
|
||||
services.AddTransient<IExchangeService, ExchangeService>();
|
||||
|
||||
|
||||
@@ -5,9 +5,11 @@ type IPieChart = {
|
||||
data: number[]
|
||||
labels: string[]
|
||||
colors: string[]
|
||||
width?: number
|
||||
height?: number
|
||||
}
|
||||
|
||||
const PieChart: React.FC<IPieChart> = ({ data, labels, colors }) => {
|
||||
const PieChart: React.FC<IPieChart> = ({ data, labels, colors, width = 150, height = 150 }) => {
|
||||
return (
|
||||
<>
|
||||
<Plot
|
||||
@@ -22,7 +24,7 @@ const PieChart: React.FC<IPieChart> = ({ data, labels, colors }) => {
|
||||
},
|
||||
]}
|
||||
layout={{
|
||||
height: 150,
|
||||
height: height,
|
||||
margin: {
|
||||
b: 20,
|
||||
l: 0,
|
||||
@@ -33,7 +35,7 @@ const PieChart: React.FC<IPieChart> = ({ data, labels, colors }) => {
|
||||
paper_bgcolor: 'rgba(0,0,0,0)',
|
||||
plot_bgcolor: 'rgba(0,0,0,0)',
|
||||
showlegend: false,
|
||||
width: 150,
|
||||
width: width,
|
||||
}}
|
||||
config={{
|
||||
displayModeBar: false,
|
||||
|
||||
@@ -17,7 +17,7 @@ const BundleBacktestRequestsSettings: React.FC = () => {
|
||||
const [sortBy, setSortBy] = useState<BundleBacktestRequestSortableColumn>(BundleBacktestRequestSortableColumn.CreatedAt)
|
||||
const [sortOrder, setSortOrder] = useState<string>('desc')
|
||||
const [nameContains, setNameContains] = useState<string>('')
|
||||
const [statusFilter, setStatusFilter] = useState<BundleBacktestRequestStatus | null>(BundleBacktestRequestStatus.Failed)
|
||||
const [statusFilter, setStatusFilter] = useState<BundleBacktestRequestStatus | null>(null)
|
||||
const [userIdFilter, setUserIdFilter] = useState<string>('')
|
||||
const [userNameContains, setUserNameContains] = useState<string>('')
|
||||
const [totalBacktestsMin, setTotalBacktestsMin] = useState<string>('')
|
||||
@@ -258,7 +258,7 @@ const BundleBacktestRequestsSettings: React.FC = () => {
|
||||
<span className="loading loading-spinner loading-sm ml-2"></span>
|
||||
)}
|
||||
</h3>
|
||||
<div className="grid grid-cols-3 md:grid-cols-3 lg:grid-cols-4 xl:grid-cols-5 gap-4">
|
||||
<div className="grid grid-cols-2 md:grid-cols-3 lg:grid-cols-4 xl:grid-cols-5 gap-4">
|
||||
{isLoadingSummary ? (
|
||||
// Show skeleton with all statuses set to 0
|
||||
<>
|
||||
|
||||
@@ -3,7 +3,7 @@ import {
|
||||
type BundleBacktestRequestListItemResponse,
|
||||
BundleBacktestRequestSortableColumn
|
||||
} from '../../../generated/ManagingApi'
|
||||
import {Table} from '../../../components/mollecules'
|
||||
import {Table, Toast} from '../../../components/mollecules'
|
||||
|
||||
interface IBundleBacktestRequestsTable {
|
||||
bundleRequests: BundleBacktestRequestListItemResponse[]
|
||||
@@ -68,6 +68,16 @@ const BundleBacktestRequestsTable: React.FC<IBundleBacktestRequestsTable> = ({
|
||||
return `${progress.toFixed(1)}%`
|
||||
}
|
||||
|
||||
const copyToClipboard = async (text: string) => {
|
||||
const toast = new Toast('Copying to clipboard...')
|
||||
try {
|
||||
await navigator.clipboard.writeText(text)
|
||||
toast.update('success', 'Request ID copied to clipboard!')
|
||||
} catch (err) {
|
||||
toast.update('error', 'Failed to copy to clipboard')
|
||||
}
|
||||
}
|
||||
|
||||
const SortableHeader = ({ column, label }: { column: BundleBacktestRequestSortableColumn; label: string }) => {
|
||||
const isActive = sortBy === column
|
||||
return (
|
||||
@@ -180,7 +190,23 @@ const BundleBacktestRequestsTable: React.FC<IBundleBacktestRequestsTable> = ({
|
||||
id: 'requestId',
|
||||
Header: () => <SortableHeader column={BundleBacktestRequestSortableColumn.RequestId} label="Request ID" />,
|
||||
accessor: (row: BundleBacktestRequestListItemResponse) => (
|
||||
<span className="font-mono text-xs">{row.requestId?.substring(0, 8)}...</span>
|
||||
<div className="flex items-center gap-2">
|
||||
<span className="font-mono text-xs">{row.requestId?.substring(0, 8)}...</span>
|
||||
{row.requestId && (
|
||||
<button
|
||||
className="btn btn-ghost btn-xs p-1 h-auto min-h-0"
|
||||
onClick={(e) => {
|
||||
e.stopPropagation()
|
||||
copyToClipboard(row.requestId || '')
|
||||
}}
|
||||
title="Copy Request ID"
|
||||
>
|
||||
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" strokeWidth="1.5" stroke="currentColor" className="w-4 h-4">
|
||||
<path strokeLinecap="round" strokeLinejoin="round" d="M15.666 3.6A2.25 2.25 0 0013.5 2.25h-3c-1.03 0-1.9.693-2.166 1.6m5.332 0A2.25 2.25 0 0115.75 4.5v3.75m0 0v3.75m0-3.75h3.75m-3.75 0h-3.75M15 15.75a2.25 2.25 0 01-2.25 2.25H5.25A2.25 2.25 0 013 15.75V8.25a2.25 2.25 0 012.25-2.25h7.5A2.25 2.25 0 0115 8.25v7.5z" />
|
||||
</svg>
|
||||
</button>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
},
|
||||
...(onDelete ? [{
|
||||
|
||||
@@ -3,7 +3,7 @@ import {useMutation, useQuery, useQueryClient} from '@tanstack/react-query'
|
||||
|
||||
import useApiUrlStore from '../../../app/store/apiStore'
|
||||
import {JobClient} from '../../../generated/ManagingApi'
|
||||
import {BottomMenuBar, Toast} from '../../../components/mollecules'
|
||||
import {BottomMenuBar, PieChart, Toast} from '../../../components/mollecules'
|
||||
|
||||
import JobsTable from './jobsTable'
|
||||
|
||||
@@ -13,7 +13,7 @@ const JobsSettings: React.FC = () => {
|
||||
const [pageSize, setPageSize] = useState(50)
|
||||
const [sortBy, setSortBy] = useState<string>('CreatedAt')
|
||||
const [sortOrder, setSortOrder] = useState<string>('desc')
|
||||
const [statusFilter, setStatusFilter] = useState<string>('Failed')
|
||||
const [statusFilter, setStatusFilter] = useState<string>('')
|
||||
const [jobTypeFilter, setJobTypeFilter] = useState<string>('')
|
||||
const [userIdFilter, setUserIdFilter] = useState<string>('')
|
||||
const [workerIdFilter, setWorkerIdFilter] = useState<string>('')
|
||||
@@ -146,7 +146,7 @@ const JobsSettings: React.FC = () => {
|
||||
}
|
||||
|
||||
const clearFilters = () => {
|
||||
setStatusFilter('Failed') // Reset to Failed instead of All
|
||||
setStatusFilter('') // Reset to All
|
||||
setJobTypeFilter('')
|
||||
setUserIdFilter('')
|
||||
setWorkerIdFilter('')
|
||||
@@ -195,7 +195,35 @@ const JobsSettings: React.FC = () => {
|
||||
</svg>
|
||||
Status Overview
|
||||
</h3>
|
||||
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
|
||||
<div className="grid grid-cols-1 lg:grid-cols-2 gap-6 items-start">
|
||||
{/* Pie Chart */}
|
||||
<div className="flex justify-center">
|
||||
<PieChart
|
||||
data={jobSummary.statusSummary.map(item => item.count || 0)}
|
||||
labels={jobSummary.statusSummary.map(item => item.status || 'Unknown')}
|
||||
colors={jobSummary.statusSummary.map(item => {
|
||||
const statusLower = (item.status || '').toLowerCase()
|
||||
switch (statusLower) {
|
||||
case 'pending':
|
||||
return '#fbbf24' // warning color
|
||||
case 'running':
|
||||
return '#3b82f6' // info color
|
||||
case 'completed':
|
||||
return '#10b981' // success color
|
||||
case 'failed':
|
||||
return '#ef4444' // error color
|
||||
case 'cancelled':
|
||||
return '#6b7280' // neutral color
|
||||
default:
|
||||
return '#9ca3af' // default gray
|
||||
}
|
||||
})}
|
||||
width={300}
|
||||
height={300}
|
||||
/>
|
||||
</div>
|
||||
{/* Status Tiles */}
|
||||
<div className="grid grid-cols-2 md:grid-cols-2 lg:grid-cols-2 gap-4">
|
||||
{jobSummary.statusSummary.map((statusItem) => {
|
||||
const statusLower = (statusItem.status || '').toLowerCase()
|
||||
let statusIcon, statusDesc, statusColor
|
||||
@@ -271,6 +299,7 @@ const JobsSettings: React.FC = () => {
|
||||
</div>
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -576,7 +605,6 @@ const JobsSettings: React.FC = () => {
|
||||
</a>
|
||||
</li>
|
||||
</BottomMenuBar>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -207,6 +207,13 @@ var host = hostBuilder
|
||||
{
|
||||
services.AddHostedService<GeneticComputeWorker>();
|
||||
}
|
||||
|
||||
// Register the bundle backtest health check worker if enabled
|
||||
var isBundleHealthCheckEnabled = configuration.GetValue<bool>("WorkerBundleBacktestHealthCheck", false);
|
||||
if (isBundleHealthCheckEnabled)
|
||||
{
|
||||
services.AddHostedService<BundleBacktestHealthCheckWorker>();
|
||||
}
|
||||
})
|
||||
.ConfigureLogging((hostingContext, logging) =>
|
||||
{
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
"HeartbeatIntervalSeconds": 30,
|
||||
"StaleJobTimeoutMinutes": 10
|
||||
},
|
||||
"WorkerBundleBacktestHealthCheck": true,
|
||||
"Sentry": {
|
||||
"Dsn": "https://ba7ab16fc3aa445480c115861b4ec8b9@glitch.kai.managing.live/4"
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user