Republisher Flow

This document describes the republisher service, a manual DLQ (Dead Letter Queue) republishing tool for recovering from transient errors.

Overview

The Republisher is a manual intervention tool that allows operators to retry messages that failed in the Results or Errors handlers due to transient errors. Unlike the Errors Republisher which handles automatic retries with exponential backoff, the Republisher requires manual operator action to publish messages to its topic.

Input Messages: PersonalizationResults from Results or Errors handlers (both message types are supported)

Output: Individual EventPersMessage messages republished to Request Publisher topic

Topic: event-pers.republisher

Architecture Overview

graph TB %% Input Source DLQ[("Dead Letter Queue
or Manual Source")] RepublisherTopic[("Republisher Topic
event-pers.republisher")] %% Main Handler subgraph "Republisher Handler" RH[Republisher Handler
Simple message processor] end %% Processing subgraph "Message Processing" Parse[Parse PersonalizationResults] Filter[Filter Out Failed Contacts] Rebuild[Rebuild EventPersMessages] end %% Publishers subgraph "Publishers" RequestPub[Request Publisher
config.Republisher.TopicName] ErrorsPub[Errors Publisher
config.ErrorsTopic] end %% Output RequestTopic[("Request Publisher Topic
event-pers.push.request-publisher.{mode}.{group}")] ErrorsTopic[("Errors Topic")] %% Flow DLQ --> RepublisherTopic RepublisherTopic --> RH RH --> Parse Parse --> Filter Filter --> Rebuild Rebuild --> RequestPub RequestPub --> RequestTopic %% Error path Parse -.-> ErrorsPub ErrorsPub -.-> ErrorsTopic %% Styling classDef handler fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef process fill:#e3f2fd,stroke:#1976d2,stroke-width:2px classDef publisher fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef topic fill:#fff3e0,stroke:#ef6c00,stroke-width:2px class RH handler class Parse,Filter,Rebuild process class RequestPub,ErrorsPub publisher class DLQ,RepublisherTopic,RequestTopic,ErrorsTopic topic

Message Processing Flow

sequenceDiagram participant Operator as Operator
(Manual) participant RepTopic as Republisher Topic participant Handler as Republisher Handler participant ReqPub as Request Publisher participant ReqTopic as Request Publisher Topic Note over Operator: Identifies transient error resolved
in Results/Errors handler Operator->>RepTopic: Publish PersonalizationResults
from DLQ or error logs RepTopic->>Handler: Receive PersonalizationResults message Handler->>Handler: Unmarshal PersonalizationResults alt Parse Error Handler->>Handler: Log error & publish to errors topic Handler->>RepTopic: Ack message else Parse Success Handler->>Handler: Extract failed contact IDs
from results.Data.Failures Handler->>Handler: Filter originalRequest.Data
exclude failed contacts alt No Retryable Contacts Handler->>RepTopic: Ack message (no work to do) else Has Retryable Contacts loop For each retryable contact Handler->>Handler: Build EventPersMessage
with original attributes Handler->>ReqPub: Publish individual message ReqPub->>ReqTopic: Forward to Request Publisher Note over ReqTopic: Message enters normal
processing pipeline end Handler->>RepTopic: Ack message end end

Message Transformation

The Republisher transforms PersonalizationResults back into individual EventPersMessage messages:

graph LR subgraph "Input: PersonalizationResults" A1[OriginalRequest.Data
Array of CompileRequestData] A2[Data.Failures
Array of failed contact IDs] A3[Message Attributes
Original attributes] end subgraph "Filtering" B[Remove contacts in
Failures array] end subgraph "Output: EventPersMessages" C1[One EventPersMessage
per successful contact] C2[Preserves all original
message attributes] C3[Ready for request
publisher processing] end A1 --> B A2 --> B B --> C1 A3 --> C2 C1 --> C3 C2 --> C3 style A1 fill:#fff3e0 style A2 fill:#ffcdd2 style B fill:#e3f2fd style C1 fill:#c8e6c9 style C2 fill:#c8e6c9 style C3 fill:#c8e6c9

Key Components

Processing Steps

  1. Parse PersonalizationResults: JSON deserialization from PubSub message

  2. Extract Failed Contacts: Get list of contact IDs from results.Data.Failures

  3. Filter Retryable Data: Remove failed contacts from results.OriginalRequest.Data

  4. Rebuild Messages: Create individual EventPersMessage for each retryable contact

  5. Preserve Attributes: Copy all original message attributes to new messages

  6. Publish: Send each message to Request Publisher topic

Supported Message Types

The Republisher can process messages from:

  • Results Handler: PersonalizationResults with success: true but containing some failures

  • Errors Handler: PersonalizationResults with success: false (complete failures)

Both message types use the same PersonalizationResults structure and are processed identically.

Dependencies

  • PubSub Client: Single client for main project (uses cloud.google.com/go/pubsub/v2)

  • Request Publisher: Publishes rebuilt messages to request publisher topic

  • Errors Publisher: Handles malformed messages that can’t be parsed

Configuration

  • Input Topic/Subscription: event-pers.republisher

  • Output Topic: Configurable via REPUBLISHER_TOPIC_NAME (default: event-pers.push.request-publisher.tx.default)

  • Max Outstanding Messages: REPUBLISHER_MAX_OUTSTANDING_MESSAGES (default: 10)

  • Subscription Goroutines: REPUBLISHER_SUBSCRIPTION_NUM_GOROUTINES (default: 10)

Republisher vs Errors Republisher

Aspect Republisher (Manual Tool) Errors Republisher (Automatic)

Trigger

Manual operator action

Automatic via Delayer topic

Input Source

event-pers.republisher topic (operator publishes)

config.ErrorsWorker.RepublishRetryTopic (from Delayer)

Use Case

Recover from transient errors after resolution

Automatic retry with exponential backoff

Message Type

PersonalizationResults (from Results/Errors handlers)

PersonalizationErrors (from Errors handler only)

Processing

Filters failed contacts, rebuilds EventPersMessages

Validates retry count, republishes with channel handler

Architecture

Simple handler, no pipeline/channels

Pipeline-based with channel abstraction

Retry Logic

No retry logic (one-time republish)

Checks max retry count, increments retry counter

Operator Involvement

Required (manual publish to topic)

None (fully automatic)

Processing Features

  • Simple Architecture: Direct message processing without pipeline or channel abstraction

  • Failure Filtering: Automatically excludes contacts that failed personalization

  • Attribute Preservation: Maintains all original message attributes for proper routing

  • Multi-Contact Support: Handles messages with multiple contacts, publishes individually

  • Error Handling: Malformed messages published to errors topic for investigation

  • Idempotent: Can safely republish same message multiple times (contacts already excluded)

Operational Usage

When to Use

Use the Republisher when:

  • Transient errors (network issues, service timeouts) have been resolved

  • Results or Errors handler messages failed due to temporary infrastructure problems

  • Manual intervention is needed after investigating error logs

  • Automatic retry mechanisms (Errors Republisher) have been exhausted

Message Sources

Messages for republishing can come from:

  1. Dead Letter Queue (DLQ) subscriptions

  2. Error logs with PersonalizationResults payload

  3. Monitoring alerts with message data

  4. Manual extraction from error storage (BigQuery, Cloud Storage)

Process

  1. Identify PersonalizationResults message that failed due to transient error

  2. Verify the error is resolved (infrastructure restored, service available)

  3. Publish the PersonalizationResults message to event-pers.republisher topic

  4. Monitor request publisher for successful reprocessing

  5. Failed contacts are automatically filtered out and not retried