Introduction
Cloud Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. It decouples services that produce events from services that process events, enabling you to build scalable and reliable systems.
Key Concepts
- Topics: Named resources to which messages are sent by publishers.
- Subscriptions: Named resources representing the stream of messages from a single, specific topic, to be delivered to the subscribing application.
- Messages: Data that is sent to a topic by a publisher and received from a subscription by a subscriber.
- Publisher: An application that creates and sends messages to a topic.
- Subscriber: An application that receives messages from a subscription.
How Cloud Pub/Sub Works
- Publish/Subscribe Model: Publishers send messages to topics. Subscribers create subscriptions to these topics to receive messages.
- Message Delivery: Messages are delivered at least once to each subscription. Pub/Sub ensures that messages are delivered in the order they were published.
- Acknowledgements: Subscribers must acknowledge each message they receive. If a message is not acknowledged within a certain period, it is redelivered.
Setting Up Cloud Pub/Sub
Step 1: Create a Topic
from google.cloud import pubsub_v1 # Initialize a Publisher client publisher = pubsub_v1.PublisherClient() # Define the topic path topic_path = publisher.topic_path('your-project-id', 'your-topic-name') # Create the topic topic = publisher.create_topic(request={"name": topic_path}) print(f"Topic created: {topic.name}")
Step 2: Create a Subscription
from google.cloud import pubsub_v1 # Initialize a Subscriber client subscriber = pubsub_v1.SubscriberClient() # Define the subscription path subscription_path = subscriber.subscription_path('your-project-id', 'your-subscription-name') # Create the subscription subscription = subscriber.create_subscription( request={"name": subscription_path, "topic": topic_path} ) print(f"Subscription created: {subscription.name}")
Step 3: Publish Messages
from google.cloud import pubsub_v1 # Initialize a Publisher client publisher = pubsub_v1.PublisherClient() # Define the topic path topic_path = publisher.topic_path('your-project-id', 'your-topic-name') # Publish a message message_data = "Hello, World!" future = publisher.publish(topic_path, message_data.encode("utf-8")) print(f"Published message ID: {future.result()}")
Step 4: Receive Messages
from google.cloud import pubsub_v1 # Initialize a Subscriber client subscriber = pubsub_v1.SubscriberClient() # Define the subscription path subscription_path = subscriber.subscription_path('your-project-id', 'your-subscription-name') def callback(message): print(f"Received message: {message.data}") message.ack() # Subscribe to the subscription streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print(f"Listening for messages on {subscription_path}...") # Keep the main thread alive to listen for messages try: streaming_pull_future.result() except KeyboardInterrupt: streaming_pull_future.cancel()
Practical Exercise
Exercise: Implement a Simple Pub/Sub System
- Objective: Create a topic, a subscription, publish messages to the topic, and receive messages from the subscription.
- Steps:
- Create a new topic named
test-topic
. - Create a subscription named
test-subscription
to thetest-topic
. - Publish a message "Hello, Pub/Sub!" to the
test-topic
. - Receive and print the message from
test-subscription
.
- Create a new topic named
Solution
from google.cloud import pubsub_v1 # Initialize clients publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() # Define paths project_id = 'your-project-id' topic_name = 'test-topic' subscription_name = 'test-subscription' topic_path = publisher.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path(project_id, subscription_name) # Create topic topic = publisher.create_topic(request={"name": topic_path}) print(f"Topic created: {topic.name}") # Create subscription subscription = subscriber.create_subscription( request={"name": subscription_path, "topic": topic_path} ) print(f"Subscription created: {subscription.name}") # Publish message message_data = "Hello, Pub/Sub!" future = publisher.publish(topic_path, message_data.encode("utf-8")) print(f"Published message ID: {future.result()}") # Receive message def callback(message): print(f"Received message: {message.data}") message.ack() streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print(f"Listening for messages on {subscription_path}...") try: streaming_pull_future.result() except KeyboardInterrupt: streaming_pull_future.cancel()
Common Mistakes and Tips
- Message Acknowledgement: Always acknowledge messages after processing them to prevent redelivery.
- Error Handling: Implement error handling in your callback function to manage exceptions and ensure the subscriber continues to receive messages.
- Resource Cleanup: Properly clean up resources (topics, subscriptions) when they are no longer needed to avoid unnecessary charges.
Conclusion
In this section, you learned about Cloud Pub/Sub, its key concepts, and how to set up and use it for real-time messaging. You also implemented a simple Pub/Sub system to publish and receive messages. This knowledge is crucial for building scalable and decoupled systems on Google Cloud Platform. In the next module, we will explore more data and analytics services offered by GCP.
Google Cloud Platform (GCP) Course
Module 1: Introduction to Google Cloud Platform
- What is Google Cloud Platform?
- Setting Up Your GCP Account
- GCP Console Overview
- Understanding Projects and Billing
Module 2: Core GCP Services
Module 3: Networking and Security
Module 4: Data and Analytics
Module 5: Machine Learning and AI
Module 6: DevOps and Monitoring
- Cloud Build
- Cloud Source Repositories
- Cloud Functions
- Stackdriver Monitoring
- Cloud Deployment Manager
Module 7: Advanced GCP Topics
- Hybrid and Multi-Cloud with Anthos
- Serverless Computing with Cloud Run
- Advanced Networking
- Security Best Practices
- Cost Management and Optimization