Introduction

Cloud Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. It decouples services that produce events from services that process events, enabling you to build scalable and reliable systems.

Key Concepts

  1. Topics: Named resources to which messages are sent by publishers.
  2. Subscriptions: Named resources representing the stream of messages from a single, specific topic, to be delivered to the subscribing application.
  3. Messages: Data that is sent to a topic by a publisher and received from a subscription by a subscriber.
  4. Publisher: An application that creates and sends messages to a topic.
  5. Subscriber: An application that receives messages from a subscription.

How Cloud Pub/Sub Works

  1. Publish/Subscribe Model: Publishers send messages to topics. Subscribers create subscriptions to these topics to receive messages.
  2. Message Delivery: Messages are delivered at least once to each subscription. Pub/Sub ensures that messages are delivered in the order they were published.
  3. Acknowledgements: Subscribers must acknowledge each message they receive. If a message is not acknowledged within a certain period, it is redelivered.

Setting Up Cloud Pub/Sub

Step 1: Create a Topic

from google.cloud import pubsub_v1

# Initialize a Publisher client
publisher = pubsub_v1.PublisherClient()
# Define the topic path
topic_path = publisher.topic_path('your-project-id', 'your-topic-name')

# Create the topic
topic = publisher.create_topic(request={"name": topic_path})
print(f"Topic created: {topic.name}")

Step 2: Create a Subscription

from google.cloud import pubsub_v1

# Initialize a Subscriber client
subscriber = pubsub_v1.SubscriberClient()
# Define the subscription path
subscription_path = subscriber.subscription_path('your-project-id', 'your-subscription-name')

# Create the subscription
subscription = subscriber.create_subscription(
    request={"name": subscription_path, "topic": topic_path}
)
print(f"Subscription created: {subscription.name}")

Step 3: Publish Messages

from google.cloud import pubsub_v1

# Initialize a Publisher client
publisher = pubsub_v1.PublisherClient()
# Define the topic path
topic_path = publisher.topic_path('your-project-id', 'your-topic-name')

# Publish a message
message_data = "Hello, World!"
future = publisher.publish(topic_path, message_data.encode("utf-8"))
print(f"Published message ID: {future.result()}")

Step 4: Receive Messages

from google.cloud import pubsub_v1

# Initialize a Subscriber client
subscriber = pubsub_v1.SubscriberClient()
# Define the subscription path
subscription_path = subscriber.subscription_path('your-project-id', 'your-subscription-name')

def callback(message):
    print(f"Received message: {message.data}")
    message.ack()

# Subscribe to the subscription
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")

# Keep the main thread alive to listen for messages
try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()

Practical Exercise

Exercise: Implement a Simple Pub/Sub System

  1. Objective: Create a topic, a subscription, publish messages to the topic, and receive messages from the subscription.
  2. Steps:
    • Create a new topic named test-topic.
    • Create a subscription named test-subscription to the test-topic.
    • Publish a message "Hello, Pub/Sub!" to the test-topic.
    • Receive and print the message from test-subscription.

Solution

from google.cloud import pubsub_v1

# Initialize clients
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Define paths
project_id = 'your-project-id'
topic_name = 'test-topic'
subscription_name = 'test-subscription'
topic_path = publisher.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(project_id, subscription_name)

# Create topic
topic = publisher.create_topic(request={"name": topic_path})
print(f"Topic created: {topic.name}")

# Create subscription
subscription = subscriber.create_subscription(
    request={"name": subscription_path, "topic": topic_path}
)
print(f"Subscription created: {subscription.name}")

# Publish message
message_data = "Hello, Pub/Sub!"
future = publisher.publish(topic_path, message_data.encode("utf-8"))
print(f"Published message ID: {future.result()}")

# Receive message
def callback(message):
    print(f"Received message: {message.data}")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")

try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()

Common Mistakes and Tips

  • Message Acknowledgement: Always acknowledge messages after processing them to prevent redelivery.
  • Error Handling: Implement error handling in your callback function to manage exceptions and ensure the subscriber continues to receive messages.
  • Resource Cleanup: Properly clean up resources (topics, subscriptions) when they are no longer needed to avoid unnecessary charges.

Conclusion

In this section, you learned about Cloud Pub/Sub, its key concepts, and how to set up and use it for real-time messaging. You also implemented a simple Pub/Sub system to publish and receive messages. This knowledge is crucial for building scalable and decoupled systems on Google Cloud Platform. In the next module, we will explore more data and analytics services offered by GCP.

© Copyright 2024. All rights reserved