<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Tharun Kumar Sekar on Medium]]></title>
        <description><![CDATA[Stories by Tharun Kumar Sekar on Medium]]></description>
        <link>https://medium.com/@tharun026?source=rss-2cad428895fb------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/2*B2wwOli9ZjLI-zaxQ29a7w.jpeg</url>
            <title>Stories by Tharun Kumar Sekar on Medium</title>
            <link>https://medium.com/@tharun026?source=rss-2cad428895fb------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Sun, 17 May 2026 03:11:29 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@tharun026/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Understanding Streaming Query Metrics]]></title>
            <link>https://medium.com/analytics-vidhya/understanding-streaming-query-metrics-e3436a0e3372?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/e3436a0e3372</guid>
            <category><![CDATA[streaming-metrics]]></category>
            <category><![CDATA[structured-streaming]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[spark]]></category>
            <category><![CDATA[optimize-streaming]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Sun, 10 Dec 2023 16:53:49 GMT</pubDate>
            <atom:updated>2023-12-10T16:53:49.374Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*wkBjtZJo-6-R_DO9.jpeg" /></figure><p>To optimize a Streaming Pipeline, Streaming query metrics is the right place to begin your analysis.</p><p>For illustration purposes, I’m picking Kafka Topic as a source and Delta table as destination. Here is a sample Streaming Query Metrics and this can be found in the log4j file of the driver.</p><pre>INFO MicroBatchExecution: Streaming query made progress: {<br>  &quot;id&quot; : &quot;8734d7c4-f46d-4e28-a7d6-5b4498ec9fc0&quot;,<br>  &quot;runId&quot; : &quot;848g9a09-8141-4a78-8c05-c2138c1b4e09&quot;,<br>  &quot;name&quot; : &quot;ingest-stream&quot;,<br>  &quot;timestamp&quot; : &quot;2023-11-30T09:03:00.000Z&quot;,<br>  &quot;batchId&quot; : 2,<br>  &quot;numInputRows&quot; : 43,<br>  &quot;inputRowsPerSecond&quot; : 725573.7333333333,<br>  &quot;processedRowsPerSecond&quot; : 5705.508014487084,<br>  &quot;durationMs&quot; : {<br>    &quot;addBatch&quot; : 7629498,<br>    &quot;getBatch&quot; : 0,<br>    &quot;commitOffsets&quot; : 203,<br>    &quot;queryPlanning&quot; : 176,<br>    &quot;triggerExecution&quot; : 7630245,<br>    &quot;walCommit&quot; : 131<br>  },<br>  &quot;stateOperators&quot; : [ ],<br>  &quot;sources&quot; : [ {<br>    &quot;description&quot; : &quot;KafkaV2[Subscribe[event-tree]]&quot;,<br>    &quot;startOffset&quot; : {<br>      &quot;event-tree&quot; : {<br>        &quot;1&quot; : 143,<br>        &quot;0&quot; : 149<br>      }<br>    },<br>    &quot;endOffset&quot; : {<br>      &quot;event-tree&quot; : {<br>        &quot;1&quot; : 166,<br>        &quot;0&quot; : 169<br>      }<br>    },<br>    &quot;latestOffset&quot; : {<br>      &quot;event-tree&quot; : {<br>        &quot;1&quot; : 199<br>        &quot;0&quot; : 199<br>      }<br>    },<br>    &quot;numInputRows&quot; : 43,<br>    &quot;inputRowsPerSecond&quot; : 725573.7333333333,<br>    &quot;processedRowsPerSecond&quot; : 5705.508014487084,<br>    &quot;metrics&quot; : {<br>      &quot;avgOffsetsBehindLatest&quot; : &quot;0.0&quot;,<br>      &quot;estimatedTotalBytesBehindLatest&quot; : &quot;0.0&quot;,<br>      &quot;maxOffsetsBehindLatest&quot; : &quot;0&quot;,<br>      &quot;minOffsetsBehindLatest&quot; : &quot;0&quot;<br>    }<br>  } ],<br>  &quot;sink&quot; : {<br>    &quot;description&quot; : &quot;DeltaSink[s3://datalake/bronze/event-tree]&quot;,<br>    &quot;numOutputRows&quot; : -1<br>  }<br>  }<br>}</pre><p>Few of the basic entries in this Metrics are</p><ul><li><strong>id</strong> — Streaming Pipeline’s id. This will not change across different runs.</li><li><strong>runId</strong> — Unique ID of that individual run. This is expected to change during every restart.</li><li><strong>batchId</strong> — Number of the micro-batch which is being processed.</li><li><strong>numInputRows</strong> — Number of Records that were consumed in this micro-batch</li><li><strong>processedRowsPerSecond</strong> — Number of Records that were processed per second.</li></ul><p>Now let’s get into the metrics which will help us in understanding the processing time.</p><ul><li><strong>durationMs</strong> — This category contains all the time related information of the micro-batch</li><li><strong>getBatch</strong> — Time taken to retrieve the metadata about the next micro-batch, like offsets. This doesn’t include reading the actual data. This value would mostly be very minimal.</li><li><strong>walCommit</strong> — Time taken to commit the offset value to the checkpoint. This value also would be very minimal. This value would also be very minimal</li><li><strong>queryPlanning</strong> — Time taken to generate the execution plan by spark. This value would also be minimal.</li><li><strong>addBatch</strong> — Time taken to read, process and sent the data to the sink. This metric would always have a higher value since the processing time is measured here.</li><li><strong>commitOffsets</strong> — Time taken to commit to the commit log file after processing the micro-batch. This value would also be minimal.</li><li><strong>triggerExecution</strong> — This metric is the summation of all the above metrics.</li></ul><p>The next category is about the Source (Kafka in our case).</p><ul><li><strong>description</strong> — Name of the topic from which we are reading the data.</li><li><strong>startOffset</strong> — Category in which we display information related to the start offset from which we are reading the data.</li><li><strong>event-tree</strong> — Topic Name</li><li><strong>0, 1</strong> — partition Ids</li><li>The value displays the record number (offset) from which this micro-batch has <strong>started</strong> to read the data.</li><li>endOffset — The values displayed are the record number (offsets) until which the records has been read.</li><li>latestOffset — This displays the current latest record of the partition. If this value matches the endOffset, it means we have processed all records in the partition.</li></ul><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=e3436a0e3372" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/understanding-streaming-query-metrics-e3436a0e3372">Understanding Streaming Query Metrics</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Dynamic Partition Upsert — SPARK]]></title>
            <link>https://medium.com/analytics-vidhya/dynamic-partition-upsert-spark-1ff1a1025813?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/1ff1a1025813</guid>
            <category><![CDATA[spark]]></category>
            <category><![CDATA[upsert]]></category>
            <category><![CDATA[dynamic-partitions]]></category>
            <category><![CDATA[partition]]></category>
            <category><![CDATA[partitioning]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Tue, 17 May 2022 13:18:16 GMT</pubDate>
            <atom:updated>2022-05-17T15:36:30.960Z</atom:updated>
            <content:encoded><![CDATA[<h3>Dynamic Partition Upsert — SPARK</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*3lh9QrTikOFpMcrB-xlO8g.jpeg" /></figure><p>If you’re using Spark, you probably know what partitioning is, and perhaps you would have even encountered Dynamic Partitions. But even, If you are not familiar with Spark partitioning in general or Dynamic Partition Inserts, don’t worry, we’ve got it covered.</p><h4><strong>Partitioning in Spark</strong></h4><p>Partition in simple terms means splitting the data based on a column’s value and storing it in individual partitions/folders.</p><p>Let’s look at the usual way, we save the data with Partitions and then see how Dynamic Partition can help us.</p><pre>val historyDF = Seq(<br>      (8, &quot;bat&quot;, &quot;1&quot;, &quot;2022-05-01&quot;),<br>      (64, &quot;mouse&quot;, &quot;1&quot;, &quot;2022-05-02&quot;),<br>      (-27, &quot;horse&quot;, &quot;1&quot;, &quot;2022-05-03&quot;),<br>      (-28, &quot;mouse&quot;, &quot;1&quot;,&quot;2022-05-03&quot;),<br>      (10, &quot;bat&quot;, &quot;1&quot;, &quot;2022-05-04&quot;)<br>   ).<br>   toDF(&quot;number&quot;, &quot;word&quot;, &quot;priority&quot;,&quot;date&quot;)</pre><p>Here, we have created a dataframe “historyDF” with 5 records. Let’s look at the data now.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*vEvmfQEyC41SJLtx-xsXVQ.png" /></figure><p>Now let’s save this dataframe to the folder “dbfs:/dynamicPartitions/” with partitioning based on the column “date”.</p><pre>historyDF.<br>   write.<br>   mode(&quot;overwrite&quot;).<br>   partitionBy(&quot;date&quot;).<br>   parquet(&quot;dbfs:/dynamicPartitions/&quot;)</pre><p>Let’s look at how the data looks after it’s saved in the file system.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*NNNw7Er-AB4f-UJEccsgQQ.png" /></figure><p>We can see 4 subfolders created for the 4 different dates which were used for partitioning. The subfolders were created at the same timestamp. We can confirm this by viewing the “modificationTime” entry in the above image.</p><h4>Need for Dynamic Partitioning</h4><p>In most cases, we would need to run a daily load/ETL to load data into HDFS. This would also have updated or additional records belonging to the previous dates. In technical terms, we should do an “UPSERT” — (Update and Insert) to the existing partitions and load the current date’s data into a new partition.</p><p>Let’s create a sample data and store it in a dataframe “deltaDF”.</p><pre>val deltaDF = Seq(<br>      (64, &quot;mouse&quot;, &quot;2&quot;, &quot;2022-05-02&quot;),<br>      (-29, &quot;mouse&quot;, &quot;2&quot;, &quot;2022-05-03&quot;),<br>      (10, &quot;cat&quot;, &quot;2&quot;, &quot;2022-05-05&quot;)<br>   ).<br>   toDF(&quot;number&quot;, &quot;word&quot;, &quot;priority&quot;, &quot;date&quot;)</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*jPgUN6oXseSU0wEc4HKSEg.png" /></figure><p>Now let’s try to understand the data in “deltaDF”. We have 3 records in total and 1 record for each day. The record for 2022–05–02is an exact replica of the existing record in “historyDF” with just the priority changed. The record for 2022–05–03has an update to the field “number” with the value being changed from -28 to -29. Finally, we have a new entry 2022-05-05 with one record.</p><p>The expectation now is to have one new partition created for 2022-05-05, have the record updated for 2022-05-03 and the partition to be overwritten for 2022-05-02. The rest of the partitions should remain untouched.</p><h4>How to Dynamic Partition</h4><p>In order to perform this, we need to first update the spark’s partition override mode to dynamic. This can be done by running the command,</p><pre>spark.conf.set(&quot;spark.sql.sources.partitionOverwriteMode&quot;,&quot;dynamic&quot;)</pre><p>Since most of the HDFS is object-based storage, we cannot update individual records in HDFS. Instead we can update the partitions we are interested in. In order to achieve this, we need to have the complete data in a partition that’s going to be overwritten in memory so that we don’t lose the data. It’s confusing right. Let’s make it simple with code.</p><pre>val distinctDates = deltaDF.<br>   select(&#39;date).<br>   distinct.<br>   map(_.getString(0)).collect().toList</pre><pre>val filteredHistoryDF = historyDF.<br>   filter(&#39;date.isin(distinctDates:_*))</pre><pre>filteredHistoryDF.<br>   union(deltaDF).<br>   withColumn(<br>      &quot;rank&quot;, <br>      row_number.over(<br>         Window.partitionBy(<br>            &#39;date, <br>            &#39;word).<br>            orderBy(&#39;priority.desc)<br>         )<br>   ).<br>   filter(&#39;rank === 1).<br>   write.<br>   mode(&quot;overwrite&quot;).<br>   partitionBy(&quot;date&quot;).<br>   parquet(&quot;dbfs:/dynamicPartitions/&quot;)</pre><p>Here, we are selecting the different dates for which we are having entries in the deltaDF. Then we are filtering the records related to these dates from the historyDF. Now we union both the delta and the filtered history and select the latest records, so we have the updated ones along with the ones that previously existed. And finally we save the records to the same path.</p><p>Let’s see how the data looks in the file system after the update has happened.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*qo9krphnkKV5krUj2WjQcw.png" /></figure><p>We can see that a new partition folder was created for 2022-05-05 and partitions 2022-05-02 and 2022-05-03 were updated. We can confirm this by viewing the timestamp field modificationTime. The other partitions are untouched and have the same timestamp ( the creation timestamp).</p><p>By doing this, we save execution time by skipping partitions which we are not interested in and also save a lot of I/O time.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=1ff1a1025813" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/dynamic-partition-upsert-spark-1ff1a1025813">Dynamic Partition Upsert — SPARK</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Higher-Order Functions — Python]]></title>
            <link>https://medium.com/analytics-vidhya/higher-order-functions-python-716f508a8f41?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/716f508a8f41</guid>
            <category><![CDATA[higher-order-function]]></category>
            <category><![CDATA[functional-programming]]></category>
            <category><![CDATA[python]]></category>
            <category><![CDATA[first-class-function]]></category>
            <category><![CDATA[function-as-parameter]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Thu, 23 Sep 2021 07:15:47 GMT</pubDate>
            <atom:updated>2021-09-27T05:10:51.047Z</atom:updated>
            <content:encoded><![CDATA[<h3>Higher-Order Functions — Python</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*TxDhmsq9LG0p4au_RULnVA.png" /></figure><p>A programming language is said to support First Class Functions if it treats functions as first-class objects. By definition, a “first-class object” in a program entity is an object that can be passed around just like other objects. It has the following characteristics:</p><ul><li>can have properties and methods.</li><li>can be assigned to a variable.</li><li>can be passed as an argument to a function.</li><li>can be returned as a result of another function.</li></ul><p>Let’s try to understand each property through code.</p><ol><li><strong>Properties and Methods</strong></li></ol><p>Each Method/Function you create in Python has a set of default properties and methods, which can inspect using the dir() method. In the below example, I have defined a hello_world function that prints the string Hello World!.</p><pre>def hello_world():<br>   print(&quot;Hello World!&quot;)</pre><p>When we call the dir() function on the hello_world method, we could see all the default methods that are part of it.</p><pre>print(dir(hello_world))<br>[&#39;__annotations__&#39;, &#39;__call__&#39;, &#39;__class__&#39;, &#39;__closure__&#39;, &#39;__code__&#39;, &#39;__defaults__&#39;, &#39;__delattr__&#39;, &#39;__dict__&#39;, &#39;__dir__&#39;, &#39;__doc__&#39;, &#39;__eq__&#39;, &#39;__format__&#39;, &#39;__ge__&#39;, &#39;__get__&#39;, &#39;__getattribute__&#39;, &#39;__globals__&#39;, &#39;__gt__&#39;, &#39;__hash__&#39;, &#39;__init__&#39;, &#39;__init_subclass__&#39;, &#39;__kwdefaults__&#39;, &#39;__le__&#39;, &#39;__lt__&#39;, &#39;__module__&#39;, &#39;__name__&#39;, &#39;__ne__&#39;, &#39;__new__&#39;, &#39;__qualname__&#39;, &#39;__reduce__&#39;, &#39;__reduce_ex__&#39;, &#39;__repr__&#39;, &#39;__setattr__&#39;, &#39;__sizeof__&#39;, &#39;__str__&#39;, &#39;__subclasshook__&#39;]</pre><p>You can call any of these methods tied with the function. For example:</p><pre>hello_world.__name__<br># &#39;hello_world&#39;</pre><p>You can also use type to understand that the function we create is an instance of the function class.</p><pre>print(type(hello_world))<br># &lt;class &#39;function&#39;&gt;</pre><p><strong>2. Assigning Function to Variables</strong></p><p>We can also assign functions to variables.</p><pre>def hello_world_function(name):<br>   print(&quot;Hello &quot; + name + &quot;!&quot;)</pre><pre>hello_world_variable = hello_world_function</pre><p>Here we are assigning hello_world_function function to the variable hello_world_variable. Now hello_world_variable is a functions object which means, we can call it just like the hello_world_function.</p><pre>hello_world_variable(&quot;Tharun&quot;)<br># Hello Tharun!</pre><p>This assignment does not call the function instead it takes the function object referenced by hello_world_function and creates a second name pointing to it.</p><pre>hello_world_function<br># &lt;function hello at 0x0000020127C982F0&gt;</pre><pre>hello_world_variable<br># &lt;function hello at 0x0000020127C982F0&gt;</pre><p><strong>3. Function as an Argument</strong></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/500/0*Ku2iFqeZS1X9B_ER.png" /></figure><p>Since Function is an object, you can pass it as an argument much similar to a variable.</p><p>Let’s consider iterating over a list of items and printing them sequentially. We can easily build an iterate function.</p><pre>def iterate(items):<br>   for item in items:<br>      print(item)</pre><p>This is usual stuff. What if we want to do something different from printing the items? That’s where Higher-Order Functions come in. We can create a function iterate_custom that takes in both the item list and the function that needs to be applied to each item.</p><pre>def iterate_custom(items, function):<br>   for item in items:<br>      function(item)</pre><p>By doing this, we have created a function that can do anything with the list that involves sequential iteration. This is a higher level of abstraction and this also makes our code reusable.</p><p><strong>4. Return Function from Function</strong></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/500/0*_yipt4yF0Qvslwuf.png" /></figure><p>This is usually done to have a wrapper function that decides the control flow or to decide which function should be called. For example:</p><pre>def square(num):<br>   return num * num</pre><pre>def cube(num):<br>   return square(num) * num</pre><pre>def power_of_num(power):<br>   if power == 2:<br>      return square<br>   elif power == 3:<br>      return cube</pre><pre>num_powers = power_of_num(2)<br># num_powers is assigned with the square method</pre><pre>num_powers(5)<br># 25</pre><p>We have defined methods square and cube which are pretty usual. The third method power_of_num is a wrapper function that returns either of the first two methods based on the variable value. In this case, power_of_num is called with the variable value 2. Square method would be returned now and assigned in the variable num_powers. Now, if we call the num_powers variable, it would act as the square method.</p><pre>num_powers = power_of_num(3)<br># num_powers is assigned with the cube method</pre><pre>num_powers(5)<br># 125</pre><p>Got questions? Feel free to comment here.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=716f508a8f41" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/higher-order-functions-python-716f508a8f41">Higher-Order Functions — Python</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Slowly Changing Dimension]]></title>
            <link>https://medium.com/analytics-vidhya/slowly-changing-dimension-346270b22d0f?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/346270b22d0f</guid>
            <category><![CDATA[type2]]></category>
            <category><![CDATA[scd]]></category>
            <category><![CDATA[slowly-changing-dimension]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Wed, 07 Oct 2020 07:47:09 GMT</pubDate>
            <atom:updated>2020-10-07T12:46:59.859Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*1bFmgy1TmsEOfCqp" /></figure><p>I wanted to learn about Slowly Changing Dimension for a long time, but I couldn’t find a clear, concise blog post for anyone not familiar with the topic. I, therefore, give you my own offering, a quick introduction to Slowly Changing Dimensions or SCD in a data warehousing scenario.</p><p>Let’s take 2 tables</p><ul><li>Users (Dimension)</li></ul><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/ebadee9eb4cb176a01b056f4b47492cc/href">https://medium.com/media/ebadee9eb4cb176a01b056f4b47492cc/href</a></iframe><ul><li>Sales (Fact)</li></ul><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/80d4a32b8c077a2e42ae8245b2fdc58a/href">https://medium.com/media/80d4a32b8c077a2e42ae8245b2fdc58a/href</a></iframe><p>When organizing a data warehouse into a star schema, we need to relate fact records to dimension records to get its related attributes. There are scenarios where the information in the dimension might change. For instance, the user Adam might move to the United Kingdom. If he does, do we associate all his fact records with the new country? Or do we want to ignore the change in the country to keep historical accuracy? Or do we treat facts before the change in the country to those after?</p><p>It is this decision that determines whether to make the dimension a slowly changing one. There are different types of SCD depending on how you treat incoming change.</p><h3>Types of SCD</h3><h4>Type 0 — Fixed Dimension</h4><p>No changes are allowed here. In other words, the dimension never changes. In this case, we don’t change Adam’s country even if he moves to another one.</p><h4>Type 1 — No History</h4><p>Update the dimension directly. There is no track of the change in dimensions. We could only see the current state. In our case, Adam’s record would be modified to have United_Kingdom as Country. All his orders, which he placed when was in United_States, will now be pointing to United_Kingdom.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/6116ecb190e5c6f5a6009aaa348c98d4/href">https://medium.com/media/6116ecb190e5c6f5a6009aaa348c98d4/href</a></iframe><h4>Type 2 — Row Versioning</h4><p>Type 2 is the most common method of tracking change in data warehouses. Here, we track the changes with new records and additional columns such as the current flag and active dates.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/50186440175bb97a7f78566e1d1ce285/href">https://medium.com/media/50186440175bb97a7f78566e1d1ce285/href</a></iframe><p>New Columns</p><ul><li><strong>ID</strong> — We add a new ID column since the existing user id will not be sufficient to identify the specific record we require.</li><li><strong>Current_Flag</strong> — A quick method of returning only the latest record of each user.</li><li><strong>Start_Date</strong> — The date from which the specific record is active.</li><li><strong>End_Date</strong> — The date to which the specific record is active</li></ul><p>This method is very powerful. We maintain the history for the entire record and can easily perform change-over-time analysis.</p><h4>Type 3</h4><p>In this type, we add a new column instead of a record. In our case, we add a new column “Previous Country” to track the change. In case, if the user changes the country again, we have to add another column.</p><h4>Type 4</h4><p>We will simply update the record similar to Type 1 to accommodate the new change. However, we simultaneously maintain a history table which is similar to type2 to track the changes.</p><p>The dimension table after update will look like</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/6116ecb190e5c6f5a6009aaa348c98d4/href">https://medium.com/media/6116ecb190e5c6f5a6009aaa348c98d4/href</a></iframe><p>The history table will have the following records.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/db05790605f47b3cc792e82d1625a0c3/href">https://medium.com/media/db05790605f47b3cc792e82d1625a0c3/href</a></iframe><p>Separating the history from the dimension makes the dimension table smaller and therefore helps in increasing the performance and reduces the complexity if the majority of the users only use the current value.</p><p>However, if you require historical values, this type adds complexity and performance overheads.</p><p>Type 1 and Type 2 are generally preferred than Type 4.</p><p>Got questions? Feel free to comment here.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=346270b22d0f" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/slowly-changing-dimension-346270b22d0f">Slowly Changing Dimension</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Data Reconciliation in Spark]]></title>
            <link>https://medium.com/analytics-vidhya/data-reconciliation-in-spark-b185c6a2952b?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/b185c6a2952b</guid>
            <category><![CDATA[spark-application]]></category>
            <category><![CDATA[spark-quality-checks]]></category>
            <category><![CDATA[spark]]></category>
            <category><![CDATA[reconciliation]]></category>
            <category><![CDATA[data-validation]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Sun, 20 Sep 2020 15:09:01 GMT</pubDate>
            <atom:updated>2020-09-20T16:41:16.675Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*LPe9SCnSiDQXq99R.png" /></figure><p>Data Reconciliation is defined as the process of verification of data during data migration. In this process target data is compared against source data to ensure that the migration happens as expected.</p><h4><strong>Need for Data Reconciliation</strong></h4><ul><li>You cannot trust your data without data verification.</li><li>Comparing record counts and fill rates does not always work.</li><li>Untrustworthy data leads to flawed insights.</li></ul><p><strong>Data Reconciler</strong> is a data reconciliation tool that checks for the accuracy of your data. Before taking you through the technical implementation, I would like to show you the output of the Reconciliation tool. You can run this code by yourself by following the instructions in next section.</p><p>The input dataset has 4 fields with a record count of 50 million records sizing about 1 GB in parquet format. After performing reconciliation on this dataset, we get the following output.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/ffd99d83f2572bb6d092f4fdbf228584/href">https://medium.com/media/ffd99d83f2572bb6d092f4fdbf228584/href</a></iframe><p>The above output provides the following information.</p><ul><li><strong>Matching Record Count</strong> (Record 6) — Number of records with matching primary keys in both datasets. In Other words, the record count after performing an inner join between the datasets. This value will be used as the denominator to calculate the percentage of matching records for each column.</li><li><strong>Dropped Records </strong>(Record 7) — Number of records that exist in the old table but not in the new one. In other words, the output of a left anti join.</li><li><strong>New Records</strong> (Record 8) — Number of records that exist in the new table but not in the old one.</li><li><strong>Old File Path</strong> (Record 9) — Actual Number of records in the old table.</li><li><strong>New File Path</strong> (Record 10) — Actual Number of records in the new table.</li><li><strong>Field Name</strong> (Column 1) — Contains each column available in the old table.</li><li><strong>Matching Record Count</strong> (Column 2) — Number of records with same values in both old and new column.</li><li><strong>Mismatch Record Count</strong> (Column 3) — Number of records with different values in both old and new column.</li><li><strong>Matching Record Percentage</strong> (Column 4) — Matching Record Count of individual column / Matching Record Count between Datasets (Record 6)</li></ul><h4>How to run the Data Reconciler?</h4><p>The source code of the Data Reconciler is available in <a href="https://github.com/tharun026/SparkDataReconciler">github</a>. For now, the tool only support data in Parquet format and the data should have a primary key or a combination of primary keys. You can download the code and add in customization if you need and then build it. Once you have the jar, you can load it into your big data environment and trigger the job using the command</p><pre>/usr/lib/spark/bin/spark-submit --deploy-mode cluster --executor-cores 5 --name Data_Reconciliation --class com.github.tharun.datareconciler.Pipeline {JAR_PATH} --qualityCheckType reconciler --oldTable {PATH_OF_OLD_DATA} --newTable {PATH_OF_NEW_DATA} --outputPath {PATH_OF_OUTPUT_DATA} --primaryKey {COMMA_SEPARATED_PRIMARY_KEYS}</pre><h4>Parameters:</h4><ul><li><em>Executor Cores</em> — 5. This is for achieving a balance in parallelism and equal load.</li><li><em>Name</em> — Data_Reconciliation. A name for your spark job.</li><li><em>Class</em> — com.github.tharun.datareconciler.Pipeline. Main class or entry point for the spark job.</li><li><em>JAR_PATH</em> — path where you have placed the JAR.</li><li><em>Quality Check Type </em>— reconciler. For triggering the reconciliation part of the code.</li><li><em>Old Table</em> — Path of Old Data. Path where you have stored the old dataset.</li><li><em>New Table</em> — Path of New Data. Path where you have stored the new dataset.</li><li><em>Primary Key</em> — Comma Separated Primary Key Column Names.</li></ul><h4>Technical Implementation</h4><p>Once you have fed in both the datasets, they are joined based on the primary keys. Now, for each record if the value in Column “A” matches with the value in Column “B”, a new column with value as “1” is created and if the values doesn’t match, the new column is filled with value “0”. A sum of this new column, gives us the total matching records for each column. Once we have the matching record count, other values like mismatch record count, matching percentage are calculated.</p><h4>Runtime Stats</h4><h4><em>Dataset 1</em></h4><ul><li>50 Million Records</li><li>6 GB in parquet</li><li>170 columns</li><li>AWS r5.12x large — 5 nodes</li><li>3 minutes runtime</li></ul><h4>Dataset 2</h4><ul><li>350 Million Records</li><li>30 GB in parquet</li><li>170 columns</li><li>AWS r5.12x large — 10 nodes</li><li>6 minutes runtime</li></ul><p>Github URL — <a href="https://github.com/tharun026/SparkDataReconciler">https://github.com/tharun026/SparkDataReconciler</a></p><p>Got questions? Feel free to comment here.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=b185c6a2952b" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/data-reconciliation-in-spark-b185c6a2952b">Data Reconciliation in Spark</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Spark Parallel Job Submission]]></title>
            <link>https://medium.com/analytics-vidhya/spark-parallel-job-submission-38b41220397b?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/38b41220397b</guid>
            <category><![CDATA[scala]]></category>
            <category><![CDATA[spark-performance]]></category>
            <category><![CDATA[spark]]></category>
            <category><![CDATA[spark-optimization]]></category>
            <category><![CDATA[scala-futures]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Sun, 06 Sep 2020 14:27:13 GMT</pubDate>
            <atom:updated>2020-09-07T13:21:31.884Z</atom:updated>
            <content:encoded><![CDATA[<h3>Spark Parallel Job Execution</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*cVZgaMA_9uL2IVPR6zKYqw.jpeg" /></figure><p>Spark is known for breaking down a big job and running individual tasks in parallel. But, this doesn’t mean it can run two independent jobs in parallel. This article will help you to maximize the parallelization that you can achieve from Spark.</p><h4>Asynchronous Programming</h4><p>This is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. When the work is complete, it notifies the main thread about the completion or failure of the worker thread. In Scala, you can achieve this using <em>Future</em>.</p><h4>Scala Futures</h4><p>Futures are a means of performing asynchronous programming in Scala. A <em>Future</em> gives you a simple way to run a job inside your spark application concurrently.</p><p>Let’s look at the usual way we write our Spark code and then see how <em>Future </em>can help us.</p><pre><em>val </em>employee = spark.read.parquet(&quot;s3://****/employee&quot;)<br><em>val </em>salary = spark.read.parquet(&quot;s3://****/salary&quot;)<br><em>val </em>ratings = spark.read.parquet(&quot;s3://****/ratings&quot;)<br><br><em>println</em>(&quot;Joining employee with salary&quot;)<br>employee.join(salary, Seq(&quot;employee_id&quot;))<br>  .exportToS3AndJSON(&quot;s3://****/employee_salary&quot;)<br><br><em>println</em>(&quot;Joining employee with ratings&quot;)<br>employee.join(ratings, Seq(&quot;employee_id&quot;))<br>  .exportToS3AndJSON(&quot;s3://****/employee_ratings&quot;)</pre><p>In the above code, we read 3 datasets — employee, salary and ratings.</p><ul><li>In the first statement, we join tables Employee and Salary based on Employee_ID and we save down the result in parquet and JSON format.</li><li>In the second statement, we join tables Employee and Ratings based on Employee_ID and we save down the result again in parquet and JSON format.</li></ul><p>The first and the second statement are in no way related to each other and yet Spark will run it sequentially. You would get a better picture of this, if you take a look at the picture of the Spark UI.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*fBh5o_bziFOktAga1CRx0g.png" /><figcaption>Spark UI</figcaption></figure><p>Job ID 0 — starts first and runs for 5.5 minutes and once the first job is completed, the second one is picked up and so on. You can deduce the same by looking at the event timeline too. None of the job overlaps and each job is picked up after the previous job is completed.</p><p>If the job 0 utilizes 50% of the cluster, the remaining 50% would be un-utilized.</p><p>Let’s understand how we can increase the utilization by using scala futures. Below is the same piece of code but with <em>Future</em> incorporated.</p><pre><em>import </em>java.util.concurrent.Executors<br><em>import </em>scala.concurrent.duration.Duration<br><em>import </em>scala.concurrent.{Await, ExecutionContext, Future}</pre><pre><em>//Allowing a maximum of 2 threads to run<br>val </em>executorService = Executors.<em>newFixedThreadPool</em>(2)<br><em>implicit val </em>executionContext = ExecutionContext.<em>fromExecutorService</em>(executorService)</pre><pre><em>val </em>employee = spark.read.parquet(&quot;s3://****/employee&quot;)<br><em>val </em>salary = spark.read.parquet(&quot;s3://****/salary&quot;)<br><em>val </em>ratings = spark.read.parquet(&quot;s3://****/ratings&quot;)</pre><pre><em>val futureA = Future {<br>   println</em>(&quot;Joining employee with salary&quot;)<br>   employee.join(salary, Seq(&quot;employee_id&quot;))<br>     .exportToS3AndJSON(&quot;s3://****/employee_salary&quot;)<br>   <em>println</em>(&quot;Future A Complete&quot;)<br>   }</pre><pre>val futureB = Future {<br>   <em>println</em>(&quot;Joining employee with ratings&quot;)<br>   employee.join(ratings, Seq(&quot;employee_id&quot;))<br>     .exportToS3AndJSON(&quot;s3://****/employee_ratings&quot;)<br>   <em>println</em>(&quot;Future B Complete&quot;)<br>   }</pre><pre>Await.result(futureA, Duration.inf)<br>Await.result(futureB, Duration.inf)</pre><p>The changes include</p><ul><li>Importing ExecutionContext to get access to the thread pool.</li><li>Defining the number of threads to run.</li><li>Enclosing the transformations inside a Future construct.</li><li>The Await.result method call declares that it will wait for the Future to get executed.</li></ul><p>Let’s take a look at how the job performs now by looking at the Spark UI.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*wJ_GhaiPiEQ05q4L7afzrw.png" /></figure><p>In here you could see that the jobs 0 and 1 have started at almost the same time. You can also see from the Event timeline, that both the jobs are running in parallel.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=38b41220397b" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/spark-parallel-job-submission-38b41220397b">Spark Parallel Job Submission</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Git — Basics]]></title>
            <link>https://medium.com/analytics-vidhya/git-basics-f949c8109094?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/f949c8109094</guid>
            <category><![CDATA[git-basics]]></category>
            <category><![CDATA[git-architecture]]></category>
            <category><![CDATA[git]]></category>
            <category><![CDATA[git-tips]]></category>
            <category><![CDATA[github]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Fri, 29 May 2020 15:58:08 GMT</pubDate>
            <atom:updated>2020-05-29T16:34:55.322Z</atom:updated>
            <content:encoded><![CDATA[<h3>Git — Basics</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*p25yfq9lMjPg0kX1.png" /></figure><p>Git is a powerful tool, but it has a reputation of baffling newcomers. With the right knowledge, anyone can master git. Once you start to understand it, the terminology will make more sense and you’ll (eventually) learn to love it.</p><h4>What is Git?</h4><p>Git is a type of version control system (VCS) that makes it easier to track changes to files. For example, when you edit a file, git can help you determine exactly <em>what</em> changed, <em>who</em> changed it, and <em>why</em>.</p><p>It’s useful for coordinating work among multiple people on a project, and for tracking progress over time by saving “checkpoints”. You could use it while writing an essay, or to track changes to artwork and design files.</p><p>Git isn’t the only version control system out there, but it’s by far the most popular. Many software developers use git daily, and understanding how to use it can give a major boost to your resume.</p><h4>Environments in GIT</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*hAaiZTVVZ9c8L-jvtvwa0Q.png" /></figure><p><strong>Working Directory</strong>: The directory where you have cloned the files in your system.</p><p><strong>Staging Area</strong>: Temporary location to stage your files before you perform a commit. This area helps you to commit only required files and skip the remaining files.</p><p><strong>Local Repository</strong>: Replica of the remote repository which will carry your commits.</p><p><strong>Remote Repository</strong>: A common repository that everyone can use to exchange their changes. It is most commonly located on a remote server.</p><p><strong>Pull</strong>: Once you have your branch created in the remote repository, you have to perform a git pull , which will sync your local repository with the remote server. Right after your perform a pull, the newly created branch will be available in your local repository.</p><p><strong>Checkout</strong>: After you perform a pull you will have your new branch in your local repository. Now you need to perform git checkout branchname to point your working directory to the new branch.</p><p><strong>Add: </strong>When you complete your changes in your local machine, you can do a git add to add the changes to the staging area. Some of the different functionalities of the add command are</p><ul><li>git add * to add all your changed files to staging area</li><li>git add filename to specifically add a file to staging area</li><li>git add -p to look at individual changes in each file and then add it to staging area</li></ul><p><strong>Commit</strong>: Once you have staged the required files, you can do git commit -m &quot;message&quot; to commit your changes to your branch. A commit is the one which will be pushed to the Remote repository once your perform a git push.</p><p><strong>Push: </strong>Until you do a push, all your changes will still be in your local machine and you can’t exchange it with anyone. git push pushes all your local commits to the remote repository. Now anyone can access your changes by going to your branch.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f949c8109094" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/git-basics-f949c8109094">Git — Basics</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Git Squash Commit With Git Rebase]]></title>
            <link>https://medium.com/analytics-vidhya/git-squash-commit-with-git-rebase-34443d271f62?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/34443d271f62</guid>
            <category><![CDATA[git]]></category>
            <category><![CDATA[rebase]]></category>
            <category><![CDATA[fixup]]></category>
            <category><![CDATA[git-rebase]]></category>
            <category><![CDATA[squash]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Wed, 20 May 2020 04:29:29 GMT</pubDate>
            <atom:updated>2020-05-20T14:23:39.293Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*4jIUAmZMSDWka6ic.png" /></figure><p>When submitting a pull request to merge your code with Master/Develop, it’s better you squash your commits. Some applications that interact with git repos will provide a user interface for squashing. But let’s take the fun route — <em>the terminal way</em>.</p><p>There are multiple ways to do a git squash. One - do it locally in your system and then push it to remote. The other way is having a copy of all your changes in remote before doing a rebase so you have a copy of your changes in remote in case if something goes wrong.</p><p>Lets look at the safer way first. Make sure your branch is up to date with the remote server. Now do git log --pretty=oneline to understand the commits that happened in your branch.</p><pre>* c88bc5 Implement search inputs for user<br>* 8f4917 Enriched plots for better understanding<br>* 59c01d Add pyplot for better analysis<br>* ba6f1f Add listing feature to quality checks<br>* 9f2adb Add feature to pipeline<br>* f796c1 Initial commit</pre><p>The last 6 commits would look much better if they were wrapped up together, so let’s do that through interactive rebasing.</p><p>To interactively rebase commits, you can follow the below format and enter your command via the command line.</p><pre>git rebase -i HEAD~&lt;n&gt; (n is the number of commits you want to squash)</pre><pre>git rebase -i HEAD~6 (This will roll up all 6 commits in the current branch)</pre><p>or</p><pre>git rebase -i &lt;sha code&gt; (sha code of the commit until which you want to squash)</pre><pre>git rebase -i f796c1 (sha code of the initial commit)</pre><p>The -i flag is to indicate that this rebase process will be an interactive session.</p><p>Once you enter the above command, this is what you will see.</p><pre>pick f796c1 Initial commit<br>pick 9f2adb Add feature to pipeline<br>pick ba6f1f Add listing feature to quality checks<br>pick 59c01d Add pyplot for better analysis<br>pick 8f4917 Enriched plots for better understanding<br>pick c88bc5 Implement search inputs for user</pre><pre><em># Rebase 8db7e8b..fa20af3 onto 8db7e8b</em> <br><em>#</em> <br><em># Commands:</em> <br><em>#  p, pick = use commit</em> <br><em>#  r, reword = use commit, but edit the commit message</em> <br><em>#  e, edit = use commit, but stop for amending</em> <br><em>#  s, squash = use commit, but meld into previous commit</em> <br><em>#  f, fixup = like &quot;squash&quot;, but discard this commit&#39;s log message</em> <br><em>#  x, exec = run command (the rest of the line) using shell</em> <br><em>#</em> <br><em># These lines can be re-ordered; they are executed from top to bottom.</em> <br><em>#</em> <br><em># If you remove a line here THAT COMMIT WILL BE LOST.</em> <br><em>#</em> <br><em># However, if you remove everything, the rebase will be aborted.</em> <br><em>#</em> <br><em># Note that empty commits are commented out</em></pre><p>We see the 6 last commits, from older to newer. See the comments below the list of commits? Good job explaining, git! pick is the default action. In this case, it would reapply the commit as is, no changes in the contents or messages. Saving this file would make no changes to the repository.</p><p>We are interested only in the below actions.</p><ul><li>squash ( s for short), which melds the commit into the previous one (the one in the line before)</li><li>fixup ( f for short), which acts like “squash”, but discards the commit message</li></ul><p>Let’s say we want to squash all our commits, because they belong to the same logical changeset. We’ll preserve the initial commit and squash all the subsequent commits into the previous one. We have to change pick to squash in all the commits except the first one.</p><pre>pick f796c1 Initial commit<br>squash 9f2adb Add feature to pipeline<br>squash ba6f1f Add listing feature to quality checks<br>squash 59c01d Add pyplot for better analysis<br>squash 8f4917 Enriched plots for better understanding<br>squash c88bc5 Implement search inputs for user</pre><p>Save the editor and you will land into another editor to decide the commit message of the melded three commits. In this editor, you will be given an option to add/remove the commit messages. Once you save the commit messages and quit the editor, all your commits will be transformed into one.</p><p>If you want to skip editing the commit message part, you can use fixup command and this will have your commit messages already commented out.</p><p>Once the commit message part is saved, the final thing you have to do is git push to push all your changes to remote. And this push has to be forced since the branch in your local and remote have been diverged after the rebase.</p><pre>git push --force</pre><p>P.S. If you have too many commits, to be squashed and you have manually update every pick to squash , vim provides a simple way to achieve it.</p><pre>:%s/pick/squash/gc</pre><p>This command will update every pick to squash upon your confirmation.</p><p>If you say reword ( r for short) in a commit you want to edit:</p><pre>pick f796c1 Initial commit<br>pick 9f2adb Add feature to pipeline<br>reword ba6f1f Add listing feature to quality checks<br>pick 59c01d Add pyplot for better analysis<br>pick 8f4917 Enriched plots for better understanding<br>pick c88bc5 Implement search inputs for user</pre><pre><em># Rebase 8db7e8b..fa20af3 onto 8db7e8b</em> <br><em>#</em> <br><em># Commands:</em> <br><em>#  p, pick = use commit</em> <br><em>#  r, reword = use commit, but edit the commit message</em> <br><em>#  e, edit = use commit, but stop for amending</em> <br><em>#  s, squash = use commit, but meld into previous commit</em> <br><em>#  f, fixup = like &quot;squash&quot;, but discard this commit&#39;s log message</em> <br><em>#  x, exec = run command (the rest of the line) using shell</em> <br><em>#</em> <br><em># These lines can be re-ordered; they are executed from top to bottom.</em> <br><em>#</em> <br><em># If you remove a line here THAT COMMIT WILL BE LOST.</em> <br><em>#</em> <br><em># However, if you remove everything, the rebase will be aborted.</em> <br><em>#</em> <br><em># Note that empty commits are commented out</em></pre><p>When you save and quit the editor, git will follow the reword command and will land you in an editor again, as if you had amended commit ba6f1f . Now you can edit the commit message, save and quit the editor.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=34443d271f62" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/git-squash-commit-with-git-rebase-34443d271f62">Git Squash Commit With Git Rebase</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Improve Spark Write Performance]]></title>
            <link>https://medium.com/analytics-vidhya/improve-spark-write-performance-d187efb8c8bf?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/d187efb8c8bf</guid>
            <category><![CDATA[spark-optimization]]></category>
            <category><![CDATA[spark-write]]></category>
            <category><![CDATA[committer-algorithm]]></category>
            <category><![CDATA[spark-performanc]]></category>
            <category><![CDATA[spark]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Tue, 14 Apr 2020 12:12:47 GMT</pubDate>
            <atom:updated>2020-05-17T13:46:52.021Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*auKfT2hVGHZRIWF9.jpg" /></figure><p>The EMRFS S3-optimized committer is a new output committer available for use with Apache Spark jobs as of Amazon EMR 5.19.0. This committer improves performance when writing Apache Parquet files to <a href="https://aws.amazon.com/s3/">S</a>3 using the <a href="https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html">E</a>MR File System (EMRFS). In this post, we run a performance benchmark to compare this new optimized committer with existing committer algorithms, namely FileOutputCommitter algorithm versions 1 and 2. We close with a discussion on current limitations for the new committer, providing workarounds where possible.</p><h4>Comparison with FileOutputCommitter</h4><p>In Amazon EMR version 5.19.0 and earlier, Spark jobs that write Parquet to Amazon S3 use a Hadoop commit algorithm called FileOutputCommitter by default. There are two versions of this algorithm, version 1 and 2. Both versions rely on writing intermediate task output to temporary locations. They subsequently perform rename operations to make the data visible at task or job completion time.</p><p>Algorithm version 1 has two phases of rename: one to commit the individual task output, and the other to commit the overall job output from completed/successful tasks. Algorithm version 2 is more efficient because task commits rename files directly to the final output location. This eliminates the second rename phase, but it makes partial data visible before the job completes, which not all workloads can tolerate.</p><p>The renames that are performed are fast, metadata-only operations on the Hadoop Distributed File System (HDFS). However, when output is written to object stores such as Amazon S3, renames are implemented by copying data to the target and then deleting the source. This rename “penalty” is exacerbated with directory renames, which can happen in both phases of FileOutputCommitter v1. Whereas these are single metadata-only operations on HDFS, committers must execute N copy-and-delete operations on S3.</p><p>To partially mitigate this, Amazon EMR 5.14.0+ defaults to FileOutputCommitter v2 when writing Parquet data to S3 with EMRFS in Spark. The new EMRFS S3-optimized committer improves on that work to avoid rename operations altogether by using the transactional properties of Amazon S3 multipart uploads. Tasks may then write their data directly to the final output location, but defer completion of each output file until task commit time.</p><h4>Performance test</h4><p>When evaluated the write performance of the different committers by executing the following INSERT OVERWRITE Spark SQL query. The SELECT * FROM range(…)clause generated data at execution time. This produced ~15 GB of data across exactly 100 Parquet files in Amazon S3.</p><pre>SET rows=4e9; -- 4 Billion <br>SET partitions=100;  <br>INSERT OVERWRITE DIRECTORY ‘s3://${bucket}/perf-test/${trial_id}’ USING PARQUET SELECT * FROM range(0, ${rows}, 1, ${partitions});</pre><p><strong>Note</strong>: The EMR cluster ran in the same AWS Region as the S3 bucket. The trial_id property used a UUID generator to ensure that there was no conflict between test runs.</p><p>We executed our test on an EMR cluster created with the emr-5.19.0 release label, with a single m5d.2xlarge instance in the master group, and eight m5d.2xlarge instances in the core group. We used the default Spark configuration properties set by Amazon EMR for this cluster configuration, which include the following:</p><pre>spark.dynamicAllocation.enabled true <br>spark.executor.memory 11168M <br>spark.executor.cores 4</pre><p>After running 10 trials for each committer, we captured and summarized query execution times in the following chart. Whereas FileOutputCommitter v2 averaged 49 seconds, the EMRFS S3-optimized committer averaged only 31 seconds — a 1.6x speedup.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/800/0*hBSyjVKj61kkxCWK.png" /></figure><p>As mentioned earlier, FileOutputCommitter v2 eliminates some, but not all, rename operations that FileOutputCommitter v1 uses. To illustrate the full performance impact of renames against S3, we reran the test using FileOutputCommitter v1. In this scenario, we observed an average runtime of 450 seconds, which is 14.5x slower than the EMRFS S3-optimized committer.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/800/0*hQglTDFpktvSmJF_.png" /></figure><p>The last scenario we evaluated is the case when <a href="https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html">EMRFS consistent view</a> is enabled, which addresses issues that can arise due to the Amazon S3 data consistency model. In this mode, the EMRFS S3-optimized committer time was unaffected by this change and still averaged 30 seconds. On the other hand, FileOutputCommitter v2 averaged 53 seconds, which was slower than when the consistent view feature was turned off, widening the overall performance difference to 1.8x.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/800/0*wckd2aghM-htih7g.png" /></figure><h3>Enabling the EMRFS S3-optimized committer</h3><p>Starting with Amazon EMR version 5.20.0, the EMRFS S3-optimized committer is enabled by default. In Amazon EMR version 5.19.0, you can enable the committer by setting the spark.sql.parquet.fs.optimized.committer.optimization-enabled property to true from within Spark or when creating clusters. The committer takes effect when you use Spark’s built-in Parquet support to write Parquet files into Amazon S3 with EMRFS. This includes using the Parquet data source with Spark SQL, DataFrames, or Datasets. However, there are some use cases when the EMRFS S3-optimized committer does not take effect, and some use cases where Spark performs its own renames entirely outside of the committer. For more information about the committer and about these special cases, see <a href="https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html">Using the EMRFS S3-optimized Committer</a> in the <em>Amazon EMR Release Guide</em>.</p><h3>Summary</h3><p>The EMRFS S3-optimized committer improves write performance compared to FileOutputCommitter. Starting with Amazon EMR version 5.19.0, you can use it with Spark’s built-in Parquet support.</p><p>This article is a transcript from the Amazon Web Services blogs.</p><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=d187efb8c8bf" width="1" height="1" alt=""><hr><p><a href="https://medium.com/analytics-vidhya/improve-spark-write-performance-d187efb8c8bf">Improve Spark Write Performance</a> was originally published in <a href="https://medium.com/analytics-vidhya">Analytics Vidhya</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Case Classes — Scala]]></title>
            <link>https://medium.com/@tharun026/case-classes-scala-bc4bcf166399?source=rss-2cad428895fb------2</link>
            <guid isPermaLink="false">https://medium.com/p/bc4bcf166399</guid>
            <category><![CDATA[scala]]></category>
            <category><![CDATA[case-class]]></category>
            <category><![CDATA[immutable]]></category>
            <category><![CDATA[val]]></category>
            <category><![CDATA[getter]]></category>
            <dc:creator><![CDATA[Tharun Kumar Sekar]]></dc:creator>
            <pubDate>Fri, 10 Apr 2020 12:50:37 GMT</pubDate>
            <atom:updated>2020-05-17T13:47:16.189Z</atom:updated>
            <content:encoded><![CDATA[<h3>Case Classes — Scala</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/635/0*k9J4Q0XLJadBs7FS.png" /></figure><p>Representing data is a key part of writing programs, but it’s also mechanical: you need to define your fields, setters, getters, and other details. When coding in languages which are a bit more verbose, such as Java, you often end up using the tools, such as your IDE, to automatically generate some code for you. What if, instead of seeking support from your IDE, the compiler could do this for you?</p><p>Case classes are good for modeling immutable data. It can be considered as a class with an arbitrary number of parameters for which the compiler automatically adds ad-hoc code. Case classes are the ideal data containers because of these functionalities that encourage the use of immutability.</p><h4>Defining a case class</h4><pre>case class Member(id: Integer, name: String, country: String)<br>val a = Member(10001, &quot;Adam&quot;, &quot;UK&quot;)</pre><p>Notice how the <strong><em>new</em></strong> keyword was not used to instantiate the Member case class. This is because case classes have an <strong><em>apply</em></strong> method by default which takes care of object construction. By default all the parameters in a case class are <strong><em>public. </em></strong>You can’t reassign a variable in a case class because it is a <strong><em>val </em></strong>(i.e. immutable). It is possible to use <strong><em>var </em></strong>but this is discouraged. The compiler also adds other functionalities to a case class.</p><h4>Getters</h4><p>For each parameter, the compiler adds a getter function with the same name of the parameter it refers to. For example, you can easily access the id value by calling the object name and the parameter. Once instantiated, you cannot modify the value, since it is a <strong><em>val.</em></strong></p><pre>println(a.id) //prints 10001<br>a.id = 10002 //throws a compile time error</pre><p>Scala does not generate setter functions for the parameters, since they are immutable.</p><h4><strong>Copying</strong></h4><p>When you want to modify a value of an existing case class, you can use the copy function to create a new data representation. In simple words, you can create a (shallow) copy of an instance of a case class simply by using the <strong><em>copy </em></strong>method.</p><pre>val b = a.copy(id = 10002, name = &quot;Bryan&quot;)<br>println(b.id)      //prints 10002<br>println(b.name)    //prints Bryan<br>println(b.country) //prints UK which was copied from object a</pre><p>You can also change all of it’s parameter at the same time.</p><p><strong>Apply</strong></p><p>One of the topmost benefit of Case Class is that Scala Compiler generates an <em>apply</em> method with the name of the class having identical number of parameters as defined in the class definition, because of that you can create objects of the Case Class even in the absence of the keyword <strong>new</strong>.</p><pre>case class Member(id: Integer, name: String, country: String)<br>val a = Member(10001, &quot;Adam&quot;, &quot;UK&quot;)</pre><p>If you liked this article, click the 👏 so other people will see it here on Medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=bc4bcf166399" width="1" height="1" alt="">]]></content:encoded>
        </item>
    </channel>
</rss>