Subscribe to our news letter to get the latest on Google Cloud Platform and more!
Best Practices for Cost Optimization in Google Cloud: A GCP Guide 2025
As organizations increasingly migrate to cloud platforms, understanding the intricacies of cost…
As organizations increasingly migrate to cloud platforms, understanding the intricacies of cost…
In the evolving landscape of cloud computing, organizations are increasingly turning to…
Introduction: The Journey from Keys to Federation In the early days of…
Subscribe to our news letter to get the latest on Google Cloud Platform and more!
Imagine you’re organizing a big outdoor event, managing a fleet of delivery trucks, or overseeing crop growth on a farm. In each of these cases, knowing what the weather will be is crucial. In today’s world, data has become the backbone of every decision, from predicting weather patterns to optimizing logistics. However, before companies can use data, it has to be gathered, organized, and processed into a form that reveals trends and insights. This is where data engineering on GCP shines—enabling automated, scalable, and efficient data handling in the cloud.
This blog series will showcase how we set up a GCP data pipeline that automates the entire process. We’ll start by gathering weather data from an external API, transform it to make it more useful, and load it into BigQuery, Google Cloud’s data warehouse, where we can analyze and visualize trends over time. By the end of this series, you’ll see how tools like Cloud Functions, Pub/Sub, Dataflow, and Looker Studio come together in GCP to reveal the story within the data.
To make sense of a cloud data pipeline, let’s look at each tool we’ll use and how it fits into the bigger picture. Imagine this pipeline as a series of stations, each responsible for a specific task to ensure data flows smoothly from raw source to actionable insights.
Together, these tools provide a seamless way to automate data processing. Google Cloud’s managed services handle scaling, reliability, and security, so we can focus on transforming data into insights rather than managing infrastructure.
In this first post, we’ll start by setting up the Cloud Function to collect weather data. As we move through the series, I’ll show you how each of these tools connects to form a complete, reliable pipeline that could support a range of real-world needs.
With the architecture set, let’s start implementing it, beginning with our Cloud Function. This function will fetch weather data from the OpenWeather API and send it to Pub/Sub. Think of it as the “collector” in our pipeline, grabbing fresh weather data at regular intervals so it’s always ready for analysis.
The code primarily lives in three folders: src/transformations, src/utilities, and src/config. We’ll walk through each one and explain how they work together to read data, apply transformations, and load it into BigQuery.
main.pyThe main.py script is the heart of our pipeline. It defines the pipeline’s flow—from reading raw data to applying transformations and finally writing the results to BigQuery. Let’s break down each section.
CustomPipelineOptionsOur pipeline needs certain inputs, like the path to the CSV file containing our raw data and the destination table in BigQuery. We use CustomPipelineOptions, a custom extension of Apache Beam’s PipelineOptions, to capture these inputs.
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
"""
Adds custom command line arguments for the pipeline.
Args:
parser: The argparse parser object to add arguments to.
"""
parser.add_argument('--input_path', required=True, help='Path to the input CSV file in GCS')
parser.add_argument('--output_table', required=True,
help='BigQuery output table in the format project_id.dataset_id.table_id')
This class does a couple of things:
These options make the pipeline reusable for different datasets or tables by simply changing the command-line arguments.
It creates command-line arguments (--input_path and --output_table) so we can specify the input file and the output table when we run the pipeline.
The run() function defines the core logic of our pipeline. It initializes the pipeline with the options, sets up error handling, and organizes the data processing steps into three main stages: reading, transforming, and writing.
def run(argv=None):
pipeline_options = PipelineOptions(argv)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
# Split the output table into project, dataset, and table IDs
try:
project_id, dataset_id, table_id = custom_options.output_table.split('.')
except ValueError as e:
logger.error(
f"Invalid output table format: {custom_options.output_table}. Expected format: project_id.dataset_id.table_id")
return
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline = beam.Pipeline(options=pipeline_options)
This function does a few things before starting the pipeline:
save_main_session helps Dataflow avoid import errors in complex setups by preserving all imports in the main execution environment.run()The pipeline itself is composed of three main stages:
The first stage reads raw data from a CSV file stored in Google Cloud Storage. For this, we use Apache Beam’s ReadFromCsv transform, which reads each row as a record and passes it to the next stage.
| 'Read CSV File' >> ReadFromCsv(custom_options.input_path)
The raw data likely needs to be cleaned or transformed before being useful. Here, we apply a custom transformation using the Transformations class (which we’ll discuss in more detail below). This class performs tasks like data validation and converting the duration from milliseconds to minutes.
| 'Apply Transformations' >> beam.ParDo(Transformations())
In the final stage, we load the transformed data into BigQuery. We specify details like the dataset, table, and schema. This schema defines the structure and data types for each column in the BigQuery table.
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
project=project_id,
dataset=dataset_id,
table=table_id,
ignore_unknown_columns=False,
schema='Id:INTEGER,track_id:STRING,artists:STRING,...',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
This configuration does a few things:
WRITE_APPEND, which appends new data to the existing table without overwriting it.After defining the pipeline stages, we execute pipeline.run() to start processing the data.
pipeline.run().wait_until_finish()
This line runs the pipeline on Google Cloud Dataflow and waits until it completes. If everything is set up correctly, this will run seamlessly in the cloud, processing large volumes of data with ease.
src/transformationsThis module contains the Transformations class, where the actual data manipulation happens. Transformations extends Apache Beam’s DoFn class, which is used to define custom data processing functions in Beam.
Transformations Classconvert_to_json(): This static method converts each record into a JSON-compatible dictionary format, which is easier to work with when storing in databases like BigQuery
@staticmethod
def convert_to_json(record):
return {
"Id": record.Id,
"track_id": record.track_id,
"artists": record.artists,
"album_name": record.album_name,
"track_name": record.track_name,
"popularity": record.popularity,
"duration_ms": record.duration_ms,
"explicit": record.explicit,
"danceability": record.danceability,
"energy": record.energy,
...
}
process(): The process() method is the core of Transformations. It takes each record, validates it, applies transformations, and converts it into JSON format.
@staticmethod
def convert_to_json(record):
return {
"Id": record.Id,
"track_id": record.track_id,
"artists": record.artists,
"album_name": record.album_name,
"track_name": record.track_name,
"popularity": record.popularity,
"duration_ms": record.duration_ms,
"explicit": record.explicit,
"danceability": record.danceability,
"energy": record.energy,
...
}
Here’s what happens in process():
validate_record() (from utilities) to check each record’s integrity before proceeding.duration_ms from milliseconds to minutes, making it more user-friendly.src/utilitiesThe utilities folder includes helper modules for logging and validation. These utilities make the pipeline robust and easy to monitor.
logger.py)The logger.py module sets up logging based on the environment. If running in production, it logs directly to Google Cloud Logging; otherwise, it logs to the console for easier debugging in test environments.
def setup_logging():
log = logging.getLogger(__name__)
try:
if os.getenv(ENVIRONMENT) != TEST and os.getenv(ENVIRONMENT) != DEV:
client = Client()
cloud_handler = CloudLoggingHandler(client)
log.setLevel(logging.INFO)
log.addHandler(cloud_handler)
log.info("Cloud logging has been set up.")
else:
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())
log.debug("Console logging has been set up for testing environment.")
except Exception as e:
log.error(f"Failed to set up logging: {e}")
return log
logger = setup_logging()
This function:
INFO level in production to avoid unnecessary verbosity and DEBUG level for testing to catch more details.utils.py)The validation utility helps ensure each record meets specific criteria. For instance, it might check that fields are not empty or that values are within expected ranges.
def validate_record(record):
# Example validation checks
if not record.Id or not record.track_id:
return None
if record.popularity < 0 or record.popularity > 100:
return None
return record
This function:
Id and track_id are present.With the code in place, the next crucial step in the development process is implementing a robust CI/CD pipeline. This pipeline automates the testing and deployment of our application, ensuring that our changes are consistently integrated and delivered to the cloud environment with minimal manual intervention. By leveraging CI/CD, we can enhance our workflow, catching bugs early and deploying updates more efficiently. The following section will delve into our CI/CD configuration, detailing how we authenticate with Google Cloud, manage dependencies, and ensure code quality through automated testing, ultimately streamlining our deployment process for the weather data pipeline.
Continuous Integration (CI) and Continuous Deployment (CD) are essential practices in modern software development that aim to improve the efficiency, quality, and speed of the software delivery process.
Together, CI and CD create a robust pipeline that streamlines the development process, encourages collaboration among teams, and enhances the overall software quality.
In summary, CI/CD practices are critical in today’s fast-paced software development environment. They enable teams to deliver high-quality software rapidly and reliably, ensuring that businesses can meet user demands and adapt to changing market conditions.
The workflow triggers under specific conditions:
main, develop, or release/** branches.workflow_dispatch, allowing for manual execution.These triggers ensure that your CI/CD workflow runs at crucial points in the development process, covering branch pushes, feature integration (via pull requests), and manual deployments when necessary.
permissions:
id-token: write
contents: read
The id-token: write permission is critical for securely authenticating with Google Cloud Platform (GCP) using GitHub’s OIDC (OpenID Connect) tokens. The contents: read permission allows the workflow to access the repository content necessary for deployment.
deploy_to_cloud_functionThis job is responsible for handling the deployment process. It runs on an Ubuntu environment and contains a series of steps to check out code, authenticate with GCP, set up the environment, run quality checks, and finally deploy the Cloud Function.
- name: Git Checkout
uses: actions/checkout@v4
This step pulls the latest code from your repository, ensuring the workflow has access to the most recent changes.
- name: Authenticate with GCP
uses: google-github-actions/auth@v2
with:
service_account: ${{ secrets.GCP_SERVICE_ACCOUNT }}
workload_identity_provider: ${{ secrets.GCP_WORKLOAD_IDENTITY_PROVIDER }}
This step uses GitHub’s OIDC integration to authenticate with GCP. Instead of managing long-lived credentials, this uses a secure, short-lived ID token, improving security. It requires:
GCP_SERVICE_ACCOUNT: Specifies the service account with necessary permissions.GCP_WORKLOAD_IDENTITY_PROVIDER: Uses a predefined Workload Identity Provider to link GitHub and GCP, further enhancing security.- name: Set up python
uses: actions/setup-python@v5
with:
python-version: ${{ secrets.PYTHON_VERSION }}
cache: 'pip'
This step sets up the Python environment, installing the specific Python version defined in the secrets (for example, Python 3.12). The caching of pip packages speeds up the process in subsequent runs.
- name: Install dependencies
run: pip install -r requirements.txt
This step installs all Python dependencies listed in requirements.txt, ensuring the environment has all libraries necessary for the application to run.
- name: Quality Checks
env:
GOOGLE_CLOUD_PROJECT: ${{ secrets.PROJECT_ID }}
OPENWEATHER_API_KEY: ${{ secrets.OPENWEATHER_API_KEY }}
ENVIRONMENT: "TEST"
run: |
coverage run -m pytest
coverage report --fail-under=80
This is a crucial step for maintaining code quality and reliability. It runs:
pytest to execute test cases, ensuring that all components function as expected.- name: Upload coverage results
if: success()
uses: actions/upload-artifact@v4
with:
name: coverage
path: .
if-no-files-found: error
retention-days: 10
This step saves the coverage results as an artifact, making them available for later review or download if needed. Artifacts are retained for 10 days, allowing for inspection of test coverage over time.
- name: Determine Function Name
id: function_name
run: |
if [[ "${GITHUB_REF##*/}" == "develop" ]]; then
echo "FUNCTION_NAME=${{ secrets.GCP_FUNCTION_NAME }}-dev" >> $GITHUB_ENV
elif [[ "${GITHUB_REF##*/}" == "main" ]]; then
echo "FUNCTION_NAME=${{ secrets.GCP_FUNCTION_NAME }}-prod" >> $GITHUB_ENV
else
echo "FUNCTION_NAME=${{ secrets.GCP_FUNCTION_NAME }}-staging" >> $GITHUB_ENV
fi
This step dynamically sets the Cloud Function name based on the branch:
-dev suffix).-prod suffix).-staging suffix).This naming convention allows for environment-specific Cloud Functions, avoiding overlap and reducing the risk of deploying to the wrong environment.
- name: Deploy to cloud function
run: gcloud functions deploy $FUNCTION_NAME --runtime python312 --trigger-http --entry-point entry_point --source . --set-env-vars GOOGLE_CLOUD_PROJECT=${{ secrets.PROJECT_ID }},OPENWEATHER_API_KEY=${{ secrets.OPENWEATHER_API_KEY }}
This is the core deployment step. It uses the gcloud command to:
FUNCTION_NAME.GOOGLE_CLOUD_PROJECT and OPENWEATHER_API_KEY) for the function.By setting environment-specific variables, this step ensures that the deployed function has all necessary settings to operate in each environment (e.g., using different API keys or project IDs).
- name: Slack notification
uses: 8398a7/action-slack@v3
with:
status: custom
fields: workflow,job,commit,repo,ref,author,took
custom_payload: |
{
attachments: [{
color: '${{ job.status }}' === 'success' ? 'good' : '${{ job.status }}' === 'failure' ? 'danger' : 'warning',
text: `Action Name: ${process.env.AS_WORKFLOW}\nJOB Name: ${process.env.AS_JOB} (${process.env.AS_COMMIT})\n Repository Name:${process.env.AS_REPO}@${process.env.AS_REF} by ${process.env.AS_AUTHOR} ${{ job.status }} in ${process.env.AS_TOOK}`,
}]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
if: always()
This step sends a notification to Slack once the job completes. It uses a webhook to notify team members, including relevant details like:
green for success, red for failure, and yellow for warnings).This notification keeps the team informed of deployment progress and outcomes, making it easy to respond to any issues quickly.
For those interested in exploring the code and structure of the weather data publisher project, I’ve made it available on GitHub. The repository contains all the scripts and configurations necessary to replicate the pipeline we’ve discussed. You can find the project at
DataArize/weather-data-publisher.
Feel free to clone the repository, experiment with the code, and adapt it to your own use cases. Contributions and feedback are welcome, as we aim to build a collaborative community around cloud-native data processing solutions.
In this blog series, we’ve delved into the transformative capabilities of cloud-native tools by designing a data processing pipeline that utilizes Google Cloud Functions and Pub/Sub. By leveraging these managed services, we created a streamlined and efficient workflow for fetching weather data from an API and publishing it for further processing. This setup not only simplifies the process of handling incoming data but also ensures that we can scale our solutions seamlessly as our data needs grow.
Cloud Functions allow us to run code in response to events without the need to manage infrastructure, while Pub/Sub acts as a reliable messaging service that decouples our data sources from downstream processing. This architecture exemplifies how modern cloud technologies can make data processing more accessible and flexible, enabling organizations to derive valuable insights from their data with minimal overhead.
However, our journey doesn’t end here. In the next installment of this series, we will explore how to further enhance our pipeline by incorporating Google Cloud Dataflow for data transformations and visualizing the results with Looker Studio. This will provide us with a comprehensive view of our data journey, from raw input to actionable insights. Stay tuned as we continue to unravel the power of cloud-native solutions in our data processing endeavors!