using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using OTSSignsOrchestrator.Server.Data; using OTSSignsOrchestrator.Server.Data.Entities; using OTSSignsOrchestrator.Server.Hubs; namespace OTSSignsOrchestrator.Server.Workers; /// /// Background service that polls for queued work, /// claims one job at a time, resolves the correct , /// and delegates execution. All transitions are logged and broadcast via SignalR. /// public sealed class ProvisioningWorker : BackgroundService { private readonly IServiceProvider _services; private readonly ILogger _logger; private static readonly TimeSpan PollInterval = TimeSpan.FromSeconds(5); public ProvisioningWorker( IServiceProvider services, ILogger logger) { _services = services; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("ProvisioningWorker started — polling every {Interval}s", PollInterval.TotalSeconds); using var timer = new PeriodicTimer(PollInterval); while (await timer.WaitForNextTickAsync(stoppingToken)) { try { await TryProcessNextJobAsync(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "ProvisioningWorker: unhandled error during poll cycle"); } } _logger.LogInformation("ProvisioningWorker stopped"); } private async Task TryProcessNextJobAsync(CancellationToken ct) { await using var scope = _services.CreateAsyncScope(); var db = scope.ServiceProvider.GetRequiredService(); var hub = scope.ServiceProvider.GetRequiredService>(); // Atomically claim the oldest queued job var job = await db.Jobs .Where(j => j.Status == JobStatus.Queued) .OrderBy(j => j.CreatedAt) .FirstOrDefaultAsync(ct); if (job is null) return; // Optimistic concurrency: set Running + StartedAt job.Status = JobStatus.Running; job.StartedAt = DateTime.UtcNow; try { await db.SaveChangesAsync(ct); } catch (DbUpdateConcurrencyException) { // Another worker already claimed this job _logger.LogDebug("Job {JobId} was claimed by another worker", job.Id); return; } _logger.LogInformation("Job {JobId} claimed (type={JobType}, customer={CustomerId})", job.Id, job.JobType, job.CustomerId); // Resolve the correct pipeline for this job type var pipelines = scope.ServiceProvider.GetRequiredService>(); var pipeline = pipelines.FirstOrDefault(p => string.Equals(p.HandlesJobType, job.JobType, StringComparison.OrdinalIgnoreCase)); if (pipeline is null) { _logger.LogError("No pipeline registered for job type '{JobType}' (job {JobId})", job.JobType, job.Id); job.Status = JobStatus.Failed; job.ErrorMessage = $"No pipeline registered for job type '{job.JobType}'."; job.CompletedAt = DateTime.UtcNow; await db.SaveChangesAsync(ct); await hub.Clients.All.SendJobCompleted(job.Id.ToString(), false, job.ErrorMessage); return; } try { await pipeline.ExecuteAsync(job, ct); job.Status = JobStatus.Completed; job.CompletedAt = DateTime.UtcNow; await db.SaveChangesAsync(ct); var summary = $"Job {job.JobType} completed for customer {job.CustomerId}."; _logger.LogInformation("Job {JobId} completed successfully", job.Id); await hub.Clients.All.SendJobCompleted(job.Id.ToString(), true, summary); } catch (Exception ex) { _logger.LogError(ex, "Job {JobId} failed: {Message}", job.Id, ex.Message); job.Status = JobStatus.Failed; job.ErrorMessage = ex.Message; job.CompletedAt = DateTime.UtcNow; await db.SaveChangesAsync(CancellationToken.None); await hub.Clients.All.SendJobCompleted(job.Id.ToString(), false, ex.Message); } } }