processing-worker
processing-worker — промежуточный сервис между raw-событиями и доменной моделью backend. Он превращает нестабильный transport-поток в осмысленный normalized ingest.
За что отвечает
- чтение
source.raw.v1из RabbitMQ; - валидация по общим схемам;
- нормализация payload;
- source-specific enrichment;
- вызов ingest в
npp-backend; - публикация
source.normalized.v1; - retry и DLQ для неуспешных сообщений.
Почему этот сервис вынесен отдельно
Потому что парсинг внешней площадки и бизнес-нормализация — это разные задачи.
scrape-helper знает, как получить карточку. processing-worker знает, как превратить её в:
ProcurementRegistryRecordSupplierRiskSignalSupplierCompanyProfileAuctionItem
Так платформа остаётся устойчивой, даже если один источник меняет HTML.
Какие типы событий он понимает
На входе worker получает RawSourceEvent.
На выходе он строит NormalizedSourceEvent и в зависимости от source и содержимого решает, какой это доменный тип:
- закупка;
- запись реестра;
- риск-сигнал;
- профиль компании;
- аукционный лот.
Где смотреть код
Ключевые файлы:
| Путь | Назначение |
|---|---|
src/main.ts | bootstrap, consumer lifecycle, retry/DLQ |
src/normalize.ts | основная логика нормализации |
src/backend-client.ts | вызов GraphQL ingest |
src/contracts | валидация схем |
src/messaging | работа с очередями |
Что важно про нормализацию
Worker не просто копирует поля. Он:
- чистит строки;
- нормализует валюту;
- вычисляет derived fields;
- определяет
status; - связывает запись с атомным контуром;
- подготавливает source-specific блоки для аналитики.
Отсюда и частые случаи, когда:
- raw-событие выглядит живым;
- а backend-агрегат или supplier-связка всё равно пустая.
Принцип работы по шагам
Внутренне worker проходит одно сообщение примерно так:
- получает сообщение из
source.raw.v1; - десериализует и валидирует его по JSON Schema;
- проверяет, нужно ли отправить событие в quarantine;
- выбирает ветку нормализации по типу источника;
- строит
NormalizedSourceEvent; - валидирует уже нормализованное событие;
- вызывает ingest в backend;
- публикует
source.normalized.v1; - подтверждает обработку исходного сообщения.
Это важно: acknowledgement происходит только после успешной нормализации и ingest, а не сразу после чтения из очереди.
Упрощённый runtime-цикл
const raw = queue.parseMessage<RawSourceEvent>(message);
validators.validateRaw(raw);
const quarantinedEvent = detectQuarantinableRawEvent(raw);
if (quarantinedEvent) {
await reportQuarantinedRawEvent(apiGraphqlUrl, ingestToken, quarantinedEvent);
queue.ack(message);
return;
}
const normalized = normalizeRawEvent(raw);
validators.validateNormalized(normalized);
await sendToBackend(apiGraphqlUrl, ingestToken, normalized);
await queue.publish(queueNormalized, normalized);
queue.ack(message);В этой схеме хорошо видно, что worker одновременно является и нормализатором, и гарантом корректной передачи данных в доменный слой.
Переменные окружения
RABBITMQ_URLQUEUE_RAW_EVENTQUEUE_RETRY_EVENTQUEUE_DEAD_LETTER_EVENTQUEUE_NORMALIZED_EVENTAPI_GRAPHQL_URLAPI_INGEST_TOKENSHARED_CONTRACTS_DIRRETRY_ATTEMPTSRETRY_BASE_DELAY_MSPREFETCH
Локальный запуск
cd ../infra
cp .env.example .env
docker compose --env-file .env -f docker-compose.yml -f docker-compose.apps.yml up -d rabbitmq backend-api
cd ../processing-worker
npm install
npm run start:devЧто важно проверить при проблемах
Если worker выглядит подозрительно, полезно смотреть:
- есть ли новые сообщения в
source.raw.v1; - не растёт ли
source.raw.retry.v1; - не утекают ли сообщения в
source.raw.dlq.v1; - проходит ли GraphQL ingest;
- не ругается ли валидация контрактов.
Как трактовать типовые симптомы
source.raw.v1есть, ноsource.normalized.v1пустая: проблема чаще всего в валидации, quarantine или normalizer logic.- retry растёт, DLQ пустая: это похоже на временную проблему backend или RabbitMQ.
- DLQ растёт: нужно проверять poison payload, схему или некорректный mapping.
- ingest проходит, но экран пустой: скорее всего проблема уже не в worker, а в backend-агрегате или ролевом UI.
Где лежит смысл данных
Частая ошибка при чтении этого репозитория — считать, что worker “просто проксирует JSON”.
На самом деле он отвечает за семантический переход:
- от HTML/сырого API-ответа;
- к нормализованной сущности;
- пригодной для backend, аналитики, отчётов и UI.
Именно поэтому изменения здесь часто затрагивают не только transport, но и бизнес-смысл данных.
Когда идти именно сюда
Этот репозиторий почти наверняка нужный, если:
- raw-парсер отработал, но доменная запись не создаётся;
- нужно добавить derived field;
- нужно изменить статусную нормализацию;
- требуется новая ветка
normalize<Source>RawEvent; - payload доезжает в backend “не в том смысле”, хотя внешний источник уже собран.
Качество
npm run check
npm run test
npm run build