<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Mansi Bhadani on Medium]]></title>
        <description><![CDATA[Stories by Mansi Bhadani on Medium]]></description>
        <link>https://medium.com/@mansi.bhadani31?source=rss-d90757d5d287------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/0*agF2oB7vAmIRnNiB</url>
            <title>Stories by Mansi Bhadani on Medium</title>
            <link>https://medium.com/@mansi.bhadani31?source=rss-d90757d5d287------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Mon, 18 May 2026 06:36:40 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@mansi.bhadani31/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Watermarks in Spark Structured Streaming: What They Actually Do]]></title>
            <link>https://medium.com/@mansi.bhadani31/watermarks-in-spark-structured-streaming-what-they-actually-do-4b06e0f7a596?source=rss-d90757d5d287------2</link>
            <guid isPermaLink="false">https://medium.com/p/4b06e0f7a596</guid>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[spark]]></category>
            <category><![CDATA[timestamp]]></category>
            <category><![CDATA[data-engineer]]></category>
            <dc:creator><![CDATA[Mansi Bhadani]]></dc:creator>
            <pubDate>Sun, 05 Apr 2026 19:22:13 GMT</pubDate>
            <atom:updated>2026-04-05T19:22:13.481Z</atom:updated>
            <content:encoded><![CDATA[<p><em>A practical guide to event-time watermarks — when late data gets dropped, why stateful aggregation memory grows, and how to tune window size for your SLA.</em></p><p>Watermarks are one of those Spark concepts that look simple in the documentation and turn confusing the moment you hit production.</p><p>The official description is something like: “a watermark defines how late data can arrive before being dropped.” That’s technically correct and practically useless until you’ve watched Spark silently discard events and spent an afternoon figuring out why.</p><p>This is the guide I wish I’d had when I built the NJ Transit streaming pipeline. I’ll explain what watermarks actually do to Spark’s internal state, when late data gets dropped (and when it doesn’t), why memory grows if you tune watermarks wrong, and how to pick the right window and watermark configuration for a given latency SLA.</p><h3>Why Event Time Is Different From Processing Time</h3><p>Before watermarks make sense, you need to internalize the event time vs. processing time distinction — because the entire watermark mechanism exists to handle the gap between them.</p><p><strong>Processing time</strong> is when Spark sees the event. <strong>Event time</strong> is when the event actually occurred, embedded in the payload.</p><p>In a transit pipeline, a vehicle position update might be generated at 14:32:07 on the vehicle, transmitted over a cellular network, buffered in Kafka, and consumed by Spark at 14:32:45. The event time is 14:32:07. The processing time is 14:32:45. The gap is 38 seconds.</p><p>That gap is usually small. But networks fail, vehicles go underground, Kafka consumers fall behind. Sometimes that gap is 5 minutes. Sometimes it’s 2 hours.</p><p>If you’re computing windowed aggregations (e.g., average delay per route over 5-minute windows), you need to decide: when is a 5-minute window “done” and ready to emit? With processing time, you just wait 5 minutes of wall-clock time. With event time, you don’t know when all the events for a window have arrived — because some of them might still be in transit.</p><p>Watermarks are Spark’s answer to that problem.</p><h3>What a Watermark Actually Does</h3><p>When you define a watermark in Spark:</p><pre>df.withWatermark(&quot;event_timestamp&quot;, &quot;2 minutes&quot;)</pre><p>You’re telling Spark: “The maximum amount of time an event can be delayed beyond its event time is 2 minutes. Any event that arrives more than 2 minutes late (relative to the current watermark) will be dropped.”</p><p>The watermark itself is computed as:</p><pre>watermark = max(event_timestamp seen so far) - delay_threshold</pre><p>So if the latest event Spark has seen has a timestamp of 14:40:00, and the watermark delay is 2 minutes, the current watermark is 14:38:00.</p><p><strong>Any event with an event_timestamp earlier than 14:38:00 will be dropped.</strong></p><p>This is the part that surprises people: the watermark isn’t a fixed time threshold. It’s a <em>moving threshold</em> driven by the maximum event time Spark has observed. If your pipeline stalls and no new events arrive, the watermark doesn’t advance — it stays frozen, and no windows close.</p><h3>The State Store: Why Memory Grows</h3><p>Here’s the mechanism that bites people in production.</p><p>Spark Structured Streaming maintains a <strong>state store</strong> for stateful operations like windowed aggregations. For every open window, Spark keeps partial aggregation state in memory (and on disk as a checkpoint) until the window is finalized and emitted.</p><p>A window is finalized when the watermark advances past the window’s end time.</p><p>If your watermark delay is set too generously (say, 30 minutes), Spark keeps all windows open for at least 30 minutes. If you have 5-minute tumbling windows and 30 minutes of watermark delay, that’s at minimum 7 windows open simultaneously, each accumulating state.</p><p>For a high-volume topic with many distinct grouping keys (route × vehicle × direction), this state can be substantial:</p><pre># This configuration creates a LOT of state<br>df.withWatermark(&quot;event_timestamp&quot;, &quot;30 minutes&quot;) \<br>  .groupBy(<br>      window(&quot;event_timestamp&quot;, &quot;5 minutes&quot;),<br>      col(&quot;route_id&quot;),<br>      col(&quot;vehicle_id&quot;),<br>      col(&quot;direction&quot;)<br>  ) \<br>  .agg(avg(&quot;delay_seconds&quot;).alias(&quot;avg_delay&quot;))</pre><p>With 300 routes × 50 vehicles × 2 directions = 30,000 grouping key combinations, and 7 open windows, you’re maintaining state for up to 210,000 partial aggregations simultaneously.</p><p>Increase the watermark delay or the window size, and that number grows proportionally.</p><h3>When Late Data Gets Dropped vs. Included</h3><p>The behavior here is more nuanced than “late = dropped.”</p><p><strong>An event is dropped</strong> if its event_timestamp is less than the current watermark when it arrives. The current watermark is max_event_time_seen - delay_threshold.</p><p><strong>An event is included</strong> if it arrives before the watermark advances past its window’s end time — even if it arrives “late” relative to processing time.</p><p>Example with a 5-minute window and a 2-minute watermark delay:</p><pre>Window: [14:30:00, 14:35:00)<br>Event arrives at processing time 14:37:30 with event_time 14:34:45</pre><pre>Current watermark = max_seen_event_time - 2min</pre><pre>If max_seen_event_time = 14:36:30:<br>  watermark = 14:34:30<br>  event_time 14:34:45 &gt; watermark → EVENT INCLUDED ✓</pre><pre>If max_seen_event_time = 14:37:30:<br>  watermark = 14:35:30<br>  window end 14:35:00 &lt; watermark → WINDOW CLOSED<br>  event_time 14:34:45 → EVENT DROPPED ✗</pre><p>The event’s fate depends not on how late it arrived, but on where the watermark is when it arrives.</p><h3>Tuning Watermarks for Your SLA</h3><p>Here’s a practical framework for picking watermark delay values:</p><h3>Step 1: Measure Your Actual Event Delay Distribution</h3><p>Before guessing a watermark value, instrument your pipeline to measure the real delay distribution. In the transit pipeline, I added this to a monitoring job:</p><pre>from pyspark.sql.functions import col, current_timestamp, unix_timestamp</pre><pre>delay_stats = df \<br>    .withColumn(&quot;processing_delay_seconds&quot;,<br>                unix_timestamp(current_timestamp()) -<br>                unix_timestamp(col(&quot;event_timestamp&quot;))) \<br>    .agg(<br>        percentile_approx(&quot;processing_delay_seconds&quot;, 0.50).alias(&quot;p50_delay&quot;),<br>        percentile_approx(&quot;processing_delay_seconds&quot;, 0.95).alias(&quot;p95_delay&quot;),<br>        percentile_approx(&quot;processing_delay_seconds&quot;, 0.99).alias(&quot;p99_delay&quot;),<br>        max(&quot;processing_delay_seconds&quot;).alias(&quot;max_delay&quot;)<br>    )</pre><p>Run this for a week. The 99th percentile of your event delay distribution is your starting watermark value. If p99 is 45 seconds, use &quot;1 minute&quot;. If p99 is 4 minutes, use &quot;5 minutes&quot;.</p><h3>Step 2: Balance Latency vs. Completeness</h3><p>Watermark delay is a direct tradeoff between output latency and result completeness:</p><p>Watermark Delay Output Latency Late Data Included State Size Short (30s) Low ~95% Small Medium (2min) Medium ~99% Medium Long (10min) High ~99.9% Large</p><p>For the transit pipeline, the SLA was sub-60 second end-to-end latency. That meant I couldn’t use a watermark longer than about 90 seconds — otherwise windows would never close within the SLA window.</p><p>The measured p99 delay was ~45 seconds (mostly Kafka consumer lag plus network). I used a 90-second watermark, which covered ~99.2% of events and kept state size manageable.</p><pre>transit_agg = df \<br>    .withWatermark(&quot;event_timestamp&quot;, &quot;90 seconds&quot;) \<br>    .groupBy(<br>        window(&quot;event_timestamp&quot;, &quot;5 minutes&quot;, &quot;1 minute&quot;),  # 5min window, 1min slide<br>        col(&quot;route_id&quot;)<br>    ) \<br>    .agg(<br>        avg(&quot;delay_seconds&quot;).alias(&quot;avg_delay&quot;),<br>        count(&quot;*&quot;).alias(&quot;event_count&quot;)<br>    )</pre><h3>Step 3: Monitor Window Closure Rate</h3><p>Add a metric to track how many windows are closing per micro-batch. If windows aren’t closing, the watermark isn’t advancing — usually because no new events are arriving, or because you’re seeing a flood of very old events.</p><pre># In your streaming query listener<br>def process_batch(df, epoch_id):<br>    closed_windows = df.filter(col(&quot;window.end&quot;) &lt; current_watermark())<br>    print(f&quot;Epoch {epoch_id}: {closed_windows.count()} windows closed&quot;)</pre><h3>Common Watermark Bugs</h3><h3>Bug 1: Watermark Never Advances</h3><p><strong>Symptom:</strong> State store grows without bound. Windows never emit.</p><p><strong>Cause:</strong> No new events arriving with recent event timestamps. If your source has a dead period (overnight, weekends), the watermark freezes and nothing closes.</p><p><strong>Fix:</strong> Either use processing time for windows that must close on a schedule, or send periodic heartbeat events with current timestamps to advance the watermark.</p><h3>Bug 2: All Late Data Gets Dropped</h3><p><strong>Symptom:</strong> Aggregation counts are consistently 20–30% lower than expected.</p><p><strong>Cause:</strong> Watermark delay is set shorter than the actual p99 event delay.</p><p><strong>Fix:</strong> Instrument your actual delay distribution (see Step 1 above). Increase the watermark delay to cover the p99.</p><h3>Bug 3: Out-of-Order Events Causing Wrong Aggregations</h3><p><strong>Symptom:</strong> Window aggregations are correct in total but wrong per-window.</p><p><strong>Cause:</strong> Events from window A arriving during the processing of window B, with a watermark that still includes them, but they’re being assigned to the wrong window due to processing time assumptions.</p><p><strong>Fix:</strong> Always use event_timestamp (not processing timestamp) as the window column. This is the whole point of event-time windows.</p><pre># WRONG - uses processing time<br>df.groupBy(window(current_timestamp(), &quot;5 minutes&quot;))</pre><pre># RIGHT - uses event time<br>df.groupBy(window(col(&quot;event_timestamp&quot;), &quot;5 minutes&quot;))</pre><h3>Output Modes and Watermarks</h3><p>One more thing that trips people up: not all output modes work with watermarks.</p><ul><li><strong>Append mode:</strong> Only emits rows once their window is finalized (past the watermark). Best for downstream sinks that can’t handle updates (Kafka, Snowflake via COPY). Requires watermark.</li><li><strong>Update mode:</strong> Emits rows every time they change. Doesn’t require watermark but state never gets cleaned up without one.</li><li><strong>Complete mode:</strong> Emits the full result table every batch. No watermark support. Only practical for small result sets.</li></ul><p>For the transit pipeline, I used append mode — Snowflake COPY INTO doesn’t handle upserts efficiently, and I wanted to emit finalized aggregations only once.</p><pre>query = transit_agg \<br>    .writeStream \<br>    .outputMode(&quot;append&quot;) \  # Only emit finalized windows<br>    .format(&quot;snowflake&quot;) \<br>    .option(&quot;sfURL&quot;, snowflake_url) \<br>    .option(&quot;dbtable&quot;, &quot;fct_transit_aggregations&quot;) \<br>    .option(&quot;checkpointLocation&quot;, &quot;/checkpoints/transit-agg&quot;) \<br>    .start()</pre><h3>Takeaway</h3><p>Watermarks aren’t just a “drop late data” switch. They’re Spark’s mechanism for deciding when windowed state is safe to finalize and clean up. Get them wrong in one direction and you drop events you needed. Get them wrong in the other direction and your memory grows until the job crashes.</p><p>The right watermark value is determined empirically: measure your actual event delay distribution, set the delay to cover p99, and monitor window closure rate in production. Then adjust.</p><p>The goal isn’t zero late data. The goal is a predictable, bounded tradeoff between latency and completeness — and watermarks are the knob.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=4b06e0f7a596" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Building a Dead-Letter Queue for Your Kafka Pipeline the Right Way]]></title>
            <link>https://medium.com/@mansi.bhadani31/building-a-dead-letter-queue-for-your-kafka-pipeline-the-right-way-d37c411675ec?source=rss-d90757d5d287------2</link>
            <guid isPermaLink="false">https://medium.com/p/d37c411675ec</guid>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[data-engineer]]></category>
            <category><![CDATA[pipeline]]></category>
            <category><![CDATA[pos]]></category>
            <category><![CDATA[kafka-dlq]]></category>
            <dc:creator><![CDATA[Mansi Bhadani]]></dc:creator>
            <pubDate>Sun, 05 Apr 2026 19:21:26 GMT</pubDate>
            <atom:updated>2026-04-05T19:21:26.836Z</atom:updated>
            <content:encoded><![CDATA[<p><em>Every streaming pipeline drops records. Here’s how to route them to a dead-letter topic with structured failure reasons, and build a Slack alert before your downstream notices.</em></p><p>Every Kafka pipeline drops records. The question isn’t if — it’s whether you know when it happens, why it happened, and whether you can recover.</p><p>Most pipelines I’ve seen handle this one of two ways: they silently swallow failures (log an error, increment a counter, move on), or they crash the consumer entirely and page someone at 2 AM. Neither is good.</p><p>A <strong>dead-letter queue (DLQ)</strong> — a dedicated Kafka topic where failed records land with structured failure metadata — is the third option. It gives you visibility into failures, preserves the original event for replay, and keeps your main pipeline running while you investigate.</p><p>Here’s how to build one properly.</p><h3>What a DLQ Solves (and Doesn’t)</h3><p>A DLQ handles <strong>processing failures</strong> — events that arrive at your consumer but can’t be processed due to:</p><ul><li>Schema validation failures (malformed Avro, missing required fields)</li><li>Deserialization errors (corrupted bytes, wrong schema version)</li><li>Business logic rejections (null trip_id on a transit event, negative loan amount)</li><li>Downstream write failures (Snowflake temporarily unavailable)</li></ul><p>A DLQ does <strong>not</strong> handle events that never arrived (network partition between producer and broker) or events that were produced to the wrong topic. Those are upstream problems.</p><h3>The DLQ Topic Design</h3><p>The DLQ is a regular Kafka topic. What makes it useful is the structure of the messages you write to it.</p><p>A DLQ message should contain three things:</p><ol><li><strong>The original event bytes</strong> — exact bytes from the source topic, unmodified</li><li><strong>Failure metadata</strong> — what failed, why, which consumer, which offset</li><li><strong>Routing information</strong> — which source topic and partition the event came from</li></ol><p>Here’s the Avro schema I use for DLQ messages:</p><pre>{<br>  &quot;type&quot;: &quot;record&quot;,<br>  &quot;name&quot;: &quot;DeadLetterRecord&quot;,<br>  &quot;namespace&quot;: &quot;com.pipeline.dlq&quot;,<br>  &quot;fields&quot;: [<br>    {&quot;name&quot;: &quot;original_topic&quot;, &quot;type&quot;: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;original_partition&quot;, &quot;type&quot;: &quot;int&quot;},<br>    {&quot;name&quot;: &quot;original_offset&quot;, &quot;type&quot;: &quot;long&quot;},<br>    {&quot;name&quot;: &quot;original_timestamp&quot;, &quot;type&quot;: &quot;long&quot;},<br>    {&quot;name&quot;: &quot;original_key&quot;, &quot;type&quot;: [&quot;null&quot;, &quot;bytes&quot;], &quot;default&quot;: null},<br>    {&quot;name&quot;: &quot;original_value&quot;, &quot;type&quot;: &quot;bytes&quot;},<br>    {&quot;name&quot;: &quot;failure_timestamp&quot;, &quot;type&quot;: &quot;long&quot;},<br>    {&quot;name&quot;: &quot;failure_reason&quot;, &quot;type&quot;: {<br>      &quot;type&quot;: &quot;enum&quot;,<br>      &quot;name&quot;: &quot;FailureReason&quot;,<br>      &quot;symbols&quot;: [<br>        &quot;SCHEMA_VALIDATION_FAILED&quot;,<br>        &quot;DESERIALIZATION_ERROR&quot;,<br>        &quot;BUSINESS_RULE_VIOLATION&quot;,<br>        &quot;DOWNSTREAM_WRITE_FAILED&quot;,<br>        &quot;UNKNOWN&quot;<br>      ]<br>    }},<br>    {&quot;name&quot;: &quot;failure_message&quot;, &quot;type&quot;: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;consumer_group&quot;, &quot;type&quot;: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;pipeline_version&quot;, &quot;type&quot;: &quot;string&quot;}<br>  ]<br>}</pre><p>The failure_reason enum is important. It lets you filter the DLQ topic by failure type — schema failures are usually upstream producer bugs; downstream write failures are usually transient and worth retrying; business rule violations need human review.</p><h3>The Python Implementation</h3><p>Here’s a production-ready DLQ producer that wraps your consumer logic:</p><pre>from confluent_kafka import Consumer, Producer, KafkaError<br>from confluent_kafka.avro import AvroConsumer, AvroProducer<br>from confluent_kafka.schema_registry import SchemaRegistryClient<br>import json<br>import time<br>import logging</pre><pre>logger = logging.getLogger(__name__)<br></pre><pre>class DLQProducer:<br>    def __init__(self, bootstrap_servers: str, dlq_topic: str, consumer_group: str):<br>        self.dlq_topic = dlq_topic<br>        self.consumer_group = consumer_group<br>        self.producer = Producer({&quot;bootstrap.servers&quot;: bootstrap_servers})</pre><pre>    def send_to_dlq(<br>        self,<br>        original_message,<br>        failure_reason: str,<br>        failure_message: str,<br>        pipeline_version: str = &quot;1.0.0&quot;<br>    ):<br>        dlq_record = {<br>            &quot;original_topic&quot;: original_message.topic(),<br>            &quot;original_partition&quot;: original_message.partition(),<br>            &quot;original_offset&quot;: original_message.offset(),<br>            &quot;original_timestamp&quot;: original_message.timestamp()[1],<br>            &quot;original_key&quot;: original_message.key(),<br>            &quot;original_value&quot;: original_message.value(),<br>            &quot;failure_timestamp&quot;: int(time.time() * 1000),<br>            &quot;failure_reason&quot;: failure_reason,<br>            &quot;failure_message&quot;: str(failure_message)[:2000],  # cap message length<br>            &quot;consumer_group&quot;: self.consumer_group,<br>            &quot;pipeline_version&quot;: pipeline_version,<br>        }</pre><pre>        self.producer.produce(<br>            topic=self.dlq_topic,<br>            key=original_message.key(),<br>            value=json.dumps(dlq_record).encode(&quot;utf-8&quot;),<br>            on_delivery=self._delivery_callback,<br>        )<br>        self.producer.poll(0)</pre><pre>    def _delivery_callback(self, err, msg):<br>        if err:<br>            logger.error(f&quot;DLQ delivery failed: {err}&quot;)<br>        else:<br>            logger.debug(f&quot;DLQ record delivered: {msg.topic()}[{msg.partition()}]@{msg.offset()}&quot;)</pre><pre>    def flush(self):<br>        self.producer.flush()</pre><p>Now wrap your consumer’s processing logic:</p><pre>def process_transit_event(message, dlq_producer: DLQProducer):<br>    try:<br>        # Deserialize<br>        try:<br>            event = deserialize_avro(message.value())<br>        except Exception as e:<br>            dlq_producer.send_to_dlq(<br>                message,<br>                failure_reason=&quot;DESERIALIZATION_ERROR&quot;,<br>                failure_message=str(e)<br>            )<br>            return</pre><pre>        # Validate business rules<br>        validation_errors = validate_transit_event(event)<br>        if validation_errors:<br>            dlq_producer.send_to_dlq(<br>                message,<br>                failure_reason=&quot;BUSINESS_RULE_VIOLATION&quot;,<br>                failure_message=&quot;; &quot;.join(validation_errors)<br>            )<br>            return</pre><pre>        # Write to Snowflake<br>        try:<br>            write_to_snowflake(event)<br>        except SnowflakeWriteError as e:<br>            dlq_producer.send_to_dlq(<br>                message,<br>                failure_reason=&quot;DOWNSTREAM_WRITE_FAILED&quot;,<br>                failure_message=str(e)<br>            )<br>            return</pre><pre>    except Exception as e:<br>        # Catch-all for unexpected failures<br>        dlq_producer.send_to_dlq(<br>            message,<br>            failure_reason=&quot;UNKNOWN&quot;,<br>            failure_message=str(e)<br>        )<br>        logger.exception(f&quot;Unexpected error processing message: {e}&quot;)<br></pre><pre>def validate_transit_event(event: dict) -&gt; list[str]:<br>    errors = []<br>    if not event.get(&quot;trip_id&quot;):<br>        errors.append(&quot;trip_id is null or empty&quot;)<br>    if not event.get(&quot;route_id&quot;):<br>        errors.append(&quot;route_id is null or empty&quot;)<br>    if event.get(&quot;delay_seconds&quot;) is not None and abs(event[&quot;delay_seconds&quot;]) &gt; 3600:<br>        errors.append(f&quot;delay_seconds {event[&#39;delay_seconds&#39;]} exceeds plausible range&quot;)<br>    return errors</pre><h3>The Consumer Loop</h3><pre>def run_consumer(<br>    bootstrap_servers: str,<br>    source_topic: str,<br>    dlq_topic: str,<br>    consumer_group: str<br>):<br>    consumer = Consumer({<br>        &quot;bootstrap.servers&quot;: bootstrap_servers,<br>        &quot;group.id&quot;: consumer_group,<br>        &quot;auto.offset.reset&quot;: &quot;earliest&quot;,<br>        &quot;enable.auto.commit&quot;: False,  # Manual commit ONLY after processing<br>    })</pre><pre>    dlq_producer = DLQProducer(bootstrap_servers, dlq_topic, consumer_group)</pre><pre>    consumer.subscribe([source_topic])</pre><pre>    try:<br>        while True:<br>            msg = consumer.poll(timeout=1.0)</pre><pre>            if msg is None:<br>                continue</pre><pre>            if msg.error():<br>                if msg.error().code() == KafkaError._PARTITION_EOF:<br>                    continue<br>                else:<br>                    logger.error(f&quot;Consumer error: {msg.error()}&quot;)<br>                    continue</pre><pre>            process_transit_event(msg, dlq_producer)</pre><pre>            # Commit AFTER processing (or sending to DLQ)<br>            # This ensures at-least-once delivery<br>            consumer.commit(asynchronous=False)</pre><pre>    finally:<br>        dlq_producer.flush()<br>        consumer.close()</pre><p>The key detail: commit only after the message has been either successfully processed or sent to the DLQ. If your process crashes mid-handling, the message will be redelivered — which is correct behavior.</p><h3>Slack Alerting on DLQ Volume Spikes</h3><p>A DLQ is only useful if someone knows when it’s filling up. I use a separate monitoring job that tails the DLQ topic and sends Slack alerts when failure rate exceeds a threshold.</p><pre>import requests<br>from collections import defaultdict, deque<br>from datetime import datetime, timedelta<br></pre><pre>class DLQMonitor:<br>    def __init__(self, slack_webhook_url: str, alert_threshold: int = 10):<br>        self.slack_webhook_url = slack_webhook_url<br>        self.alert_threshold = alert_threshold<br>        self.failure_counts = defaultdict(lambda: deque(maxlen=100))<br>        self.last_alert_time = {}</pre><pre>    def record_failure(self, failure_reason: str, source_topic: str):<br>        key = f&quot;{source_topic}:{failure_reason}&quot;<br>        self.failure_counts[key].append(datetime.utcnow())<br>        self._check_alert(key, failure_reason, source_topic)</pre><pre>    def _check_alert(self, key: str, failure_reason: str, source_topic: str):<br>        # Count failures in the last 5 minutes<br>        cutoff = datetime.utcnow() - timedelta(minutes=5)<br>        recent_failures = sum(<br>            1 for ts in self.failure_counts[key] if ts &gt; cutoff<br>        )</pre><pre>        # Alert if threshold exceeded and no alert sent in last 15 minutes<br>        last_alert = self.last_alert_time.get(key, datetime.min)<br>        if (recent_failures &gt;= self.alert_threshold and<br>                datetime.utcnow() - last_alert &gt; timedelta(minutes=15)):<br>            self._send_slack_alert(failure_reason, source_topic, recent_failures)<br>            self.last_alert_time[key] = datetime.utcnow()</pre><pre>    def _send_slack_alert(self, failure_reason: str, source_topic: str, count: int):<br>        message = {<br>            &quot;text&quot;: f&quot;🚨 *DLQ Alert* — `{source_topic}`&quot;,<br>            &quot;attachments&quot;: [{<br>                &quot;color&quot;: &quot;danger&quot;,<br>                &quot;fields&quot;: [<br>                    {&quot;title&quot;: &quot;Failure Reason&quot;, &quot;value&quot;: failure_reason, &quot;short&quot;: True},<br>                    {&quot;title&quot;: &quot;Count (last 5 min)&quot;, &quot;value&quot;: str(count), &quot;short&quot;: True},<br>                    {&quot;title&quot;: &quot;Time&quot;, &quot;value&quot;: datetime.utcnow().strftime(&quot;%Y-%m-%d %H:%M UTC&quot;), &quot;short&quot;: True},<br>                    {&quot;title&quot;: &quot;Action&quot;, &quot;value&quot;: &quot;Check DLQ topic for details&quot;, &quot;short&quot;: False},<br>                ]<br>            }]<br>        }</pre><pre>        try:<br>            response = requests.post(<br>                self.slack_webhook_url,<br>                json=message,<br>                timeout=5<br>            )<br>            response.raise_for_status()<br>        except Exception as e:<br>            logger.error(f&quot;Failed to send Slack alert: {e}&quot;)</pre><h3>Replaying DLQ Events</h3><p>The whole point of the DLQ is that you can replay events after fixing the underlying issue. A replay script reads from the DLQ, filters by failure reason or time range, and re-produces the original events to the source topic:</p><pre>def replay_dlq_events(<br>    bootstrap_servers: str,<br>    dlq_topic: str,<br>    source_topic: str,<br>    failure_reason_filter: str = None,<br>    since_timestamp: int = None<br>):<br>    consumer = Consumer({<br>        &quot;bootstrap.servers&quot;: bootstrap_servers,<br>        &quot;group.id&quot;: &quot;dlq-replay-job&quot;,<br>        &quot;auto.offset.reset&quot;: &quot;earliest&quot;,<br>        &quot;enable.auto.commit&quot;: False,<br>    })</pre><pre>    producer = Producer({&quot;bootstrap.servers&quot;: bootstrap_servers})<br>    consumer.subscribe([dlq_topic])</pre><pre>    replayed = 0<br>    skipped = 0</pre><pre>    try:<br>        while True:<br>            msg = consumer.poll(timeout=5.0)<br>            if msg is None:<br>                break</pre><pre>            dlq_record = json.loads(msg.value())</pre><pre>            # Apply filters<br>            if failure_reason_filter and dlq_record[&quot;failure_reason&quot;] != failure_reason_filter:<br>                skipped += 1<br>                continue</pre><pre>            if since_timestamp and dlq_record[&quot;failure_timestamp&quot;] &lt; since_timestamp:<br>                skipped += 1<br>                continue</pre><pre>            # Re-produce original event to source topic<br>            producer.produce(<br>                topic=source_topic,<br>                key=dlq_record.get(&quot;original_key&quot;),<br>                value=bytes(dlq_record[&quot;original_value&quot;]),<br>            )<br>            replayed += 1</pre><pre>    finally:<br>        producer.flush()<br>        consumer.close()<br>        logger.info(f&quot;Replay complete: {replayed} events replayed, {skipped} skipped&quot;)</pre><h3>Operational Tips</h3><p><strong>Use a separate consumer group for DLQ monitoring.</strong> Don’t reuse the main consumer group — you want to be able to tail the DLQ independently without affecting main pipeline offsets.</p><p><strong>Set a retention policy on the DLQ topic.</strong> 7–14 days is usually sufficient. Use compaction only if your events have meaningful keys; otherwise, time-based retention is cleaner.</p><p><strong>Alert on DLQ <em>rate</em>, not just DLQ <em>size</em>.</strong> A spike in failures in the last 5 minutes is more actionable than “the DLQ has 10,000 records.” The size accumulates over time; the rate tells you something is actively wrong.</p><p><strong>Never auto-replay from DLQ without human review.</strong> The whole point of routing to a DLQ is that something needs investigation. Automated replay without a fix in place just moves the problem around.</p><h3>Takeaway</h3><p>A dead-letter queue isn’t just a safety net — it’s observability infrastructure. It transforms “the pipeline is dropping records” from a mystery into a structured log you can query, alert on, and replay from.</p><p>Build it before you need it. Wire the Slack alert. The first time your DLQ fires and you can tell a stakeholder exactly what failed, why, and when you’ll have it fixed, you’ll be glad you did.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=d37c411675ec" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[dbt Tests That Actually Catch Real Data Quality Issues (Not Just Null Checks)]]></title>
            <link>https://medium.com/@mansi.bhadani31/dbt-tests-that-actually-catch-real-data-quality-issues-not-just-null-checks-8cdcd1a65e3e?source=rss-d90757d5d287------2</link>
            <guid isPermaLink="false">https://medium.com/p/8cdcd1a65e3e</guid>
            <category><![CDATA[dbt]]></category>
            <category><![CDATA[range-validation]]></category>
            <category><![CDATA[dbt-test]]></category>
            <category><![CDATA[data-quality]]></category>
            <category><![CDATA[data-engineer]]></category>
            <dc:creator><![CDATA[Mansi Bhadani]]></dc:creator>
            <pubDate>Wed, 01 Apr 2026 20:21:52 GMT</pubDate>
            <atom:updated>2026-04-01T20:21:52.028Z</atom:updated>
            <content:encoded><![CDATA[<p><em>Beyond not_null and unique: building custom dbt tests for range validation, cross-table referential integrity, and statistical drift detection.</em></p><p>Every dbt project I’ve seen starts the same way: someone adds not_null and unique tests to the primary key column, runs dbt test, sees green, and calls it &quot;tested.&quot;</p><p>Then three months later a downstream dashboard shows negative revenue or a count that’s off by 30%, and the investigation reveals the dbt tests were checking the wrong things.</p><p>I’ve built data quality frameworks on top of dbt for two projects now — a mortgage risk pipeline and the NJ Transit streaming system — and the tests that actually caught real production issues weren’t the generic ones. They were custom tests built around the specific failure modes of those pipelines.</p><p>This is a walkthrough of those tests, how they work, and when to use them.</p><h3>The Tests That Don’t Catch Real Issues</h3><p>Let’s be clear about what generic tests <em>can</em> catch, so we know what gap we’re filling.</p><p>not_null catches missing required fields. unique catches duplicate primary keys. accepted_values catches invalid enum values. relationships catches broken foreign keys. These are all worth running. They catch a class of issues.</p><p>What they don’t catch:</p><ul><li>A value that’s present, unique, and valid — but statistically wrong (LTV ratio of 0.002 instead of 2.0)</li><li>A column whose distribution has drifted significantly between runs</li><li>A referential relationship that’s technically valid but logically broken (matching IDs across tables that were joined on the wrong key)</li><li>Aggregations that are technically non-null but wrong by 40%</li></ul><p>That’s the gap. Here’s how to fill it.</p><h3>Test 1: Range Validation with Context</h3><p>The most common real-world data quality failure I’ve seen isn’t nulls — it’s values outside their expected business range. A loan-to-value ratio of 400 is technically a float, passes not_null, but represents a data pipeline bug (usually a unit conversion error or a denominator-zero edge case).</p><p>Generic dbt doesn’t have a built-in range test, but it’s trivial to write:</p><pre>-- tests/generic/assert_column_in_range.sql<br>{% test assert_column_in_range(model, column_name, min_value, max_value) %}</pre><pre>SELECT *<br>FROM {{ model }}<br>WHERE {{ column_name }} IS NOT NULL<br>  AND (<br>    {{ column_name }} &lt; {{ min_value }}<br>    OR {{ column_name }} &gt; {{ max_value }}<br>  )</pre><pre>{% endtest %}</pre><p>Apply it in your schema.yml:</p><pre>models:<br>  - name: mortgage_features<br>    columns:<br>      - name: loan_to_value_ratio<br>        tests:<br>          - assert_column_in_range:<br>              min_value: 0.01<br>              max_value: 2.0<br>      - name: interest_rate<br>        tests:<br>          - assert_column_in_range:<br>              min_value: 0.001<br>              max_value: 0.25<br>      - name: credit_score<br>        tests:<br>          - assert_column_in_range:<br>              min_value: 300<br>              max_value: 850</pre><p>The test fails if any row has a value outside the range. The query returns those rows, so you can inspect them directly.</p><p><strong>Why this catches real issues:</strong> In the mortgage pipeline, this test caught a batch where LTV ratios had been accidentally divided by 100 during a feature transformation refactor. The values were present, non-null, and unique — but completely wrong. not_null would never have caught it.</p><h3>Test 2: Cross-Table Referential Integrity with Business Logic</h3><p>dbt’s built-in relationships test checks that a foreign key exists in the referenced table. That&#39;s necessary but not sufficient. What you often need is a join that validates business logic, not just key existence.</p><p>Example: in the transit pipeline, every trip event should have a corresponding schedule entry. A foreign key check ensures the trip_id exists in the schedule table — but it doesn&#39;t check that the event timestamp falls within the scheduled window for that trip.</p><pre>-- tests/generic/assert_event_within_schedule_window.sql<br>{% test assert_event_within_schedule_window(model, trip_id_col, event_time_col, buffer_minutes=30) %}</pre><pre>SELECT e.*<br>FROM {{ model }} e<br>LEFT JOIN {{ ref(&#39;dim_schedule&#39;) }} s<br>  ON e.{{ trip_id_col }} = s.trip_id<br>WHERE s.trip_id IS NULL<br>   OR e.{{ event_time_col }} &lt; DATEADD(&#39;minute&#39;, -{{ buffer_minutes }}, s.scheduled_departure)<br>   OR e.{{ event_time_col }} &gt; DATEADD(&#39;minute&#39;, {{ buffer_minutes }}, s.scheduled_arrival)</pre><pre>{% endtest %}</pre><p>This returns events that either have no matching schedule or fall outside the expected time window by more than the buffer. A standard relationships test would pass; this one catches the logical mismatch.</p><h3>Test 3: Row Count Drift Detection</h3><p>One of the most useful tests I’ve built is a row count sanity check. For tables that are loaded incrementally, a batch that’s 50% smaller than the previous batch is usually a bug — a partition filter issue, a source system outage, or a pipeline logic error.</p><pre>-- tests/generic/assert_row_count_within_threshold.sql<br>{% test assert_row_count_within_threshold(model, date_column, lookback_days=7, min_ratio=0.5, max_ratio=2.0) %}</pre><pre>WITH daily_counts AS (<br>    SELECT<br>        DATE_TRUNC(&#39;day&#39;, {{ date_column }}) AS load_date,<br>        COUNT(*) AS row_count<br>    FROM {{ model }}<br>    WHERE {{ date_column }} &gt;= DATEADD(&#39;day&#39;, -({{ lookback_days }} + 1), CURRENT_DATE)<br>    GROUP BY 1<br>),<br>with_lag AS (<br>    SELECT<br>        load_date,<br>        row_count,<br>        LAG(row_count) OVER (ORDER BY load_date) AS prev_row_count<br>    FROM daily_counts<br>)<br>SELECT *<br>FROM with_lag<br>WHERE prev_row_count IS NOT NULL<br>  AND (<br>    row_count &lt; prev_row_count * {{ min_ratio }}<br>    OR row_count &gt; prev_row_count * {{ max_ratio }}<br>  )</pre><pre>{% endtest %}</pre><p>Apply it to any incrementally loaded model:</p><pre>- name: fct_transit_events<br>  tests:<br>    - assert_row_count_within_threshold:<br>        date_column: event_timestamp<br>        lookback_days: 7<br>        min_ratio: 0.6<br>        max_ratio: 1.5</pre><p>This test fails if any day in the past week had a row count less than 60% or more than 150% of the previous day. Adjust the ratios for tables with known weekend/weekday variance.</p><p><strong>Why this matters:</strong> This test caught two separate issues in the transit pipeline — once when a Kafka consumer group fell behind and batches were being dropped, and once when a Spark watermark was configured too aggressively and late-arriving events were being discarded.</p><h3>Test 4: Statistical Distribution Drift</h3><p>For ML feature tables, row count drift isn’t enough. You need to know if the <em>distribution</em> of a column has shifted significantly — which indicates a data quality issue, a schema change upstream, or model drift.</p><p>This is a simplified Z-score test comparing the current period’s mean and stddev against a historical baseline:</p><pre>-- tests/generic/assert_column_distribution_stable.sql<br>{% test assert_column_distribution_stable(model, column_name, date_column, zscore_threshold=3.0, lookback_days=30) %}</pre><pre>WITH historical_stats AS (<br>    SELECT<br>        AVG({{ column_name }}) AS hist_mean,<br>        STDDEV({{ column_name }}) AS hist_stddev<br>    FROM {{ model }}<br>    WHERE {{ date_column }} BETWEEN<br>        DATEADD(&#39;day&#39;, -({{ lookback_days }} + 7), CURRENT_DATE)<br>        AND DATEADD(&#39;day&#39;, -7, CURRENT_DATE)<br>),<br>current_stats AS (<br>    SELECT<br>        AVG({{ column_name }}) AS curr_mean<br>    FROM {{ model }}<br>    WHERE {{ date_column }} &gt;= DATEADD(&#39;day&#39;, -7, CURRENT_DATE)<br>),<br>zscore_calc AS (<br>    SELECT<br>        ABS(c.curr_mean - h.hist_mean) / NULLIF(h.hist_stddev, 0) AS zscore<br>    FROM current_stats c, historical_stats h<br>)<br>SELECT *<br>FROM zscore_calc<br>WHERE zscore &gt; {{ zscore_threshold }}</pre><pre>{% endtest %}</pre><p>Apply to key feature columns:</p><pre>- name: mortgage_features<br>  tests:<br>    - assert_column_distribution_stable:<br>        column_name: debt_to_income_ratio<br>        date_column: origination_date<br>        zscore_threshold: 3.0<br>        lookback_days: 30</pre><p>If the current week’s mean is more than 3 standard deviations from the 30-day historical mean, the test fails. This catches upstream data issues that would silently degrade model performance.</p><h3>Test 5: Aggregation Reconciliation</h3><p>For financial pipelines, you often need to ensure that a fact table’s aggregations reconcile with a source-of-truth. This is a source-to-target reconciliation test:</p><pre>-- tests/generic/assert_aggregate_reconciles.sql<br>{% test assert_aggregate_reconciles(model, agg_column, source_model, source_agg_column, join_column, tolerance=0.01) %}</pre><pre>WITH model_agg AS (<br>    SELECT<br>        {{ join_column }},<br>        SUM({{ agg_column }}) AS model_total<br>    FROM {{ model }}<br>    GROUP BY {{ join_column }}<br>),<br>source_agg AS (<br>    SELECT<br>        {{ join_column }},<br>        SUM({{ source_agg_column }}) AS source_total<br>    FROM {{ source_model }}<br>    GROUP BY {{ join_column }}<br>)<br>SELECT m.{{ join_column }},<br>       m.model_total,<br>       s.source_total,<br>       ABS(m.model_total - s.source_total) / NULLIF(s.source_total, 0) AS pct_diff<br>FROM model_agg m<br>JOIN source_agg s USING ({{ join_column }})<br>WHERE ABS(m.model_total - s.source_total) / NULLIF(s.source_total, 0) &gt; {{ tolerance }}</pre><pre>{% endtest %}</pre><p>This fails if any grouping key has a discrepancy greater than the tolerance (default 1%). Useful for ensuring that a transformed fact table’s totals reconcile with the raw source.</p><h3>Putting It Together: A Testing Strategy</h3><p>Generic tests (not_null, unique, relationships) should cover your staging models — catching issues at the source layer before they propagate.</p><p>Custom tests (range_validation, distribution_drift, row_count_drift) should cover your mart and feature models — catching semantic issues that only become visible after transformation.</p><p>A practical structure:</p><pre># models/staging/schema.yml<br>models:<br>  - name: stg_loan_applications<br>    columns:<br>      - name: application_id<br>        tests: [not_null, unique]<br>      - name: applicant_ssn_hash<br>        tests: [not_null]</pre><pre># models/marts/schema.yml  <br>models:<br>  - name: fct_loan_features<br>    tests:<br>      - assert_row_count_within_threshold:<br>          date_column: load_date<br>    columns:<br>      - name: loan_to_value_ratio<br>        tests:<br>          - assert_column_in_range:<br>              min_value: 0.01<br>              max_value: 2.0<br>          - assert_column_distribution_stable:<br>              date_column: origination_date</pre><h3>Run Tests in CI, Alert on Failure</h3><p>None of this matters if you only run tests manually. Wire dbt test into your Airflow DAG after every model run:</p><pre># dags/dbt_pipeline.py<br>run_models = BashOperator(<br>    task_id=&quot;dbt_run&quot;,<br>    bash_command=&quot;dbt run --select marts.+&quot;<br>)</pre><pre>test_models = BashOperator(<br>    task_id=&quot;dbt_test&quot;,<br>    bash_command=&quot;dbt test --select marts.+ --store-failures&quot;<br>)</pre><pre>run_models &gt;&gt; test_models</pre><p>With --store-failures, dbt writes failing rows to a dbt_test_failures schema in your warehouse. When a test fails, you can query those rows directly to understand what happened.</p><h3>Takeaway</h3><p>The not_null and unique tests are table stakes. They catch a real class of issues and you should run them. But they don&#39;t catch the issues that actually hurt you in production — wrong values, drifting distributions, aggregation discrepancies.</p><p>Build custom tests around your pipeline’s specific failure modes. Start with range validation (catches unit errors) and row count drift (catches pipeline failures). Add distribution drift if you’re feeding ML models. Add aggregation reconciliation if you’re working with financial data.</p><p>The tests that matter are the ones that would have caught the last bug.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=8cdcd1a65e3e" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[How I Passed the SnowPro Core Exam in 3 Weeks While in Grad School]]></title>
            <link>https://medium.com/@mansi.bhadani31/how-i-passed-the-snowpro-core-exam-in-3-weeks-while-in-grad-school-f1edba912680?source=rss-d90757d5d287------2</link>
            <guid isPermaLink="false">https://medium.com/p/f1edba912680</guid>
            <category><![CDATA[careers]]></category>
            <category><![CDATA[snowflake]]></category>
            <category><![CDATA[snowpro-core-exam]]></category>
            <category><![CDATA[data]]></category>
            <dc:creator><![CDATA[Mansi Bhadani]]></dc:creator>
            <pubDate>Wed, 01 Apr 2026 20:21:20 GMT</pubDate>
            <atom:updated>2026-04-01T20:21:20.290Z</atom:updated>
            <content:encoded><![CDATA[<p><em>Study plan, resource list, the 5 topic areas that actually appear on the exam, and the one practice test worth paying for.</em></p><p>I passed the SnowPro Core Certification exam in November 2024 while taking 12 credits at Pace University and working on the NJ Transit capstone pipeline. Total study time: 3 weeks, roughly 1–2 hours per evening.</p><p>This guide is what I wish someone had handed me before I started. Not the “read all 800 pages of Snowflake documentation” advice — the actual focused plan that gets you through the exam without burning out.</p><h3>What the SnowPro Core Exam Actually Tests</h3><p>The exam is 100 questions, 115 minutes, passing score 750/1000. Snowflake publishes an official exam guide, but it’s broad enough to be almost useless for prioritization.</p><p>Here’s what actually shows up based on my experience and the experience of other candidates I’ve talked to:</p><p><strong>1. Snowflake Architecture (~20–25% of questions)</strong> Virtual warehouses, micro-partitioning, clustering keys, the storage/compute/cloud services layer separation. You need to understand how each layer works and why this architecture enables independent scaling.</p><p><strong>2. Virtual Warehouses and Performance (~15–20%)</strong> Warehouse sizes, auto-suspend and auto-resume, multi-cluster warehouses, query acceleration service, result caching vs. local disk caching vs. remote disk caching. The three caching layers are a common question area.</p><p><strong>3. Data Loading and Unloading (~15%)</strong> COPY INTO, stages (internal vs. external), file formats, VARIANT for semi-structured data, FLATTEN for JSON/Parquet arrays. Know the difference between COPY INTO &lt;table&gt; and COPY INTO &lt;location&gt;.</p><p><strong>4. Data Sharing and Collaboration (~10–15%)</strong> Secure data sharing, data marketplace, reader accounts, direct share vs. listing. This gets more questions than you’d expect — it’s a Snowflake differentiator so they emphasize it.</p><p><strong>5. Security and Access Control (~15–20%)</strong> Role-based access control, DAC vs. MAC, column-level security, row access policies, dynamic data masking, network policies. This is heavily tested and people underestimate it.</p><p>The remaining questions are spread across: account administration, performance optimization (query profiling, clustering), Time Travel and Fail-safe, and Snowflake editions.</p><h3>The 3-Week Study Plan</h3><h3>Week 1: Architecture and Core Concepts</h3><p><strong>Goal:</strong> Understand Snowflake’s architecture deeply enough to answer “why” questions, not just “what” questions.</p><p><strong>What to study:</strong></p><ul><li>Snowflake Architecture documentation (the official docs on multi-cluster shared data architecture)</li><li>Virtual warehouses: sizes, credit consumption, auto-suspend behavior</li><li>Micro-partitioning: how Snowflake stores data, natural clustering vs. explicit clustering keys</li><li>The three caching layers: result cache (24hr, query-level), local SSD cache (warehouse-level), remote storage cache</li></ul><p><strong>A question you should be able to answer cold:</strong> A user runs the same query twice in 5 minutes. The second query returns instantly. Which cache is responsible?</p><p>Answer: Result cache — query results are cached for 24 hours if the underlying data hasn’t changed.</p><p><strong>Time commitment:</strong> 45–60 min/day, 5 days</p><h3>Week 2: Loading, Security, and Data Sharing</h3><p><strong>Goal:</strong> Know the operational details that appear as specific scenario questions.</p><p><strong>Data Loading — what to know:</strong></p><ul><li>Stages: user stage (@~), table stage (@%tablename), named stage (@stagename)</li><li>File formats: CSV options that affect COPY INTO behavior (SKIP_HEADER, NULL_IF, EMPTY_FIELD_AS_NULL)</li><li>Semi-structured: VARIANT column, PARSE_JSON, FLATTEN, the : and :: operators</li></ul><pre>-- Know how to query nested JSON in VARIANT columns<br>SELECT<br>    src:event_type::string AS event_type,<br>    src:payload:trip_id::string AS trip_id,<br>    f.value::float AS delay_seconds<br>FROM transit_events,<br>LATERAL FLATTEN(input =&gt; src:delays) f</pre><p><strong>Security — what to know:</strong></p><ul><li>RBAC hierarchy: ORGADMIN → ACCOUNTADMIN → SYSADMIN → USERADMIN → PUBLIC</li><li>The difference between a role and a privilege</li><li>Column-level security (column masking policies) vs. row-level security (row access policies)</li><li>Network policies: IP allow/block lists at account and user level</li></ul><p><strong>Data Sharing — what to know:</strong></p><ul><li>A data share is a named object containing database objects to be shared</li><li>Consumers don’t copy data — they query the provider’s storage directly</li><li>Reader accounts: Snowflake-managed accounts for non-Snowflake customers</li><li>Data marketplace vs. private listing</li></ul><p><strong>Time commitment:</strong> 60 min/day, 5 days</p><h3>Week 3: Performance, Time Travel, and Practice Exams</h3><p><strong>Goal:</strong> Solidify weaker areas, burn through practice questions, identify gaps.</p><p><strong>Performance — what to know:</strong></p><ul><li>Query profiling in the web UI: which operators consume the most time</li><li>Clustering keys vs. natural clustering: when to add an explicit clustering key</li><li>Materialized views vs. regular views vs. dynamic tables</li><li>Query acceleration service: for large, irregular queries with partial scans</li></ul><p><strong>Time Travel and Fail-safe:</strong></p><ul><li>Time Travel: 0–90 days depending on edition (Standard = 1 day max, Enterprise = 90 days max)</li><li>AT and BEFORE clauses for querying historical data</li><li>UNDROP TABLE/SCHEMA/DATABASE within Time Travel window</li><li>Fail-safe: additional 7-day recovery period managed by Snowflake (not user-accessible)</li></ul><p><strong>The key distinction:</strong> Time Travel is for you. Fail-safe is for Snowflake. You can query Time Travel data yourself; you need to contact Snowflake Support to recover data from Fail-safe.</p><p><strong>Time commitment:</strong> 30–45 min content, 30–45 min practice questions, 5 days</p><h3>Resources: What to Use and What to Skip</h3><h3>Use These</h3><p><strong>Snowflake Official Documentation</strong> The actual source of truth. For topics you don’t understand from practice questions, go here. Don’t try to read it end-to-end — use it as a reference.</p><p><strong>Udemy: SnowPro Core Certification Course (Nikolai Schuler)</strong> This is the one paid resource worth buying. ~$15–20 on sale (they’re almost always on sale). The video explanations of the architecture layers and caching hierarchy are genuinely clearer than the official docs.</p><p><strong>Snowflake’s Sample Questions (Official)</strong> Snowflake publishes sample questions on their certification page. Do these first to understand the question style.</p><p><strong>ExamTopics SnowPro Core</strong> Free, community-sourced practice questions. Quality varies — some questions are outdated or debated in the comments. Use for volume practice, but don’t treat every answer as authoritative.</p><h3>Skip These</h3><p>Any study guide that’s more than 18 months old — Snowflake moves fast and some features have changed significantly (Dynamic Tables, Cortex, data marketplace changes).</p><p>Video courses that are 20+ hours. You don’t have time, and the marginal value past 8–10 hours of video is low.</p><h3>The Practice Test Worth Paying For</h3><p><strong>SnowPro Core Practice Exams by David Fradin on Udemy</strong> (~$15 on sale)</p><p>This is the one I used the week before the exam. 300+ questions across multiple practice exams, explanations for every answer, questions that are close to actual exam difficulty and style.</p><p>Don’t buy it to pass — buy it to identify gaps. If you’re getting 70–75% on these practice exams, you’re in the range to pass the real exam. If you’re under 65%, you have specific areas to shore up.</p><h3>The 10 Concepts Most Likely to Appear</h3><p>If I had to bet on what shows up on your exam, I’d put chips on these:</p><ol><li><strong>Caching layers</strong> — result cache vs. warehouse cache vs. storage cache, and what invalidates each</li><li><strong>Time Travel retention periods</strong> — which edition gets how many days, and the AT/BEFORE syntax</li><li><strong>Fail-safe</strong> — 7 days, Snowflake-managed, not user-accessible</li><li><strong>Micro-partitioning</strong> — ~16MB compressed, automatic, metadata-driven pruning</li><li><strong>Clustering keys</strong> — when to add them, what columns make good clustering keys, cost implications</li><li><strong>COPY INTO options</strong> — ON_ERROR behavior (ABORT_STATEMENT, CONTINUE, SKIP_FILE), PURGE</li><li><strong>Role hierarchy</strong> — which default role does what, USERADMIN vs. SYSADMIN responsibilities</li><li><strong>Data sharing</strong> — share object model, consumer doesn’t copy data, reader accounts</li><li><strong>Multi-cluster warehouses</strong> — scaling policy (Economy vs. Standard), when to use vs. larger single warehouse</li><li><strong>VARIANT and FLATTEN</strong> — querying semi-structured data, the lateral flatten pattern</li></ol><h3>Exam Day Tips</h3><p><strong>Flag and move on.</strong> If you’re unsure, flag the question and keep going. The exam is 115 minutes for 100 questions — you have time to come back.</p><p><strong>Eliminate obviously wrong answers.</strong> Snowflake exam questions often have two plausible answers and two clearly wrong ones. Getting to a 50/50 and guessing is better than burning 5 minutes trying to be certain.</p><p><strong>Watch for edition-specific features.</strong> Many questions have “this feature requires X edition” as the distinguishing factor. Know that Dynamic Data Masking, Row Access Policies, and multi-cluster warehouses require Enterprise edition or higher.</p><p><strong>Trust your first instinct on architecture questions.</strong> These have clear right answers rooted in how Snowflake actually works. If you’ve studied the architecture well, your gut is usually right.</p><h3>After the Exam</h3><p>The exam result is shown immediately after submission. If you pass, your badge is issued within 48 hours via Credly.</p><p>The certification is valid for 2 years, after which you can take a shorter renewal exam.</p><h3>My Actual Study Schedule (Week by Week)</h3><p><strong>Week 1 (Architecture):</strong> Schuler Udemy course chapters 1–5 (architecture, virtual warehouses, storage). 1 hour/day on my commute or after class.</p><p><strong>Week 2 (Operations):</strong> Schuler Udemy chapters 6–10 (loading, security, sharing). Official Snowflake docs for anything unclear.</p><p><strong>Week 3 (Practice):</strong> Fradin practice exams. Every wrong answer → look up the official doc. 30 minutes of new content, 30 minutes of practice questions. Last 2 days: full practice exam under timed conditions.</p><p>Total hours: ~25–30 hours over 3 weeks.</p><h3>Takeaway</h3><p>The SnowPro Core is not a hard exam if you study the right things. The architecture and caching layers are the foundation — if you understand <em>why</em> Snowflake’s design choices exist, a lot of the specific feature behavior becomes logical rather than memorizable.</p><p>Don’t study everything. Study architecture deeply, security and data sharing thoroughly, and use practice exams to find your gaps in Week 3.</p><p>Good luck — and yes, the SnowPro Core is worth putting on your resume.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f1edba912680" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Apache Iceberg’s Hidden Superpower: Time-Travel Queries in Production]]></title>
            <link>https://medium.com/@mansi.bhadani31/apache-icebergs-hidden-superpower-time-travel-queries-in-production-708df1b8fe7d?source=rss-d90757d5d287------2</link>
            <guid isPermaLink="false">https://medium.com/p/708df1b8fe7d</guid>
            <category><![CDATA[snowflake]]></category>
            <category><![CDATA[time-travel-queries]]></category>
            <category><![CDATA[apache-iceberg]]></category>
            <category><![CDATA[data-engineer]]></category>
            <dc:creator><![CDATA[Mansi Bhadani]]></dc:creator>
            <pubDate>Tue, 31 Mar 2026 19:28:44 GMT</pubDate>
            <atom:updated>2026-03-31T19:28:44.574Z</atom:updated>
            <content:encoded><![CDATA[<p><em>How I migrated 10TB of mortgage data from Snowflake to Iceberg on S3, what I gained, what broke, and why snapshot isolation changed how analysts work.</em></p><p>Most people hear “Apache Iceberg” and think: <em>open table format, better than Parquet, replaces Hive</em>. That’s true, but it undersells the feature that actually changed how the analysts I worked with queried data: <strong>time-travel</strong>.</p><p>Not the marketing version — “query historical data!” — but the production reality: snapshot isolation, zero-copy branching, and the ability to surgically rewind a table to any point in its history without ETL reruns or backup restores.</p><p>This is the story of migrating 10TB of mortgage risk data from Snowflake to Apache Iceberg on S3, what broke, what we gained, and the specific moment time-travel went from a demo feature to a workflow dependency.</p><h3>Why We Left Snowflake for Iceberg</h3><p>The mortgage risk pipeline ran batch feature engineering on 1M+ loan records using PySpark, producing outputs consumed by ML models for prepayment risk prediction. It lived in Snowflake, which worked fine — until it didn’t.</p><p>Three pain points drove the migration:</p><p><strong>1. Storage costs at scale.</strong> Snowflake’s compressed columnar storage is efficient, but at 10TB with 12-month retention requirements, the cost was significant. S3 with Iceberg cut storage costs by 60% — the same data, same query performance, fraction of the price.</p><p><strong>2. Vendor lock-in on the lakehouse layer.</strong> The ML team wanted to query the same tables directly from PySpark without routing through Snowflake’s Spark connector. Iceberg on S3 with the Iceberg REST catalog meant any engine (Spark, Trino, Athena, DuckDB) could query the same tables natively.</p><p><strong>3. Schema evolution in production.</strong> Snowflake handles schema evolution, but not gracefully across external tools. Adding a new feature column to a 1M-row table in Snowflake meant a full table copy. Iceberg handles it with metadata-only operations.</p><h3>How Iceberg Time-Travel Actually Works</h3><p>Before getting into the migration story, it’s worth understanding the mechanics — because the “hidden superpower” only makes sense once you see what’s under the hood.</p><p>Every Iceberg write operation creates a new <strong>snapshot</strong>. A snapshot is an immutable record of the table state at that point in time — which files belong to the table, which have been deleted, what the schema was.</p><pre>Table metadata pointer<br>  └── snapshot-004 (current)  ← latest write<br>        └── manifest-list<br>              ├── manifest-A (data-file-01.parquet, data-file-02.parquet)<br>              └── manifest-B (data-file-03.parquet)<br>  └── snapshot-003<br>  └── snapshot-002<br>  └── snapshot-001  ← table creation</pre><p>Time-travel works by pointing your query at a historical snapshot instead of the current one. No data is copied, no backup is restored — you’re just reading from a different snapshot in the same metadata chain.</p><pre># Read the table as it existed at a specific timestamp<br>df = spark.read \<br>    .option(&quot;as-of-timestamp&quot;, &quot;2025-08-01T00:00:00&quot;) \<br>    .format(&quot;iceberg&quot;) \<br>    .load(&quot;s3://my-bucket/mortgage-features&quot;)</pre><pre># Or by snapshot ID<br>df = spark.read \<br>    .option(&quot;snapshot-id&quot;, 4847894586449951480) \<br>    .format(&quot;iceberg&quot;) \<br>    .load(&quot;s3://my-bucket/mortgage-features&quot;)</pre><p>That’s it. No special infrastructure. Just a read option.</p><h3>The Migration: What Broke</h3><p>The migration itself was a full snapshot export from Snowflake and re-ingestion into Iceberg on S3. Here’s what we hit:</p><h3>Problem 1: Timestamp Precision Mismatch</h3><p>Snowflake stores TIMESTAMP_NTZ with nanosecond precision internally. When we exported to Parquet and re-read in Spark, timestamps were truncating to microseconds. Downstream models that used timestamp features as inputs started producing slightly different outputs.</p><p><strong>Fix:</strong> Explicitly cast timestamps during migration:</p><pre>from pyspark.sql.functions import col, to_timestamp</pre><pre>df = spark.read.parquet(&quot;s3://export-bucket/snowflake-dump/&quot;) \<br>    .withColumn(&quot;origination_date&quot;, <br>                col(&quot;origination_date&quot;).cast(&quot;timestamp&quot;))</pre><h3>Problem 2: Null Handling in Partition Columns</h3><p>Iceberg supports null values in partition columns; Hive-style partitioning does not. When we migrated a table partitioned by loan_state, the ~0.3% of records with null states caused silent failures — Spark wrote them to a __HIVE_DEFAULT_PARTITION__ path that Iceberg&#39;s reader didn&#39;t recognize.</p><p><strong>Fix:</strong> Either filter nulls before writing, or use Iceberg’s native null partition handling:</p><pre># Write with Iceberg&#39;s partition spec that handles nulls correctly<br>spark.sql(&quot;&quot;&quot;<br>    CREATE TABLE iceberg_catalog.mortgage.features<br>    USING iceberg<br>    PARTITIONED BY (loan_state)<br>    AS SELECT * FROM staging_table<br>&quot;&quot;&quot;)</pre><h3>Problem 3: Snapshot Accumulation</h3><p>Three weeks into production, metadata queries started slowing down. The reason: we were creating dozens of snapshots per day (append + overwrite operations) and never running EXPIRE_SNAPSHOTS. The manifest list had grown to thousands of entries.</p><p><strong>Fix:</strong> Schedule regular maintenance:</p><pre>spark.sql(&quot;&quot;&quot;<br>    CALL iceberg_catalog.system.expire_snapshots(<br>        table =&gt; &#39;mortgage.features&#39;,<br>        older_than =&gt; TIMESTAMP &#39;2025-07-01 00:00:00&#39;,<br>        retain_last =&gt; 10<br>    )<br>&quot;&quot;&quot;)</pre><h3>The Moment Time-Travel Became a Workflow Dependency</h3><p>Six weeks post-migration, the ML team ran a model refresh and noticed prepayment risk predictions had shifted significantly on a segment of loans. Not a model bug — the feature table had been updated with corrected LTV ratios from the data provider, and the old features were gone.</p><p>Before Iceberg: this would have required a backup restore or a full re-ingestion of the historical snapshot. With Iceberg:</p><pre># Find the snapshot before the LTV correction landed<br>snapshots_df = spark.sql(&quot;&quot;&quot;<br>    SELECT snapshot_id, committed_at, operation <br>    FROM iceberg_catalog.mortgage.features.snapshots<br>    ORDER BY committed_at DESC<br>&quot;&quot;&quot;)</pre><pre># Read features as they existed before the correction<br>historical_features = spark.read \<br>    .option(&quot;as-of-timestamp&quot;, &quot;2025-08-14T06:00:00&quot;) \<br>    .format(&quot;iceberg&quot;) \<br>    .load(&quot;iceberg_catalog.mortgage.features&quot;)</pre><p>The ML team could retrain against the exact feature set the original model had seen, reproduce the prediction delta, and validate the correction’s impact — all without involving the data engineering team or waiting for a restore.</p><p>That’s when time-travel stopped being a “nice to have” and became a first-class requirement.</p><h3>Snapshot Isolation Changed How Analysts Queried</h3><p>Beyond debugging, snapshot isolation changed the analytics workflow in a subtler way. When analysts ran long-running queries (sometimes 20–30 minutes for full-portfolio aggregations), they used to get inconsistent results if a background write job ran mid-query — a classic dirty read problem.</p><p>With Iceberg, every query runs against a snapshot. Once your query starts, the snapshot it’s reading is frozen. Background writes create new snapshots; your query never sees them.</p><pre>-- This query will read a consistent snapshot even if <br>-- a background job writes new data during execution<br>SELECT <br>    loan_state,<br>    AVG(predicted_risk_score) as avg_risk,<br>    COUNT(*) as loan_count<br>FROM iceberg_catalog.mortgage.features<br>GROUP BY loan_state</pre><p>No read locks. No query failures from concurrent writes. Consistent results every time.</p><h3>Schema Evolution Without Downtime</h3><p>The other migration win worth highlighting: schema evolution.</p><p>Three times during the project, the feature pipeline needed new columns. In Snowflake, adding a column to a 1M-row table meant an ALTER TABLE that locked writes while Snowflake re-materialized the metadata. In practice: 2-5 minutes of pipeline downtime per schema change.</p><p>In Iceberg, adding a column is a metadata-only operation:</p><pre>ALTER TABLE iceberg_catalog.mortgage.features<br>ADD COLUMNS (<br>    debt_to_income_ratio DOUBLE,<br>    appraisal_method STRING<br>)</pre><p>Execution time: under a second. Old data files aren’t touched — they just return null for the new columns when read. New files include the columns. No downtime, no data copy.</p><h3>What I’d Do Differently</h3><p>A few things I’d change with hindsight:</p><p><strong>Set a snapshot retention policy on day one.</strong> We didn’t, and the snapshot accumulation problem was entirely preventable.</p><p><strong>Use the Iceberg REST catalog from the start.</strong> We initially used a Hadoop catalog, which required HDFS. Switching to the REST catalog mid-project was painful. REST catalog is engine-agnostic and trivially configurable.</p><p><strong>Test the partition evolution path before you need it.</strong> Iceberg supports changing partition specs without rewriting data (partition evolution), but the behavior during the transition window is subtle. Test it in staging before you need it in production.</p><h3>Takeaway</h3><p>The 60% storage cost reduction was the reason we migrated. The schema evolution improvements were the expected win. But time-travel queries and snapshot isolation were the features that changed how the team actually worked with the data.</p><p>If you’re on Snowflake and considering a lakehouse migration, the cost story is often what drives the conversation. Don’t undersell the operational story — consistent reads, schema evolution without downtime, and the ability to surgically rewind a table are workflow improvements that compound over time.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=708df1b8fe7d" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Why I Chose Kafka Over Kinesis for the NJ Transit Real-Time Pipeline]]></title>
            <link>https://medium.com/@mansi.bhadani31/why-i-chose-kafka-over-kinesis-for-the-nj-transit-real-time-pipeline-f82e6ac5296f?source=rss-d90757d5d287------2</link>
            <guid isPermaLink="false">https://medium.com/p/f82e6ac5296f</guid>
            <category><![CDATA[nj-transit]]></category>
            <category><![CDATA[kinesis]]></category>
            <category><![CDATA[real-time-pipeline]]></category>
            <category><![CDATA[data-engineer]]></category>
            <category><![CDATA[kafka]]></category>
            <dc:creator><![CDATA[Mansi Bhadani]]></dc:creator>
            <pubDate>Tue, 31 Mar 2026 19:21:17 GMT</pubDate>
            <atom:updated>2026-03-31T19:21:17.882Z</atom:updated>
            <content:encoded><![CDATA[<p><em>A breakdown of the architectural decision — partition semantics, consumer group replay, and why Confluent’s Schema Registry was the deciding factor over AWS Kinesis.</em></p><p>When I started building the NJ Transit Smart Management System for my capstone at Pace University, one of the first real decisions I had to make was: <strong>Kafka or Kinesis?</strong></p><p>Both are battle-tested streaming platforms. Both can handle millions of events per day. Both have managed cloud offerings. On paper, either would have worked. But the more I dug into the architecture requirements of a real-time transit pipeline, the clearer it became that Kafka — specifically Confluent Cloud — was the right call.</p><p>Here’s exactly why.</p><h3>The Problem I Was Solving</h3><p>The pipeline needed to ingest GTFS-RT (General Transit Feed Specification — Realtime) transit event data, process it through Spark Structured Streaming, apply a 5-point data quality gate, and land clean records into Snowflake — all within a sub-60 second latency window.</p><p>The design constraints that shaped the streaming decision:</p><ul><li><strong>Multiple independent consumers</strong> — Spark Streaming, a DQ monitoring service, and an alerting layer all needed to read the same events independently</li><li><strong>Schema evolution</strong> — GTFS-RT feeds aren’t perfectly stable; fields get added, types shift</li><li><strong>Consumer replay</strong> — when a Spark job fails mid-batch, I needed to reprocess from a known offset, not lose events</li><li><strong>Local development parity</strong> — I needed to run the exact same stack on my laptop that runs in the cloud</li></ul><h3>Where Kinesis Falls Short</h3><p>Kinesis is a great product if you’re already deep in the AWS ecosystem and need a managed, low-ops streaming bus. But three specific limitations ruled it out for this project.</p><h3>1. Shard Semantics vs. Partition Semantics</h3><p>Kinesis uses <strong>shards</strong> with a fixed capacity model: each shard handles 1 MB/s ingest and 2 MB/s reads. When you need more throughput, you split shards — but you can’t unsplit them cleanly, and the resharding process interrupts consumers.</p><p>Kafka <strong>partitions</strong> are far more flexible. You set the partition count upfront, and Kafka handles rebalancing across consumer group members automatically. For a transit pipeline where event volume varies significantly between rush hour and overnight, Kafka’s partition model gave me cleaner elasticity without manual intervention.</p><h3>2. Consumer Group Replay</h3><p>This was the dealbreaker.</p><p>Kinesis retains data for <strong>7 days maximum</strong> (with Enhanced Fan-Out). More critically, Kinesis doesn’t support consumer group offset management the way Kafka does. Each shard iterator is stateless on the Kinesis side — you manage offsets yourself, or you use something like DynamoDB to track them.</p><p>Kafka’s consumer group protocol tracks committed offsets per group, per partition. When my Spark job crashed during a watermark calculation (it happened — twice), I could reset the consumer group offset to exactly the checkpoint before the failure and replay without writing a single line of custom offset management code.</p><pre># Reset a consumer group offset to a specific timestamp<br>kafka-consumer-groups.sh \<br>  --bootstrap-server &lt;broker&gt; \<br>  --group spark-transit-consumer \<br>  --topic gtfs-rt-events \<br>  --reset-offsets \<br>  --to-datetime 2025-09-01T14:00:00.000 \<br>  --execute</pre><p>Try doing that cleanly in Kinesis. You can’t — not without custom DynamoDB logic.</p><h3>3. No Native Schema Registry</h3><p>Kinesis has no native schema registry. You’d need AWS Glue Schema Registry, which requires additional IAM plumbing and only supports Avro and JSON Schema (no Protobuf in all regions at the time of this project).</p><p>Confluent’s Schema Registry is tightly integrated with the Kafka ecosystem, supports Avro/JSON/Protobuf, and enforces <strong>compatibility modes</strong> (BACKWARD, FORWARD, FULL) so that a producer publishing a new schema version can’t silently break a downstream consumer.</p><h3>Why Confluent Kafka Won</h3><h3>Schema Registry Was Non-Negotiable</h3><p>GTFS-RT data is typed but not rigid. Over the course of the project, I encountered three cases where the feed added optional fields or changed enum values. With Schema Registry enforcing BACKWARD compatibility, I could evolve the Avro schema without touching my Spark deserialization code.</p><pre>{<br>  &quot;type&quot;: &quot;record&quot;,<br>  &quot;name&quot;: &quot;TransitEvent&quot;,<br>  &quot;fields&quot;: [<br>    {&quot;name&quot;: &quot;trip_id&quot;, &quot;type&quot;: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;route_id&quot;, &quot;type&quot;: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;timestamp&quot;, &quot;type&quot;: &quot;long&quot;},<br>    {&quot;name&quot;: &quot;delay_seconds&quot;, &quot;type&quot;: [&quot;null&quot;, &quot;int&quot;], &quot;default&quot;: null},<br>    {&quot;name&quot;: &quot;vehicle_id&quot;, &quot;type&quot;: [&quot;null&quot;, &quot;string&quot;], &quot;default&quot;: null}<br>  ]<br>}</pre><p>New nullable fields with defaults are backward compatible — old consumers can read new messages without any changes. This saved me from a class of bugs that would have been brutal to debug at 2 AM.</p><h3>Docker Parity</h3><p>Running Confluent locally is one docker-compose.yml away:</p><pre>services:<br>  zookeeper:<br>    image: confluentinc/cp-zookeeper:7.5.0<br>    environment:<br>      ZOOKEEPER_CLIENT_PORT: 2181</pre><pre>  broker:<br>    image: confluentinc/cp-kafka:7.5.0<br>    depends_on: [zookeeper]<br>    ports:<br>      - &quot;9092:9092&quot;<br>    environment:<br>      KAFKA_BROKER_ID: 1<br>      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181<br>      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092</pre><pre>  schema-registry:<br>    image: confluentinc/cp-schema-registry:7.5.0<br>    depends_on: [broker]<br>    ports:<br>      - &quot;8081:8081&quot;</pre><p>Local dev was identical to Confluent Cloud. No “works on my machine” surprises when deploying.</p><h3>Consumer Groups at Scale</h3><p>The pipeline ultimately had three independent consumer groups:</p><p>Consumer Group Purpose spark-transit-consumer Main Spark Structured Streaming job dq-monitor-consumer Real-time data quality checks alert-consumer Delay threshold alerting</p><p>Each reads the same topic at its own pace, maintains its own offset, and can replay independently. Kinesis’s enhanced fan-out gets close to this, but the per-shard $0.015/hour cost would have added up across 3 consumers reading a high-volume topic.</p><h3>The Numbers</h3><p>The pipeline processed over <strong>2 million simulated transit events per day</strong> with sub-60 second end-to-end latency. During load testing, consumer lag stayed under 500ms at peak throughput.</p><p>Kinesis could have handled the throughput. But it couldn’t have given me schema evolution safety, clean consumer replay, or Docker-local parity at the same operational cost.</p><h3>When I’d Choose Kinesis Instead</h3><p>To be fair: Kinesis makes sense when:</p><ul><li>You’re fully AWS-native and want zero additional managed services</li><li>Your consumers are Lambda functions triggered by Kinesis (the integration is tight and cheap)</li><li>You don’t need consumer group semantics and are okay managing offsets in DynamoDB</li><li>Schema evolution isn’t a concern or you’re using Glue Schema Registry already</li></ul><p>For greenfield projects where the team is already deep in AWS and streaming requirements are simple, Kinesis is a totally reasonable choice. It’s not the wrong tool — it just wasn’t the right tool <em>for this pipeline</em>.</p><h3>Takeaway</h3><p>The decision wasn’t about Kafka being “better” in some abstract sense. It was about three concrete requirements:</p><ol><li><strong>Partition-level consumer groups with managed offsets</strong> → Kafka wins</li><li><strong>Schema evolution with backward compatibility enforcement</strong> → Confluent Schema Registry wins</li><li><strong>Local dev/prod parity without cloud costs</strong> → Docker Compose wins</li></ol><p>If you’re building a multi-consumer streaming pipeline where schema stability matters and you need reliable replay, reach for Kafka. You’ll thank yourself when the first schema change rolls in at week three.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f82e6ac5296f" width="1" height="1" alt="">]]></content:encoded>
        </item>
    </channel>
</rss>