Lightweight batch streaming Meetup RSVPs to Snowflake with GCP Pub/Sub

Let’s take a real-time streaming source — all the RSVPs from Meetup.com — into Snowflake, with the help of Google Cloud Pub/Sub.

Felipe Hoffa
Dec 15, 2021 · 7 min read
Image generated by AI (by author)

Use case

This is a real use case, consisting of all worldwide RSVPs on meetup.com being published in real time. This data has been collected from Meetup through their official API.

With this we can observe the top cities in North America by # of RSVPs since October 2021:

Top cities by # of RSVPS in North America since October 2021 (by author)

And the top topics by RSVPs for each city:

Top topics for Meetup events in different cities since October 2021 (by author)

Quick summary of solution

  • Python script in an e2-micro VM captures messages 24x7, publishes to Pub/Sub. Free tier, or $6.00/month.
  • Pub/Sub moves ~4GB/month. Free tier, or <<$1.00/month.
  • Python script launches every 10 minutes, reads from Pub/Sub, sends files to GCS. Costs included within the above e2-micro.
  • Snowflake Snowpipe detects new hourly GCS files, ingests into Snowflake. If running Snowflake in AWS, should consider GCP egress costs. Less than 2 credits per month.
  • Snowflake then makes it easy to parse the incoming JSON objects and analyze our data.

Quick survey of alternatives

  • Use Kafka: It would be more complex and expensive than Pub/Sub.
  • Use Pub/Sub Lite: It requires provisioning, which should be cheaper for streams with more data — but not for this case.
  • Use Dataflow: Recommended by Google, but requires running 24/7 in streaming mode — gets expensive.
  • Use external functions: Calling Pub/Sub directly from Snowflake through a proxy to unload messages sounds like an interesting idea, but unexplored.
  • Skip Pub/Sub, have the Python script produce GCS files: High risk of losing messages while composing files. Pub/Sub serves as a great permanent-temporary storage instead.
  • Use a different cloud toolset: Azure and AWS have good alternatives to this pipeline. I chose GCP just because I’m more familiar with it — which might be the case for people interested in this particular solution.

The code

Python — Capture and publish to pub/sub:

import requests
import time
from google.cloud import pubsub_v1
project_id = "myproject"
topic_id = "meetup202011"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
futures = dict()
minutes_ago = 1000*(int(time.time())-600)
url = "https://stream.meetup.com/2/rsvps?since_mtime="+str(minutes_ago)
print(url)
r = requests.get(url, stream=True)
for line in r.iter_lines(decode_unicode=True):
# print(line)
publisher.publish(topic_path, line.encode("utf-8"))

Python — Drain pub/sub for 20 seconds, send files:

import time
from google.cloud import pubsub_v1
from google.cloud import storage
project_id = "myproject"
subscription_id = "meetup20201201"
gcs_bucket = 'my-bucket'
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
max_messages = 20000
max_time = 20
start_time = int(time.time())
filename = 'file%i.gz' % start_time
with subscriber:
ack_ids = []
with gzip.open(filename, 'wb') as f:
while (
len(ack_ids) < max_messages
and time.time() < start_time + max_time
):
response = subscriber.pull(
request={
"subscription": subscription_path,
"max_messages": max_messages-len(ack_ids)}
)
print ("<received %i messages" % len(response.received_messages))
if not response.received_messages:
break
for received_message in response.received_messages:
f.write(received_message.message.data)
f.write('\n'.encode())
ack_ids.append(received_message.ack_id)
print ("received %i messages>" % len(response.received_messages))
if ack_ids:
gcs = storage.Client()
bucket = gcs.get_bucket(gcs_bucket)
blob = bucket.blob('logged/%s' % filename)
blob.upload_from_filename(filename=filename)
# Acknowledges the received messages so they will not be sent again.
ack_arrays = [ack_ids[i:i + 1000] for i in range(0, len(ack_ids), 1000)]
for x in ack_arrays:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": x}
)
print ("ack %i" % len(x))
os.remove(filename)
print(
f"Received and acknowledged {len(ack_ids)} messages from {subscription_path}."
)

Pub/Sub — Create the topic and subscriptions for the above code to work

VM shell — Run the Python code

while true; do python3 read.py; sleep 10; done

Wake up every 10 minutes to read messages stored in Pub/Sub:

while true; do python3 write.py; sleep 600; done

Snowflake — set up integration with GCS

create or replace storage integration temp_fhoffa_gcs_meetup
type = external_stage
storage_provider = gcs
enabled = true
storage_allowed_locations = ('gcs://my-bucket/folder/');


desc storage integration temp_fhoffa_gcs_meetup;
create or replace stage temp_fhoffa_gcs_meetup
storage_integration = temp_fhoffa_gcs_meetup
url = 'gcs://my-bucket/folder/'
file_format = (type=json);

Playing with the files, make sure they exist:

list @temp_fhoffa_gcs_meetup;select parse_json($1), METADATA$FILENAME, METADATA$FILE_ROW_NUMBER
from '@temp_fhoffa_gcs_meetup' (pattern=>'.*file1674367459.gz')
;
select count(*), array_agg(distinct metadata$filename)
from '@temp_fhoffa_gcs_meetup';

Set up an integration with GCS and Pub/Sub for new file notifications:

create notification integration temp_meetup202011_pubsub_int
type = queue
notification_provider = gcp_pubsub
enabled = true
gcp_pubsub_subscription_name = 'projects/fhoffa/subscriptions/meetup20201125_snowpipe_gcs';

desc notification integration temp_meetup202011_pubsub_int;

Create a table to ingest the arriving semi-structured data:

create table meetup202011_rsvps (x variant);

Tell Snowpipe to load into this table every time a new file arrives to GCS:

create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;

Writing queries in Snowflake

Top cities in North America by # of RSVPs

select x:group.group_city 
|| ', '
|| coalesce(x:group.group_state, x:group.group_country) city
, count(*) rsvps
from meetup202011_rsvps
where x:group.group_country in ('us', 'ca', 'mx')
and x:mtime > date_part(epoch_millisecond, '2021-10-01'::timestamp)
group by 1
order by rsvps desc
limit 500;
# of RSVPS for cities in North America (by author)
Top cities by # of RSVPS in North America since October 2021 (by author)

Top topics per city

For example, the top 3 topics for Meetup events in San Francisco, Boston, Austin, Denver, and New York since October 1st 2021. My personal highlights for each city are:

  • Austin: New in town
  • Boston: Social
  • Denver: Adventure
  • Miami: Fun times
  • New York: Social network
  • San Francisco: Software development
Top topics for Meetup events in different cities since October 2021 (by author)

select x:group.group_city::string city
, topic.value:urlkey::string topic
, count(*) rsvps
from meetup202011_rsvps, table(flatten(input=>x:group.group_topics)) topic
where x:mtime > date_part(epoch_millisecond, '2021-10-01'::timestamp)
and x:group.group_country = 'us'
and x:group.group_city in ('Austin', 'San Francisco', 'New York', 'Boston', 'Miami')
group by 1, 2
qualify row_number() over(partition by city order by rsvps desc) <= 3
order by city, rsvps desc;

Running this pipeline in practice, observed metrics

During a regular week we can see peaks of 2 messages per second:

(by author)

The average message size fluctuates around 1.4KB:

(by author)

We can see how the messages accumulate for within 10 minute windows, and then drain according to the script schedule:

(by author)

Due to the behavior of Pub/Sub, some messages don’t drain as expected, staying in Pub/Sub for 20 minutes (or sometimes much more):

(by author)

Highlights and notes

GCP

  • The e2-micro instance is included within the free tier.
  • GCP makes it easy to fire up alarms in case the stream stops.
(by author)

Snowflake

  • Snowpipe brings in each new file immediately into Snowflake.
  • The VARIANT type makes it easy to query semi-structured data, like the data contained in these JSON messages.

Next steps

Want more?

I’m Felipe Hoffa, Data Cloud Advocate for Snowflake. Thanks for joining me on this adventure. You can follow me on Twitter and LinkedIn. Check reddit.com/r/snowflake for the most interesting Snowflake news.

Snowflake

Articles for engineers, by engineers.