How to create data processing pipeline using Apache Spark with Dataproc on Google Cloud

A Short Tutorial

parvaneh shayegh
5 min readJan 6, 2021

Raw data are often dirty (difficult to use for data scientists in their existing state) and need to be cleaned before they can be used. An example of this is the data that have been scraped from the web containing encodings or HTML tags.

In this tutorial, you will learn how to load data from Stackoverflow posts into a Spark cluster hosted on Dataproc, extract useful information and store the processed data as zipped CSV files in Google Cloud Storage.

Lets Begin

  1. You need a Google Account
  2. Create a project and enable billing in the Cloud Console in order to use Google Cloud resources. New users of Google Cloud Platform (GCP) are eligible for $300 free trial budget.

From now on, you may need to write command line codes in the terminal. To open the terminal in GCP you need to activate Cloud shell:

Activating Cloud shell (Terminal) in GCP

3. Enable Compute Engine, Dataproc and BigQuery Storage APIs. You can do it by either typing this code in the terminal or using GCP web GUI (see how).

gcloud services enable compute.googleapis.com \ dataproc.googleapis.com bigquerystorage.googleapis.com

4. Specify which project you want to use. To do so, set the project id of your project:

gcloud config set project <project_id>

For example:

gcloud config set project project_spark1

5. Set the region of your project by choosing one from the list here.

gcloud config set dataproc/region <region>

For example:

gcloud config set dataproc/region us-central1

6. Set some environment variables for ease of later use. Required environment variables for this tutorial are Dataproc cluster name and bucket name.

CLUSTER_NAME=<cluster_name>
BUCKET_NAME=<bucket_name>

7. Create a Dataproc Cluster:

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-1 \
--num-workers 2\
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway

Note : In Google cloud tutorial, the values for worker-machine-type and num-workers aren1-standard-8 and 8, respectively. However, in case you are practicing using the free-trial budget of Google, you need to change them to n1-standard-1 and num-workers 2 . Details of the meaning of each variable is explained here

8. Create a Google Cloud Storage bucket for your job output:

gsutil mb gs://${BUCKET_NAME}

This bucket name is unique for all users and for all projects so you may get ServiceException if the picked name is not unique. In this case, change the value of BUCKET_NAME environment variable and try creating it again.

EDA (Exploratory Data Analysis) using BigQuery Web UI

Before processing the input data, you should learn more about the nature of the data you’re dealing with. To do this, open the BigQuery Web UI:

how open BigQuery Web UI

Next, choose the dataset that you want to use. In this tutorial, a dataset from Stackoverflow questions is used. You can use any other public dataset, or your own dataset (learn how).

Note: Reddit dataset is used in Google Cloud tutorial

Next, run the following command in the BigQuery Web UI Query Editor:

select * from <Dataset ID> limit <NUMBER OF ROWS>; 

To see the top 10 rows of Stackoverflow questions:

select * from bigquery-public-data.stackoverflow.posts_questions\ limit 10;

After a few seconds, the output should be like this:

From now on, you need to write codes in PySpark and so you need the code editor:

Using PySpark to get the number of Stackoverflow questions for each tag

You can use PySpark to determine the number of questions for each tag. In your editor, create a new python file and name it “count_tags.py”. Then, use this code:

Save the file and go back to the terminal. Make sure that you are in the same directory as your saved file is. Then run the .py file that you just created using:

gcloud dataproc jobs submit pyspark — cluster ${CLUSTER_NAME} — jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar counts_tags.py

The output should be like this:

which shows the number of questions per tag sorted descending.

Load cleaned, extracted data to Google Storage bucket as a CSV file

Now, you know the tags and the columns of your dataset but you may be only interested in some columns, for example “title” and “body” for each Stackoverflow question with a specific tag. As the final step you can store the dataset with the columns of your interest to a CSV file in your bucket. Go back to the editor, create a new python file and name it “back_cleaner.py”. You need to add your PySpark code in this file :

In this code, you get the title and the body of all questions with tag “c++” and you save them into your storage bucket as “stackoverflow_question_posts.csv.gz”.

Now, it’s time to run this code. Go back to the terminal and run this command:

gcloud dataproc jobs submit pyspark — cluster ${CLUSTER_NAME} — jars gs://spark-lib/bigquery/
spark-bigquery-latest_2.12.jar back_cleaner.py — ${BUCKET_NAME}

After a few minutes, you see some messages saying that the job is completed. It’s time to download your csv data. Navigate to Storage from GCP:

Find the bucket name that you made for this tutorial and click:

You should be able to see the generated CSV file :

You can download it or use it for other purposes on Google Cloud Platform :) .

Note: To avoid incurring unnecessary charges to your GCP account after completion, flow the clean up process

Thanks for reading this article! Leave a comment below if you have any questions.

--

--

parvaneh shayegh

A Data scientist, with the passion of learning new things!