This comprehensive guide will walk you through creating a Flask application that integrates with Google Cloud Pub/Sub for asynchronous messaging.
Prerequisites
- Python 3.7+ installed
- Google Cloud account
- Basic knowledge of Flask and Python
Step 1: Set Up Google Cloud Project and Pub/Sub
- Create a Google Cloud Project:
- Go to Google Cloud Console
- Create a new project or select an existing one
- Note your Project ID
- Enable Pub/Sub API:
- Navigate to "APIs & Services" > "Library"
- Search for "Cloud Pub/Sub API" and enable it
- Create Service Account:
- Go to "IAM & Admin" > "Service Accounts"
- Click "Create Service Account"
- Give it a name (e.g., "pubsub-flask-service")
- Assign the "Pub/Sub Admin" role
- Create a JSON key and download it securely
Step 2: Set Up Python Environment
- Create a new directory for your project:
mkdir flask-pubsub cd flask-pubsub
- Create a virtual environment:
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
- Install required packages:
pip install flask google-cloud-pubsub python-dotenv
Step 3: Create Pub/Sub Helper Module
Create a file pubsub_helper.py
:
from google.cloud import pubsub_v1
import os
from dotenv import load_dotenv
import json
import time
load_dotenv()
class PubSubHelper:
def __init__(self):
self.project_id = os.getenv('GOOGLE_CLOUD_PROJECT')
self.credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
# Configure the publisher and subscriber clients
self.publisher = pubsub_v1.PublisherClient.from_service_account_file(self.credentials_path)
self.subscriber = pubsub_v1.SubscriberClient.from_service_account_file(self.credentials_path)
def create_topic(self, topic_name):
"""Creates a new Pub/Sub topic."""
topic_path = self.publisher.topic_path(self.project_id, topic_name)
try:
topic = self.publisher.create_topic(request={"name": topic_path})
print(f"Topic created: {topic.name}")
return topic
except Exception as e:
print(f"Error creating topic: {e}")
return None
def create_subscription(self, topic_name, subscription_name):
"""Creates a new subscription to the topic."""
topic_path = self.publisher.topic_path(self.project_id, topic_name)
subscription_path = self.subscriber.subscription_path(self.project_id, subscription_name)
try:
subscription = self.subscriber.create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
"ack_deadline_seconds": 30
}
)
print(f"Subscription created: {subscription.name}")
return subscription
except Exception as e:
print(f"Error creating subscription: {e}")
return None
def publish_message(self, topic_name, message_data):
"""Publishes a message to a Pub/Sub topic."""
topic_path = self.publisher.topic_path(self.project_id, topic_name)
# Convert message data to bytes
if isinstance(message_data, dict):
message_data = json.dumps(message_data).encode('utf-8')
elif isinstance(message_data, str):
message_data = message_data.encode('utf-8')
try:
future = self.publisher.publish(topic_path, data=message_data)
message_id = future.result()
print(f"Message published with ID: {message_id}")
return message_id
except Exception as e:
print(f"Error publishing message: {e}")
return None
def pull_messages(self, subscription_name, callback, max_messages=5):
"""Pulls messages from a subscription and processes them with the callback."""
subscription_path = self.subscriber.subscription_path(self.project_id, subscription_name)
def wrapped_callback(message):
try:
# Process the message
callback(message)
# Acknowledge the message
message.ack()
except Exception as e:
print(f"Error processing message: {e}")
message.nack()
try:
streaming_pull_future = self.subscriber.subscribe(
subscription_path, callback=wrapped_callback
)
print(f"Listening for messages on {subscription_path}...")
# Keep the main thread alive
with self.subscriber:
try:
streaming_pull_future.result()
except Exception as e:
print(f"Error in subscriber: {e}")
streaming_pull_future.cancel()
streaming_pull_future.result()
except Exception as e:
print(f"Error pulling messages: {e}")
Step 4: Create Flask Application
Create a file app.py
:
from flask import Flask, request, jsonify
from pubsub_helper import PubSubHelper
from dotenv import load_dotenv
import os
import threading
import time
load_dotenv()
app = Flask(__name__)
pubsub = PubSubHelper()
# Configuration
TOPIC_NAME = "flask-messages"
SUBSCRIPTION_NAME = "flask-messages-sub"
# Initialize Pub/Sub resources
pubsub.create_topic(TOPIC_NAME)
pubsub.create_subscription(TOPIC_NAME, SUBSCRIPTION_NAME)
def message_processor(message):
"""Callback function to process incoming messages."""
print(f"Received message: {message.data.decode('utf-8')}")
# Add your message processing logic here
# Start the message listener in a background thread
def start_listener():
time.sleep(2) # Give the app time to start
pubsub.pull_messages(SUBSCRIPTION_NAME, message_processor)
listener_thread = threading.Thread(target=start_listener)
listener_thread.daemon = True
listener_thread.start()
@app.route('/')
def home():
return "Flask Pub/Sub Integration - Send messages to /publish"
@app.route('/publish', methods=['POST'])
def publish_message():
try:
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
message_id = pubsub.publish_message(TOPIC_NAME, data)
if message_id:
return jsonify({
"status": "Message published",
"message_id": message_id
}), 200
else:
return jsonify({"error": "Failed to publish message"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)
Step 5: Create Environment Configuration
Create a .env
file:
GOOGLE_CLOUD_PROJECT=your-project-id
GOOGLE_APPLICATION_CREDENTIALS=path/to/your/service-account-key.json
Step 6: Test the Application
- Run the Flask application:
python app.py
- Test publishing a message using curl:
curl -X POST -H "Content-Type: application/json" -d '{"message": "Hello Pub/Sub!", "sender": "Flask App"}' http://localhost:8080/publish
- You should see:
- In the terminal running Flask: The received message printed
- In the curl response: The message ID and success status
Step 7: Advanced Features (Optional)
7.1 Add Message Attributes
Modify the publish_message
method in pubsub_helper.py
:
def publish_message(self, topic_name, message_data, attributes=None):
"""Publishes a message with optional attributes."""
topic_path = self.publisher.topic_path(self.project_id, topic_name)
if isinstance(message_data, dict):
message_data = json.dumps(message_data).encode('utf-8')
elif isinstance(message_data, str):
message_data = message_data.encode('utf-8')
try:
future = self.publisher.publish(
topic_path,
data=message_data,
**attributes if attributes else {}
)
message_id = future.result()
print(f"Message published with ID: {message_id}")
return message_id
except Exception as e:
print(f"Error publishing message: {e}")
return None
7.2 Add Dead Letter Queue
Create a dead letter topic and subscription:
# In app.py, after creating the main topic
DEAD_LETTER_TOPIC = "flask-messages-dead-letter"
DEAD_LETTER_SUBSCRIPTION = "flask-messages-dead-letter-sub"
pubsub.create_topic(DEAD_LETTER_TOPIC)
pubsub.create_subscription(DEAD_LETTER_TOPIC, DEAD_LETTER_SUBSCRIPTION)
# Modify the main subscription to have a dead letter policy
subscription_path = pubsub.subscriber.subscription_path(
os.getenv('GOOGLE_CLOUD_PROJECT'),
SUBSCRIPTION_NAME
)
dead_letter_topic_path = pubsub.publisher.topic_path(
os.getenv('GOOGLE_CLOUD_PROJECT'),
DEAD_LETTER_TOPIC
)
try:
policy = {
"name": subscription_path,
"dead_letter_policy": {
"dead_letter_topic": dead_letter_topic_path,
"max_delivery_attempts": 5
}
}
pubsub.subscriber.update_subscription(request=policy)
except Exception as e:
print(f"Error configuring dead letter policy: {e}")
7.3 Add Monitoring
Add monitoring endpoints to your Flask app:
@app.route('/health')
def health_check():
return jsonify({"status": "healthy"}), 200
@app.route('/metrics')
def metrics():
# Here you could integrate with Cloud Monitoring API
# or provide custom metrics
return jsonify({
"messages_published": 0, # You'd track this in your app
"messages_processed": 0
}), 200
Step 8: Deploy to Production
- Containerize with Docker:
Create aDockerfile
:FROM python:3.9-slim WORKDIR /app COPY . . RUN pip install -r requirements.txt CMD ["gunicorn", "--bind", "0.0.0.0:8080", "app:app"]
- Deploy to Google Cloud Run:
gcloud builds submit --tag gcr.io/your-project-id/flask-pubsub gcloud run deploy flask-pubsub --image gcr.io/your-project-id/flask-pubsub --platform managed
Best Practices
- Error Handling:
- Implement robust error handling for Pub/Sub operations
- Use dead letter queues for failed messages
- Security:
- Never commit your service account key to version control
- Use IAM roles with least privilege
- Implement authentication for your Flask endpoints
- Performance:
- Use batch publishing for high-volume messages
- Consider asynchronous processing for message handling
- Monitoring:
- Set up Cloud Monitoring for your Pub/Sub topics
- Implement logging for message processing
Conclusion
This guide has shown you how to:
- Set up Google Cloud Pub/Sub
- Create a Flask application that publishes messages
- Implement a subscriber to process messages
- Add advanced features like dead letter queues
- Deploy your application to production