Architecture Diagrams

This page provides visual overviews of the Message Generator pipeline using Mermaid diagrams.

Full Pipeline Overview

The Message Generator processes two types of message flows: transactional (TX) and batch. Both flows pass through reception, personalization, device mapping, and delivery stages.

graph TD subgraph "HTTP Clients" CDC["CDC / Suite / RTI"] end subgraph "Web API" WEB["Web (Fastify)"] end CDC -->|"POST /api/rti/push\nPOST /api/trigger\nPOST /api/test-message"| WEB WEB -->|"sending-tx-<dg>"| AGG["Aggregator"] WEB -->|"sending-batch-<dg>"| SLICER["Slicer"] subgraph "TX Path (Reception)" AGG end subgraph "Batch Path (Reception / Chunker)" SLICER -->|"sending-batch-fetcher-<dg>"| FETCHER["Fetcher"] SLICER -->|"BullMQ: launches-<dg>\n(throttled campaigns)"| LAUNCHER["Launcher"] end AGG -->|"event-pers.push.request-publisher.tx.<dg>\n(personalized)"| PERS["Personalizer\n(external service)"] AGG -->|"sending-tx-devicemapper-<dg>\n(non-personalized)"| DM FETCHER -->|"event-pers.push.request-publisher.batch.<dg>\n(personalized)"| PERS FETCHER -->|"sending-batch-devicemapper-<dg>\n(non-personalized)"| DM LAUNCHER -->|"event-pers.push.request-publisher.batch.<dg>\n(personalized)"| PERS LAUNCHER -->|"sending-batch-devicemapper-<dg>\n(non-personalized)"| DM PERS -->|"sending-<tx|batch>-devicemapper-<dg>"| DM["Device Mapper"] DM -->|"sending-*-push-apple-<dg>"| APPLE["Apple Push"] DM -->|"sending-*-push-fcm-<dg>"| FCM["FCM Push"] DM -->|"sending-*-push-huawei-<dg>"| HUAWEI["Huawei Push"] DM -->|"sending-*-webpush-vapid-<dg>"| VAPID["Web Push (VAPID)"] DM -->|"sending-*-inbox-<dg>"| INBOX["Inbox Service"] DM -->|"sending-*-em-<dg>"| EM["Embedded Messaging"] subgraph "Delivery" APPLE FCM HUAWEI VAPID INBOX EM end AGG -->|"transient error"| DELAY["me-pubsub-delayer"] FETCHER -->|"transient error"| DELAY LAUNCHER -->|"transient error"| DELAY DM -->|"fatal error"| ERRORS["sending-mg-errors"] AGG -->|"fatal error"| ERRORS FETCHER -->|"fatal error"| ERRORS

Chunker Internal Flow

The Chunker is composed of three sub-components that work together to process batch campaigns.

graph TD BATCH["sending-batch-<dg>-sub"] --> SLICER["Slicer"] SLICER -->|"Fetches campaign info\nfrom me-push"| MEPUSH["me-push API"] SLICER -->|"Gets contact list size\nfrom Suite API"| SUITE["Suite Contact List API"] SLICER -->|"Creates fetch tasks\n(offset + count per slice)"| PUBSUB["sending-batch-fetcher-<dg>"] SLICER -->|"Throttled campaigns:\nBullMQ job with 1min delay"| BULL["BullMQ Queue\nlaunches-<dg>"] PUBSUB --> FETCHER["Fetcher"] FETCHER -->|"Fetches contacts\nfrom Suite API"| SUITE FETCHER -->|"Publishes per-contact messages"| OUTPUT["event-pers.push.request-publisher.batch.<dg>\nor sending-batch-devicemapper-<dg>"] BULL --> LAUNCHER["Launcher"] LAUNCHER -->|"Fetches batch of contacts\nfrom campaign_audience table"| DB["PostgreSQL"] LAUNCHER -->|"Publishes per-contact messages"| OUTPUT LAUNCHER -->|"Re-enqueues with ~1min delay\nuntil all contacts sent"| BULL SLICER -->|"Registers batch start"| MON["Campaign Monitoring"] LAUNCHER -->|"Checks abort status"| REDIS["Redis\n(AbortedCampaignsInfoService)"]

Client Manager Sub-Workers

The Client Manager is a subsystem responsible for device data lifecycle management across multi-tenant PostgreSQL schemas.

graph TD subgraph "Pub/Sub Consumers" WRITER["Writer\nclient-manager.writer"] REPUB["Republisher\nclient-manager.republisher"] PTS["Push Token Status Updater\nclient-manager.push-token-status"] CD["Contact Deletion Worker\nclient-manager.contact-deletion"] AD["Account Deletion\nclient-manager.account-deletion"] TA["Tenant Activator\nclient-manager.activate-customer"] end subgraph "Batch Jobs" CDJ["Contact Deletion Job\n(BigQuery query)"] DC["Dashboard Collector"] OD["Orphan Deletion"] end WRITER -->|"write device data"| PG["PostgreSQL\ntenant_<customerId> schemas"] WRITER -->|"retry on failure"| REPUB WRITER -->|"permanent error"| WERR["client-manager.writer-errors"] REPUB -->|"retry (max 5)"| WRITER REPUB -->|"exhausted retries"| WERR PTS -->|"update token validity"| PG CD -->|"delete contact devices"| PG AD -->|"drop tenant schema"| PG TA -->|"create tenant schema"| PG CDJ -->|"query deleted contacts"| BQ["BigQuery"] CDJ -->|"publish deletion messages"| CD DC -->|"collect metrics"| PG DC -->|"write dashboard data"| DASH["dashboard_tenant_metrics"] OD -->|"delete orphaned devices\n(no contact, >30 days old)"| PG

Error Handling Flow

All pipeline components use a shared error handling policy with retry and escalation.

graph TD ERR["Error occurs\nduring message processing"] --> CLASSIFY{"Classify error"} CLASSIFY -->|"Retriable\n(423, 429, 500-504,\nTopicPublishError,\nClientClosedError)"| RETRY{"Retry count\n< max delays?"} RETRY -->|"Yes"| DELAY["Delay via me-pubsub-delayer\n(1s, 1s, 2s, 3s, 7s, 30s)"] DELAY --> REDELIVER["Re-deliver message\nwith incremented retry count"] RETRY -->|"No (expired)"| EXPIRED{"Service-retryable\nerror?"} EXPIRED -->|"Yes"| RESET["Reset retry count to 0\n(infinite retries for infra issues)"] EXPIRED -->|"No"| EXPIRE_DROP["Log + drop\n(MessageExpiredError)"] CLASSIFY -->|"Permanent\n(404, 410)"| DROP["Log + drop silently"] CLASSIFY -->|"Internal normal\n(EmptyContactList,\nCampaignAborted, etc.)"| DROP CLASSIFY -->|"Unknown / needs investigation"| ERROR_TOPIC["Publish to\nsending-mg-errors"] EXPIRE_DROP -->|"Not permanent &\nnot internal normal"| ERROR_TOPIC