Add parralelism for bundle worker
This commit is contained in:
@@ -37,6 +37,9 @@ public class BundleBacktestWorker : BaseWorker<BundleBacktestWorker>
|
|||||||
|
|
||||||
protected override async Task Run(CancellationToken cancellationToken)
|
protected override async Task Run(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
var maxDegreeOfParallelism = 3;
|
||||||
|
using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
|
||||||
|
var processingTasks = new List<Task>();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Get pending bundle backtest requests
|
// Get pending bundle backtest requests
|
||||||
@@ -47,8 +50,21 @@ public class BundleBacktestWorker : BaseWorker<BundleBacktestWorker>
|
|||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
await ProcessBundleRequest(bundleRequest, cancellationToken);
|
await semaphore.WaitAsync(cancellationToken);
|
||||||
|
var task = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await ProcessBundleRequest(bundleRequest, cancellationToken);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
semaphore.Release();
|
||||||
|
}
|
||||||
|
}, cancellationToken);
|
||||||
|
processingTasks.Add(task);
|
||||||
}
|
}
|
||||||
|
await Task.WhenAll(processingTasks);
|
||||||
|
|
||||||
await RetryUnfinishedBacktestsInFailedBundles(cancellationToken);
|
await RetryUnfinishedBacktestsInFailedBundles(cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user