Subscribe to our news letter to get the latest on Google Cloud Platform and more!
Simplifying Authentication with Workload Identity Federation in GCP
Introduction: The Journey from Keys to Federation In the early days of…
Introduction: The Journey from Keys to Federation In the early days of…
Introduction As businesses grow, so does the complexity of their cloud infrastructure….
Introduction As we edge closer to 2025, the landscape of Google Cloud…
Subscribe to our news letter to get the latest on Google Cloud Platform and more!
Creating a recommendation engine is a fascinating journey filled with exciting challenges. For my project, I started with an old Netflix dataset from Kaggle, which contained ratings given by users to various movies. After cleaning the data using a Spring Batch application deployed as a Cloud Run Job (something I’ve written about in an earlier blog), I faced the next big step: preparing the data for analysis and storage.
This step was pivotal. I needed to ensure the data was clean, structured, and accessible for analysis—ready to uncover insights and eventually power the machine-learning model for recommendations.
Enter Google Dataflow: my go-to solution for scalable data processing and transformation.
Let’s break down the requirements:
While I could directly upload the text files to Vertex AI for training, this approach had some glaring issues:
This is where Google Dataflow shines. It’s a fully managed, serverless service for stream and batch data processing, built on the Apache Beam programming model. Here’s why it was the perfect choice:
To make things simple, I structured the pipeline into three stages:
The pipeline begins by ingesting text files from a GCS bucket. Each file contains lines of data in this format:
movie_id,customer_id,rating,date
123,456,5,2023-01-01
124,457,4,2023-01-02
To handle this, I built a utility class GCSFileReader
that validates the file path and uses Apache Beam’s TextIO.Read
to read the data:
public TextIO.Read read(String filePath) {
if (filePath == null || filePath.isEmpty()) {
throw new MissingArgumentException("File path cannot be null or empty.");
}
return TextIO.read().from(filePath);
}
Next, the raw text lines are parsed into a structured format using a custom DoFn
called MovieCustomerPairFn
. This function splits each line into fields (e.g., movie_id
, customer_id
, rating
, date
) and validates the data:
public void processElement(ProcessContext context) {
String line = context.element().trim();
String[] fields = line.split(",");
if (fields.length != 4) return; // Skip invalid rows
RatingInfo ratingInfo = new RatingInfo(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3]);
context.output(ratingInfo);
}
To calculate metrics like the average rating per movie, I used a custom CombineFn
. This function accumulates ratings for each movie and computes the average:
@Override
public AverageAccumulator mergeAccumulators(Iterable<AverageAccumulator> accumulators) {
AverageAccumulator merged = new AverageAccumulator();
for (AverageAccumulator acc : accumulators) {
merged.merge(acc);
}
return merged;
}
Finally, the structured and enriched data is written to BigQuery. To ensure compatibility, I defined a schema in BigQuerySchemaUtils
:
public static TableSchema getRatingInfoSchema() {
List<TableFieldSchema> fields = List.of(
new TableFieldSchema().setName("movie_id").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("customer_id").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("rating").setType("INTEGER").setMode("REQUIRED"),
new TableFieldSchema().setName("date").setType("STRING").setMode("REQUIRED")
);
return new TableSchema().setFields(fields);
}
When you deploy a pipeline, Dataflow takes care of the following:
One of the standout features of Google Dataflow is its comprehensive monitoring and debugging capabilities. When you’re working on a data pipeline, especially for a critical application like a recommendation engine, visibility into how your pipeline is performing is invaluable. Dataflow’s execution details and metrics allow you to:
When you deploy a Dataflow job, it appears in the Google Cloud Console under the “Dataflow” section. Clicking on your job opens the Job Details page, where you can explore several key aspects:
The visualization graph in Dataflow is one of the first things you’ll notice. It represents your pipeline as a directed acyclic graph (DAG), showing each step (or transform) as a node. This graph allows you to:
The metrics dashboard provides real-time statistics about:
For instance, when working on my recommendation engine, I noticed that the “Parse Movie Ratings” transform had a higher latency because of unexpected invalid rows. This prompted me to add better validation logic in my DoFn
class.
Dataflow logs provide detailed output for every stage of your pipeline. They are categorized into:
Using these logs, I was able to trace malformed records in my dataset, which helped me refine the pipeline’s error-handling logic.
If your pipeline runs on multiple workers, the “Workers” tab in the UI gives granular details about:
This helped me confirm that my pipeline was effectively scaling down when processing smaller datasets, saving costs.
For those interested in exploring the code behind this blog post and replicating the setup, I’ve made the entire project available on GitHub. You can find the repository, including the Dockerfile, GitHub Actions workflows, and the Spring Batch code, in the link below:
Github: https://github.com/DataArize/recommendations-data-cleaning-pipeline
Feel free to fork the repository, explore the setup, and adapt it to your own projects. If you have any questions or suggestions, don’t hesitate to open an issue or contribute to the project!
Now that the data is loaded into BigQuery, I’ll spend some time analyzing it—finding trends, exploring user behavior, and identifying potential features for the recommendation engine. This exploratory phase will lay the groundwork for designing the model.
In the next blog, I’ll share how I use BigQuery for analysis and transition the data to Vertex AI for building the machine-learning model.
Google Dataflow is an incredibly powerful tool for data engineering. Its ability to scale, transform, and integrate with other Google Cloud services makes it indispensable for workflows like building a recommendation engine. By structuring the pipeline effectively, I’ve ensured that the data is clean, enriched, and analysis-ready, setting the stage for the next steps in the project.
If you’re working on a similar pipeline, I’d love to hear your thoughts or questions! Let’s build together!