Architecture Diagrams
This page provides visual overviews of the Message Generator pipeline using Mermaid diagrams.
Full Pipeline Overview
The Message Generator processes two types of message flows: transactional (TX) and batch. Both flows pass through reception, personalization, device mapping, and delivery stages.
graph TD
subgraph "HTTP Clients"
CDC["CDC / Suite / RTI"]
end
subgraph "Web API"
WEB["Web (Fastify)"]
end
CDC -->|"POST /api/rti/push\nPOST /api/trigger\nPOST /api/test-message"| WEB
WEB -->|"sending-tx-<dg>"| AGG["Aggregator"]
WEB -->|"sending-batch-<dg>"| SLICER["Slicer"]
subgraph "TX Path (Reception)"
AGG
end
subgraph "Batch Path (Reception / Chunker)"
SLICER -->|"sending-batch-fetcher-<dg>"| FETCHER["Fetcher"]
SLICER -->|"BullMQ: launches-<dg>\n(throttled campaigns)"| LAUNCHER["Launcher"]
end
AGG -->|"event-pers.push.request-publisher.tx.<dg>\n(personalized)"| PERS["Personalizer\n(external service)"]
AGG -->|"sending-tx-devicemapper-<dg>\n(non-personalized)"| DM
FETCHER -->|"event-pers.push.request-publisher.batch.<dg>\n(personalized)"| PERS
FETCHER -->|"sending-batch-devicemapper-<dg>\n(non-personalized)"| DM
LAUNCHER -->|"event-pers.push.request-publisher.batch.<dg>\n(personalized)"| PERS
LAUNCHER -->|"sending-batch-devicemapper-<dg>\n(non-personalized)"| DM
PERS -->|"sending-<tx|batch>-devicemapper-<dg>"| DM["Device Mapper"]
DM -->|"sending-*-push-apple-<dg>"| APPLE["Apple Push"]
DM -->|"sending-*-push-fcm-<dg>"| FCM["FCM Push"]
DM -->|"sending-*-push-huawei-<dg>"| HUAWEI["Huawei Push"]
DM -->|"sending-*-webpush-vapid-<dg>"| VAPID["Web Push (VAPID)"]
DM -->|"sending-*-inbox-<dg>"| INBOX["Inbox Service"]
DM -->|"sending-*-em-<dg>"| EM["Embedded Messaging"]
subgraph "Delivery"
APPLE
FCM
HUAWEI
VAPID
INBOX
EM
end
AGG -->|"transient error"| DELAY["me-pubsub-delayer"]
FETCHER -->|"transient error"| DELAY
LAUNCHER -->|"transient error"| DELAY
DM -->|"fatal error"| ERRORS["sending-mg-errors"]
AGG -->|"fatal error"| ERRORS
FETCHER -->|"fatal error"| ERRORS
Chunker Internal Flow
The Chunker is composed of three sub-components that work together to process batch campaigns.
graph TD
BATCH["sending-batch-<dg>-sub"] --> SLICER["Slicer"]
SLICER -->|"Fetches campaign info\nfrom me-push"| MEPUSH["me-push API"]
SLICER -->|"Gets contact list size\nfrom Suite API"| SUITE["Suite Contact List API"]
SLICER -->|"Creates fetch tasks\n(offset + count per slice)"| PUBSUB["sending-batch-fetcher-<dg>"]
SLICER -->|"Throttled campaigns:\nBullMQ job with 1min delay"| BULL["BullMQ Queue\nlaunches-<dg>"]
PUBSUB --> FETCHER["Fetcher"]
FETCHER -->|"Fetches contacts\nfrom Suite API"| SUITE
FETCHER -->|"Publishes per-contact messages"| OUTPUT["event-pers.push.request-publisher.batch.<dg>\nor sending-batch-devicemapper-<dg>"]
BULL --> LAUNCHER["Launcher"]
LAUNCHER -->|"Fetches batch of contacts\nfrom campaign_audience table"| DB["PostgreSQL"]
LAUNCHER -->|"Publishes per-contact messages"| OUTPUT
LAUNCHER -->|"Re-enqueues with ~1min delay\nuntil all contacts sent"| BULL
SLICER -->|"Registers batch start"| MON["Campaign Monitoring"]
LAUNCHER -->|"Checks abort status"| REDIS["Redis\n(AbortedCampaignsInfoService)"]
Client Manager Sub-Workers
The Client Manager is a subsystem responsible for device data lifecycle management across multi-tenant PostgreSQL schemas.
graph TD
subgraph "Pub/Sub Consumers"
WRITER["Writer\nclient-manager.writer"]
REPUB["Republisher\nclient-manager.republisher"]
PTS["Push Token Status Updater\nclient-manager.push-token-status"]
CD["Contact Deletion Worker\nclient-manager.contact-deletion"]
AD["Account Deletion\nclient-manager.account-deletion"]
TA["Tenant Activator\nclient-manager.activate-customer"]
end
subgraph "Batch Jobs"
CDJ["Contact Deletion Job\n(BigQuery query)"]
DC["Dashboard Collector"]
OD["Orphan Deletion"]
end
WRITER -->|"write device data"| PG["PostgreSQL\ntenant_<customerId> schemas"]
WRITER -->|"retry on failure"| REPUB
WRITER -->|"permanent error"| WERR["client-manager.writer-errors"]
REPUB -->|"retry (max 5)"| WRITER
REPUB -->|"exhausted retries"| WERR
PTS -->|"update token validity"| PG
CD -->|"delete contact devices"| PG
AD -->|"drop tenant schema"| PG
TA -->|"create tenant schema"| PG
CDJ -->|"query deleted contacts"| BQ["BigQuery"]
CDJ -->|"publish deletion messages"| CD
DC -->|"collect metrics"| PG
DC -->|"write dashboard data"| DASH["dashboard_tenant_metrics"]
OD -->|"delete orphaned devices\n(no contact, >30 days old)"| PG
Error Handling Flow
All pipeline components use a shared error handling policy with retry and escalation.
graph TD
ERR["Error occurs\nduring message processing"] --> CLASSIFY{"Classify error"}
CLASSIFY -->|"Retriable\n(423, 429, 500-504,\nTopicPublishError,\nClientClosedError)"| RETRY{"Retry count\n< max delays?"}
RETRY -->|"Yes"| DELAY["Delay via me-pubsub-delayer\n(1s, 1s, 2s, 3s, 7s, 30s)"]
DELAY --> REDELIVER["Re-deliver message\nwith incremented retry count"]
RETRY -->|"No (expired)"| EXPIRED{"Service-retryable\nerror?"}
EXPIRED -->|"Yes"| RESET["Reset retry count to 0\n(infinite retries for infra issues)"]
EXPIRED -->|"No"| EXPIRE_DROP["Log + drop\n(MessageExpiredError)"]
CLASSIFY -->|"Permanent\n(404, 410)"| DROP["Log + drop silently"]
CLASSIFY -->|"Internal normal\n(EmptyContactList,\nCampaignAborted, etc.)"| DROP
CLASSIFY -->|"Unknown / needs investigation"| ERROR_TOPIC["Publish to\nsending-mg-errors"]
EXPIRE_DROP -->|"Not permanent &\nnot internal normal"| ERROR_TOPIC