Errors Republisher Flow

This flowchart shows the initialization and execution flow of the errors_republisher service.

Architecture Overview

graph TB %% External Input PubSub[("PubSub Retry Topic
config.ErrorsWorker.RepublishRetryTopic")] %% Main Handler subgraph "Errors Republisher Handler" ERH[Errors Republisher Handler] Pipeline[Processing Pipeline] subgraph "Pipeline Steps" ParseStep[Parse Message Step] CheckAbortStep[Check Abort Personalization Step] ChannelStep[Determine Channel Step] subgraph "Conditional Retry Steps" ShouldRetryStep{Should Retry?} ParseRetryStep[Parse Retry Count Step] CheckMaxRetryStep[Check Max Retry Count Step] ProcessRetriableStep[Process Retriable Failures Step] end end end %% Channel System subgraph "Channel System" Registry[Channel Registry] PushChannel[Push Channel
with Retry Processing] end %% Publishers & Services subgraph "Publishers & Services" RequestPublisher["Request Publisher
me_mode_group_personalization_requests"] ErrorsPublisher["Errors Publisher
config.ErrorsTopic"] DelayerPublisher["Delayer Publisher
Fallback for delayed retry"] subgraph "Event Publishers" MobileEvents[Mobile Event Publisher
Main PubSub Client] DPEvents[DP Event Publisher
DP PubSub Client] end end %% External Services subgraph "External Services" PushCampaignService[(Push Campaign Service)] RetryQueue[(Retry Queue)] PermanentErrors[(Permanent Error Storage)] end %% PubSub Clients subgraph "PubSub Clients" MainClient[Main PubSub Client
GCloudProjectId] DPClient[DP PubSub Client
DPGCloudProjectId] PersClient[Personalization Client
PersGCloudProjectId] end %% Flow connections PubSub --> ERH ERH --> Pipeline Pipeline --> ParseStep ParseStep --> CheckAbortStep CheckAbortStep --> ChannelStep ChannelStep --> ShouldRetryStep ShouldRetryStep -->|Yes| ParseRetryStep ParseRetryStep --> CheckMaxRetryStep CheckMaxRetryStep --> ProcessRetriableStep ShouldRetryStep -->|No| MobileEvents %% Channel processing ChannelStep --> Registry Registry --> PushChannel %% Retry processing ProcessRetriableStep --> PushChannel PushChannel --> RequestPublisher RequestPublisher --> RetryQueue %% Non-retryable path PushChannel --> MobileEvents PushChannel --> DPEvents MobileEvents --> PermanentErrors DPEvents --> PermanentErrors %% Error handling ProcessRetriableStep --> ErrorsPublisher %% Client connections ERH --> MainClient RequestPublisher --> PersClient ErrorsPublisher --> MainClient MobileEvents --> MainClient DPEvents --> DPClient PushChannel --> MainClient PushChannel --> PersClient %% Styling classDef handler fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef pipeline fill:#e3f2fd,stroke:#1976d2,stroke-width:2px classDef channel fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef publisher fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef service fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px classDef topic fill:#fff3e0,stroke:#ef6c00,stroke-width:2px classDef client fill:#fafafa,stroke:#424242,stroke-width:2px class ERH,Pipeline handler class ParseStep,CheckAbortStep,ChannelStep,ParseRetryStep,CheckMaxRetryStep,ProcessRetriableStep pipeline class ShouldRetryStep pipeline class Registry,PushChannel channel class RequestPublisher,ErrorsPublisher,DelayerPublisher,MobileEvents,DPEvents publisher class PushCampaignService,RetryQueue,PermanentErrors service class PubSub topic class MainClient,DPClient,PersClient client

Processing Pipeline

sequenceDiagram participant PS as PubSub Retry Topic participant ERH as Errors Republisher Handler participant Pipeline as Processing Pipeline participant Registry as Channel Registry participant PC as Push Channel participant ReqPub as Request Publisher participant EventPub as Event Publishers participant RetryQueue as Retry Queue PS->>ERH: Delayed retry message (from Delayer) ERH->>Pipeline: Execute processing pipeline %% Pipeline execution Pipeline->>Pipeline: 1. Parse Message Step
JSON decode to PersonalizationErrors Pipeline->>Pipeline: 2. Check Abort Personalization Step
Check if campaign/customer is cancelled Pipeline->>Registry: 3. Determine Channel Step
Get channel from message attributes Registry->>PC: Return push channel alt Error is NOT Retryable Pipeline->>EventPub: Log error and ack message EventPub->>PS: Publish to event topics for tracking Note over EventPub: Should never happen - alert raised else Error is Retryable Pipeline->>Pipeline: 4. Parse Retry Count Step
Extract and validate retry count Pipeline->>Pipeline: 5. Check Max Retry Count Step
Compare against configured max alt Max Retry Reached PC->>EventPub: Report permanent failures Note over EventPub: Max retries exceeded, give up else Within Retry Limit Pipeline->>PC: 6. Process Retriable Failures Step PC->>ReqPub: Republish request for retry ReqPub->>RetryQueue: Send to personalization service Note over RetryQueue: Request will be retried end end alt Processing Errors Pipeline->>ERH: Handle pipeline errors ERH->>PS: Publish to errors topic end ERH->>PS: Acknowledge message

Channel-Specific Methods Usage

sequenceDiagram participant Handler as Errors Republisher Handler participant Registry as Channel Registry participant Channel as Channel Handler
(Push/EmbeddedMessaging) participant ReqPub as Request Publisher participant EventPub as Event Publishers Note over Handler: Received delayed retry message from Delayer Handler->>Registry: DetermineChannel(message attributes) Registry->>Channel: Return channel instance alt Error is NOT Retryable rect rgb(255, 240, 230) Note over Handler,EventPub: Should never happen - log ERROR alert Handler->>EventPub: Publish non-retryable tracking event Note over EventPub: Alert raised - investigate why
non-retryable error in retry queue end else Error is Retryable rect rgb(240, 245, 255) Note over Handler: Validate retry count Handler->>Handler: Parse retry count from attributes Handler->>Handler: Check vs max retry count (default: 5) end alt Max Retry Reached rect rgb(255, 230, 230) Note over Handler,EventPub: Report permanent failure Handler->>Channel: ProcessFailures (permanent) Channel->>EventPub: Publish to failure tracking Note over EventPub: Max retries exceeded, give up end else Within Retry Limit rect rgb(240, 255, 240) Note over Handler,ReqPub: Republish for retry Handler->>Channel: Prepare retry request Handler->>Handler: Increment retry count attribute Channel->>ReqPub: Republish to personalization service Note over ReqPub: Request retried after exponential backoff delay end end end Note over Handler: Delayed retry processing complete

Key Components

Pipeline Processing Steps

  1. Parse Message Step: JSON deserialization of personalization errors from delayed retry queue

  2. Check Abort Personalization Step: Verify campaign/customer is not in cancellation list, abort if cancelled

  3. Determine Channel Step: Route to appropriate channel handler (push, embedded_messaging)

Conditional Retry Steps (only if error is retryable):

  1. Parse Retry Count Step: Extract retry count from message attributes, log error details

  2. Check Max Retry Count Step: Compare retry count against max (default: 5), report permanent failures if exceeded

  3. Process Retriable Failures Step: Republish request to personalization service for retry

External Dependencies

  • Multiple PubSub Clients: Main, DP, and Personalization project clients (all use cloud.google.com/go/pubsub/v2)

  • Event Publishers: Mobile and DP event publishing for non-retryable errors (should never happen)

  • Delayer Publisher: Fallback for re-queueing delayed retries if needed

  • Request Publisher: Republishes requests to personalization service

Error Processing Paths

  • Retry Path: Retryable errors (within retry limit) → Process Retriable Failures → Request Publisher → Personalization Service

  • Max Retry Path: Errors exceeding max retry count → Report Failures → Event Publishers → Permanent Failure Storage

  • Non-Retryable Path: Should never happen → Event Publishers → Tracking (with ERROR level alert)

  • Fatal Path: Invalid/malformed errors → Errors Publisher → Dead Letter Queue

Configuration

  • Input Topic: config.ErrorsWorker.RepublishRetryTopic (receives messages from Delayer)

  • Subscription: Same as RepublishRetryTopic or custom via app params

  • Output: Republished requests to personalization service

  • Channels: Push and embedded_messaging channels with error processing capabilities

Processing Features

  • Pipeline Architecture: Simplified pipeline focused on retry validation and republishing

  • Channel-based Routing: Registry pattern for extensible channel support (push, embedded_messaging)

  • Conditional Retry Logic: ThenIfSteps pattern enables retry validation before republishing

  • Retry Count Tracking: Validates retry count using me_event_pers/retry_count attribute

  • Max Retry Protection: Prevents infinite retry loops by enforcing configurable max retry count (default: 5)

  • Abort Cancellation: Early exit for cancelled campaigns/customers to prevent wasted processing

  • Alerting: Logs ERROR when non-retryable errors appear in retry queue (should never happen)

  • Event Publishing: Dual mobile/DP event streams for permanent failure tracking

Differences from Errors Handler

  • No Validate Step: Assumes validation already happened in Errors Handler

  • No Process Failures Step: Doesn’t publish failure events (already done in Errors Handler)

  • No Schedule Delay Step: Messages already delayed, just republishes for retry

  • Simplified Pipeline: Focused on retry validation and republishing only

  • Different Input Source: Receives from Delayer topic, not personalization errors topic