Snowflake
Published in

Snowflake

Lightweight batch streaming Meetup RSVPs to Snowflake with GCP Pub/Sub

Let’s take a real-time streaming source — all the RSVPs from Meetup.com — into Snowflake, with the help of Google Cloud Pub/Sub

Image generated by AI (by author)
Watch on Youtube

Use case

We want to move a stream of data with ~4k messages per hour to Snowflake, using Google Pub/Sub as temporary storage.

This is a real use case, consisting of all worldwide RSVPs on meetup.com being published in real time. This data has been collected from Meetup through their official API.

With this we can observe the top cities in North America by # of RSVPs since October 2021:

Top cities by # of RSVPS in North America since October 2021 (by author)

And the top topics by RSVPs for each city:

Top topics for Meetup events in different cities since October 2021 (by author)

Quick summary of solution

Most of this fits into the permanent GCP free tier:

  • Python script in an e2-micro VM captures messages 24x7, publishes to Pub/Sub. Free tier, or $6.00/month.
  • Pub/Sub moves ~4GB/month. Free tier, or <<$1.00/month.
  • Python script launches every 10 minutes, reads from Pub/Sub, sends files to GCS. Costs included within the above e2-micro.
  • Snowflake Snowpipe detects new hourly GCS files, ingests into Snowflake. If running Snowflake in AWS, should consider GCP egress costs. Less than 2 credits per month.
  • Snowflake then makes it easy to parse the incoming JSON objects and analyze our data.

Quick survey of alternatives

Some paths considered, but not followed:

  • Use Kafka: It would be more complex and expensive than Pub/Sub.
  • Use Pub/Sub Lite: It requires provisioning, which should be cheaper for streams with more data — but not for this case.
  • Use Dataflow: Recommended by Google, but requires running 24/7 in streaming mode — gets expensive.
  • Use external functions: Calling Pub/Sub directly from Snowflake through a proxy to unload messages sounds like an interesting idea, but unexplored.
  • Skip Pub/Sub, have the Python script produce GCS files: High risk of losing messages while composing files. Pub/Sub serves as a great permanent-temporary storage instead.
  • Use a different cloud toolset: Azure and AWS have good alternatives to this pipeline. I chose GCP just because I’m more familiar with it — which might be the case for people interested in this particular solution.

The code

Python — Capture and publish to pub/sub:

read.py: Opens the Meetup RSVP stream and reads from it forever, sending each line to Pub/Sub.

import requests
import time
from google.cloud import pubsub_v1
project_id = "myproject"
topic_id = "meetup202011"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
futures = dict()
minutes_ago = 1000*(int(time.time())-600)
url = "https://stream.meetup.com/2/rsvps?since_mtime="+str(minutes_ago)
print(url)
r = requests.get(url, stream=True)
for line in r.iter_lines(decode_unicode=True):
# print(line)
publisher.publish(topic_path, line.encode("utf-8"))

Python — Drain pub/sub for 20 seconds, send files:

write.py: Reads messages stored in Pub/Sub, and creates a file to send to GCS. Due to its behavior, we need to ask Pub/Sub repeated times for more messages, within a 20 second window. After the file has been written to GCS, acknowledges the messages are safe to Pub/Sub.

import time
from google.cloud import pubsub_v1
from google.cloud import storage
project_id = "myproject"
subscription_id = "meetup20201201"
gcs_bucket = 'my-bucket'
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
max_messages = 20000
max_time = 20
start_time = int(time.time())
filename = 'file%i.gz' % start_time
with subscriber:
ack_ids = []
with gzip.open(filename, 'wb') as f:
while (
len(ack_ids) < max_messages
and time.time() < start_time + max_time
):
response = subscriber.pull(
request={
"subscription": subscription_path,
"max_messages": max_messages-len(ack_ids)}
)
print ("<received %i messages" % len(response.received_messages))
if not response.received_messages:
break
for received_message in response.received_messages:
f.write(received_message.message.data)
f.write('\n'.encode())
ack_ids.append(received_message.ack_id)
print ("received %i messages>" % len(response.received_messages))
if ack_ids:
gcs = storage.Client()
bucket = gcs.get_bucket(gcs_bucket)
blob = bucket.blob('logged/%s' % filename)
blob.upload_from_filename(filename=filename)
# Acknowledges the received messages so they will not be sent again.
ack_arrays = [ack_ids[i:i + 1000] for i in range(0, len(ack_ids), 1000)]
for x in ack_arrays:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": x}
)
print ("ack %i" % len(x))
os.remove(filename)
print(
f"Received and acknowledged {len(ack_ids)} messages from {subscription_path}."
)

Pub/Sub — Create the topic and subscriptions for the above code to work

Follow the Pub/Sub docs for the topic_path and subscription_path used above to exist.

VM shell — Run the Python code

Read from the Meetup stream forever. If there’s any error that aborts the Python script, wait 10 seconds, and run the script again:

while true; do python3 read.py; sleep 10; done

Wake up every 10 minutes to read messages stored in Pub/Sub:

while true; do python3 write.py; sleep 600; done

Snowflake — set up integration with GCS

Follow the docs “Configuring an Integration for Google Cloud Storage”:

create or replace storage integration temp_fhoffa_gcs_meetup
type = external_stage
storage_provider = gcs
enabled = true
storage_allowed_locations = ('gcs://my-bucket/folder/');


desc storage integration temp_fhoffa_gcs_meetup;
create or replace stage temp_fhoffa_gcs_meetup
storage_integration = temp_fhoffa_gcs_meetup
url = 'gcs://my-bucket/folder/'
file_format = (type=json);

Playing with the files, make sure they exist:

list @temp_fhoffa_gcs_meetup;select parse_json($1), METADATA$FILENAME, METADATA$FILE_ROW_NUMBER
from '@temp_fhoffa_gcs_meetup' (pattern=>'.*file1674367459.gz')
;
select count(*), array_agg(distinct metadata$filename)
from '@temp_fhoffa_gcs_meetup';

Set up an integration with GCS and Pub/Sub for new file notifications:

create notification integration temp_meetup202011_pubsub_int
type = queue
notification_provider = gcp_pubsub
enabled = true
gcp_pubsub_subscription_name = 'projects/fhoffa/subscriptions/meetup20201125_snowpipe_gcs';

desc notification integration temp_meetup202011_pubsub_int;

Create a table to ingest the arriving semi-structured data:

create table meetup202011_rsvps (x variant);

Tell Snowpipe to load into this table every time a new file arrives to GCS:

create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;

Writing queries in Snowflake

Thanks to the VARIANT type, writing queries in Snowflake is straightforward, even without defining views nor schemas.

Top cities in North America by # of RSVPs

The top 2 cities by # of RSVPS in North America since October 2021 are Toronto and New York, with New York having almost 3 times more RSVPs than Toronto:

select x:group.group_city 
|| ', '
|| coalesce(x:group.group_state, x:group.group_country) city
, count(*) rsvps
from meetup202011_rsvps
where x:group.group_country in ('us', 'ca', 'mx')
and x:mtime > date_part(epoch_millisecond, '2021-10-01'::timestamp)
group by 1
order by rsvps desc
limit 500;
# of RSVPS for cities in North America (by author)
Top cities by # of RSVPS in North America since October 2021 (by author)

Top topics per city

It’s interesting to see how different cities prefer different topics.

For example, the top 3 topics for Meetup events in San Francisco, Boston, Austin, Denver, and New York since October 1st 2021. My personal highlights for each city are:

  • Austin: New in town
  • Boston: Social
  • Denver: Adventure
  • Miami: Fun times
  • New York: Social network
  • San Francisco: Software development
Top topics for Meetup events in different cities since October 2021 (by author)

select x:group.group_city::string city
, topic.value:urlkey::string topic
, count(*) rsvps
from meetup202011_rsvps, table(flatten(input=>x:group.group_topics)) topic
where x:mtime > date_part(epoch_millisecond, '2021-10-01'::timestamp)
and x:group.group_country = 'us'
and x:group.group_city in ('Austin', 'San Francisco', 'New York', 'Boston', 'Miami')
group by 1, 2
qualify row_number() over(partition by city order by rsvps desc) <= 3
order by city, rsvps desc;

Running this pipeline in practice, observed metrics

These dashboards live within the Google Cloud console.

During a regular week we can see peaks of 2 messages per second:

(by author)

The average message size fluctuates around 1.4KB:

(by author)

We can see how the messages accumulate for within 10 minute windows, and then drain according to the script schedule:

(by author)

Due to the behavior of Pub/Sub, some messages don’t drain as expected, staying in Pub/Sub for 20 minutes (or sometimes much more):

(by author)

Highlights and notes

GCP

  • Pub/Sub requires no setup nor provisioning, and ensures data won’t be lost. It’s pretty cheap at low streaming rates.
  • The e2-micro instance is included within the free tier.
  • GCP makes it easy to fire up alarms in case the stream stops.
(by author)

Snowflake

  • It’s easy to configure Snowpipe to trigger automatically with new GCS files, even when running Snowflake on AWS.
  • Snowpipe brings in each new file immediately into Snowflake.
  • The VARIANT type makes it easy to query semi-structured data, like the data contained in these JSON messages.

Next steps

Want more?

I’m Felipe Hoffa, Data Cloud Advocate for Snowflake. Thanks for joining me on this adventure. You can follow me on Twitter and LinkedIn. Check reddit.com/r/snowflake for the most interesting Snowflake news.

--

--

--

Snowflake articles from engineers using Snowflake to power their data.

Recommended from Medium

Fluentd working demystified

Teacher to Developer— Part 1

a lock superimposed over computer code

Docker with GPU support on Linux

How To Use and Benefit From AWS Bottlerocket in EKS

Feature image of the article

How to prepare for Google cloud certified Associate Cloud Engineer Exam

Deploying with AWS — just the basics.

Programing — Easy Ways to Measure the Performance of R Code

Host a Static Website on GCP With Load Balancer and CDN

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Felipe Hoffa

Felipe Hoffa

Data Cloud Advocate at Snowflake ❄️. Originally from Chile, now in San Francisco and around the world. Previously at Google. Let’s talk data.

More from Medium

Snowpark Fast & Furious: Streamlining your Data Pipelines

How to Generate ERDs from a Snowflake Model

The Ultimate Guide to Using dbt With Snowflake

The 10 Capabilities of Data Vault 2.0 You Should Be Using