How To

How to Test pubsub-delayer

This guide shows how to publish a delayed message with pubsub-delayer and verify its delivery with a subscription.

Step 1. Publish a test message with a delay

gcloud pubsub topics publish pubsub-delayer \
  --message="test message" \
  --attribute="me-pubsub-delayer/topic=ak-test,me-pubsub-delayer/delay=10s" \
  --project=ems-mobile-engage

Attributes:

  • me-pubsub-delayer/topic:: Target topic where the message will be forwarded (ak-test).

  • me-pubsub-delayer/delay:: Delay duration before forwarding (e.g. 10s).

Step 2. Verify the message in the target subscription

gcloud pubsub subscriptions pull ak-test-sub \
  --project=ems-mobile-engage \
  --auto-ack

This command pulls the message from the subscription ak-test-sub after the configured delay expires.

How to Configure Custom Queue Routing

The service routes messages to different Cloud Task queues based on the target topic. This allows you to:

  • Separate high-priority from low-priority messages

  • Isolate specific topics with dedicated rate limits

  • Optimize queue configuration based on traffic patterns

View Current Queue Configuration

Check the current queue mappings:

kubectl get configmap me-pubsub-delayer-queue-config -n mobile-engage -o yaml

Example Configuration

The configuration defines how topics map to queues:

data:
  queue-mapping.yaml: |
    # Default queue for unmapped topics
    default: pubsub-delayer

    mappings:
      # Exact match - client-changes always goes to this queue
      client-changes: pubsub-delayer-client-changes

      # Pattern match - all analytics-* topics go here
      sending-*: pubsub-delayer-sending-chain

      # Pattern match - all *-events topics go here
      *-dg01: pubsub-delayer-dg01

Pattern matching rules:

  • Exact matches take precedence

  • Wildcard * matches any characters

  • First matching pattern wins

  • Unmapped topics use the default queue

For operational tasks (updating configuration, creating queues), see Queue Configuration Management.

How to Verify Queue Routing

After publishing a message, you can verify which queue it was routed to:

# Publish a test message
gcloud pubsub topics publish pubsub-delayer \
  --message="analytics test" \
  --attribute="me-pubsub-delayer/topic=sending-batch-dg02,me-pubsub-delayer/delay=5s" \
  --project=ems-mobile-engage

# Check the logs to see queue selection
kubectl logs -l app=me-pubsub-delayer-task-creator -n mobile-engage | \
  grep "queue-selected-for-topic" | \
  tail -1

Expected output:

{
  "level":"debug",
  "topic":"sending-batch-dg02",
  "queue":"pubsub-delayer-sending-chain",
  "matchType":"pattern",
  "pattern":"sending-*",
  "message":"queue-selected-for-topic"
}

This shows:

  • topic: The target topic from your message

  • queue: Which queue the message was routed to

  • matchType: How the match was made (exact or pattern)

  • pattern: Which pattern matched (if pattern matching was used)

How to Monitor Your Messages

View Messages in a Specific Queue

Check the Cloud Tasks console for your queue:

# List tasks in a queue
gcloud tasks list --queue=pubsub-delayer-sending-chain \
  --location=europe-west3 \
  --project=ems-mobile-engage

Or use the Cloud Tasks UI to see:

  • Number of pending tasks

  • Task execution rate

  • Task age and scheduled time

Check Task Execution

View logs to see when tasks are executed:

kubectl logs -l app=me-pubsub-delayer-task-creator -n mobile-engage --tail=50

Troubleshoot Message Delays

If messages aren’t being delivered:

  1. Check queue selection: Verify the message was routed to the correct queue

  2. Check queue status: Ensure the queue is not paused

  3. Check queue depth: See if tasks are backing up

  4. Check error logs: Look for validation or permission errors

# Check for error messages
kubectl logs -l app=me-pubsub-delayer-task-creator -n mobile-engage | \
  grep -i error

# Check if queue exists
gcloud tasks queues describe your-queue-name \
  --location=europe-west3 \
  --project=ems-mobile-engage

For operational troubleshooting, see Troubleshooting Queue Configuration.