feat(import-jobs): persisted ImportJobRegistry в БД (TD-5)

Раньше прогресс фоновых импортов жил в ConcurrentDictionary внутри
Singleton-сервиса: рестарт процесса терял всю историю, активные
джобы навсегда оставались в статусе Running.

Теперь:
- Domain.Integrations.ImportJob (TenantEntity) — таблица import_jobs,
  миграция Phase8c_ImportJobs (jsonb для ErrorsJson, индексы по
  OrgId+StartedAt / OrgId+Status / FinishedAt).
- ImportJobRegistry рефакторен: Create() пишет строку немедленно,
  SaveAsync() обновляет, Get/RecentlyFinished читают из БД. API
  совместимое со старой in-memory версией — MoySkladImportService
  и контроллеры не меняются.
- MoySkladImportController.RunInBackgroundAsync теперь:
  * Periodic flush через Timer каждые 2 секунды — UI видит
    реальный progress (Stage/Created/Total), а не Create-snapshot;
  * Финальный flush в finally — обязательный для terminal state.
- AdminCleanupController.WipeAllAsync — то же финальное сохранение.
- SkipAudit=true для import-job записей — служебные, в OrgAuditLog
  не пишем.

Tenant-isolation: query-filter работает прозрачно, B не видит джоб A.

Тесты: 3 интеграционных (survives across scope, RecentlyFinished
читает из БД, tenant-isolation).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
nns 2026-05-28 16:45:08 +05:00
parent 2f9bbc858f
commit b963adfa2e
7 changed files with 373 additions and 18 deletions

View file

@ -138,6 +138,13 @@ public ActionResult<object> WipeAllAsync()
finally finally
{ {
job.FinishedAt = DateTime.UtcNow; job.FinishedAt = DateTime.UtcNow;
// Финальный flush в БД (persisted progress, TD-5).
try
{
using var s = HttpContextTenantContext.UseOverride(orgId);
await _jobs.SaveAsync(job);
}
catch { /* swallow */ }
} }
}); });
return Ok(new { jobId = job.Id }); return Ok(new { jobId = job.Id });

View file

@ -124,6 +124,20 @@ public async Task<ActionResult<object>> ImportCounterparties([FromBody] ImportRe
{ {
return Task.Run(async () => return Task.Run(async () =>
{ {
// Периодический flush snapshot'а в БД каждые 2 секунды — чтобы UI
// видел actual прогресс (Stage/Created/Total), а не Create-time
// запись до самого финиша. AsyncLocal-override активен в callback'е,
// потому что Timer вызывает наш delegate в этом же ExecutionContext'е.
using var flushTimer = new Timer(_ =>
{
try
{
using var s = HttpContextTenantContext.UseOverride(orgId);
_ = _jobs.SaveAsync(job);
}
catch { /* swallow — следующий тик попробует снова */ }
}, null, dueTime: TimeSpan.FromSeconds(2), period: TimeSpan.FromSeconds(2));
try try
{ {
using var tenantScope = HttpContextTenantContext.UseOverride(orgId); using var tenantScope = HttpContextTenantContext.UseOverride(orgId);
@ -141,6 +155,14 @@ public async Task<ActionResult<object>> ImportCounterparties([FromBody] ImportRe
finally finally
{ {
job.FinishedAt = DateTime.UtcNow; job.FinishedAt = DateTime.UtcNow;
// Финальный flush — обязательный, иначе после рестарта job
// останется со статусом Running навсегда.
try
{
using var tenantScope2 = HttpContextTenantContext.UseOverride(orgId);
await _jobs.SaveAsync(job);
}
catch { /* registry сам логирует */ }
} }
}); });
} }

View file

@ -0,0 +1,49 @@
using foodmarket.Domain.Common;
namespace foodmarket.Domain.Integrations;
public enum ImportJobStatus
{
Running = 0,
Succeeded = 1,
Failed = 2,
Cancelled = 3,
}
/// <summary>Запись прогресса фонового импорта (MoySklad: products /
/// counterparties; cleanup: all). Пишется один раз при старте,
/// обновляется в процессе через <c>ImportJobRegistry.SaveAsync</c>,
/// финализируется в <c>finally</c>-блоке RunInBackgroundAsync.
///
/// Tenant-scoped — каждая орга видит только свои джобы. Errors хранятся
/// JSON-массивом строк (jsonb) — для UI достаточно, для разбора достаточно
/// `(SELECT * FROM import_jobs WHERE Id = ?)`.
///
/// Раньше состояние жило в <c>ConcurrentDictionary</c> внутри Singleton
/// сервиса (TD-5): при рестарте процесса вся история импортов терялась.
/// Теперь — БД, прогресс сохраняется через рестарт.</summary>
public class ImportJob : TenantEntity
{
/// <summary>"products" | "counterparties" | "cleanup-all" | …</summary>
public string Kind { get; set; } = "";
public DateTime StartedAt { get; set; } = DateTime.UtcNow;
public DateTime? FinishedAt { get; set; }
public ImportJobStatus Status { get; set; } = ImportJobStatus.Running;
/// <summary>Описание текущей фазы для UI («Загрузка страниц 3/10…»).</summary>
public string? Stage { get; set; }
public int Total { get; set; }
public int Created { get; set; }
public int Updated { get; set; }
public int Skipped { get; set; }
public int Deleted { get; set; }
public int GroupsCreated { get; set; }
/// <summary>Финальное сообщение / последняя ошибка для UI.</summary>
public string? Message { get; set; }
/// <summary>JSON-массив строк с накопленными ошибками (для разбора).</summary>
public string ErrorsJson { get; set; } = "[]";
}

View file

@ -1,47 +1,142 @@
using System.Collections.Concurrent; using System.Text.Json;
using foodmarket.Domain.Integrations;
using foodmarket.Infrastructure.Persistence;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
namespace foodmarket.Infrastructure.Integrations.MoySklad; namespace foodmarket.Infrastructure.Integrations.MoySklad;
// Алиас на enum для обратной совместимости с кодом, использующим
// MoySklad.ImportJobStatus. Внутренне маппится на Domain.Integrations.
public enum ImportJobStatus { Running, Succeeded, Failed, Cancelled } public enum ImportJobStatus { Running, Succeeded, Failed, Cancelled }
/// <summary>In-memory snapshot persisted <see cref="ImportJob"/> row.
/// Background-таски мутируют поля; периодически (и обязательно в finally)
/// контроллер вызывает <c>SaveAsync</c> чтобы сбросить состояние в БД.
///
/// Раньше всё жило в ConcurrentDictionary (TD-5): рестарт процесса терял
/// прогресс и историю. Теперь это снимок строки БД с тем же API, что был —
/// чтобы не трогать MoySkladImportService.</summary>
public class ImportJobProgress public class ImportJobProgress
{ {
public Guid Id { get; init; } = Guid.NewGuid(); public Guid Id { get; init; } = Guid.NewGuid();
public string Kind { get; init; } = ""; // "products" | "counterparties" public string Kind { get; init; } = "";
public DateTime StartedAt { get; init; } = DateTime.UtcNow; public DateTime StartedAt { get; init; } = DateTime.UtcNow;
public DateTime? FinishedAt { get; set; } public DateTime? FinishedAt { get; set; }
public ImportJobStatus Status { get; set; } = ImportJobStatus.Running; public ImportJobStatus Status { get; set; } = ImportJobStatus.Running;
public string? Stage { get; set; } // человекочитаемое описание текущего шага public string? Stage { get; set; }
public int Total { get; set; } // входящих записей от MS (растёт по мере пейджинга) public int Total { get; set; }
public int Created { get; set; } public int Created { get; set; }
public int Updated { get; set; } public int Updated { get; set; }
public int Skipped { get; set; } public int Skipped { get; set; }
public int Deleted { get; set; } // для cleanup public int Deleted { get; set; }
public int GroupsCreated { get; set; } public int GroupsCreated { get; set; }
public string? Message { get; set; } // последняя ошибка / финальное сообщение public string? Message { get; set; }
public List<string> Errors { get; set; } = []; public List<string> Errors { get; set; } = new();
} }
// Process-memory реестр прогресса фоновых импортов. Один процесс API — одно DI singleton. /// <summary>Persistence-backed реестр прогресса фоновых импортов. Singleton,
// При рестарте контейнера история импортов теряется — для просмотра «вчерашнего» надо /// каждый метод открывает scope для свежего <see cref="AppDbContext"/> с
// смотреть логи. На MVP достаточно. /// текущим tenant'ом (для background задач — через HttpContextTenantContext.UseOverride).
///
/// API совместимое со старой in-memory версией: Create/Get/RecentlyFinished
/// плюс новый <c>SaveAsync</c> — сейвить snapshot в БД на checkpoint'ах
/// (рекомендуется вызывать как минимум раз в финальном finally-блоке).</summary>
public class ImportJobRegistry public class ImportJobRegistry
{ {
private readonly ConcurrentDictionary<Guid, ImportJobProgress> _jobs = new(); private readonly IServiceScopeFactory _scopes;
public ImportJobRegistry(IServiceScopeFactory scopes)
{
_scopes = scopes;
}
public ImportJobProgress Create(string kind) public ImportJobProgress Create(string kind)
{ {
var job = new ImportJobProgress { Kind = kind }; var snap = new ImportJobProgress { Kind = kind };
_jobs[job.Id] = job; using var scope = _scopes.CreateScope();
return job; var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
db.SkipAudit = true;
db.Set<ImportJob>().Add(ToEntity(snap));
db.SaveChanges();
return snap;
} }
public ImportJobProgress? Get(Guid id) => _jobs.TryGetValue(id, out var j) ? j : null; public async Task SaveAsync(ImportJobProgress snap, CancellationToken ct = default)
{
using var scope = _scopes.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
db.SkipAudit = true;
var existing = await db.Set<ImportJob>().FirstOrDefaultAsync(j => j.Id == snap.Id, ct);
if (existing is null)
{
db.Set<ImportJob>().Add(ToEntity(snap));
}
else
{
existing.Kind = snap.Kind;
existing.StartedAt = snap.StartedAt;
existing.FinishedAt = snap.FinishedAt;
existing.Status = (Domain.Integrations.ImportJobStatus)snap.Status;
existing.Stage = snap.Stage;
existing.Total = snap.Total;
existing.Created = snap.Created;
existing.Updated = snap.Updated;
existing.Skipped = snap.Skipped;
existing.Deleted = snap.Deleted;
existing.GroupsCreated = snap.GroupsCreated;
existing.Message = snap.Message;
existing.ErrorsJson = JsonSerializer.Serialize(snap.Errors);
}
await db.SaveChangesAsync(ct);
}
public IReadOnlyList<ImportJobProgress> RecentlyFinished(int take = 10) => public ImportJobProgress? Get(Guid id)
_jobs.Values {
.Where(j => j.FinishedAt is not null) using var scope = _scopes.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var row = db.Set<ImportJob>().AsNoTracking().FirstOrDefault(j => j.Id == id);
return row is null ? null : FromEntity(row);
}
public IReadOnlyList<ImportJobProgress> RecentlyFinished(int take = 10)
{
using var scope = _scopes.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var rows = db.Set<ImportJob>().AsNoTracking()
.Where(j => j.FinishedAt != null)
.OrderByDescending(j => j.FinishedAt) .OrderByDescending(j => j.FinishedAt)
.Take(take) .Take(take)
.ToList(); .ToList();
return rows.Select(FromEntity).ToList();
}
private static ImportJob ToEntity(ImportJobProgress s) => new()
{
Id = s.Id,
Kind = s.Kind,
StartedAt = s.StartedAt,
FinishedAt = s.FinishedAt,
Status = (Domain.Integrations.ImportJobStatus)s.Status,
Stage = s.Stage,
Total = s.Total, Created = s.Created, Updated = s.Updated,
Skipped = s.Skipped, Deleted = s.Deleted, GroupsCreated = s.GroupsCreated,
Message = s.Message,
ErrorsJson = JsonSerializer.Serialize(s.Errors),
};
private static ImportJobProgress FromEntity(ImportJob e) => new()
{
Id = e.Id,
Kind = e.Kind,
StartedAt = e.StartedAt,
FinishedAt = e.FinishedAt,
Status = (ImportJobStatus)(int)e.Status,
Stage = e.Stage,
Total = e.Total, Created = e.Created, Updated = e.Updated,
Skipped = e.Skipped, Deleted = e.Deleted, GroupsCreated = e.GroupsCreated,
Message = e.Message,
Errors = string.IsNullOrEmpty(e.ErrorsJson) ? new()
: (JsonSerializer.Deserialize<List<string>>(e.ErrorsJson) ?? new()),
};
} }

View file

@ -71,6 +71,7 @@ public AppDbContext(DbContextOptions<AppDbContext> options, ITenantContext tenan
public DbSet<EmployeeRetailPointAssignment> EmployeeRetailPointAssignments => Set<EmployeeRetailPointAssignment>(); public DbSet<EmployeeRetailPointAssignment> EmployeeRetailPointAssignments => Set<EmployeeRetailPointAssignment>();
public DbSet<SuperAdminAuditLog> SuperAdminAuditLogs => Set<SuperAdminAuditLog>(); public DbSet<SuperAdminAuditLog> SuperAdminAuditLogs => Set<SuperAdminAuditLog>();
public DbSet<OrgAuditLog> OrgAuditLogs => Set<OrgAuditLog>(); public DbSet<OrgAuditLog> OrgAuditLogs => Set<OrgAuditLog>();
public DbSet<foodmarket.Domain.Integrations.ImportJob> ImportJobs => Set<foodmarket.Domain.Integrations.ImportJob>();
/// <summary>Если true — <see cref="OrgAuditInterceptor"/> не пишет audit-строки /// <summary>Если true — <see cref="OrgAuditInterceptor"/> не пишет audit-строки
/// для этого SaveChanges. Используется сидерами/миграциями, фоновыми /// для этого SaveChanges. Используется сидерами/миграциями, фоновыми
@ -147,6 +148,18 @@ protected override void OnModelCreating(ModelBuilder builder)
b.HasIndex(x => new { x.OrganizationId, x.UserId, x.CreatedAt }); b.HasIndex(x => new { x.OrganizationId, x.UserId, x.CreatedAt });
}); });
builder.Entity<foodmarket.Domain.Integrations.ImportJob>(b =>
{
b.ToTable("import_jobs");
b.Property(x => x.Kind).HasMaxLength(50).IsRequired();
b.Property(x => x.Stage).HasMaxLength(500);
b.Property(x => x.Message).HasMaxLength(2000);
b.Property(x => x.ErrorsJson).HasColumnType("jsonb").IsRequired();
b.HasIndex(x => new { x.OrganizationId, x.StartedAt });
b.HasIndex(x => new { x.OrganizationId, x.Status });
b.HasIndex(x => x.FinishedAt);
});
builder.ConfigureCatalog(); builder.ConfigureCatalog();
builder.ConfigureInventory(); builder.ConfigureInventory();
builder.ConfigurePurchases(); builder.ConfigurePurchases();

View file

@ -0,0 +1,55 @@
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using foodmarket.Infrastructure.Persistence;
#nullable disable
namespace foodmarket.Infrastructure.Persistence.Migrations
{
/// <summary>Phase8c — таблица import_jobs для persisted прогресса фоновых
/// импортов из MoySklad. Раньше состояние жило в ConcurrentDictionary
/// внутри Singleton-сервиса, рестарт процесса терял всё (TD-5).
/// Теперь Create() пишет строку немедленно, SaveAsync() обновляет,
/// AdminJobsController читает из БД.</summary>
[DbContext(typeof(AppDbContext))]
[Migration("20260528220000_Phase8c_ImportJobs")]
public partial class Phase8c_ImportJobs : Migration
{
protected override void Up(MigrationBuilder b)
{
b.Sql(@"
CREATE TABLE IF NOT EXISTS public.import_jobs (
""Id"" uuid PRIMARY KEY,
""OrganizationId"" uuid NOT NULL,
""Kind"" varchar(50) NOT NULL,
""StartedAt"" timestamp with time zone NOT NULL,
""FinishedAt"" timestamp with time zone,
""Status"" integer NOT NULL,
""Stage"" varchar(500),
""Total"" integer NOT NULL DEFAULT 0,
""Created"" integer NOT NULL DEFAULT 0,
""Updated"" integer NOT NULL DEFAULT 0,
""Skipped"" integer NOT NULL DEFAULT 0,
""Deleted"" integer NOT NULL DEFAULT 0,
""GroupsCreated"" integer NOT NULL DEFAULT 0,
""Message"" varchar(2000),
""ErrorsJson"" jsonb NOT NULL DEFAULT '[]'::jsonb,
""CreatedAt"" timestamp with time zone NOT NULL,
""UpdatedAt"" timestamp with time zone
);
CREATE INDEX IF NOT EXISTS ""IX_import_jobs_OrganizationId_StartedAt""
ON public.import_jobs (""OrganizationId"", ""StartedAt"");
CREATE INDEX IF NOT EXISTS ""IX_import_jobs_OrganizationId_Status""
ON public.import_jobs (""OrganizationId"", ""Status"");
CREATE INDEX IF NOT EXISTS ""IX_import_jobs_FinishedAt""
ON public.import_jobs (""FinishedAt"");
");
}
protected override void Down(MigrationBuilder b)
{
b.Sql(@"DROP TABLE IF EXISTS public.import_jobs;");
}
}
}

View file

@ -0,0 +1,114 @@
using FluentAssertions;
using foodmarket.Application.Common.Tenancy;
using foodmarket.Infrastructure.Integrations.MoySklad;
using foodmarket.Infrastructure.Persistence;
// Двусмысленность ImportJobStatus: тесты используют только in-process snapshot
// API (registry.Create/Save/Get), поэтому ссылаемся на MoySklad-namespace.
using ImportJobStatus = foodmarket.Infrastructure.Integrations.MoySklad.ImportJobStatus;
using foodmarket.IntegrationTests.Support;
using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
namespace foodmarket.IntegrationTests;
/// <summary>TD-5: ImportJob теперь persisted в БД. Тест проверяет, что
/// прогресс сохраняется через границу «рестарта реестра» (новый scope =
/// новая ConcurrentDictionary в старой версии) — но мы читаем из БД,
/// поэтому job остаётся видимым.</summary>
[Collection(ApiCollection.Name)]
public class ImportJobPersistenceTests
{
private readonly ApiFactory _factory;
public ImportJobPersistenceTests(ApiFactory factory) => _factory = factory;
[Fact]
public async Task Created_job_survives_across_registry_instances()
{
// 1) Сигнин чтобы получить orgId, потом руками используем registry.
var api = new ApiActor(_factory.CreateClient());
await api.SignupAndLoginAsync($"impjob-{Guid.NewGuid():N}");
var orgId = await GetOrgIdAsync(api);
using var scope1 = _factory.Services.CreateScope();
var registry = _factory.Services.GetRequiredService<ImportJobRegistry>();
Guid jobId;
using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgId))
{
var job = registry.Create("products");
job.Stage = "Импорт страниц 3/10";
job.Total = 100;
job.Created = 30;
await registry.SaveAsync(job);
jobId = job.Id;
}
// 2) В новом scope (имитация после-рестарта) Get(id) тянет из БД,
// не из in-memory ConcurrentDictionary. State сохранён.
using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgId))
{
var reloaded = registry.Get(jobId);
reloaded.Should().NotBeNull();
reloaded!.Kind.Should().Be("products");
reloaded.Stage.Should().Be("Импорт страниц 3/10");
reloaded.Total.Should().Be(100);
reloaded.Created.Should().Be(30);
reloaded.Status.Should().Be(ImportJobStatus.Running);
}
}
[Fact]
public async Task RecentlyFinished_returns_completed_jobs_from_db()
{
var api = new ApiActor(_factory.CreateClient());
await api.SignupAndLoginAsync($"impjob-rf-{Guid.NewGuid():N}");
var orgId = await GetOrgIdAsync(api);
var registry = _factory.Services.GetRequiredService<ImportJobRegistry>();
using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgId))
{
var job = registry.Create("products");
job.Status = ImportJobStatus.Succeeded;
job.FinishedAt = DateTime.UtcNow;
job.Created = 5;
await registry.SaveAsync(job);
var finished = registry.RecentlyFinished(10);
finished.Should().Contain(j => j.Id == job.Id && j.Status == ImportJobStatus.Succeeded);
}
}
[Fact]
public async Task Tenant_isolation_for_import_jobs()
{
var a = new ApiActor(_factory.CreateClient());
var b = new ApiActor(_factory.CreateClient());
await a.SignupAndLoginAsync($"impjob-iso-a-{Guid.NewGuid():N}");
await b.SignupAndLoginAsync($"impjob-iso-b-{Guid.NewGuid():N}");
var orgA = await GetOrgIdAsync(a);
var orgB = await GetOrgIdAsync(b);
var registry = _factory.Services.GetRequiredService<ImportJobRegistry>();
Guid jobIdA;
using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgA))
{
var job = registry.Create("products");
await registry.SaveAsync(job);
jobIdA = job.Id;
}
// B не видит джоб A через registry.Get (query-filter по OrganizationId).
using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgB))
{
registry.Get(jobIdA).Should().BeNull();
}
}
private static async Task<Guid> GetOrgIdAsync(ApiActor api)
{
var me = await api.GetJsonAsync("/api/me");
return Guid.Parse(me.GetProperty("orgId").GetString()!);
}
}