Request Publisher Flow
This flowchart shows the initialization and execution flow of the request_publisher service.
Architecture Overview
event-pers.push.request-publisher.{mode}.{group}")] %% Main Handler subgraph "Request Publisher Handler" RPH[Request Publisher Handler] Pipeline[Processing Pipeline] subgraph "Pipeline Steps" FetchStep[Fetch Campaign Step] PostTemplatesStep[Post Templates Step] PrepareStep[Prepare Templates Step
conditional if batching] BuildMsgStep[Build Request Message Step] PublishStep[Publish Request Step] end end %% Channel System subgraph "Channel System" Registry[Channel Registry] PushChannel[Push Channel
with Request Processing] end %% External Services subgraph "External Services" PersonalizationService[(Personalization Service)] PushCampaignService[(Push Campaign Service)] TemplateService[(Template Processing)] end %% Publishers & Clients subgraph "Publishers & Clients" PersonalizationClient[Personalization Client] ErrorsPublisher[Errors Publisher
config.ErrorsTopic] subgraph "PubSub Clients" MainClient[Main PubSub Client
GCloudProjectId] PersClient[Personalization Client
PersGCloudProjectId] end end %% Telemetry subgraph "Telemetry & Monitoring" TelemetryResource[Telemetry Resource] MeterProvider[Meter Provider] OpenTelemetry[OpenTelemetry Provider] end %% Output Topics subgraph "Output Topics" PersonalizationTopic[("Personalization Request Topic")] ErrorTopics[("Error Topics")] end %% Flow connections PubSub --> RPH RPH --> Pipeline Pipeline --> FetchStep FetchStep --> PostTemplatesStep PostTemplatesStep --> PrepareStep PrepareStep --> BuildMsgStep BuildMsgStep --> PublishStep %% Channel processing RPH --> Registry Registry --> PushChannel %% External service calls FetchStep --> PushCampaignService PostTemplatesStep --> TemplateService PublishStep --> PersonalizationClient PersonalizationClient --> PersonalizationService PersonalizationService --> PersonalizationTopic %% Error handling FetchStep --> ErrorsPublisher PostTemplatesStep --> ErrorsPublisher PublishStep --> ErrorsPublisher ErrorsPublisher --> ErrorTopics %% Client connections RPH --> MainClient RPH --> PersClient ErrorsPublisher --> MainClient PersonalizationClient --> PersClient %% Telemetry connections RPH --> TelemetryResource TelemetryResource --> MeterProvider MeterProvider --> OpenTelemetry %% Styling classDef handler fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef pipeline fill:#e3f2fd,stroke:#1976d2,stroke-width:2px classDef channel fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef service fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px classDef client fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef topic fill:#fff3e0,stroke:#ef6c00,stroke-width:2px classDef telemetry fill:#f1f8e9,stroke:#558b2f,stroke-width:2px class RPH,Pipeline handler class FetchStep,PostTemplatesStep,PrepareStep,BuildMsgStep,PublishStep pipeline class Registry,PushChannel channel class PersonalizationService,PushCampaignService,TemplateService service class PersonalizationClient,ErrorsPublisher,MainClient,PersClient client class PubSub,PersonalizationTopic,ErrorTopics topic class TelemetryResource,MeterProvider,OpenTelemetry telemetry
Processing Pipeline
Channel-Specific Methods Usage
(Push/EmbeddedMessaging) participant CampaignSvc as Campaign Service participant TemplateBuilder as Template Builder participant PersClient as Personalization Client Note over Handler: Processing batch of messages Handler->>Registry: DetermineChannel(message attributes) Registry->>Channel: Return channel instance rect rgb(230, 240, 255) Note over Handler,PersClient: 1. GetCampaign - Fetch campaign metadata Handler->>Channel: GetCampaign(customerId, campaignId) Channel->>CampaignSvc: Fetch campaign data CampaignSvc-->>Channel: Campaign metadata Channel-->>Handler: Campaign data end rect rgb(240, 255, 240) Note over Handler,PersClient: 2. PostTemplates - Create personalization templates Handler->>Channel: Call template processing Channel->>TemplateBuilder: BuildPersonalizationTemplates(campaign) TemplateBuilder-->>Channel: Templates with fields Channel->>PersClient: PostTemplates(customerId, campaignId, templates) PersClient-->>Channel: Template hash Channel-->>Handler: Hash for personalization end alt Batch Mode rect rgb(255, 245, 230) Note over Handler,PersClient: 3. PrepareTemplates - Wait for template preparation Handler->>Channel: PrepareTemplates(customerId, campaignId, hash) Channel->>PersClient: HTTP GET prepare endpoint PersClient-->>Channel: Preparation complete Channel-->>Handler: Success end end rect rgb(255, 240, 245) Note over Handler,PersClient: 4. GetAttributes - Build message attributes Handler->>Channel: GetAttributes(campaignId, hash, messages) Note over Channel: Does NOT include MeCampaignIdAttribute
(added by builder) Channel-->>Handler: Channel-specific attributes end rect rgb(245, 240, 255) Note over Handler,PersClient: 5. BuildMessage & Publish Handler->>Handler: BuildPersonalizationRequestMessage
(adds MeCampaignIdAttribute) Handler->>PersClient: Publish to personalization service end Note over Handler: Message successfully published for personalization
Key Components
Pipeline Processing Steps
-
Fetch Campaign Step: Retrieve campaign metadata from external service using channel handler
-
Post Templates Step: Create and post personalization templates to personalization service, receive hash
-
Prepare Templates Step: (Conditional - only for batch mode) Wait for template preparation to complete
-
Build Request Message Step: Construct personalization request message with all required attributes
-
Publish Request Step: Send personalization request to personalization service topic
External Dependencies
-
PubSub Clients: Main project and personalization project clients (all use cloud.google.com/go/pubsub/v2)
-
Personalization Client: Handles requests to personalization service via HTTP
-
Campaign Service: Provides campaign metadata and configuration
-
Template Builder: Creates personalization templates from campaign data
Message Processing Flow
-
Input Path: PubSub Events → Request Publisher Handler → Message Channel → Group Processing
-
Campaign Path: Fetch Campaign → Build Templates → Post Templates → (Prepare if batch mode)
-
Personalization Path: Build Message → Publish to Personalization Service
-
Error Path: Any step failure → Errors Publisher → Error Topics
Batch vs Transactional Mode
-
Batch Mode: Groups messages by (channel, campaignId), includes PrepareTemplates step for optimization
-
Transactional Mode: Processes messages individually, skips PrepareTemplates step for lower latency
Configuration
-
Input Topic:
event-pers.push.request-publisher.{mode}.{group} -
Subscription:
event-pers.push.request-publisher.{mode}.{group}(or custom via SubscriptionName param) -
Output: Personalization requests sent to
me-{mode}-{group}-personalization-requeststopic -
Channels: Push and embedded_messaging channels with request processing capabilities
Processing Features
-
Pipeline Architecture: Sequential step processing with comprehensive error handling
-
Batch/Transactional Modes: Configurable processing modes with different optimization strategies
-
Channel-based Routing: Registry pattern for extensible channel support (push, embedded_messaging)
-
Template Management: Dynamic template creation, posting, and optional preparation
-
Telemetry Integration: OpenTelemetry metrics collection for monitoring
-
Campaign Caching: Configurable campaign metadata caching for performance
-
Graceful Shutdown: Two-phase shutdown with separate subscription and publish contexts