Republisher Flow
This document describes the republisher service, a manual DLQ (Dead Letter Queue) republishing tool for recovering from transient errors.
Overview
The Republisher is a manual intervention tool that allows operators to retry messages that failed in the Results or Errors handlers due to transient errors. Unlike the Errors Republisher which handles automatic retries with exponential backoff, the Republisher requires manual operator action to publish messages to its topic.
Input Messages: PersonalizationResults from Results or Errors handlers (both message types are supported)
Output: Individual EventPersMessage messages republished to Request Publisher topic
Topic: event-pers.republisher
Architecture Overview
or Manual Source")] RepublisherTopic[("Republisher Topic
event-pers.republisher")] %% Main Handler subgraph "Republisher Handler" RH[Republisher Handler
Simple message processor] end %% Processing subgraph "Message Processing" Parse[Parse PersonalizationResults] Filter[Filter Out Failed Contacts] Rebuild[Rebuild EventPersMessages] end %% Publishers subgraph "Publishers" RequestPub[Request Publisher
config.Republisher.TopicName] ErrorsPub[Errors Publisher
config.ErrorsTopic] end %% Output RequestTopic[("Request Publisher Topic
event-pers.push.request-publisher.{mode}.{group}")] ErrorsTopic[("Errors Topic")] %% Flow DLQ --> RepublisherTopic RepublisherTopic --> RH RH --> Parse Parse --> Filter Filter --> Rebuild Rebuild --> RequestPub RequestPub --> RequestTopic %% Error path Parse -.-> ErrorsPub ErrorsPub -.-> ErrorsTopic %% Styling classDef handler fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef process fill:#e3f2fd,stroke:#1976d2,stroke-width:2px classDef publisher fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef topic fill:#fff3e0,stroke:#ef6c00,stroke-width:2px class RH handler class Parse,Filter,Rebuild process class RequestPub,ErrorsPub publisher class DLQ,RepublisherTopic,RequestTopic,ErrorsTopic topic
Message Processing Flow
(Manual) participant RepTopic as Republisher Topic participant Handler as Republisher Handler participant ReqPub as Request Publisher participant ReqTopic as Request Publisher Topic Note over Operator: Identifies transient error resolved
in Results/Errors handler Operator->>RepTopic: Publish PersonalizationResults
from DLQ or error logs RepTopic->>Handler: Receive PersonalizationResults message Handler->>Handler: Unmarshal PersonalizationResults alt Parse Error Handler->>Handler: Log error & publish to errors topic Handler->>RepTopic: Ack message else Parse Success Handler->>Handler: Extract failed contact IDs
from results.Data.Failures Handler->>Handler: Filter originalRequest.Data
exclude failed contacts alt No Retryable Contacts Handler->>RepTopic: Ack message (no work to do) else Has Retryable Contacts loop For each retryable contact Handler->>Handler: Build EventPersMessage
with original attributes Handler->>ReqPub: Publish individual message ReqPub->>ReqTopic: Forward to Request Publisher Note over ReqTopic: Message enters normal
processing pipeline end Handler->>RepTopic: Ack message end end
Message Transformation
The Republisher transforms PersonalizationResults back into individual EventPersMessage messages:
Array of CompileRequestData] A2[Data.Failures
Array of failed contact IDs] A3[Message Attributes
Original attributes] end subgraph "Filtering" B[Remove contacts in
Failures array] end subgraph "Output: EventPersMessages" C1[One EventPersMessage
per successful contact] C2[Preserves all original
message attributes] C3[Ready for request
publisher processing] end A1 --> B A2 --> B B --> C1 A3 --> C2 C1 --> C3 C2 --> C3 style A1 fill:#fff3e0 style A2 fill:#ffcdd2 style B fill:#e3f2fd style C1 fill:#c8e6c9 style C2 fill:#c8e6c9 style C3 fill:#c8e6c9
Key Components
Processing Steps
-
Parse PersonalizationResults: JSON deserialization from PubSub message
-
Extract Failed Contacts: Get list of contact IDs from
results.Data.Failures -
Filter Retryable Data: Remove failed contacts from
results.OriginalRequest.Data -
Rebuild Messages: Create individual
EventPersMessagefor each retryable contact -
Preserve Attributes: Copy all original message attributes to new messages
-
Publish: Send each message to Request Publisher topic
Supported Message Types
The Republisher can process messages from:
-
Results Handler:
PersonalizationResultswithsuccess: truebut containing some failures -
Errors Handler:
PersonalizationResultswithsuccess: false(complete failures)
Both message types use the same PersonalizationResults structure and are processed identically.
Dependencies
-
PubSub Client: Single client for main project (uses cloud.google.com/go/pubsub/v2)
-
Request Publisher: Publishes rebuilt messages to request publisher topic
-
Errors Publisher: Handles malformed messages that can’t be parsed
Configuration
-
Input Topic/Subscription:
event-pers.republisher -
Output Topic: Configurable via
REPUBLISHER_TOPIC_NAME(default:event-pers.push.request-publisher.tx.default) -
Max Outstanding Messages:
REPUBLISHER_MAX_OUTSTANDING_MESSAGES(default: 10) -
Subscription Goroutines:
REPUBLISHER_SUBSCRIPTION_NUM_GOROUTINES(default: 10)
Republisher vs Errors Republisher
| Aspect | Republisher (Manual Tool) | Errors Republisher (Automatic) |
|---|---|---|
Trigger |
Manual operator action |
Automatic via Delayer topic |
Input Source |
|
|
Use Case |
Recover from transient errors after resolution |
Automatic retry with exponential backoff |
Message Type |
|
|
Processing |
Filters failed contacts, rebuilds EventPersMessages |
Validates retry count, republishes with channel handler |
Architecture |
Simple handler, no pipeline/channels |
Pipeline-based with channel abstraction |
Retry Logic |
No retry logic (one-time republish) |
Checks max retry count, increments retry counter |
Operator Involvement |
Required (manual publish to topic) |
None (fully automatic) |
Processing Features
-
Simple Architecture: Direct message processing without pipeline or channel abstraction
-
Failure Filtering: Automatically excludes contacts that failed personalization
-
Attribute Preservation: Maintains all original message attributes for proper routing
-
Multi-Contact Support: Handles messages with multiple contacts, publishes individually
-
Error Handling: Malformed messages published to errors topic for investigation
-
Idempotent: Can safely republish same message multiple times (contacts already excluded)
Operational Usage
When to Use
Use the Republisher when:
-
Transient errors (network issues, service timeouts) have been resolved
-
Results or Errors handler messages failed due to temporary infrastructure problems
-
Manual intervention is needed after investigating error logs
-
Automatic retry mechanisms (Errors Republisher) have been exhausted
Message Sources
Messages for republishing can come from:
-
Dead Letter Queue (DLQ) subscriptions
-
Error logs with
PersonalizationResultspayload -
Monitoring alerts with message data
-
Manual extraction from error storage (BigQuery, Cloud Storage)
Process
-
Identify
PersonalizationResultsmessage that failed due to transient error -
Verify the error is resolved (infrastructure restored, service available)
-
Publish the
PersonalizationResultsmessage toevent-pers.republishertopic -
Monitor request publisher for successful reprocessing
-
Failed contacts are automatically filtered out and not retried