Skip to content

processing-worker

processing-worker — промежуточный сервис между raw-событиями и доменной моделью backend. Он превращает нестабильный transport-поток в осмысленный normalized ingest.

За что отвечает

  • чтение source.raw.v1 из RabbitMQ;
  • валидация по общим схемам;
  • нормализация payload;
  • source-specific enrichment;
  • вызов ingest в npp-backend;
  • публикация source.normalized.v1;
  • retry и DLQ для неуспешных сообщений.

Почему этот сервис вынесен отдельно

Потому что парсинг внешней площадки и бизнес-нормализация — это разные задачи.

scrape-helper знает, как получить карточку. processing-worker знает, как превратить её в:

  • Procurement
  • RegistryRecord
  • SupplierRiskSignal
  • SupplierCompanyProfile
  • AuctionItem

Так платформа остаётся устойчивой, даже если один источник меняет HTML.

Какие типы событий он понимает

На входе worker получает RawSourceEvent.

На выходе он строит NormalizedSourceEvent и в зависимости от source и содержимого решает, какой это доменный тип:

  • закупка;
  • запись реестра;
  • риск-сигнал;
  • профиль компании;
  • аукционный лот.

Где смотреть код

Ключевые файлы:

ПутьНазначение
src/main.tsbootstrap, consumer lifecycle, retry/DLQ
src/normalize.tsосновная логика нормализации
src/backend-client.tsвызов GraphQL ingest
src/contractsвалидация схем
src/messagingработа с очередями

Что важно про нормализацию

Worker не просто копирует поля. Он:

  • чистит строки;
  • нормализует валюту;
  • вычисляет derived fields;
  • определяет status;
  • связывает запись с атомным контуром;
  • подготавливает source-specific блоки для аналитики.

Отсюда и частые случаи, когда:

  • raw-событие выглядит живым;
  • а backend-агрегат или supplier-связка всё равно пустая.

Принцип работы по шагам

Внутренне worker проходит одно сообщение примерно так:

  1. получает сообщение из source.raw.v1;
  2. десериализует и валидирует его по JSON Schema;
  3. проверяет, нужно ли отправить событие в quarantine;
  4. выбирает ветку нормализации по типу источника;
  5. строит NormalizedSourceEvent;
  6. валидирует уже нормализованное событие;
  7. вызывает ingest в backend;
  8. публикует source.normalized.v1;
  9. подтверждает обработку исходного сообщения.

Это важно: acknowledgement происходит только после успешной нормализации и ingest, а не сразу после чтения из очереди.

Упрощённый runtime-цикл

ts
const raw = queue.parseMessage<RawSourceEvent>(message);
validators.validateRaw(raw);

const quarantinedEvent = detectQuarantinableRawEvent(raw);
if (quarantinedEvent) {
  await reportQuarantinedRawEvent(apiGraphqlUrl, ingestToken, quarantinedEvent);
  queue.ack(message);
  return;
}

const normalized = normalizeRawEvent(raw);
validators.validateNormalized(normalized);
await sendToBackend(apiGraphqlUrl, ingestToken, normalized);
await queue.publish(queueNormalized, normalized);
queue.ack(message);

В этой схеме хорошо видно, что worker одновременно является и нормализатором, и гарантом корректной передачи данных в доменный слой.

Переменные окружения

  • RABBITMQ_URL
  • QUEUE_RAW_EVENT
  • QUEUE_RETRY_EVENT
  • QUEUE_DEAD_LETTER_EVENT
  • QUEUE_NORMALIZED_EVENT
  • API_GRAPHQL_URL
  • API_INGEST_TOKEN
  • SHARED_CONTRACTS_DIR
  • RETRY_ATTEMPTS
  • RETRY_BASE_DELAY_MS
  • PREFETCH

Локальный запуск

bash
cd ../infra
cp .env.example .env
docker compose --env-file .env -f docker-compose.yml -f docker-compose.apps.yml up -d rabbitmq backend-api

cd ../processing-worker
npm install
npm run start:dev

Что важно проверить при проблемах

Если worker выглядит подозрительно, полезно смотреть:

  • есть ли новые сообщения в source.raw.v1;
  • не растёт ли source.raw.retry.v1;
  • не утекают ли сообщения в source.raw.dlq.v1;
  • проходит ли GraphQL ingest;
  • не ругается ли валидация контрактов.

Как трактовать типовые симптомы

  • source.raw.v1 есть, но source.normalized.v1 пустая: проблема чаще всего в валидации, quarantine или normalizer logic.
  • retry растёт, DLQ пустая: это похоже на временную проблему backend или RabbitMQ.
  • DLQ растёт: нужно проверять poison payload, схему или некорректный mapping.
  • ingest проходит, но экран пустой: скорее всего проблема уже не в worker, а в backend-агрегате или ролевом UI.

Где лежит смысл данных

Частая ошибка при чтении этого репозитория — считать, что worker “просто проксирует JSON”.

На самом деле он отвечает за семантический переход:

  • от HTML/сырого API-ответа;
  • к нормализованной сущности;
  • пригодной для backend, аналитики, отчётов и UI.

Именно поэтому изменения здесь часто затрагивают не только transport, но и бизнес-смысл данных.

Когда идти именно сюда

Этот репозиторий почти наверняка нужный, если:

  • raw-парсер отработал, но доменная запись не создаётся;
  • нужно добавить derived field;
  • нужно изменить статусную нормализацию;
  • требуется новая ветка normalize<Source>RawEvent;
  • payload доезжает в backend “не в том смысле”, хотя внешний источник уже собран.

Качество

bash
npm run check
npm run test
npm run build

Техническая и аналитическая документация платформы NPPWEB.