Build a Streaming Data Pipeline in GCP

Using Twitter API, Cloud Pub/Sub, Dataflow and BigQuery

Amarachi Ogu
11 min readNov 21, 2022

“From the dawn of civilization to 2003, five exabytes of data were created. The same amount was created in the last two days.” -Former Google CEO, Eric Schmidt.

Every organization on the planet is going through some form of digital transformation. During the opening keynote for Google Cloud Next ’22, the CEO of Google Cloud, Thomas Kurian said “Data is at the heart of digital transformation. Data is being generated at far greater rates by everything, from infrastructure to consumer apps.”

Large volumes of data are produced by the increasing number of services and devices. Businesses are working to analyze and extract actionable insights from their data as quickly as possible to meet specific business needs.

Data continuously produced by several sources is referred to as streaming data. Examples are data generated by IoT devices, financial transactions, social media, etc.

Social media is an essential tool for business marketing strategy, it doesn’t matter if it is a small local shop or a big national company. Data is foundational to social media and responsible for connecting people to their interests. Hence, business managers need to harness this data for business intelligence.

For example, a small business owner may need to know how many times in a minute people tweet about their brand and the source of the tweet.

This project implements an approach to building a streaming pipeline in Google Cloud Platform (GCP) which gets data from Twitter using the Twitter API, ingests the streaming data into Google Cloud Pub/Sub, transforms the data using Dataflow and applies aggregations, and then save it to BigQuery.

At the end of this blog, we will achieve the following:

  1. Communicate with the Twitter API.
  2. Send tweets to Cloud Pub/Sub.
  3. Stream the data from Cloud Pub/Sub into BigQuery using Dataflow.

Prerequisites for the tasks are:

  1. Twitter Developer account — Visit the documentation to learn about Twitter API and how to get access.
  2. Google Cloud Platform account — Reference this blog for a walk-through on how you can create one.

To follow through, the code for this project can be found on GitHub.

Architecture Diagram

The image below is an illustrative architecture diagram for this project. It shows the tools used to get, transform and store the data.

Setting Up

To get started, we need to complete the following tasks:

  1. Obtain Twitter API credentials.
  2. Enable the required GCP APIs.
  3. Create a GCP Service Account and grant roles.
  4. Create a Cloud Pub/Sub Topic and Subscription.
  5. Create a Cloud Storage Bucket.
  6. Create a BigQuery dataset.

Obtain Twitter API credentials

To access the Twitter API, if you have not already done so, sign up for a developer account at developer.twitter.com. The application may take a couple of days to get approved. Keep an eye on your email because they may reach out to you for additional information.

Then, Sign in to your developer account, create an app, and save the following keys:

— Bearer Key
— API Key
— API Key Secret
— Access Token
— Access Token Secret

For this project, we will use just the Bearer Key.

Enable the required GCP APIs

To access some GCP resources, the correct Application Programming Interfaces (APIs) need to be enabled. Google Cloud’s APIs give users access to particular services. APIs for common tasks are enabled by default. However, many APIs are disabled and need to be explicitly enabled for your project before you can start using them.

Read about enabling an API in your Google Cloud project.

For this project, we need to enable the following APIs:

— Dataflow: for data processing.
— Compute Engine: will be used by pipeline workers.
— Cloud Logging: allows pipeline workers to output log messages.
— Cloud Storage: to stage temporary job files created during the execution of the pipeline.
— BigQuery: where data will be stored.
— Cloud Pub/Sub: to send event messages.
— Cloud Resource Manager: Creates, reads, and updates metadata for Google Cloud Platform resource containers
— Google Cloud Storage JSON — accessing and manipulating Cloud Storage projects in a programmatic way

Run the following command in Cloud Shell:

gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub cloudresourcemanager.googleapis.com

Create a GCP Service Account

A Service account is needed to authenticate and be granted authorized access to the needed GCP resources.

Before we proceed, make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

To create a service account, go to Create Service Account Page

Select a Cloud project and give your service account a name and a description.

Grant the service account the following roles:

— Pub/Sub Editor
— BigQuery Data Editor
— Dataflow Admin
— Storage Object Admin
— Service Account User

Next Download the JSON key file

— Click on the service account you just created.
— Click on KEYS
— Click on ADD KEYS and select Create new keys
— Select JSON Key type and click Create.

The JSON key will be saved to your computer (ensure to keep this file safe).

Create a Pub/Sub Topic and Subscription

This is the Pub/Sub topic where the tweets will be published.

How to create a Pub/Sub Topic and Subscription

For this project, the topic is named twitter-pipeline and the option to Add a default subscription is retained.

If you click on Subscriptions, you’ll see that a default subscription has been created for you. In this case twitter-pipeline-sub.

Create a Cloud Storage Bucket

The storage bucket will be used by Dataflow to stage temporary job files created during the execution of the pipeline.

How to create a Cloud Storage Bucket

Bucket names are globally unique. For this project, the bucket is named pipe-line-demo-bucket and for the location type, US Multi-region is selected >> us (multiple regions in United States).

Create a BigQuery Dataset

Tables and views are arranged and accessed using datasets, which are top-level containers in BigQuery. Before importing data into BigQuery, you must first construct at least one dataset because a table or view needs to be a part of it.

How to create a dataset

For this project, the dataset is named twitter-data, and the US location is selected >> us (multiple regions in United States).

Stream tweets from Twitter

We have completed the setup. Now, let’s get the tweets from Twitter.

First, install the required Python libraries:

pip install --upgrade pip
pip install google-cloud-pubsub
pip install tweepy
pip install 'apache-beam[gcp]'

Then, using tweepy’s StreamingClient Class and the Filter method, the tweets are streamed in real-time based on a specific set of filter or stream rules. In this case, the business brand name. Visit the documentation to learn more about the filtered stream.

Also, the argparse module is used to pass the keyword(s) of interest in the command line.

import tweepy
import argparse

# This function gets the arguments that will be parsed in the command-line
def parse_args():

#Initialize argparse
parser = argparse.ArgumentParser()

#Get the bearer token
parser.add_argument(
'--bearer_token',
help='Bearer Token gotten from your Twitter Developer account. It authenticates requests on behalf of your developer App.',
required=True
)
#Get the stream rule
parser.add_argument(
'--stream_rule',
help='the keyword of interest you intend to filter for',
required=True
)
return parser.parse_args()

args = parse_args()
bearer_token = args.bearer_token
stream_rule = args.stream_rule

class MyStream(tweepy.StreamingClient):
def __init__(self, bearer_token):
super().__init__(bearer_token)

# This function gets called after successfully connecting to the streaming API
def on_connect(self):
print("Connected")

#This function gets called when a response is received
def on_response(self, response):
tweet_data = response.data.data

print(tweet_data)

# Creating Stream object
stream_obj = MyStream(bearer_token=bearer_token)

def stream_tweets():

tweet_fields = ['text','created_at','lang','source']

# remove existing rules
rules = stream_obj.get_rules().data
if rules is not None:
existing_rules = [rule.id for rule in stream_obj.get_rules().data]
stream_obj.delete_rules(ids=existing_rules)

# add stream rules and filter
stream_obj.add_rules(tweepy.StreamRule(stream_rule))
stream_obj.filter(tweet_fields=tweet_fields)


if __name__ == "__main__":
stream_tweets()

The script for this code is named stream_tweet.py and can be found in this GitHub repo.

To run the script, use the following command:

python3 stream_tweet.py \
--bearer_token "YOUR-BEARER-TOKEN" \
--stream_rule "STREAM-RULE"

While the script is running, you can see tweets that correspond to the stream_rule keyword being printed in the terminal.

Press CTRL+C to stop.

Send tweets to Cloud Pub/Sub

So far, we have been able to stream the tweets from Twitter. In this task, we will publish the tweets to the Cloud Pub/Sub topic we created earlier.

First, add the path to the Service account JSON key file you downloaded earlier to your shell environment variables list by using the export command.

export GOOGLE_AUTH_CREDENTIALS="path/to/sa-file.json"

If you still get an error after the export, you can use the OS module in your code to explicitly specify the path to the credential. Thus:

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/to/sa-file.json"

Next, define a function that will encode the data and publish them to the Cloud Pub/Sub topic.

# create publisher
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def write_to_pubsub(data):
encode_data = json.dumps(data).encode("utf-8")

pubsub_message = publisher.publish(
topic_path, encode_data
)
print(pubsub_message.result())

Also, edit the StreamingClient class to call the write_to_pubsub function when a response is received.

class MyStream(tweepy.StreamingClient):
def __init__(self, bearer_token):
super().__init__(bearer_token)

# This function gets called after successfully connecting to the streaming API
def on_connect(self):
print("Connected")

#This function gets called when a response is received
def on_response(self, response):
tweet_data = response.data.data

# write the tweets to pubsub
write_to_pubsub(tweet_data)

The script for the complete code is named streams_to_pubsub.py and can be found in this GitHub repo.

To run the script use the following command:

python3 stream_to_pubsub.py \
--bearer_token "YOUR-BEARER-TOKEN" \
--stream_rule "STREAM-RULE" \
--project_id "PROJECT-ID" \
--topic_id "PUB/SUB-TOPIC-ID"

While the script is running, go to GCP to check if the messages are arriving. Follow the steps below:

  • Navigate to Pub/Sub
  • Click on Subscriptions
  • Select the subscription that was created when you created the Pub/Sub topic. In this case twitter-pipeline-sub
  • Click on messages and then click PULL

Back to your terminal, Press CTRL+C to stop the script.

Use Dataflow to process the data and write to BigQuery

We have reached the stage where tweets have been delivered successfully to Pub/Sub. In this step, we will read the data from the Pub/Sub subscription and write the entire data to BigQuery.

In the next step, the data will be transformed to extract only the needed information, which is the tweet source and the tweet count before writing to BigQuery.

The BigQuery schema needs to be carefully defined to avoid errors. You may want to look at BigQuery Data Types Documentation to learn more.

Also, you have to decode the data and parse it into JSON format.

# this function runs the streaming pipeline
def run():
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "Decode and parse Json" >> beam.Map(lambda element: json.loads(element.decode("utf-8")))
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
output_table_name,
dataset=dataset_name,
project=known_args.project_id,
schema=raw_tweets_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)

The script for the complete code is named export_to_bq.py and can be found in this GitHub repo.

Before using the Dataflow runner, you may want to use the Python direct runner for Apache Beam to write to BigQuery, with the command:

python3 export_to_bq.py \
--project_id "PROJECT-ID" \
--input_subscription "projects/PROJECT-ID/subscriptions/PUBSUB-SUBSCRIPTION-ID"

To run the code with Dataflow runner, use the following command:

python3 export_to_bq.py \
--project_id "PROJECT-ID" \
--input_subscription "projects/'PROJECT-ID/subscriptions/PUBSUB-SUBSCRIPTION-ID" \
--output_table_name "OUTPUT-TABLE-NAME" \
--dataset_name "DATASET-NAME" \
--runner DataflowRunner \
--temp_location "gs://BUCKET-NAME/temp" \
--region us-central1

While the script is running, navigate to the Dataflow page in GCP. A new Dataflow job must have been created.

Wait for a couple of minutes, then check the output table in BigQuery to see the result. You should see a BigQuery table created. In this case, it is named raw_tweets. Click on the Preview to see the data.

You may run the scripts a little longer to see how the tables are continuously updated with the new data.

Transform the data and write to BigQuery

In this step, the data will be transformed to extract only the needed information. Remember that the objective here is to know how many times in a minute people tweeted about a brand and the source of the tweet.

Here, the data is grouped by source and using a fixed window, the data is aggregated to get the tweet count every 60 seconds.

# this function runs the pipeline
def run(
input_subscription: str,
output_table: str,
window_interval_sec: int,
):
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "Decode and parse Json" >> beam.Map(lambda message: json.loads(message.decode("utf-8")))
| "Events fixed Window" >> beam.WindowInto(beam.window.FixedWindows(window_interval_sec), allowed_lateness=5)
| "Add source keys" >> beam.WithKeys(lambda msg: msg["source"])
| "Group by source" >> beam.GroupByKey()
| "Get tweet count">> beam.MapTuple(
lambda source, messages: {
"source": source,
"tweet_count": len(messages),
}
)
| "Add processing time" >> beam.Map(add_processing_time)
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
output_table,
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)

The script for the complete code is named agg_and_write2bq.py and can be found in this GitHub repo.

Before using the Dataflow runner, you may want to use the python direct runner for Apache Beam to write to BigQuery, with the command:

python3 agg_and_write2bq.py \
--project_id 'PROJECT-ID' \
--input_subscription "projects/'PROJECT-ID/subscriptions/PUBSUB-SUBSCRIPTION" \
--output_table "PROJECT:DATASET.TABLE or DATASET.TABLE." To run the code with Dataflow runner, use the following command:
python3 agg_and_write2bq.py \
--project_id 'PROJECT-ID' \
--input_subscription "projects/'PROJECT-ID/subscriptions/PUBSUB-SUBSCRIPTION" \
--output_table "PROJECT:DATASET.TABLE or DATASET.TABLE." \
--runner DataflowRunner \
--temp_location "gs://BUCKET-NAME/temp" \
--region us-central1

After a couple of minutes, head over to BigQuery. You will see the table has been created and the aggregated data appended to it. in this case, it is named agg_tweets. Click on the Preview to see the data.

Conclusion

There you have it, we have successfully built a data streaming pipeline in GCP. I hope this provides you with some useful insight to expand upon.

To build a pipeline for production, instead of using your local computer, you may consider using a Compute Engine virtual machine or creating a Docker container and deploying using Dataflow Flex template or deploying it to Google Kubernetes Engine.

In my next blog, I will write about deploying this streaming pipeline using Dataflow Flex template.

Thanks for reading. You are welcome to follow me on LinkedIn or Twitter.

Next Step

Deploy pipeline using Dataflow Flex template

Resources

Create a Dataflow pipeline using Python

Apache Beam Programming Guide

Aggregation in beam

Advanced group and aggregation in Apache Beam

Advanced patterns for windows & triggers with Apache Beam

Streaming Twitter data with Google Cloud Pub/Sub and Apache Beam

Troubleshooting and debugging Apache Beam and GCP Dataflow

--

--