food-market/src/food-market.infrastructure/Integrations/MoySklad/MoySkladImportService.cs
nurdotnet 2fc6d207f3
Some checks are pending
CI / POS (WPF, Windows) (push) Waiting to run
CI / Backend (.NET 8) (push) Successful in 36s
CI / Web (React + Vite) (push) Successful in 23s
Docker Images / API image (push) Successful in 42s
Docker Images / Web image (push) Successful in 25s
Docker Images / Deploy stage (push) Successful in 18s
feat(moysklad-import): async jobs с прогрессом + токен в настройках
Pain points:
1. Импорт на ~30k товарах проходит 15-30 мин, nginx рвал на 60s → 504.
2. При импорте/очистке ничего не видно — ни счётчика, ни прогресса.
3. Токен приходилось вводить каждый раз вручную.

Фиксы:
- Async-job pattern: POST /api/admin/moysklad/import-products и
  /api/admin/cleanup/all/async возвращают jobId, реальная работа
  в Task.Run. GET /api/admin/jobs/{id} — статус +
  Total/Created/Updated/Skipped/Deleted/Stage/Message.
- ImportJobRegistry (singleton, in-memory) — хранит job-progress.
- MoySkladImportService обновляет progress по мере пейджинга
  (в т.ч. счётчик Created/Updated/Skipped).
- Cleanup разбит на именованные шаги, Stage меняется по мере
  "Товары…" → "Группы…" → "Контрагенты…".
- Токен per-organization: Organization.MoySkladToken + миграция
  Phase3_OrganizationMoySkladToken. Endpoints:
  GET/PUT /api/admin/moysklad/settings.
- Импорт-endpoints больше не требуют token в теле — берут из org.
- HttpContextTenantContext.UseOverride(orgId) — AsyncLocal-scope
  для background tasks (HttpContext там нет, а query-filter'у нужен
  orgId — ставим через override).

Nginx (host + web-container) получил 60-минутный timeout на
/api/admin/import/ чтобы старый sync-путь тоже не ронять (на
случай если кто-то вернёт sync call).

Web:
- MoySkladImportPage переработан: блок "Токен API" (save/test
  mask), блок импорта с кнопками без поля токена.
- JobCard с polling каждые 1.5s отображает живые счётчики и stage.
- DangerZone тоже теперь async с live-прогрессом.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:49:11 +05:00

347 lines
16 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using foodmarket.Application.Common.Tenancy;
using foodmarket.Domain.Catalog;
using foodmarket.Infrastructure.Persistence;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace foodmarket.Infrastructure.Integrations.MoySklad;
public record MoySkladImportResult(
int Total,
int Created,
int Skipped,
int GroupsCreated,
IReadOnlyList<string> Errors);
public class MoySkladImportService
{
private readonly MoySkladClient _client;
private readonly AppDbContext _db;
private readonly ITenantContext _tenant;
private readonly ILogger<MoySkladImportService> _log;
public MoySkladImportService(
MoySkladClient client,
AppDbContext db,
ITenantContext tenant,
ILogger<MoySkladImportService> log)
{
_client = client;
_db = db;
_tenant = tenant;
_log = log;
}
public Task<MoySkladApiResult<MsOrganization>> TestConnectionAsync(string token, CancellationToken ct)
=> _client.WhoAmIAsync(token, ct);
public async Task<MoySkladImportResult> ImportCounterpartiesAsync(
string token,
bool overwriteExisting,
CancellationToken ct,
ImportJobProgress? progress = null,
Guid? organizationIdOverride = null)
{
var orgId = organizationIdOverride ?? _tenant.OrganizationId
?? throw new InvalidOperationException("No tenant organization in context.");
// MoySklad НЕ имеет поля "Поставщик/Покупатель" у контрагентов вообще —
// counterparty entity содержит только group (группа доступа), tags
// (произвольные), state (пользовательская цепочка статусов), companyType
// (legal/individual/entrepreneur). Никакого role/kind. Поэтому у нас тоже
// этого поля нет — пусть пользователь сам решит.
static foodmarket.Domain.Catalog.CounterpartyType ResolveType(string? companyType)
=> companyType switch
{
"individual" or "entrepreneur" => foodmarket.Domain.Catalog.CounterpartyType.Individual,
_ => foodmarket.Domain.Catalog.CounterpartyType.LegalEntity,
};
// Загружаем существующих в память — обновлять будем по имени (case-insensitive).
// Раньше при overwriteExisting=true бага: пропускалась проверка "skip" и ВСЕ
// поступающие записи добавлялись как новые, порождая дубли. Теперь если имя уже
// есть — обновляем ту же запись, иначе создаём.
var existingByName = await _db.Counterparties
.ToDictionaryAsync(c => c.Name, c => c, StringComparer.OrdinalIgnoreCase, ct);
var created = 0;
var updated = 0;
var skipped = 0;
var total = 0;
var errors = new List<string>();
var batch = 0;
await foreach (var c in _client.StreamCounterpartiesAsync(token, ct))
{
total++;
if (progress is not null) progress.Total = total;
// Архивных не пропускаем — импортируем как IsActive=false (см. ApplyCounterparty).
try
{
if (existingByName.TryGetValue(c.Name, out var existing))
{
if (!overwriteExisting) { skipped++; if (progress is not null) progress.Skipped = skipped; continue; }
ApplyCounterparty(existing, c, ResolveType);
updated++;
if (progress is not null) progress.Updated = updated;
}
else
{
var entity = new foodmarket.Domain.Catalog.Counterparty { OrganizationId = orgId };
ApplyCounterparty(entity, c, ResolveType);
_db.Counterparties.Add(entity);
existingByName[c.Name] = entity;
created++;
if (progress is not null) progress.Created = created;
}
batch++;
if (batch >= 100)
{
await _db.SaveChangesAsync(ct);
batch = 0;
}
}
catch (Exception ex)
{
_log.LogWarning(ex, "Failed to import counterparty {Name}", c.Name);
errors.Add($"{c.Name}: {ex.Message}");
if (progress is not null) progress.Errors = errors;
}
}
if (batch > 0) await _db.SaveChangesAsync(ct);
// `created` в отчёте = вставки + апдейты (чтобы не ломать UI, который знает только Created).
return new MoySkladImportResult(total, created + updated, skipped, 0, errors);
}
private static void ApplyCounterparty(
foodmarket.Domain.Catalog.Counterparty entity,
MsCounterparty c,
Func<string?, foodmarket.Domain.Catalog.CounterpartyType> resolveType)
{
entity.Name = Trim(c.Name, 255) ?? c.Name;
entity.LegalName = Trim(c.LegalTitle, 500);
entity.Type = resolveType(c.CompanyType);
entity.Bin = Trim(c.Inn, 20);
entity.TaxNumber = Trim(c.Kpp, 20);
entity.Phone = Trim(c.Phone, 50);
entity.Email = Trim(c.Email, 255);
entity.Address = Trim(c.ActualAddress ?? c.LegalAddress, 500);
entity.Notes = Trim(c.Description, 1000);
entity.IsActive = !c.Archived;
}
public async Task<MoySkladImportResult> ImportProductsAsync(
string token,
bool overwriteExisting,
CancellationToken ct,
ImportJobProgress? progress = null,
Guid? organizationIdOverride = null)
{
var orgId = organizationIdOverride ?? _tenant.OrganizationId
?? throw new InvalidOperationException("No tenant organization in context.");
// Pre-load tenant defaults. KZ default VAT is 16% — applied when product didn't
// carry its own vat from MoySklad.
const int kzDefaultVat = 16;
var baseUnit = await _db.UnitsOfMeasure.FirstOrDefaultAsync(u => u.Code == "796", ct)
?? await _db.UnitsOfMeasure.FirstAsync(ct);
var retailType = await _db.PriceTypes.FirstOrDefaultAsync(p => p.IsRetail, ct)
?? await _db.PriceTypes.FirstAsync(ct);
var kzt = await _db.Currencies.FirstAsync(c => c.Code == "KZT", ct);
var countriesByName = await _db.Countries
.IgnoreQueryFilters()
.ToDictionaryAsync(c => c.Name, c => c.Id, ct);
// Import folders first — build flat then link parents. Архивные тоже берём,
// помечаем IsActive=false — у MoySklad у productfolder есть archived.
var folders = await _client.GetAllFoldersAsync(token, ct);
var localGroupByMsId = new Dictionary<string, Guid>();
var groupsCreated = 0;
foreach (var f in folders.OrderBy(f => f.PathName?.Length ?? 0))
{
if (f.Id is null) continue;
var existing = await _db.ProductGroups.FirstOrDefaultAsync(
g => g.Name == f.Name && g.Path == (f.PathName ?? f.Name), ct);
if (existing is not null)
{
localGroupByMsId[f.Id] = existing.Id;
continue;
}
var g = new ProductGroup
{
OrganizationId = orgId,
Name = f.Name,
Path = string.IsNullOrEmpty(f.PathName) ? f.Name : $"{f.PathName}/{f.Name}",
IsActive = !f.Archived,
};
_db.ProductGroups.Add(g);
localGroupByMsId[f.Id] = g.Id;
groupsCreated++;
}
if (groupsCreated > 0) await _db.SaveChangesAsync(ct);
if (progress is not null) progress.GroupsCreated = groupsCreated;
// Import products
var errors = new List<string>();
var created = 0;
var updated = 0;
var skipped = 0;
var total = 0;
// При overwriteExisting=true загружаем товары целиком, чтобы обновлять существующие
// вместо создания дубликатов. Ключ = артикул (нормализованный).
var existingByArticle = await _db.Products
.Where(p => p.Article != null)
.ToDictionaryAsync(p => p.Article!, p => p, StringComparer.OrdinalIgnoreCase, ct);
var existingBarcodeSet = new HashSet<string>(
await _db.ProductBarcodes.Select(b => b.Code).ToListAsync(ct));
await foreach (var p in _client.StreamProductsAsync(token, ct))
{
total++;
if (progress is not null) progress.Total = total;
// Архивных не пропускаем — импортируем как IsActive=false.
var article = string.IsNullOrWhiteSpace(p.Article) ? p.Code : p.Article;
var alreadyByArticle = !string.IsNullOrWhiteSpace(article) && existingByArticle.ContainsKey(article);
if (alreadyByArticle && !overwriteExisting)
{
skipped++;
if (progress is not null) progress.Skipped = skipped;
continue;
}
try
{
var vat = p.Vat ?? kzDefaultVat;
var vatEnabled = (p.Vat ?? kzDefaultVat) > 0;
Guid? groupId = p.ProductFolder?.Meta?.Href is { } href && TryExtractId(href) is { } msGroupId
&& localGroupByMsId.TryGetValue(msGroupId, out var gId) ? gId : null;
Guid? countryId = p.Country?.Name is { } cn && countriesByName.TryGetValue(cn, out var cId) ? cId : null;
var retailPrice = p.SalePrices?.FirstOrDefault(sp => sp.PriceType?.Name?.Contains("Розничная", StringComparison.OrdinalIgnoreCase) == true)
?? p.SalePrices?.FirstOrDefault();
Product product;
if (alreadyByArticle && overwriteExisting)
{
product = existingByArticle[article!];
// Обновляем только скалярные поля — коллекции (prices, barcodes) оставляем:
// там могут быть данные, которые редактировал пользователь после импорта.
product.Name = Trim(p.Name, 500);
product.Article = Trim(article, 500);
product.Description = p.Description;
product.Vat = vat;
product.VatEnabled = vatEnabled;
product.ProductGroupId = groupId ?? product.ProductGroupId;
product.CountryOfOriginId = countryId ?? product.CountryOfOriginId;
product.IsWeighed = p.Weighed;
product.IsMarked = !string.IsNullOrEmpty(p.TrackingType) && p.TrackingType != "NOT_TRACKED";
product.IsActive = !p.Archived;
product.PurchasePrice = p.BuyPrice is null ? product.PurchasePrice : p.BuyPrice.Value / 100m;
updated++;
if (progress is not null) progress.Updated = updated;
}
else
{
product = new Product
{
OrganizationId = orgId,
Name = Trim(p.Name, 500),
Article = Trim(article, 500),
Description = p.Description,
UnitOfMeasureId = baseUnit.Id,
Vat = vat,
VatEnabled = vatEnabled,
ProductGroupId = groupId,
CountryOfOriginId = countryId,
IsWeighed = p.Weighed,
IsMarked = !string.IsNullOrEmpty(p.TrackingType) && p.TrackingType != "NOT_TRACKED",
IsActive = !p.Archived,
PurchasePrice = p.BuyPrice is null ? null : p.BuyPrice.Value / 100m,
PurchaseCurrencyId = kzt.Id,
};
if (retailPrice is not null)
{
product.Prices.Add(new ProductPrice
{
OrganizationId = orgId,
PriceTypeId = retailType.Id,
Amount = retailPrice.Value / 100m,
CurrencyId = kzt.Id,
});
}
foreach (var b in ExtractBarcodes(p))
{
if (existingBarcodeSet.Contains(b.Code)) continue;
product.Barcodes.Add(b);
existingBarcodeSet.Add(b.Code);
}
_db.Products.Add(product);
if (!string.IsNullOrWhiteSpace(article)) existingByArticle[article] = product;
created++;
if (progress is not null) progress.Created = created;
}
// Flush чаще (каждые 100) чтобы при сетевом обрыве на следующей странице
// мы сохранили как можно больше и смогли безопасно продолжить с overwrite.
if ((created + updated) % 100 == 0) await _db.SaveChangesAsync(ct);
}
catch (Exception ex)
{
_log.LogWarning(ex, "Failed to import MoySklad product {Name}", p.Name);
errors.Add($"{p.Name}: {ex.Message}");
if (progress is not null) progress.Errors = errors;
}
}
await _db.SaveChangesAsync(ct);
return new MoySkladImportResult(total, created + updated, skipped, groupsCreated, errors);
}
private static List<ProductBarcode> ExtractBarcodes(MsProduct p)
{
if (p.Barcodes is null) return [];
var list = new List<ProductBarcode>();
var primarySet = false;
foreach (var entry in p.Barcodes)
{
foreach (var (kind, code) in entry)
{
if (string.IsNullOrWhiteSpace(code)) continue;
var type = kind switch
{
"ean13" => BarcodeType.Ean13,
"ean8" => BarcodeType.Ean8,
"code128" => BarcodeType.Code128,
"gtin" => BarcodeType.Ean13,
"upca" => BarcodeType.Upca,
"upce" => BarcodeType.Upce,
_ => BarcodeType.Other,
};
list.Add(new ProductBarcode { Code = code.Length > 500 ? code[..500] : code, Type = type, IsPrimary = !primarySet });
primarySet = true;
}
}
return list;
}
private static string? Trim(string? s, int max)
=> string.IsNullOrEmpty(s) ? s : (s.Length <= max ? s : s[..max]);
private static string? TryExtractId(string href)
{
// href like "https://api.moysklad.ru/api/remap/1.2/entity/productfolder/<guid>"
var lastSlash = href.LastIndexOf('/');
return lastSlash >= 0 ? href[(lastSlash + 1)..] : null;
}
}