Streaming data from Twitter to GCP

Syntio
SYNTIO
Published in
7 min readFeb 17, 2021

In this blog, data engineer Mia Tadic demonstrates the task of ingesting data from a remote API (i.e. Twitter’s API) to Cloud (i.e. Google Cloud).

Someone looking at a Twitter feed

Google Cloud Platform (GCP) is a suite of public cloud computing services offered by Google. The platform includes a range of hosted services for compute, storage and application development that run on Google hardware. Services used in this blog are Google Cloud Pub/Sub and Google BigQuery.

  • Google Cloud Pub/Sub is a managed, scalable, and real-time messaging ingestion service that allows messages to be exchanged between applications.
  • Google BigQuery is a scalable, managed enterprise data warehouse for analytics. It is a big data service for data processing and analytics for SQL-like queries made against multi-terabyte data sets.

Task idea

This blog demonstrates the task of ingesting data from remote API (i.e. Twitter’s API) to the cloud (i.e. Google Cloud).

It was done using Producer and Consumer.

  • The producer sent tweets containing the hashtag „dataengineering“ from Twitter to Pub/Sub topic. The producer was dockerized and then ran on Google Cloud’s Virtual Machine.
  • Consumer streamed messages from Pub/Sub topic to BigQuery table. The consumer was created using Google Cloud Functions, and it was automatically triggered by any new messages.

The described process is shown in the following diagram:

Procedure

Here are the prerequisites and steps of our procedure.

Prerequisites for the task are:

  1. Twitter Developer account
  2. Google Cloud Platform account
  3. Installed Docker

The main steps of the procedure are:

  1. Obtaining Twitter credentials
  2. Creating a Producer
  3. Creating a Consumer
  4. Dockerizing a Producer

Let’s dig into the steps of the procedure.

Obtain Twitter credentials

App’s client key and secret key, access token, and token secret can be found in the Twitter Dashboard.

Those are needed in a later step.

Create a Producer

First, the following resources were created:

1. Service account

Service account authorized access to needed GCP resources (Pub/Sub, BigQuery).

In the GCP console, Service accounts can be found under IAM & Admin section. By clicking on CREATE SERVICE ACCOUNT button, several steps were required:

  • Step 1: Filling in desired details
  • Step 2: Adding „Pub/Sub Editor“ and „BigQuery Data Editor“ roles
  • Step 3: Downloading JSON key file by choosing the CREATE KEY option

Additionally, user environment variable had to be added:

name=GOOGLE_AUTH_CREDENTIALS

value=path_to_downloaded_json_file

2. Pub/Sub Topic

Pub/Sub topic was the place where Producer sent tweets.

3. Pub/Sub Subscription

Subscription pulled data from desired Pub/Sub topic. This was a way to see how data looked like. Subscriptions can be found in the picture above.

Finally, it was all set for the Producer to be created.

As mentioned, the Producer’s task was to send tweets containing the hashtag “dataengineering” from Twitter to Pub/Sub topic.

Note: Keys and tokens obtained from Twitter were needed here.

Firstly, Pub/Sub package needed to be installed:

pip install –upgrade google-cloud-pubsub

To begin with Python script, the following packages were imported:

import json

from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from google.cloud import pubsub_v1

Since the Producer was said to send tweets to Pub/Sub topic, a connection to Pub/Sub topic had to be established somehow. It was done this way:

client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(<project_id>, <topic_name>)

On the other side, since the Producer accessed Twitter to grab its data, it was necessary to make a connection to Twitter too. That connection was made this way:

auth = OAuthHandler(<twitter-client-key>, <twitter-secret-key>)
auth.set_access_token(<twitter-access-token>, <twitter-token-secret>)
twitterStream = Stream(auth, listener())
twitterStream.filter(track=["dataengineering"])

This code ran tweepy’s implemented methods. Additionally, an object listener was introduced in the code for the task. It was a class that inherited tweepy’s StreamListener class.

For the demonstration purposes, only the following parts of the grabbed tweets were taken to save them to Pub/Sub topic:

  • created_at
  • id
  • text
  • source
  • retweet_count
  • user_name
  • user_location
  • user_followers_count
  • user_statuses_count

Extracted parts were concatenated in tweet_message after which tweet_message was published to Pub/Sub topic using the following command:

self.client.publish(self.topic_path, data=tweet_message.encode('utf-8'))

Create a Consumer

As stated before, Consumer’s task was to stream messages from Pub/Sub topic to BigQuery table — so a BigQuery table had to be created.

BigQuery tables are generally contained in datasets. Therefore, that dataset had to be created first. It was created in the GCP console, by finding BigQuery under the Big Data section, choosing the project, and clicking on CREATE DATASET. It then pops a form which can be seen in the picture below:

Then, by entering the created dataset and simply clicking on the CREATE TABLE button, the table was created. Based on the extracted tweet’s parts, the table’s schema should look like this:

Finally, the Consumer was ready to be created. As mentioned, the Consumer was created using Google Cloud Functions.

Missing values needed to be filled in:

File main.py contained Consumer code, while requirements.txt contained function dependencies (packages with their versions).

Google Cloud Function was triggered from a message on a Cloud Pub/Sub topic. This event was passed to hello_pubsub function as an argument and then function collected data (tweets) from the event:

pubsub_message = base64.b64decode(event['data']).decode('utf-8')

Since Consumer’s task was to write messages to BigQuery table, Consumer had to connect to BigQuery table. To establish the connection, the following code was used:

client = bigquery.Client()
dataset_ref = client.dataset(<dataset_id>)
table_ref = dataset_ref.table(<table_id>)
table = client.get_table(table_ref)
client.insert_rows(table, <tweet>)

Dockerize

The following step in our steps of procedure was to dockerize the Producer. The idea was to pack the Producer into a Docker image, send the image to the virtual machine and then run it from there.

For this task, Docker Desktop and Docker Hub were used and the instructions and installation file for various operating systems can be found HERE.

For Docker, two files (Dockerfile and requirements.txt) must be created and saved into the same Python project as producer code.

Dockerfile should look like this:

# Use an official Python runtime as a parent image
FROM python:3.7-slim

# Set the working directory to /app
WORKDIR /app

# Copy the current directory contents into the container at /app
ADD . /app

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Define environment variable
ENV GOOGLE_APPLICATION_CREDENTIALS="<json-key-file>"

# Run app.py when the container launches
CMD ["python", "producer.py"]

Note:

  • After checking which Python version is being used, it is fine to just add the suffix “slim” with no additional installation.
  • JSON key file must be copied into the same Python project directory (it is Dockerfile’s working directory now, so that is the place where Docker searches for environment variable)

As described at the beginning of the blog, the Docker image was deployed to the virtual machine. Therefore, in this preparation for dockerizing, the next step was to create GCP’s virtual machine.

To create VM, VM instances can be found in the GCP console, under Compute section, and by choosing Compute Engine and then clicking CREATE INSTANCE. For this demonstration, the following details were sufficient:

At last, steps for deploying Docker image to VM were:

  1. Build Docker image
docker build -t producer . 

2. Zip Docker image

docker save -o producer.tar producer

Note: Make sure to run both steps from the producer’s Python project directory

3. Upload the zip file to the virtual machine

By clicking first on the SSH button in VM instances in the GCP console, and afterward, on the Settings button, the Upload file option was chosen and the producer.tar file was uploaded.

4. Unzip Docker image

docker load -i producer.tar

5. Run Docker image

docker run -d producer

With this, the process of ingesting messages to the Pub/Sub topic was completed.

To check if created Producer and Consumer work properly, the BigQuery table can be checked. The table from this demonstration looked like this:

Conclusion and ideas

In this step-by-step tutorial, it was presented how to stream data to the cloud from remote API using a simple Producer-Consumer flow. Instead of GCP and Twitter, there are plenty more cloud platforms and APIs to use. There are also other Google Cloud resources to play with, like Google Cloud Storage or Google Cloud SQL. Another tip is to send data from Cloud Pub/Sub topic to BigQuery using Google Cloud Dataflow.

Note: Don’t forget to delete the resources when they are not needed anymore.

For more blogs on all kinds of tech subjects, be sure to visit our website

--

--

Syntio
SYNTIO
Editor for

The Data Engineering company. Offering knowledge and cloud-based solutions to complex data challenges worldwide.