Managing Complex Twitter Search Queries

Andrew Lange
Roivant Technology
Published in
5 min readNov 2, 2021

Our approach to managing complex Twitter search queries using Python and BigQuery.

A data scientist after successfully analyzing Twitter data without needing to spend hours munging.

The Problem

Twitter is currently the go-to source to find out what’s happening — products, people, politics — it’s all there. Originally unveiled as a social media platform, it has become the world’s data feed — a place where all global events are logged synchronously.

For a scientist at Roivant Discovery, these data offer a trove of potential cutting-edge research and insight. To unlock this potential, scientists need a way of collecting Twitter data on a reoccurring basis without spending an inordinate amount of time querying and munging.

As data engineers at Roivant Technology, we were tasked with figuring out:

  1. How will our users build filters?
  2. How do we collect and store the results of these filters?

Our Approach

Building Filters

There are two approaches to filtering Twitter data (via API v1 standard):

  • Stream — Tweets arrive as soon as they’re published
  • Search — Tweets arrive as results to a query

Streaming is an appealing option due to its low latency. But streams are high maintenance — when one goes down, any missed records must be filled in. Another pitfall: each Tweet only flows through a stream once. This makes it difficult to analyze a Tweet’s performance over time.

The real deal-breaker is that the streaming filter is only capable of using basic stream parameters (e.g. follow, track, location) which are insufficient for the complex filters required by our users.

We decided to pursue Search because it provides the ability to filter Tweets using search query operators. For example, we can isolate NYC subway delays specifically on the 4/5 line:

from:NYCTSubway (delay OR delays OR stopped) "4/5"

We considered allowing users to maintain a list of their own custom search queries. However, we realized that would become a nightmare 😅:

  • Prone to human error, leads to invalid queries
  • Tedious, waste of time
  • Difficult to make incremental updates over time, no lineage

We decided to create a layer of abstraction over search_query called a report. Each report object contains a criteria array in which each item represents a clause in a search_query:

Collecting and Storing Results

To collect the results of a search query, we used tweepy instead of hitting the Twitter API directly. This is because tweepy is well-supported and solves a number of challenges out-the-box:

  • Rate-limitation
  • Pagination
  • Authentication

For storage, we elected to use BigQuery as we expect our tables to grow (>100GB). We created a table to persist all Tweets (to analyze engagement over time), and a view to provide the latest version of each:

# `status` table
id,inserted_at,created_at,status,likes,favorites
123,2021-01-15 02:30,2021-01-14 23:05,This is a Tweet!,15,25
123,2021-01-16 02:33,2021-01-14 23:05,This is a Tweet!,64,32
123,2021-01-17 02:38,2021-01-14 23:05,This is a Tweet!,93,45
# `status_latest` view
id,inserted_at,created_at,status,likes,favorites
123,2021-01-17 02:38,2021-01-14 23:05,This is a Tweet!,93,45

This approach yields duplicate records. To keep storage costs down, we delete any extraneous records older than 7 days.

To this end, we wrote a twitter DAG comprised of the following tasks:

Orchestration

We considered deploying our DAG to a Google Cloud Function. However, we decided to use dag-workflows so we can develop and run our code locally.

The dag-workflows Python library was built in-house to provide a means of writing and executing DAGs. Each DAG may be comprised of Python and/or SQL tasks that run on-demand or on a schedule.

At Roivant, each project has a dedicated dag-workflows scheduler deployed to a Kubernetes cluster on GKE. When it’s time for a DAG to run, a container is provisioned and the code is executed on it.

Twitter DAG written using dag-workflows.

The Solution

  1. To build a filter, a POST request is sent to the create_report/ endpoint:

A 200 signal indicates the report was successfully created and can be retrieved:

SELECT * 
FROM report_latest
WHERE report_slug = 'subway-delays';

2. To collect results of a report, the twitter DAG must be run. A user can wait until the next scheduled run (i.e. nightly), or trigger a run manually:

kubectl exec -it -n <NAMESPACE> <POD-HASH> \
-- /secrets-init/bin/secrets-init \
--provider=google workflows dags.config trigger twitter

During the DAG run, a new load_history object is created indicating the subway-delays report successfully returned 10 Tweets:

SELECT * 
FROM load_history
WHERE report_slug = 'subway-delays';

3. To access results (i.e. these 10 Tweets), we query the status_latest view:

SELECT status_created_at, 
status_id,
user_screen_name,
report_slug,
load_uuid,
status_text
FROM status_latest
WHERE load_uuid = '04237096-131f-4c40-821d-ba3c6d0d4f9e';

Now a user can sit back and let the results pile up 🚀

Useful Queries

Our data model supports a number of useful queries. To demonstrate them, we’ll start by declaring a start_date and end_date:

Reports by Engagement

Top Hashtags by Frequency

Top User Mentions by Frequency

Call to Action

Try collecting Tweets on your own using dag-workflows and BigQuery!

To help get you started, we’ve included a code sample demonstrating the core functions used in our twitter DAG.

Learn More

We appreciate you taking the time to read about our data engineering challenges! Below you’ll find useful resources previously mentioned:

--

--