Batch Processing with Google Cloud DataFlow and Apache Beam - Big Data In Real World

Batch Processing with Google Cloud DataFlow and Apache Beam

NiFi running after installation
Building a Data Pipeline with Apache NiFi
June 15, 2020
Calculate Resource Allocation for Spark Applications
July 13, 2020
NiFi running after installation
Building a Data Pipeline with Apache NiFi
June 15, 2020
Calculate Resource Allocation for Spark Applications
July 13, 2020

In this post we will see how to implement a Batch processing pipeline by moving data from Google Cloud Storage to Google Big Query using Cloud Dataflow.

Cloud Dataflow is a fully managed data processing service on Google Cloud Platform. Apache Beam SDK let us develop both BATCH as well as STREAM processing pipelines. We program our ETL/ELT flow and Beam let us run them on Cloud Dataflow using Dataflow Runner.

In this post, we will code the pipeline in Apache Bean and run the pipeline on Google Data Flow.

Code for this post can be found here.

Dataflow vs Apache Beam

Most of the time, people get confused in understanding what is Apache Beam and what is Cloud Dataflow. To understand how to write a pipeline, it is very important to understand what is the difference between the two.

Apache Beam is an open source framework to create Data processing pipelines (BATCH as well as STREAM processing). The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.

Interested in getting in to Big Data? check out our Hadoop Developer In Real World course for interesting use case and real world projects just like what you are reading.

Benefits of Cloud Dataflow

  1. Horizontal autoscaling of worker nodes
  2. Fully Managed Service
  3. Monitor the pipeline anytime during its execution
  4. Reliable and consistent processing

What is Google Cloud Storage?

Google Cloud Storage is a service for storing your objects. An object is an immutable piece of data consisting of a file of any format. You store objects in containers called buckets. All buckets are associated with a project. You can compare GCS buckets with Amazon S3 buckets.

What is a Big Query?

Big Query is a highly scalable, cost-effective data warehouse solution on Google Cloud Platform.

Benefits of Big Query

  1. Analyze petabytes of data using ANSI SQL queries.
  2. Access data and share insights with ease
  3. More secure platform that scales with your needs

Batch processing from Google Cloud Storage to Big Query

Architecture Design

google dataflow architecture

This is how the pipeline flow will look like. Here, the source is Google Cloud Storage Bucket and the sink is Big Query. Big Query is a Data Warehouse offering on Google Cloud Platform.

google-cloud-storage-bucket

As you can see in the above screenshot, this is how the data in Google Cloud Storage bucket will look like. We have data in the form of JSON files which we will push in Big Query.

Initiate and Configure the Pipeline

The very first step is to configure the pipeline configuration. We have to set what machine type the pipeline will use, in which available region, the pipeline will execute and so on.

We can program our pipeline in JAVA or Python. First, we have to set up the Dataflow Pipeline Options object where we will define the configuration of our pipeline.

We have used Direct Runner to execute and test the pipeline locally.

options.setRunner(DirectRunner.class);

Once we test it locally, then we can replace Direct Runner with Dataflow Runner. That’s all that we need to deploy our pipeline on Cloud Dataflow.

options.setRunner(DataflowRunner.class);

Apart from this, we also need to pass other configurations too to the pipeline like project id, max number of worker nodes, temp location, staging location, worker machine type, region where our pipeline will be deployed, etc.

Create Pipeline

After passing all the configurations to the Dataflow Pipeline Options object, then we will create our Pipeline object.

Refer below snippet to take a closer look at it.

public class StorageToBQBatchPipeline {

  public static void main(String[] args) {

    /*
     * Initialize Pipeline Configurations
     */
    DataflowPipelineOptions options = PipelineOptionsFactory
        .as(DataflowPipelineOptions.class);
    options.setRunner(DirectRunner.class);
    options.setProject("");
    options.setStreaming(true);
    options.setTempLocation(""); 
    options.setStagingLocation("");
    options.setRegion("");
    options.setMaxNumWorkers(1);
    options.setWorkerMachineType("n1-standard-1");

    Pipeline pipeline = Pipeline.create(options);

Like what you are reading? You would like our free live webinars too. Sign up and get notified when we host webinars =>

Processing Data from Source (Google Cloud Storage)

Here, the source of reading the data is Google Cloud Storage Buckets, Once we create the pipeline object.

    /*
     * Read files from GCS buckets
     */
    PCollection<ReadableFile> data = pipeline
        .apply(FileIO.match().filepattern("gs://dump_*"))
        .apply(FileIO.readMatches());

    /*
     * Create Tuple Tag to process passed as well as failed records while parsing in ParDo functions
     */
    final TupleTag<KV<String, String>> mapSuccessTag = new TupleTag<KV<String, String>>() {
      private static final long serialVersionUID = 1L;
    };
    final TupleTag<KV<String, String>> mapFailedTag = new TupleTag<KV<String, String>>() {
      private static final long serialVersionUID = 1L;
    };

    PCollectionTuple mapTupleObj = data.apply(ParDo.of(new MapTransformation(mapSuccessTag, mapFailedTag))
        .withOutputTags(mapSuccessTag, TupleTagList.of(mapFailedTag)));

    PCollection<KV<String, String>> map = mapTupleObj.get(mapSuccessTag).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
FileIO.match().filepattern("gs://dump_*")

FileIO is the connector which is built in Apache Beam SDK that lets you read files from GCS.

We have used ParDo functions to first convert File IO objects into Key Value pair Objects as you can see below.

Using Tuple Tags, we will make sure that we process only correct results to the next step. The failed records will be processed separately using failed tuple tags if we face any kind of exception during processing of records.

  /*
   * Convert File to KV Object
   */
  private static class MapTransformation extends DoFn<FileIO.ReadableFile,KV<String, String>> {
    static final long serialVersionUID = 1L;
    TupleTag<KV<String, String>> successTag;
    TupleTag<KV<String, String>> failedTag;

    public MapTransformation(TupleTag<KV<String, String>> successTag, TupleTag<KV<String, String>> failedTag) {
      this.successTag = successTag;
      this.failedTag = failedTag;
    }
    @ProcessElement
    public void processElement(ProcessContext c) {
      FileIO.ReadableFile f = c.element();
      String fileName = f.getMetadata().resourceId().toString();
      String fileData = null;
      try {
        fileData = f.readFullyAsUTF8String();
        c.output(successTag,KV.of(fileName, fileData));
      } catch (Exception e) {
        c.output(failedTag, KV.of(e.getMessage(), fileName));
      }
    }
  }

Pushing Data to Destination (Google Big Query)

At this step, we will clean every Key Value pair and can do any kind of transformation as per the use case or requirement. In this case, we are directly pushing records to Big Query.

Interested in getting in to Big Data? check out our Spark Developer In Real World course for interesting use case and real world projects just like what you are reading.

Before pushing the records in BQ, we will have to first convert Key value pairs to Big Query Table Row objects.

See below snippet for that.

  /*
   * Convert KV to Table Row
   */
  private static class TableRowTransformation extends DoFn<KV<String, String>, TableRow> {
    static final long serialVersionUID = 1L;
    TupleTag<TableRow> successTag;
    TupleTag<TableRow> failedTag;

    public TableRowTransformation(TupleTag<TableRow> successTag, TupleTag<TableRow> failedTag) {
      this.successTag = successTag;
      this.failedTag = failedTag;
    }
    @ProcessElement
    public void processElement(ProcessContext c) {
      try {
        KV<String, String> kvObj = c.element();
        TableRow tableRow = new TableRow();
        tableRow.set(kvObj.getKey(), kvObj.getValue());

        c.output(successTag,tableRow);
      } catch (Exception e) {
        TableRow tableRow = new TableRow();
        c.output(failedTag, tableRow);
      }
    }
  }

Once we convert the objects into Table Row objects, then using the built- in Big Query connector in Apache Beam SDK, you can push records into the table.

As we can see, we have a bunch of options in BQ connector. We have to pass the table name, where the records will be saved.

    /*
     * Push records to BQ.
     */
    rowObj.apply(BigQueryIO.writeTableRows()
        .to("options.getOutputTable()")
        .ignoreUnknownValues() 
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
       
    pipeline.run().waitUntilFinish();

Once the pipeline gets deployed, we can see the monitoring details on the right hand side. As you can see below images, the config that we have passed into the pipeline is visible there.

google-data-flow-job-pipeline

As Dataflow is a managed offering by Google Cloud Platform, we can define the auto scaling algorithm as well the pipeline.

The monitoring section will let us know how many worker machines are currently in use, what will be the CPU utilization of the pipeline and so on.

google-cloud-platform-counters-options

Big Data In Real World
Big Data In Real World
We are a group of Big Data engineers who are passionate about Big Data and related Big Data technologies. We have designed, developed, deployed and maintained Big Data applications ranging from batch to real time streaming big data platforms. We have seen a wide range of real world big data problems, implemented some innovative and complex (or simple, depending on how you look at it) solutions.

Comments are closed.

Batch Processing with Google Cloud DataFlow and Apache Beam
This website uses cookies to improve your experience. By using this website you agree to our Data Protection Policy.

Hadoop In Real World is now Big Data In Real World!

X