CODE WITH SIBIN

Solving Real Problems with Real Code


Python Flask Application with Google Cloud Pub/Sub Integration

This comprehensive guide will walk you through creating a Flask application that integrates with Google Cloud Pub/Sub for asynchronous messaging.

Prerequisites

  • Python 3.7+ installed
  • Google Cloud account
  • Basic knowledge of Flask and Python

Step 1: Set Up Google Cloud Project and Pub/Sub

  1. Create a Google Cloud Project:
  2. Enable Pub/Sub API:
    • Navigate to "APIs & Services" > "Library"
    • Search for "Cloud Pub/Sub API" and enable it
  3. Create Service Account:
    • Go to "IAM & Admin" > "Service Accounts"
    • Click "Create Service Account"
    • Give it a name (e.g., "pubsub-flask-service")
    • Assign the "Pub/Sub Admin" role
    • Create a JSON key and download it securely

Step 2: Set Up Python Environment

  1. Create a new directory for your project: mkdir flask-pubsub cd flask-pubsub
  2. Create a virtual environment: python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
  3. Install required packages: pip install flask google-cloud-pubsub python-dotenv

Step 3: Create Pub/Sub Helper Module

Create a file pubsub_helper.py:

from google.cloud import pubsub_v1
import os
from dotenv import load_dotenv
import json
import time

load_dotenv()

class PubSubHelper:
    def __init__(self):
        self.project_id = os.getenv('GOOGLE_CLOUD_PROJECT')
        self.credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
        
        # Configure the publisher and subscriber clients
        self.publisher = pubsub_v1.PublisherClient.from_service_account_file(self.credentials_path)
        self.subscriber = pubsub_v1.SubscriberClient.from_service_account_file(self.credentials_path)
    
    def create_topic(self, topic_name):
        """Creates a new Pub/Sub topic."""
        topic_path = self.publisher.topic_path(self.project_id, topic_name)
        
        try:
            topic = self.publisher.create_topic(request={"name": topic_path})
            print(f"Topic created: {topic.name}")
            return topic
        except Exception as e:
            print(f"Error creating topic: {e}")
            return None
    
    def create_subscription(self, topic_name, subscription_name):
        """Creates a new subscription to the topic."""
        topic_path = self.publisher.topic_path(self.project_id, topic_name)
        subscription_path = self.subscriber.subscription_path(self.project_id, subscription_name)
        
        try:
            subscription = self.subscriber.create_subscription(
                request={
                    "name": subscription_path,
                    "topic": topic_path,
                    "ack_deadline_seconds": 30
                }
            )
            print(f"Subscription created: {subscription.name}")
            return subscription
        except Exception as e:
            print(f"Error creating subscription: {e}")
            return None
    
    def publish_message(self, topic_name, message_data):
        """Publishes a message to a Pub/Sub topic."""
        topic_path = self.publisher.topic_path(self.project_id, topic_name)
        
        # Convert message data to bytes
        if isinstance(message_data, dict):
            message_data = json.dumps(message_data).encode('utf-8')
        elif isinstance(message_data, str):
            message_data = message_data.encode('utf-8')
        
        try:
            future = self.publisher.publish(topic_path, data=message_data)
            message_id = future.result()
            print(f"Message published with ID: {message_id}")
            return message_id
        except Exception as e:
            print(f"Error publishing message: {e}")
            return None
    
    def pull_messages(self, subscription_name, callback, max_messages=5):
        """Pulls messages from a subscription and processes them with the callback."""
        subscription_path = self.subscriber.subscription_path(self.project_id, subscription_name)
        
        def wrapped_callback(message):
            try:
                # Process the message
                callback(message)
                # Acknowledge the message
                message.ack()
            except Exception as e:
                print(f"Error processing message: {e}")
                message.nack()
        
        try:
            streaming_pull_future = self.subscriber.subscribe(
                subscription_path, callback=wrapped_callback
            )
            print(f"Listening for messages on {subscription_path}...")
            
            # Keep the main thread alive
            with self.subscriber:
                try:
                    streaming_pull_future.result()
                except Exception as e:
                    print(f"Error in subscriber: {e}")
                    streaming_pull_future.cancel()
                    streaming_pull_future.result()
        except Exception as e:
            print(f"Error pulling messages: {e}")

Step 4: Create Flask Application

Create a file app.py:

from flask import Flask, request, jsonify
from pubsub_helper import PubSubHelper
from dotenv import load_dotenv
import os
import threading
import time

load_dotenv()

app = Flask(__name__)
pubsub = PubSubHelper()

# Configuration
TOPIC_NAME = "flask-messages"
SUBSCRIPTION_NAME = "flask-messages-sub"

# Initialize Pub/Sub resources
pubsub.create_topic(TOPIC_NAME)
pubsub.create_subscription(TOPIC_NAME, SUBSCRIPTION_NAME)

def message_processor(message):
    """Callback function to process incoming messages."""
    print(f"Received message: {message.data.decode('utf-8')}")
    # Add your message processing logic here

# Start the message listener in a background thread
def start_listener():
    time.sleep(2)  # Give the app time to start
    pubsub.pull_messages(SUBSCRIPTION_NAME, message_processor)

listener_thread = threading.Thread(target=start_listener)
listener_thread.daemon = True
listener_thread.start()

@app.route('/')
def home():
    return "Flask Pub/Sub Integration - Send messages to /publish"

@app.route('/publish', methods=['POST'])
def publish_message():
    try:
        data = request.get_json()
        if not data:
            return jsonify({"error": "No data provided"}), 400
        
        message_id = pubsub.publish_message(TOPIC_NAME, data)
        if message_id:
            return jsonify({
                "status": "Message published",
                "message_id": message_id
            }), 200
        else:
            return jsonify({"error": "Failed to publish message"}), 500
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=True)

Step 5: Create Environment Configuration

Create a .env file:

GOOGLE_CLOUD_PROJECT=your-project-id
GOOGLE_APPLICATION_CREDENTIALS=path/to/your/service-account-key.json

Step 6: Test the Application

  1. Run the Flask application: python app.py
  2. Test publishing a message using curl: curl -X POST -H "Content-Type: application/json" -d '{"message": "Hello Pub/Sub!", "sender": "Flask App"}' http://localhost:8080/publish
  3. You should see:
    • In the terminal running Flask: The received message printed
    • In the curl response: The message ID and success status

Step 7: Advanced Features (Optional)

7.1 Add Message Attributes

Modify the publish_message method in pubsub_helper.py:

def publish_message(self, topic_name, message_data, attributes=None):
    """Publishes a message with optional attributes."""
    topic_path = self.publisher.topic_path(self.project_id, topic_name)
    
    if isinstance(message_data, dict):
        message_data = json.dumps(message_data).encode('utf-8')
    elif isinstance(message_data, str):
        message_data = message_data.encode('utf-8')
    
    try:
        future = self.publisher.publish(
            topic_path, 
            data=message_data,
            **attributes if attributes else {}
        )
        message_id = future.result()
        print(f"Message published with ID: {message_id}")
        return message_id
    except Exception as e:
        print(f"Error publishing message: {e}")
        return None

7.2 Add Dead Letter Queue

Create a dead letter topic and subscription:

# In app.py, after creating the main topic
DEAD_LETTER_TOPIC = "flask-messages-dead-letter"
DEAD_LETTER_SUBSCRIPTION = "flask-messages-dead-letter-sub"

pubsub.create_topic(DEAD_LETTER_TOPIC)
pubsub.create_subscription(DEAD_LETTER_TOPIC, DEAD_LETTER_SUBSCRIPTION)

# Modify the main subscription to have a dead letter policy
subscription_path = pubsub.subscriber.subscription_path(
    os.getenv('GOOGLE_CLOUD_PROJECT'), 
    SUBSCRIPTION_NAME
)
dead_letter_topic_path = pubsub.publisher.topic_path(
    os.getenv('GOOGLE_CLOUD_PROJECT'), 
    DEAD_LETTER_TOPIC
)

try:
    policy = {
        "name": subscription_path,
        "dead_letter_policy": {
            "dead_letter_topic": dead_letter_topic_path,
            "max_delivery_attempts": 5
        }
    }
    pubsub.subscriber.update_subscription(request=policy)
except Exception as e:
    print(f"Error configuring dead letter policy: {e}")

7.3 Add Monitoring

Add monitoring endpoints to your Flask app:

@app.route('/health')
def health_check():
    return jsonify({"status": "healthy"}), 200

@app.route('/metrics')
def metrics():
    # Here you could integrate with Cloud Monitoring API
    # or provide custom metrics
    return jsonify({
        "messages_published": 0,  # You'd track this in your app
        "messages_processed": 0
    }), 200

Step 8: Deploy to Production

  1. Containerize with Docker:
    Create a Dockerfile: FROM python:3.9-slim WORKDIR /app COPY . . RUN pip install -r requirements.txt CMD ["gunicorn", "--bind", "0.0.0.0:8080", "app:app"]
  2. Deploy to Google Cloud Run: gcloud builds submit --tag gcr.io/your-project-id/flask-pubsub gcloud run deploy flask-pubsub --image gcr.io/your-project-id/flask-pubsub --platform managed

Best Practices

  1. Error Handling:
    • Implement robust error handling for Pub/Sub operations
    • Use dead letter queues for failed messages
  2. Security:
    • Never commit your service account key to version control
    • Use IAM roles with least privilege
    • Implement authentication for your Flask endpoints
  3. Performance:
    • Use batch publishing for high-volume messages
    • Consider asynchronous processing for message handling
  4. Monitoring:
    • Set up Cloud Monitoring for your Pub/Sub topics
    • Implement logging for message processing

Conclusion

This guide has shown you how to:

  1. Set up Google Cloud Pub/Sub
  2. Create a Flask application that publishes messages
  3. Implement a subscriber to process messages
  4. Add advanced features like dead letter queues
  5. Deploy your application to production

Leave a Reply

Your email address will not be published. Required fields are marked *