Introduction: The Journey from Keys to Federation In the early days of…
Subscribe Newsletter
Subscribe to our news letter to get the latest on Google Cloud Platform and more!
Introduction:
Imagine you’re organizing a big outdoor event, managing a fleet of delivery trucks, or overseeing crop growth on a farm. In each of these cases, knowing what the weather will be is crucial. In today’s world, data has become the backbone of every decision, from predicting weather patterns to optimizing logistics. However, before companies can use data, it has to be gathered, organized, and processed into a form that reveals trends and insights. This is where data engineering on GCP shines—enabling automated, scalable, and efficient data handling in the cloud.
This blog series will showcase how we set up a GCP data pipeline that automates the entire process. We’ll start by gathering weather data from an external API, transform it to make it more useful, and load it into BigQuery, Google Cloud’s data warehouse, where we can analyze and visualize trends over time. By the end of this series, you’ll see how tools like Cloud Functions, Pub/Sub, Dataflow, and Looker Studio come together in GCP to reveal the story within the data.
A Closer Look at the Tools
To make sense of a cloud data pipeline, let’s look at each tool we’ll use and how it fits into the bigger picture. Imagine this pipeline as a series of stations, each responsible for a specific task to ensure data flows smoothly from raw source to actionable insights.
Google Cloud Function: This is the pipeline’s starting point, where we fetch weather data from an external API. Cloud Functions allow us to execute code in response to events, such as a new data request, without worrying about managing servers.
Pub/Sub: Think of Pub/Sub as a messaging hub. The Cloud Function sends the raw data here, where it waits for the next step in the pipeline. This “message broker” ensures each piece of data flows in real-time, allowing our pipeline to scale effortlessly.
Dataflow: Here’s where the magic happens. Dataflow processes each data point, applying transformations to clean, enrich, or validate it. Imagine an assembly line where each item is quality-checked and modified before reaching the storage.
BigQuery: Once processed, the data lands in BigQuery. This data warehouse allows us to store, query, and analyze large datasets efficiently using SQL.
Looker Studio: Finally, to bring this data to life, we use Looker Studio to create dashboards that present the information visually. With charts and graphs, we can easily identify patterns and trends in the weather data.
How These Tools Make Data Engineering Easier
Together, these tools provide a seamless way to automate data processing. Google Cloud’s managed services handle scaling, reliability, and security, so we can focus on transforming data into insights rather than managing infrastructure.
In this first post, we’ll start by setting up the Cloud Function to collect weather data. As we move through the series, I’ll show you how each of these tools connects to form a complete, reliable pipeline that could support a range of real-world needs.
Exploring the Code – Step-by-Step Breakdown
With the architecture set, let’s start implementing it, beginning with our Cloud Function. This function will fetch weather data from the OpenWeather API and send it to Pub/Sub. Think of it as the “collector” in our pipeline, grabbing fresh weather data at regular intervals so it’s always ready for analysis.
The code primarily lives in three folders: src/transformations, src/utilities, and src/config. We’ll walk through each one and explain how they work together to read data, apply transformations, and load it into BigQuery.
Overview of main.py
The main.py script is the heart of our pipeline. It defines the pipeline’s flow—from reading raw data to applying transformations and finally writing the results to BigQuery. Let’s break down each section.
1. Setting Up Pipeline Options with CustomPipelineOptions
Our pipeline needs certain inputs, like the path to the CSV file containing our raw data and the destination table in BigQuery. We use CustomPipelineOptions, a custom extension of Apache Beam’s PipelineOptions, to capture these inputs.
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
"""
Adds custom command line arguments for the pipeline.
Args:
parser: The argparse parser object to add arguments to.
"""
parser.add_argument('--input_path', required=True, help='Path to the input CSV file in GCS')
parser.add_argument('--output_table', required=True,
help='BigQuery output table in the format project_id.dataset_id.table_id')
This class does a couple of things:
These options make the pipeline reusable for different datasets or tables by simply changing the command-line arguments.
It creates command-line arguments (--input_path and --output_table) so we can specify the input file and the output table when we run the pipeline.
The Run Function
The run() function defines the core logic of our pipeline. It initializes the pipeline with the options, sets up error handling, and organizes the data processing steps into three main stages: reading, transforming, and writing.
This function does a few things before starting the pipeline:
Command-line arguments parsing: It fetches the input and output options provided by the user.
Error handling: Ensures the output table name is in the right format. If not, it logs an error and stops execution.
Save main session: Enabling save_main_session helps Dataflow avoid import errors in complex setups by preserving all imports in the main execution environment.
Data Processing Stages in run()
The pipeline itself is composed of three main stages:
Stage 1: Reading the Data from CSV
The first stage reads raw data from a CSV file stored in Google Cloud Storage. For this, we use Apache Beam’s ReadFromCsv transform, which reads each row as a record and passes it to the next stage.
The raw data likely needs to be cleaned or transformed before being useful. Here, we apply a custom transformation using the Transformations class (which we’ll discuss in more detail below). This class performs tasks like data validation and converting the duration from milliseconds to minutes.
In the final stage, we load the transformed data into BigQuery. We specify details like the dataset, table, and schema. This schema defines the structure and data types for each column in the BigQuery table.
Schema specification: It defines each field’s type, ensuring the data is structured correctly in BigQuery.
Write disposition: We use WRITE_APPEND, which appends new data to the existing table without overwriting it.
Running and Finalizing the Pipeline
After defining the pipeline stages, we execute pipeline.run() to start processing the data.
pipeline.run().wait_until_finish()
This line runs the pipeline on Google Cloud Dataflow and waits until it completes. If everything is set up correctly, this will run seamlessly in the cloud, processing large volumes of data with ease.
The Transformations: src/transformations
This module contains the Transformations class, where the actual data manipulation happens. Transformations extends Apache Beam’s DoFn class, which is used to define custom data processing functions in Beam.
Key Components of the Transformations Class
convert_to_json(): This static method converts each record into a JSON-compatible dictionary format, which is easier to work with when storing in databases like BigQuery
process(): The process() method is the core of Transformations. It takes each record, validates it, applies transformations, and converts it into JSON format.
Validation: It uses validate_record() (from utilities) to check each record’s integrity before proceeding.
Transformation: Converts duration_ms from milliseconds to minutes, making it more user-friendly.
Logging: It logs the transformed data, which helps with debugging and tracking data flow through the pipeline.
Utilities: src/utilities
The utilities folder includes helper modules for logging and validation. These utilities make the pipeline robust and easy to monitor.
Logger Configuration (logger.py)
The logger.py module sets up logging based on the environment. If running in production, it logs directly to Google Cloud Logging; otherwise, it logs to the console for easier debugging in test environments.
def setup_logging():
log = logging.getLogger(__name__)
try:
if os.getenv(ENVIRONMENT) != TEST and os.getenv(ENVIRONMENT) != DEV:
client = Client()
cloud_handler = CloudLoggingHandler(client)
log.setLevel(logging.INFO)
log.addHandler(cloud_handler)
log.info("Cloud logging has been set up.")
else:
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())
log.debug("Console logging has been set up for testing environment.")
except Exception as e:
log.error(f"Failed to set up logging: {e}")
return log
logger = setup_logging()
This function:
Selects the logging method: Google Cloud Logging for production, console logging for test/development.
Sets log levels: Logs at INFO level in production to avoid unnecessary verbosity and DEBUG level for testing to catch more details.
Validation Utility (utils.py)
The validation utility helps ensure each record meets specific criteria. For instance, it might check that fields are not empty or that values are within expected ranges.
def validate_record(record):
# Example validation checks
if not record.Id or not record.track_id:
return None
if record.popularity < 0 or record.popularity > 100:
return None
return record
This function:
Validates essential fields: Ensures Id and track_id are present.
Range validation: Checks if popularity values fall within 0-100, filtering out bad data early.
With the code in place, the next crucial step in the development process is implementing a robust CI/CD pipeline. This pipeline automates the testing and deployment of our application, ensuring that our changes are consistently integrated and delivered to the cloud environment with minimal manual intervention. By leveraging CI/CD, we can enhance our workflow, catching bugs early and deploying updates more efficiently. The following section will delve into our CI/CD configuration, detailing how we authenticate with Google Cloud, manage dependencies, and ensure code quality through automated testing, ultimately streamlining our deployment process for the weather data pipeline.
What is CI/CD?
Continuous Integration (CI) and Continuous Deployment (CD) are essential practices in modern software development that aim to improve the efficiency, quality, and speed of the software delivery process.
Continuous Integration (CI): This practice involves automatically testing and integrating code changes into a shared repository frequently, usually multiple times a day. Developers merge their changes back to the main branch as often as possible. Each integration is verified by an automated build and tests, allowing teams to detect problems early. The main benefits of CI include:
Early Bug Detection: Issues are identified as soon as they are introduced, making them easier and less costly to fix.
Reduced Integration Problems: Frequent integration minimizes the “integration hell” often experienced when merging large code changes.
Continuous Deployment (CD): This extends CI by automatically deploying all code changes to a production environment after they pass testing. The goal is to ensure that code is always in a deployable state. Key advantages of CD include:
Faster Time to Market: Features and fixes reach users more quickly, allowing businesses to respond rapidly to user needs and market changes.
Higher Deployment Frequency: Smaller, incremental changes are deployed more regularly, reducing the risk associated with larger releases and making it easier to roll back changes if needed.
Together, CI and CD create a robust pipeline that streamlines the development process, encourages collaboration among teams, and enhances the overall software quality.
Why is CI/CD Needed?
Improved Code Quality: Automated testing ensures that only code that meets the predefined quality standards is deployed, leading to fewer bugs and issues in production.
Enhanced Collaboration: CI/CD fosters a culture of collaboration among developers, allowing them to integrate their work regularly and communicate effectively about changes and progress.
Faster Feedback Loops: Developers receive immediate feedback on their code changes through automated testing, enabling quicker identification and resolution of issues.
Reduced Risk: By deploying smaller, incremental updates rather than large, infrequent releases, teams can reduce the risk associated with new deployments. If a problem arises, it’s easier to identify and roll back a small change than a large one.
Increased Productivity: Automation of repetitive tasks, such as testing and deployment, allows developers to focus on writing code and delivering new features, rather than managing manual deployment processes.
Scalability: As teams grow and projects become more complex, CI/CD provides a systematic approach to manage code changes, ensuring that the development process remains efficient and scalable.
In summary, CI/CD practices are critical in today’s fast-paced software development environment. They enable teams to deliver high-quality software rapidly and reliably, ensuring that businesses can meet user demands and adapt to changing market conditions.
CI/CD Workflow Breakdown
1. Triggering the Workflow
The workflow triggers under specific conditions:
Push: When changes are pushed to the main, develop, or release/** branches.
Pull Request: When a new pull request is opened.
Manual Trigger: Using workflow_dispatch, allowing for manual execution.
These triggers ensure that your CI/CD workflow runs at crucial points in the development process, covering branch pushes, feature integration (via pull requests), and manual deployments when necessary.
2. Permissions
permissions:
id-token: write
contents: read
The id-token: write permission is critical for securely authenticating with Google Cloud Platform (GCP) using GitHub’s OIDC (OpenID Connect) tokens. The contents: read permission allows the workflow to access the repository content necessary for deployment.
Job: deploy_to_cloud_function
This job is responsible for handling the deployment process. It runs on an Ubuntu environment and contains a series of steps to check out code, authenticate with GCP, set up the environment, run quality checks, and finally deploy the Cloud Function.
Step-by-Step Breakdown
Git Checkout
- name: Git Checkout
uses: actions/checkout@v4
This step pulls the latest code from your repository, ensuring the workflow has access to the most recent changes.
This step uses GitHub’s OIDC integration to authenticate with GCP. Instead of managing long-lived credentials, this uses a secure, short-lived ID token, improving security. It requires:
GCP_SERVICE_ACCOUNT: Specifies the service account with necessary permissions.
GCP_WORKLOAD_IDENTITY_PROVIDER: Uses a predefined Workload Identity Provider to link GitHub and GCP, further enhancing security.
Set Up Python
- name: Set up python
uses: actions/setup-python@v5
with:
python-version: ${{ secrets.PYTHON_VERSION }}
cache: 'pip'
This step sets up the Python environment, installing the specific Python version defined in the secrets (for example, Python 3.12). The caching of pip packages speeds up the process in subsequent runs.
This step installs all Python dependencies listed in requirements.txt, ensuring the environment has all libraries necessary for the application to run.
This is a crucial step for maintaining code quality and reliability. It runs:
Tests with Pytest: Uses pytest to execute test cases, ensuring that all components function as expected.
Code Coverage: Measures the coverage of the tests, ensuring at least 80% of the code is tested. If coverage falls below 80%, the workflow fails, enforcing a standard for test coverage.
This step saves the coverage results as an artifact, making them available for later review or download if needed. Artifacts are retained for 10 days, allowing for inspection of test coverage over time.
Determine Function Name
- name: Determine Function Name
id: function_name
run: |
if [[ "${GITHUB_REF##*/}" == "develop" ]]; then
echo "FUNCTION_NAME=${{ secrets.GCP_FUNCTION_NAME }}-dev" >> $GITHUB_ENV
elif [[ "${GITHUB_REF##*/}" == "main" ]]; then
echo "FUNCTION_NAME=${{ secrets.GCP_FUNCTION_NAME }}-prod" >> $GITHUB_ENV
else
echo "FUNCTION_NAME=${{ secrets.GCP_FUNCTION_NAME }}-staging" >> $GITHUB_ENV
fi
This step dynamically sets the Cloud Function name based on the branch:
Develop Branch: Deploys to a development function (-dev suffix).
Main Branch: Deploys to production (-prod suffix).
Other Branches (e.g., Release): Uses a staging function (-staging suffix).
This naming convention allows for environment-specific Cloud Functions, avoiding overlap and reducing the risk of deploying to the wrong environment.
This is the core deployment step. It uses the gcloud command to:
Deploy the Cloud Function: Deploys using the determined FUNCTION_NAME.
Specify Runtime: Python 3.12 runtime.
Trigger: HTTP-based trigger for easy invocation.
Entry Point: The main function or entry point of your code.
Environment Variables: Sets environment variables (GOOGLE_CLOUD_PROJECT and OPENWEATHER_API_KEY) for the function.
By setting environment-specific variables, this step ensures that the deployed function has all necessary settings to operate in each environment (e.g., using different API keys or project IDs).
This step sends a notification to Slack once the job completes. It uses a webhook to notify team members, including relevant details like:
Workflow, job name, commit, repository, branch reference, author, and duration.
Color-coded Status: Displays the job status using color codes (green for success, red for failure, and yellow for warnings).
This notification keeps the team informed of deployment progress and outcomes, making it easy to respond to any issues quickly.
GitHub Repository Details
For those interested in exploring the code and structure of the weather data publisher project, I’ve made it available on GitHub. The repository contains all the scripts and configurations necessary to replicate the pipeline we’ve discussed. You can find the project at
Feel free to clone the repository, experiment with the code, and adapt it to your own use cases. Contributions and feedback are welcome, as we aim to build a collaborative community around cloud-native data processing solutions.
Conclusion:
In this blog series, we’ve delved into the transformative capabilities of cloud-native tools by designing a data processing pipeline that utilizes Google Cloud Functions and Pub/Sub. By leveraging these managed services, we created a streamlined and efficient workflow for fetching weather data from an API and publishing it for further processing. This setup not only simplifies the process of handling incoming data but also ensures that we can scale our solutions seamlessly as our data needs grow.
Cloud Functions allow us to run code in response to events without the need to manage infrastructure, while Pub/Sub acts as a reliable messaging service that decouples our data sources from downstream processing. This architecture exemplifies how modern cloud technologies can make data processing more accessible and flexible, enabling organizations to derive valuable insights from their data with minimal overhead.
However, our journey doesn’t end here. In the next installment of this series, we will explore how to further enhance our pipeline by incorporating Google Cloud Dataflow for data transformations and visualizing the results with Looker Studio. This will provide us with a comprehensive view of our data journey, from raw input to actionable insights. Stay tuned as we continue to unravel the power of cloud-native solutions in our data processing endeavors!