Request Publisher Flow

This flowchart shows the initialization and execution flow of the request_publisher service.

Architecture Overview

graph TB %% External Input PubSub[("PubSub Events Topic
event-pers.push.request-publisher.{mode}.{group}")] %% Main Handler subgraph "Request Publisher Handler" RPH[Request Publisher Handler] Pipeline[Processing Pipeline] subgraph "Pipeline Steps" FetchStep[Fetch Campaign Step] PostTemplatesStep[Post Templates Step] PrepareStep[Prepare Templates Step
conditional if batching] BuildMsgStep[Build Request Message Step] PublishStep[Publish Request Step] end end %% Channel System subgraph "Channel System" Registry[Channel Registry] PushChannel[Push Channel
with Request Processing] end %% External Services subgraph "External Services" PersonalizationService[(Personalization Service)] PushCampaignService[(Push Campaign Service)] TemplateService[(Template Processing)] end %% Publishers & Clients subgraph "Publishers & Clients" PersonalizationClient[Personalization Client] ErrorsPublisher[Errors Publisher
config.ErrorsTopic] subgraph "PubSub Clients" MainClient[Main PubSub Client
GCloudProjectId] PersClient[Personalization Client
PersGCloudProjectId] end end %% Telemetry subgraph "Telemetry & Monitoring" TelemetryResource[Telemetry Resource] MeterProvider[Meter Provider] OpenTelemetry[OpenTelemetry Provider] end %% Output Topics subgraph "Output Topics" PersonalizationTopic[("Personalization Request Topic")] ErrorTopics[("Error Topics")] end %% Flow connections PubSub --> RPH RPH --> Pipeline Pipeline --> FetchStep FetchStep --> PostTemplatesStep PostTemplatesStep --> PrepareStep PrepareStep --> BuildMsgStep BuildMsgStep --> PublishStep %% Channel processing RPH --> Registry Registry --> PushChannel %% External service calls FetchStep --> PushCampaignService PostTemplatesStep --> TemplateService PublishStep --> PersonalizationClient PersonalizationClient --> PersonalizationService PersonalizationService --> PersonalizationTopic %% Error handling FetchStep --> ErrorsPublisher PostTemplatesStep --> ErrorsPublisher PublishStep --> ErrorsPublisher ErrorsPublisher --> ErrorTopics %% Client connections RPH --> MainClient RPH --> PersClient ErrorsPublisher --> MainClient PersonalizationClient --> PersClient %% Telemetry connections RPH --> TelemetryResource TelemetryResource --> MeterProvider MeterProvider --> OpenTelemetry %% Styling classDef handler fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef pipeline fill:#e3f2fd,stroke:#1976d2,stroke-width:2px classDef channel fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef service fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px classDef client fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef topic fill:#fff3e0,stroke:#ef6c00,stroke-width:2px classDef telemetry fill:#f1f8e9,stroke:#558b2f,stroke-width:2px class RPH,Pipeline handler class FetchStep,PostTemplatesStep,PrepareStep,BuildMsgStep,PublishStep pipeline class Registry,PushChannel channel class PersonalizationService,PushCampaignService,TemplateService service class PersonalizationClient,ErrorsPublisher,MainClient,PersClient client class PubSub,PersonalizationTopic,ErrorTopics topic class TelemetryResource,MeterProvider,OpenTelemetry telemetry

Processing Pipeline

sequenceDiagram participant PS as PubSub participant RPH as Request Publisher Handler participant Pipeline as Processing Pipeline participant Registry as Channel Registry participant PC as Push Channel participant CampaignSvc as Campaign Service participant TemplateBuilder as Template Builder participant PersClient as Personalization Client participant PersService as Personalization Service participant OutTopic as Output Topic PS->>RPH: Batch of event messages RPH->>Pipeline: Execute processing pipeline %% Pipeline execution loop For each message batch group Pipeline->>PC: 1. Fetch Campaign Step PC->>CampaignSvc: FetchCampaign(customerId, campaignId) CampaignSvc->>PC: Campaign metadata Pipeline->>PC: 2. Post Templates Step PC->>TemplateBuilder: BuildPersonalizationTemplates TemplateBuilder->>PC: Templates with fields to personalize PC->>PersClient: PostTemplates(customerId, campaignId, templates) PersClient->>PersService: HTTP POST templates PersService->>PersClient: Template response with hash PersClient->>PC: Hash alt Batch Mode Pipeline->>PC: 3. Prepare Templates Step PC->>PersClient: PrepareTemplates(customerId, campaignId, hash) PersClient->>PersService: HTTP GET prepare PersService->>PersClient: Prepare response PersClient->>PC: Success end Pipeline->>PC: 4. Build Request Message Step PC->>PC: BuildPersonalizationRequestMessage Pipeline->>PC: 5. Publish Request Step PC->>OutTopic: Publish personalization request end alt Processing Errors Pipeline->>RPH: Handle pipeline errors RPH->>PS: Publish to errors topic end RPH->>PS: Acknowledge processed messages

Channel-Specific Methods Usage

sequenceDiagram participant Handler as Request Publisher Handler participant Registry as Channel Registry participant Channel as Channel Handler
(Push/EmbeddedMessaging) participant CampaignSvc as Campaign Service participant TemplateBuilder as Template Builder participant PersClient as Personalization Client Note over Handler: Processing batch of messages Handler->>Registry: DetermineChannel(message attributes) Registry->>Channel: Return channel instance rect rgb(230, 240, 255) Note over Handler,PersClient: 1. GetCampaign - Fetch campaign metadata Handler->>Channel: GetCampaign(customerId, campaignId) Channel->>CampaignSvc: Fetch campaign data CampaignSvc-->>Channel: Campaign metadata Channel-->>Handler: Campaign data end rect rgb(240, 255, 240) Note over Handler,PersClient: 2. PostTemplates - Create personalization templates Handler->>Channel: Call template processing Channel->>TemplateBuilder: BuildPersonalizationTemplates(campaign) TemplateBuilder-->>Channel: Templates with fields Channel->>PersClient: PostTemplates(customerId, campaignId, templates) PersClient-->>Channel: Template hash Channel-->>Handler: Hash for personalization end alt Batch Mode rect rgb(255, 245, 230) Note over Handler,PersClient: 3. PrepareTemplates - Wait for template preparation Handler->>Channel: PrepareTemplates(customerId, campaignId, hash) Channel->>PersClient: HTTP GET prepare endpoint PersClient-->>Channel: Preparation complete Channel-->>Handler: Success end end rect rgb(255, 240, 245) Note over Handler,PersClient: 4. GetAttributes - Build message attributes Handler->>Channel: GetAttributes(campaignId, hash, messages) Note over Channel: Does NOT include MeCampaignIdAttribute
(added by builder) Channel-->>Handler: Channel-specific attributes end rect rgb(245, 240, 255) Note over Handler,PersClient: 5. BuildMessage & Publish Handler->>Handler: BuildPersonalizationRequestMessage
(adds MeCampaignIdAttribute) Handler->>PersClient: Publish to personalization service end Note over Handler: Message successfully published for personalization

Key Components

Pipeline Processing Steps

  1. Fetch Campaign Step: Retrieve campaign metadata from external service using channel handler

  2. Post Templates Step: Create and post personalization templates to personalization service, receive hash

  3. Prepare Templates Step: (Conditional - only for batch mode) Wait for template preparation to complete

  4. Build Request Message Step: Construct personalization request message with all required attributes

  5. Publish Request Step: Send personalization request to personalization service topic

External Dependencies

  • PubSub Clients: Main project and personalization project clients (all use cloud.google.com/go/pubsub/v2)

  • Personalization Client: Handles requests to personalization service via HTTP

  • Campaign Service: Provides campaign metadata and configuration

  • Template Builder: Creates personalization templates from campaign data

Message Processing Flow

  • Input Path: PubSub Events → Request Publisher Handler → Message Channel → Group Processing

  • Campaign Path: Fetch Campaign → Build Templates → Post Templates → (Prepare if batch mode)

  • Personalization Path: Build Message → Publish to Personalization Service

  • Error Path: Any step failure → Errors Publisher → Error Topics

Batch vs Transactional Mode

  • Batch Mode: Groups messages by (channel, campaignId), includes PrepareTemplates step for optimization

  • Transactional Mode: Processes messages individually, skips PrepareTemplates step for lower latency

Configuration

  • Input Topic: event-pers.push.request-publisher.{mode}.{group}

  • Subscription: event-pers.push.request-publisher.{mode}.{group} (or custom via SubscriptionName param)

  • Output: Personalization requests sent to me-{mode}-{group}-personalization-requests topic

  • Channels: Push and embedded_messaging channels with request processing capabilities

Processing Features

  • Pipeline Architecture: Sequential step processing with comprehensive error handling

  • Batch/Transactional Modes: Configurable processing modes with different optimization strategies

  • Channel-based Routing: Registry pattern for extensible channel support (push, embedded_messaging)

  • Template Management: Dynamic template creation, posting, and optional preparation

  • Telemetry Integration: OpenTelemetry metrics collection for monitoring

  • Campaign Caching: Configurable campaign metadata caching for performance

  • Graceful Shutdown: Two-phase shutdown with separate subscription and publish contexts