From b963adfa2e26d1504c56b1549819394c17f408d0 Mon Sep 17 00:00:00 2001 From: nns Date: Thu, 28 May 2026 16:45:08 +0500 Subject: [PATCH] =?UTF-8?q?feat(import-jobs):=20persisted=20ImportJobRegis?= =?UTF-8?q?try=20=D0=B2=20=D0=91=D0=94=20(TD-5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Раньше прогресс фоновых импортов жил в ConcurrentDictionary внутри Singleton-сервиса: рестарт процесса терял всю историю, активные джобы навсегда оставались в статусе Running. Теперь: - Domain.Integrations.ImportJob (TenantEntity) — таблица import_jobs, миграция Phase8c_ImportJobs (jsonb для ErrorsJson, индексы по OrgId+StartedAt / OrgId+Status / FinishedAt). - ImportJobRegistry рефакторен: Create() пишет строку немедленно, SaveAsync() обновляет, Get/RecentlyFinished читают из БД. API совместимое со старой in-memory версией — MoySkladImportService и контроллеры не меняются. - MoySkladImportController.RunInBackgroundAsync теперь: * Periodic flush через Timer каждые 2 секунды — UI видит реальный progress (Stage/Created/Total), а не Create-snapshot; * Финальный flush в finally — обязательный для terminal state. - AdminCleanupController.WipeAllAsync — то же финальное сохранение. - SkipAudit=true для import-job записей — служебные, в OrgAuditLog не пишем. Tenant-isolation: query-filter работает прозрачно, B не видит джоб A. Тесты: 3 интеграционных (survives across scope, RecentlyFinished читает из БД, tenant-isolation). Co-Authored-By: Claude Opus 4.7 --- .../Admin/AdminCleanupController.cs | 7 + .../Admin/MoySkladImportController.cs | 22 +++ .../Integrations/ImportJob.cs | 49 +++++++ .../MoySklad/ImportJobRegistry.cs | 131 +++++++++++++++--- .../Persistence/AppDbContext.cs | 13 ++ .../20260528220000_Phase8c_ImportJobs.cs | 55 ++++++++ .../ImportJobPersistenceTests.cs | 114 +++++++++++++++ 7 files changed, 373 insertions(+), 18 deletions(-) create mode 100644 src/food-market.domain/Integrations/ImportJob.cs create mode 100644 src/food-market.infrastructure/Persistence/Migrations/20260528220000_Phase8c_ImportJobs.cs create mode 100644 tests/food-market.IntegrationTests/ImportJobPersistenceTests.cs diff --git a/src/food-market.api/Controllers/Admin/AdminCleanupController.cs b/src/food-market.api/Controllers/Admin/AdminCleanupController.cs index 076b509..e1d8a67 100644 --- a/src/food-market.api/Controllers/Admin/AdminCleanupController.cs +++ b/src/food-market.api/Controllers/Admin/AdminCleanupController.cs @@ -138,6 +138,13 @@ public ActionResult WipeAllAsync() finally { job.FinishedAt = DateTime.UtcNow; + // Финальный flush в БД (persisted progress, TD-5). + try + { + using var s = HttpContextTenantContext.UseOverride(orgId); + await _jobs.SaveAsync(job); + } + catch { /* swallow */ } } }); return Ok(new { jobId = job.Id }); diff --git a/src/food-market.api/Controllers/Admin/MoySkladImportController.cs b/src/food-market.api/Controllers/Admin/MoySkladImportController.cs index 80d93b7..0c45ae4 100644 --- a/src/food-market.api/Controllers/Admin/MoySkladImportController.cs +++ b/src/food-market.api/Controllers/Admin/MoySkladImportController.cs @@ -124,6 +124,20 @@ public async Task> ImportCounterparties([FromBody] ImportRe { return Task.Run(async () => { + // Периодический flush snapshot'а в БД каждые 2 секунды — чтобы UI + // видел actual прогресс (Stage/Created/Total), а не Create-time + // запись до самого финиша. AsyncLocal-override активен в callback'е, + // потому что Timer вызывает наш delegate в этом же ExecutionContext'е. + using var flushTimer = new Timer(_ => + { + try + { + using var s = HttpContextTenantContext.UseOverride(orgId); + _ = _jobs.SaveAsync(job); + } + catch { /* swallow — следующий тик попробует снова */ } + }, null, dueTime: TimeSpan.FromSeconds(2), period: TimeSpan.FromSeconds(2)); + try { using var tenantScope = HttpContextTenantContext.UseOverride(orgId); @@ -141,6 +155,14 @@ public async Task> ImportCounterparties([FromBody] ImportRe finally { job.FinishedAt = DateTime.UtcNow; + // Финальный flush — обязательный, иначе после рестарта job + // останется со статусом Running навсегда. + try + { + using var tenantScope2 = HttpContextTenantContext.UseOverride(orgId); + await _jobs.SaveAsync(job); + } + catch { /* registry сам логирует */ } } }); } diff --git a/src/food-market.domain/Integrations/ImportJob.cs b/src/food-market.domain/Integrations/ImportJob.cs new file mode 100644 index 0000000..aea9d76 --- /dev/null +++ b/src/food-market.domain/Integrations/ImportJob.cs @@ -0,0 +1,49 @@ +using foodmarket.Domain.Common; + +namespace foodmarket.Domain.Integrations; + +public enum ImportJobStatus +{ + Running = 0, + Succeeded = 1, + Failed = 2, + Cancelled = 3, +} + +/// Запись прогресса фонового импорта (MoySklad: products / +/// counterparties; cleanup: all). Пишется один раз при старте, +/// обновляется в процессе через ImportJobRegistry.SaveAsync, +/// финализируется в finally-блоке RunInBackgroundAsync. +/// +/// Tenant-scoped — каждая орга видит только свои джобы. Errors хранятся +/// JSON-массивом строк (jsonb) — для UI достаточно, для разбора достаточно +/// `(SELECT * FROM import_jobs WHERE Id = ?)`. +/// +/// Раньше состояние жило в ConcurrentDictionary внутри Singleton +/// сервиса (TD-5): при рестарте процесса вся история импортов терялась. +/// Теперь — БД, прогресс сохраняется через рестарт. +public class ImportJob : TenantEntity +{ + /// "products" | "counterparties" | "cleanup-all" | … + public string Kind { get; set; } = ""; + + public DateTime StartedAt { get; set; } = DateTime.UtcNow; + public DateTime? FinishedAt { get; set; } + public ImportJobStatus Status { get; set; } = ImportJobStatus.Running; + + /// Описание текущей фазы для UI («Загрузка страниц 3/10…»). + public string? Stage { get; set; } + + public int Total { get; set; } + public int Created { get; set; } + public int Updated { get; set; } + public int Skipped { get; set; } + public int Deleted { get; set; } + public int GroupsCreated { get; set; } + + /// Финальное сообщение / последняя ошибка для UI. + public string? Message { get; set; } + + /// JSON-массив строк с накопленными ошибками (для разбора). + public string ErrorsJson { get; set; } = "[]"; +} diff --git a/src/food-market.infrastructure/Integrations/MoySklad/ImportJobRegistry.cs b/src/food-market.infrastructure/Integrations/MoySklad/ImportJobRegistry.cs index ca27e98..be11566 100644 --- a/src/food-market.infrastructure/Integrations/MoySklad/ImportJobRegistry.cs +++ b/src/food-market.infrastructure/Integrations/MoySklad/ImportJobRegistry.cs @@ -1,47 +1,142 @@ -using System.Collections.Concurrent; +using System.Text.Json; +using foodmarket.Domain.Integrations; +using foodmarket.Infrastructure.Persistence; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; namespace foodmarket.Infrastructure.Integrations.MoySklad; +// Алиас на enum для обратной совместимости с кодом, использующим +// MoySklad.ImportJobStatus. Внутренне маппится на Domain.Integrations. public enum ImportJobStatus { Running, Succeeded, Failed, Cancelled } +/// In-memory snapshot persisted row. +/// Background-таски мутируют поля; периодически (и обязательно в finally) +/// контроллер вызывает SaveAsync чтобы сбросить состояние в БД. +/// +/// Раньше всё жило в ConcurrentDictionary (TD-5): рестарт процесса терял +/// прогресс и историю. Теперь это снимок строки БД с тем же API, что был — +/// чтобы не трогать MoySkladImportService. public class ImportJobProgress { public Guid Id { get; init; } = Guid.NewGuid(); - public string Kind { get; init; } = ""; // "products" | "counterparties" + public string Kind { get; init; } = ""; public DateTime StartedAt { get; init; } = DateTime.UtcNow; public DateTime? FinishedAt { get; set; } public ImportJobStatus Status { get; set; } = ImportJobStatus.Running; - public string? Stage { get; set; } // человекочитаемое описание текущего шага - public int Total { get; set; } // входящих записей от MS (растёт по мере пейджинга) + public string? Stage { get; set; } + public int Total { get; set; } public int Created { get; set; } public int Updated { get; set; } public int Skipped { get; set; } - public int Deleted { get; set; } // для cleanup + public int Deleted { get; set; } public int GroupsCreated { get; set; } - public string? Message { get; set; } // последняя ошибка / финальное сообщение - public List Errors { get; set; } = []; + public string? Message { get; set; } + public List Errors { get; set; } = new(); } -// Process-memory реестр прогресса фоновых импортов. Один процесс API — одно DI singleton. -// При рестарте контейнера история импортов теряется — для просмотра «вчерашнего» надо -// смотреть логи. На MVP достаточно. +/// Persistence-backed реестр прогресса фоновых импортов. Singleton, +/// каждый метод открывает scope для свежего с +/// текущим tenant'ом (для background задач — через HttpContextTenantContext.UseOverride). +/// +/// API совместимое со старой in-memory версией: Create/Get/RecentlyFinished +/// плюс новый SaveAsync — сейвить snapshot в БД на checkpoint'ах +/// (рекомендуется вызывать как минимум раз в финальном finally-блоке). public class ImportJobRegistry { - private readonly ConcurrentDictionary _jobs = new(); + private readonly IServiceScopeFactory _scopes; + + public ImportJobRegistry(IServiceScopeFactory scopes) + { + _scopes = scopes; + } public ImportJobProgress Create(string kind) { - var job = new ImportJobProgress { Kind = kind }; - _jobs[job.Id] = job; - return job; + var snap = new ImportJobProgress { Kind = kind }; + using var scope = _scopes.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + db.SkipAudit = true; + db.Set().Add(ToEntity(snap)); + db.SaveChanges(); + return snap; } - public ImportJobProgress? Get(Guid id) => _jobs.TryGetValue(id, out var j) ? j : null; + public async Task SaveAsync(ImportJobProgress snap, CancellationToken ct = default) + { + using var scope = _scopes.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + db.SkipAudit = true; + var existing = await db.Set().FirstOrDefaultAsync(j => j.Id == snap.Id, ct); + if (existing is null) + { + db.Set().Add(ToEntity(snap)); + } + else + { + existing.Kind = snap.Kind; + existing.StartedAt = snap.StartedAt; + existing.FinishedAt = snap.FinishedAt; + existing.Status = (Domain.Integrations.ImportJobStatus)snap.Status; + existing.Stage = snap.Stage; + existing.Total = snap.Total; + existing.Created = snap.Created; + existing.Updated = snap.Updated; + existing.Skipped = snap.Skipped; + existing.Deleted = snap.Deleted; + existing.GroupsCreated = snap.GroupsCreated; + existing.Message = snap.Message; + existing.ErrorsJson = JsonSerializer.Serialize(snap.Errors); + } + await db.SaveChangesAsync(ct); + } - public IReadOnlyList RecentlyFinished(int take = 10) => - _jobs.Values - .Where(j => j.FinishedAt is not null) + public ImportJobProgress? Get(Guid id) + { + using var scope = _scopes.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var row = db.Set().AsNoTracking().FirstOrDefault(j => j.Id == id); + return row is null ? null : FromEntity(row); + } + + public IReadOnlyList RecentlyFinished(int take = 10) + { + using var scope = _scopes.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var rows = db.Set().AsNoTracking() + .Where(j => j.FinishedAt != null) .OrderByDescending(j => j.FinishedAt) .Take(take) .ToList(); + return rows.Select(FromEntity).ToList(); + } + + private static ImportJob ToEntity(ImportJobProgress s) => new() + { + Id = s.Id, + Kind = s.Kind, + StartedAt = s.StartedAt, + FinishedAt = s.FinishedAt, + Status = (Domain.Integrations.ImportJobStatus)s.Status, + Stage = s.Stage, + Total = s.Total, Created = s.Created, Updated = s.Updated, + Skipped = s.Skipped, Deleted = s.Deleted, GroupsCreated = s.GroupsCreated, + Message = s.Message, + ErrorsJson = JsonSerializer.Serialize(s.Errors), + }; + + private static ImportJobProgress FromEntity(ImportJob e) => new() + { + Id = e.Id, + Kind = e.Kind, + StartedAt = e.StartedAt, + FinishedAt = e.FinishedAt, + Status = (ImportJobStatus)(int)e.Status, + Stage = e.Stage, + Total = e.Total, Created = e.Created, Updated = e.Updated, + Skipped = e.Skipped, Deleted = e.Deleted, GroupsCreated = e.GroupsCreated, + Message = e.Message, + Errors = string.IsNullOrEmpty(e.ErrorsJson) ? new() + : (JsonSerializer.Deserialize>(e.ErrorsJson) ?? new()), + }; } diff --git a/src/food-market.infrastructure/Persistence/AppDbContext.cs b/src/food-market.infrastructure/Persistence/AppDbContext.cs index 084d951..85c1f1a 100644 --- a/src/food-market.infrastructure/Persistence/AppDbContext.cs +++ b/src/food-market.infrastructure/Persistence/AppDbContext.cs @@ -71,6 +71,7 @@ public AppDbContext(DbContextOptions options, ITenantContext tenan public DbSet EmployeeRetailPointAssignments => Set(); public DbSet SuperAdminAuditLogs => Set(); public DbSet OrgAuditLogs => Set(); + public DbSet ImportJobs => Set(); /// Если true — не пишет audit-строки /// для этого SaveChanges. Используется сидерами/миграциями, фоновыми @@ -147,6 +148,18 @@ protected override void OnModelCreating(ModelBuilder builder) b.HasIndex(x => new { x.OrganizationId, x.UserId, x.CreatedAt }); }); + builder.Entity(b => + { + b.ToTable("import_jobs"); + b.Property(x => x.Kind).HasMaxLength(50).IsRequired(); + b.Property(x => x.Stage).HasMaxLength(500); + b.Property(x => x.Message).HasMaxLength(2000); + b.Property(x => x.ErrorsJson).HasColumnType("jsonb").IsRequired(); + b.HasIndex(x => new { x.OrganizationId, x.StartedAt }); + b.HasIndex(x => new { x.OrganizationId, x.Status }); + b.HasIndex(x => x.FinishedAt); + }); + builder.ConfigureCatalog(); builder.ConfigureInventory(); builder.ConfigurePurchases(); diff --git a/src/food-market.infrastructure/Persistence/Migrations/20260528220000_Phase8c_ImportJobs.cs b/src/food-market.infrastructure/Persistence/Migrations/20260528220000_Phase8c_ImportJobs.cs new file mode 100644 index 0000000..ae89a0f --- /dev/null +++ b/src/food-market.infrastructure/Persistence/Migrations/20260528220000_Phase8c_ImportJobs.cs @@ -0,0 +1,55 @@ +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using foodmarket.Infrastructure.Persistence; + +#nullable disable + +namespace foodmarket.Infrastructure.Persistence.Migrations +{ + /// Phase8c — таблица import_jobs для persisted прогресса фоновых + /// импортов из MoySklad. Раньше состояние жило в ConcurrentDictionary + /// внутри Singleton-сервиса, рестарт процесса терял всё (TD-5). + /// Теперь Create() пишет строку немедленно, SaveAsync() обновляет, + /// AdminJobsController читает из БД. + [DbContext(typeof(AppDbContext))] + [Migration("20260528220000_Phase8c_ImportJobs")] + public partial class Phase8c_ImportJobs : Migration + { + protected override void Up(MigrationBuilder b) + { + b.Sql(@" + CREATE TABLE IF NOT EXISTS public.import_jobs ( + ""Id"" uuid PRIMARY KEY, + ""OrganizationId"" uuid NOT NULL, + ""Kind"" varchar(50) NOT NULL, + ""StartedAt"" timestamp with time zone NOT NULL, + ""FinishedAt"" timestamp with time zone, + ""Status"" integer NOT NULL, + ""Stage"" varchar(500), + ""Total"" integer NOT NULL DEFAULT 0, + ""Created"" integer NOT NULL DEFAULT 0, + ""Updated"" integer NOT NULL DEFAULT 0, + ""Skipped"" integer NOT NULL DEFAULT 0, + ""Deleted"" integer NOT NULL DEFAULT 0, + ""GroupsCreated"" integer NOT NULL DEFAULT 0, + ""Message"" varchar(2000), + ""ErrorsJson"" jsonb NOT NULL DEFAULT '[]'::jsonb, + ""CreatedAt"" timestamp with time zone NOT NULL, + ""UpdatedAt"" timestamp with time zone + ); + + CREATE INDEX IF NOT EXISTS ""IX_import_jobs_OrganizationId_StartedAt"" + ON public.import_jobs (""OrganizationId"", ""StartedAt""); + CREATE INDEX IF NOT EXISTS ""IX_import_jobs_OrganizationId_Status"" + ON public.import_jobs (""OrganizationId"", ""Status""); + CREATE INDEX IF NOT EXISTS ""IX_import_jobs_FinishedAt"" + ON public.import_jobs (""FinishedAt""); + "); + } + + protected override void Down(MigrationBuilder b) + { + b.Sql(@"DROP TABLE IF EXISTS public.import_jobs;"); + } + } +} diff --git a/tests/food-market.IntegrationTests/ImportJobPersistenceTests.cs b/tests/food-market.IntegrationTests/ImportJobPersistenceTests.cs new file mode 100644 index 0000000..ece77dc --- /dev/null +++ b/tests/food-market.IntegrationTests/ImportJobPersistenceTests.cs @@ -0,0 +1,114 @@ +using FluentAssertions; +using foodmarket.Application.Common.Tenancy; +using foodmarket.Infrastructure.Integrations.MoySklad; +using foodmarket.Infrastructure.Persistence; +// Двусмысленность ImportJobStatus: тесты используют только in-process snapshot +// API (registry.Create/Save/Get), поэтому ссылаемся на MoySklad-namespace. +using ImportJobStatus = foodmarket.Infrastructure.Integrations.MoySklad.ImportJobStatus; +using foodmarket.IntegrationTests.Support; +using Microsoft.AspNetCore.Mvc.Testing; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace foodmarket.IntegrationTests; + +/// TD-5: ImportJob теперь persisted в БД. Тест проверяет, что +/// прогресс сохраняется через границу «рестарта реестра» (новый scope = +/// новая ConcurrentDictionary в старой версии) — но мы читаем из БД, +/// поэтому job остаётся видимым. +[Collection(ApiCollection.Name)] +public class ImportJobPersistenceTests +{ + private readonly ApiFactory _factory; + public ImportJobPersistenceTests(ApiFactory factory) => _factory = factory; + + [Fact] + public async Task Created_job_survives_across_registry_instances() + { + // 1) Сигнин чтобы получить orgId, потом руками используем registry. + var api = new ApiActor(_factory.CreateClient()); + await api.SignupAndLoginAsync($"impjob-{Guid.NewGuid():N}"); + var orgId = await GetOrgIdAsync(api); + + using var scope1 = _factory.Services.CreateScope(); + var registry = _factory.Services.GetRequiredService(); + + Guid jobId; + using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgId)) + { + var job = registry.Create("products"); + job.Stage = "Импорт страниц 3/10"; + job.Total = 100; + job.Created = 30; + await registry.SaveAsync(job); + jobId = job.Id; + } + + // 2) В новом scope (имитация после-рестарта) Get(id) тянет из БД, + // не из in-memory ConcurrentDictionary. State сохранён. + using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgId)) + { + var reloaded = registry.Get(jobId); + reloaded.Should().NotBeNull(); + reloaded!.Kind.Should().Be("products"); + reloaded.Stage.Should().Be("Импорт страниц 3/10"); + reloaded.Total.Should().Be(100); + reloaded.Created.Should().Be(30); + reloaded.Status.Should().Be(ImportJobStatus.Running); + } + } + + [Fact] + public async Task RecentlyFinished_returns_completed_jobs_from_db() + { + var api = new ApiActor(_factory.CreateClient()); + await api.SignupAndLoginAsync($"impjob-rf-{Guid.NewGuid():N}"); + var orgId = await GetOrgIdAsync(api); + + var registry = _factory.Services.GetRequiredService(); + using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgId)) + { + var job = registry.Create("products"); + job.Status = ImportJobStatus.Succeeded; + job.FinishedAt = DateTime.UtcNow; + job.Created = 5; + await registry.SaveAsync(job); + + var finished = registry.RecentlyFinished(10); + finished.Should().Contain(j => j.Id == job.Id && j.Status == ImportJobStatus.Succeeded); + } + } + + [Fact] + public async Task Tenant_isolation_for_import_jobs() + { + var a = new ApiActor(_factory.CreateClient()); + var b = new ApiActor(_factory.CreateClient()); + await a.SignupAndLoginAsync($"impjob-iso-a-{Guid.NewGuid():N}"); + await b.SignupAndLoginAsync($"impjob-iso-b-{Guid.NewGuid():N}"); + var orgA = await GetOrgIdAsync(a); + var orgB = await GetOrgIdAsync(b); + + var registry = _factory.Services.GetRequiredService(); + Guid jobIdA; + using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgA)) + { + var job = registry.Create("products"); + await registry.SaveAsync(job); + jobIdA = job.Id; + } + + // B не видит джоб A через registry.Get (query-filter по OrganizationId). + using (foodmarket.Api.Infrastructure.Tenancy.HttpContextTenantContext.UseOverride(orgB)) + { + registry.Get(jobIdA).Should().BeNull(); + } + } + + private static async Task GetOrgIdAsync(ApiActor api) + { + var me = await api.GetJsonAsync("/api/me"); + return Guid.Parse(me.GetProperty("orgId").GetString()!); + } +}