Managing Complex Twitter Search Queries
Our approach to managing complex Twitter search queries using Python and BigQuery.
The Problem
Twitter is currently the go-to source to find out what’s happening — products, people, politics — it’s all there. Originally unveiled as a social media platform, it has become the world’s data feed — a place where all global events are logged synchronously.
For a scientist at Roivant Discovery, these data offer a trove of potential cutting-edge research and insight. To unlock this potential, scientists need a way of collecting Twitter data on a reoccurring basis without spending an inordinate amount of time querying and munging.
As data engineers at Roivant Technology, we were tasked with figuring out:
- How will our users build filters?
- How do we collect and store the results of these filters?
Our Approach
Building Filters
There are two approaches to filtering Twitter data (via API v1 standard):
- Stream — Tweets arrive as soon as they’re published
- Search — Tweets arrive as results to a query
Streaming is an appealing option due to its low latency. But streams are high maintenance — when one goes down, any missed records must be filled in. Another pitfall: each Tweet only flows through a stream once. This makes it difficult to analyze a Tweet’s performance over time.
The real deal-breaker is that the streaming filter is only capable of using basic stream parameters (e.g. follow, track, location) which are insufficient for the complex filters required by our users.
We decided to pursue Search because it provides the ability to filter Tweets using search query operators. For example, we can isolate NYC subway delays specifically on the 4/5 line:
from:NYCTSubway (delay OR delays OR stopped) "4/5"
We considered allowing users to maintain a list of their own custom search queries. However, we realized that would become a nightmare 😅:
- Prone to human error, leads to invalid queries
- Tedious, waste of time
- Difficult to make incremental updates over time, no lineage
We decided to create a layer of abstraction over search_query
called a report
. Each report
object contains a criteria
array in which each item represents a clause in a search_query
:
Collecting and Storing Results
To collect the results of a search query, we used tweepy
instead of hitting the Twitter API directly. This is because tweepy
is well-supported and solves a number of challenges out-the-box:
- Rate-limitation
- Pagination
- Authentication
For storage, we elected to use BigQuery as we expect our tables to grow (>100GB). We created a table to persist all Tweets (to analyze engagement over time), and a view to provide the latest version of each:
# `status` table
id,inserted_at,created_at,status,likes,favorites
123,2021-01-15 02:30,2021-01-14 23:05,This is a Tweet!,15,25
123,2021-01-16 02:33,2021-01-14 23:05,This is a Tweet!,64,32
123,2021-01-17 02:38,2021-01-14 23:05,This is a Tweet!,93,45# `status_latest` view
id,inserted_at,created_at,status,likes,favorites
123,2021-01-17 02:38,2021-01-14 23:05,This is a Tweet!,93,45
This approach yields duplicate records. To keep storage costs down, we delete any extraneous records older than 7 days.
To this end, we wrote a twitter
DAG comprised of the following tasks:
Orchestration
We considered deploying our DAG to a Google Cloud Function. However, we decided to use dag-workflows
so we can develop and run our code locally.
The dag-workflows
Python library was built in-house to provide a means of writing and executing DAGs. Each DAG may be comprised of Python and/or SQL tasks that run on-demand or on a schedule.
At Roivant, each project has a dedicated dag-workflows
scheduler deployed to a Kubernetes cluster on GKE. When it’s time for a DAG to run, a container is provisioned and the code is executed on it.
The Solution
- To build a filter, a
POST
request is sent to thecreate_report/
endpoint:
A 200
signal indicates the report
was successfully created and can be retrieved:
SELECT *
FROM report_latest
WHERE report_slug = 'subway-delays';
2. To collect results of a report
, the twitter
DAG must be run. A user can wait until the next scheduled run (i.e. nightly), or trigger a run manually:
kubectl exec -it -n <NAMESPACE> <POD-HASH> \
-- /secrets-init/bin/secrets-init \
--provider=google workflows dags.config trigger twitter
During the DAG run, a new load_history
object is created indicating the subway-delays
report successfully returned 10 Tweets:
SELECT *
FROM load_history
WHERE report_slug = 'subway-delays';
3. To access results (i.e. these 10 Tweets), we query the status_latest
view:
SELECT status_created_at,
status_id,
user_screen_name,
report_slug,
load_uuid,
status_text
FROM status_latest
WHERE load_uuid = '04237096-131f-4c40-821d-ba3c6d0d4f9e';
Now a user can sit back and let the results pile up 🚀
Useful Queries
Our data model supports a number of useful queries. To demonstrate them, we’ll start by declaring a start_date
and end_date
:
Reports by Engagement
Top Hashtags by Frequency
Top User Mentions by Frequency
Call to Action
Try collecting Tweets on your own using dag-workflows
and BigQuery!
To help get you started, we’ve included a code sample demonstrating the core functions used in our twitter
DAG.
- Code sample: https://gitlab.com/roivant/oss/twitter-search-query
- Sample DAG: https://gitlab.com/roivant/oss/workflows/-/tree/main/example_project/dags/trader_bot
Learn More
We appreciate you taking the time to read about our data engineering challenges! Below you’ll find useful resources previously mentioned:
- Read: Building dag-workflows: a Python Workflow Execution Tool
- Documentation:
dag-workflows
- Documentation:
tweepy
- Read: Twitter Search API
- Read: Twitter Search Operators
- Read: What is BigQuery?
- Read: Streaming Data into BigQuery
- Read: Removing Duplicates in BigQuery
- Read Introducing the Roivant Technology Blog
- Work with us! View our open positions.