Programming

The objective of this blog is to handle a special scenario where the column separator or delimiter is present in the dataset. Handling such a type of dataset can be sometimes a headache for Pyspark Developers but anyhow it has to be handled. In my blog, I will share my approach to handling the challenge, I am open to learning so please share your approach as well.

Image for post
Image for post
Source: PySpark

Dataset basically looks like below:

#first line is the headerNAME|AGE|DEP
Vivek|Chaudhary|32|BSC
John|Morgan|30|BE
Ashwin|Rao|30|BE

The dataset contains three columns “Name”, “AGE”, ”DEP” separated by delimiter ‘|’. …

Programming

The objective of this article is to build an understanding to create a data pipeline to process data using Apache Structured Streaming and Apache Kafka.

Image for post
Image for post
Source: Kafka-Spark streaming

Business Case Explanation:

Let us consider a store that generates Customer Invoices and those Invoices come to an integrated platform in a real-time fashion to inform the customer how much “Edge Reward” points they have earned on their shopping.

Invoices from various stores across the City New Delhi travel to Kafka Topic in a real-time fashion and based on total shopping amount, our Kafka consumer calculates the Edge Points and sends a notification message to the customer.

  1. Invoice Dataset

Invoice Data is generated in JSON format. For the ease of understanding, I have not created a nested dataset.

{“BillNo”:”93647513",”CreatedTime”:1595688902254,”StoreID”:”STR8510",”PaymentMode”:”CARD”,”CustomerCardNo”:”5241-xxxx-xxxx-1142",”ItemID”:”258",”ItemDescription”:”Almirah”,”ItemPrice”:1687.0,”ItemQty”:1,”TotalValue”:1687.0}
{“BillNo”:”93647514",”CreatedTime”:1595688901055,”StoreID”:”STR8511",”PaymentMode”:”CARD”,”CustomerCardNo”:”5413-xxxx-xxxx-1142",”ItemID”:”259",”ItemDescription”:”LED”,”ItemPrice”:1800.0,”ItemQty”:2,”TotalValue”:3900.0}
{“BillNo”:”93647515",”CreatedTime”:1595688902258,”StoreID”:”STR8512",”PaymentMode”:”CARD”,”CustomerCardNo”:”5346-xxxx-xxxx-1142",”ItemID”:”257",”ItemDescription”:”AC”,”ItemPrice”:2290.0,”ItemQty”:2,”TotalValue”:4580.0} …

Programming

The objective of this article is to understand to write Python Classes for Oracle DB connectivity to perform CRUD operations that include creating, insert, update, and delete operations. Also, I will touch upon the concept of OOPS Polymorphism- Operator Overloading. The Python library for Oracle DB connectivity used in the article is cx_Oracle.

Image for post
Image for post
  1. Create Oracle table:
create table emp_t 
(empno number,
name varchar2(30),
sal number,
skill varchar(20));

2. Python Class for DML operation:

Create Python EMP class and define a constructor __init__() to instantiate the instance variables, with *args argument to give a flavor of overloading. …

Programming

The objective of this article is to process multiple delimited files using Apache spark with Python Programming language. This is a real-time scenario where an application can share multiple delimited file,s and the Dev Team has to process the same. We will learn how we can handle the challenge.

Image for post
Image for post

The input Data set is as below:

Name@@#Age  <--Header
vivek, chaudhary@@#30 <--row1
john, morgan@@#28 <--row2

Approach1: Let’s try to read the file using read.csv() and see the output:

from pyspark.sql import SparkSessionfrom pyspark.sql import SparkSession
spark= SparkSession.builder.appName(‘multiple_delimiter’).getOrCreate()
test_df=spark.read.csv(‘D:\python_coding\pyspark_tutorial\multiple_delimiter.csv’)
test_df.show()

Programming

The objective of this article is to read data from the Oracle DB table and push the records in JSON format to Kafka Broker and then read messages from Kafka Broker and insert the JSON messages to MongoDB collection.
The blog contains a fundamental ETL messaging system build using Oracle as a source, Kafka as middleware, and MongoDB as the target.

This is my third blog of Kafka series, previous two blog links contain conceptual details of Kafka, link as below:

https://medium.com/towards-artificial-intelligence/getting-started-with-apache-kafka-beginners-tutorial-d38e3634706c?source=friends_link&sk=2b98454c001fb88527fff8c2217947e2

https://medium.com/towards-artificial-intelligence/diving-deep-into-kafka-29160f32d408?source=friends_link&sk=0cf78cc0ee1ef12f0bac1634e71d1550

Image for post
Image for post
Kafka ETL ecosystem
  1. Connect to Oracle and Extract Data:
#import required librariesfrom json import dumps
import json
from kafka import KafkaProducer
import…

Programming

The objective of this blog is to build some more understanding of Apache Kafka concepts such as Topics, Partitions, Consumer, and Consumer Groups. Kafka's basic concepts have been covered in my previous article.

Kafka Topic & Partitions

As we know, messages in Kafka are categorized or stored inside Topics. In simple terms, Topic can be construed as a Database table. Kafka Topics inside is broken down into partitions. Partitions allow us to parallelize a topic by splitting the data of a topic across multiple brokers, thus adding an essence of parallelism to the ecosystem.

Behind the scenes

Messages are written to a partition in an append-only manner, and messages are read from a partition from beginning to end, FIFO mannerism. Each message within a partition is identified by an integer value called offset. An offset is an immutable sequential ordering of messages, maintained by Kafka. …

Programming

The objective of this article is to build an understanding of What is Kafka, Why Kafka, Kafka architecture, producer, consumer, broker, and different components of the Kafka ecosystem. And a small coding exercise with Python-Kafka.

Image for post
Image for post
Kafka Ecosystem

1. What is Kafka?

Apache Kafka is a distributed, publish-subscribe based durable messaging system, exchanging data between processes, applications, and servers. In other terms, it is an Enterprise Messaging system that is highly scalable, fault-tolerant, and agile.

The objective of this article is to understand the implementation of SCD Type1 using Bigdata computation framework Apache Spark.

Image for post
Image for post

To build more understanding on SCD Type1 or Slowly Changing Dimension please refer my previous blog, link mentioned below. Blog contains a detailed insight of Dimensional Modelling and Data Warehousing and use case covered is similar but here solution is implemented using Apache Spark.

https://medium.com/analytics-vidhya/scd-type1-implementation-in-python-e4f6ec13a797

  1. Create Spark Session and Import Datasets:
import pyspark
from pyspark.sql import SparkSession
print(‘Modules Imported’)
spark=SparkSession.builder.appName(‘spark_scd_type1’).getOrCreate()#Import source dataset
emp_src = spark.read.format(“jdbc”) \
.option(“url”, “jdbc:oracle:thin:scott/scott@//localhost:1522/oracle”) \
.option(“dbtable”, “emp_spark”).option(“user”, “scott”) \
.option(“password”, “scott”).option(“driver”, “oracle.jdbc.driver.OracleDriver”).load()
emp_src.show(10)
#import Target dataset, as of now target has zero rows
emp_tgt = spark.read.format(“jdbc”) \
.option(“url”, “jdbc:oracle:thin:scott/scott@//localhost:1522/oracle”) \
.option(“dbtable”, “emp_spark_scd1”).option(“user”, “scott”) \
.option(“password”, “scott”).option(“driver”, “oracle.jdbc.driver.OracleDriver”).load() …

Programming, Python

The objective of this article is to understand various ways to handle missing or null values present in the dataset. A null means an unknown or missing or irrelevant value, but with machine learning or a data science aspect, it becomes essential to deal with nulls efficiently, the reason being an ML engineer can’t afford to get short on the dataset.

Image for post
Image for post
Nulls

Let's check out various ways to handle missing data or Nulls in Spark Dataframe.

Pyspark connection and Application creation

import pyspark
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName(‘NULL_Handling’).getOrCreate()
print(‘NULL_Handling’)

2. Import Dataset

null_df=spark.read.csv(r’D:\python_coding\pyspark_tutorial\Nulls.csv’,header=True,inferSchema=True)
null_df.show()

Data Analytics, Python

The objective of this article is to perform analysis on the dataset and answer some questions to get the insight of data. We will learn how to connect to Oracle DB and create a Pyspark DataFrame and perform different operations to understand the various aspect of the dataset.
Exploratory Data Analysis or (EDA) is an understanding of the data sets by summarizing their main characteristics.

Image for post
Image for post
Data Analysis

As this is my first Blog on EDA, so I have tried to keep the content simple just to make sure I resonate with my readers. …

About

Vivek Chaudhary

Data Engineer by profession, Blogger, Python Trainer and DS/ML enthusiast. Linkedin: linkedin.com/in/vivek-chaudhary-5378a954. Twitter: @vc90

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store