Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ private static IServalConfigurator AddTranslationEngines(this IServalConfigurato
configurator.AddTranslationEngine<NmtEngineService>(EngineType.Nmt.ToString());
configurator.JobQueues.Add(BuildJobQueues.Nmt);

configurator.Services.AddSingleton<BuildJobRunnerManager<TranslationEngine>>();
configurator.Services.AddHostedService(p => p.GetRequiredService<BuildJobRunnerManager<TranslationEngine>>());

return configurator;
}

Expand All @@ -85,6 +88,9 @@ private static IServalConfigurator AddWordAlignmentEngines(this IServalConfigura
configurator.Services.AddHostedService<StatisticalEngineCommitService>();
configurator.JobQueues.Add(BuildJobQueues.Statistical);

configurator.Services.AddSingleton<BuildJobRunnerManager<WordAlignmentEngine>>();
configurator.Services.AddHostedService(p => p.GetRequiredService<BuildJobRunnerManager<WordAlignmentEngine>>());

return configurator;
}

Expand Down
5 changes: 4 additions & 1 deletion src/Machine/src/Serval.Machine.Shared/Models/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public enum BuildJobState
Pending,
Active,
Canceling,
Queued,
Deleting,
}

public enum BuildJobRunnerType
Expand All @@ -25,9 +27,10 @@ public record Build
{
public required string BuildId { get; init; }
public required BuildJobState JobState { get; init; }
public required string JobId { get; init; }
public string? JobId { get; init; }
public required BuildJobRunnerType BuildJobRunner { get; init; }
public required BuildStage Stage { get; init; }
public string? Options { get; set; }
public BuildData? Data { get; init; }
public required BuildExecutionData ExecutionData { get; init; }
}
8 changes: 8 additions & 0 deletions src/Machine/src/Serval.Machine.Shared/Models/BuildData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Serval.Machine.Shared.Models;

public record BuildData
{
public IReadOnlyList<ParallelCorpusContract>? ParallelCorpora { get; init; }
public double? Confidence { get; init; }
public int? CorpusSize { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
namespace Serval.Machine.Shared.Services;

public class BuildJobRunnerManager<TEngine>(IServiceProvider services, ILogger<RecurrentTask> logger)
: RecurrentTask("Build job runner manager", services, RefreshPeriod, logger)
where TEngine : ITrainingEngine
{
private static readonly TimeSpan RefreshPeriod = TimeSpan.FromSeconds(5);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
EngineGroup engineGroup = GetEngineGroup<TEngine>();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<BuildJobRunnerManager<TEngine>>>();
var dataAccessContext = scope.ServiceProvider.GetRequiredService<IDataAccessContext>();
var platformService = scope.ServiceProvider.GetRequiredKeyedService<IPlatformService>(engineGroup);
var runners = scope
.ServiceProvider.GetRequiredService<IEnumerable<IBuildJobRunner>>()
.ToDictionary(r => r.Type);
var engines = scope.ServiceProvider.GetRequiredService<IRepository<TEngine>>();

await DispatchQueuedBuildJobsAsync(
engines,
runners,
logger,
dataAccessContext,
platformService,
cancellationToken
);
await StopCancelingBuildJobsAsync(engines, runners, logger, cancellationToken);
await DeleteDeletingEngines(engines, runners, logger, cancellationToken);
}

private static async Task DispatchQueuedBuildJobsAsync(
IRepository<TEngine> engines,
IReadOnlyDictionary<BuildJobRunnerType, IBuildJobRunner> runners,
ILogger<BuildJobRunnerManager<TEngine>> logger,
IDataAccessContext dataAccessContext,
IPlatformService platformService,
CancellationToken cancellationToken
)
{
foreach (
TEngine engine in await engines.GetAllAsync(
e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Queued,
cancellationToken
)
)
{
Build build = engine.CurrentBuild!;
if (!string.IsNullOrEmpty(build.JobId))
//TODO - how should these be cleaned up if somehow they existed? Just marked as failed?
continue;
string? jobId = null;
try
{
await engines.UpdateAsync(
e => e.EngineId == engine.EngineId,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending),
cancellationToken: cancellationToken
);
jobId = await runners[build.BuildJobRunner]
.CreateJobAsync(
engine.Type,
engine.EngineId,
build.BuildId,
build.Stage,
build.Options,
cancellationToken
);
await engines.UpdateAsync(
e => e.EngineId == engine.EngineId,
u => u.Set(e => e.CurrentBuild!.JobId, jobId),
cancellationToken: cancellationToken
);
await runners[build.BuildJobRunner].EnqueueJobAsync(jobId, engine.Type, cancellationToken);
}
catch (Exception e)
{
logger.LogError(e, "Failed to dispatch build job for build {BuildId}.", build.BuildId);
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(build.BuildId, e.Message, CancellationToken.None);
await engines.UpdateAsync(
e =>
e.EngineId == engine.EngineId
&& e.CurrentBuild != null
&& e.CurrentBuild.BuildId == build.BuildId,
u =>
{
u.Unset(e => e.CurrentBuild);
},
cancellationToken: cancellationToken
);
if (jobId != null)
await runners[build.BuildJobRunner].DeleteJobAsync(jobId, CancellationToken.None);
},
cancellationToken: CancellationToken.None
);
}
}
}

private static async Task StopCancelingBuildJobsAsync(
IRepository<TEngine> engines,
IReadOnlyDictionary<BuildJobRunnerType, IBuildJobRunner> runners,
ILogger<BuildJobRunnerManager<TEngine>> logger,
CancellationToken cancellationToken
)
{
foreach (
TEngine engine in await engines.GetAllAsync(
e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Canceling,
cancellationToken
)
)
{
Build build = engine.CurrentBuild!;
if (string.IsNullOrEmpty(build.JobId))
//TODO - should these be cleaned up? I think we can just do nothing since the only responsibility of the this function is to stop the job and the job is already 'stopped', right?
continue;

try
{
await runners[build.BuildJobRunner].StopJobAsync(build.JobId, cancellationToken);
}
catch (Exception e)
{
logger.LogError(
e,
"Failed to stop job {JobId} for canceling build {BuildId}.",
build.JobId,
build.BuildId
);
}
}
}

private static async Task DeleteDeletingEngines(
IRepository<TEngine> engines,
IReadOnlyDictionary<BuildJobRunnerType, IBuildJobRunner> runners,
ILogger<BuildJobRunnerManager<TEngine>> logger,
CancellationToken cancellationToken
)
{
//TODO what about non-building engines? For ClearML this would still be needed. This may just need to be a new flag on the engine itself instead of a build state.
foreach (
TEngine engine in await engines.GetAllAsync(
e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Deleting,
cancellationToken
)
)
{
foreach (BuildJobRunnerType runnerType in runners.Keys)
{
IBuildJobRunner runner = runners[runnerType];
try
{
await runner.DeleteEngineAsync(engine.EngineId, cancellationToken);
}
catch (Exception e)
{
logger.LogError(e, "Failed to delete engine {EngineId}.", engine.EngineId);
}
}
}
}

private static EngineGroup GetEngineGroup<T>()
where T : ITrainingEngine
{
//TODO is there a better way? Could just explicitly create translation and alignment managers?
return typeof(T).Name switch
{
nameof(TranslationEngine) => EngineGroup.Translation,
nameof(WordAlignmentEngine) => EngineGroup.WordAlignment,
_ => throw new InvalidOperationException($"Unknown engine type: {typeof(T).Name}"),
};
}
}
Loading
Loading