Event-Driven Data Preprocessing

Victor Nitu
Phi Skills
Published in
12 min readMay 1, 2020
Photo by SwapnIl Dwivedi on Unsplash

Molding raw information into data is a craft that empowers its owner. The art of data preprocessing finds structure inside the chaos alike pottery transforms mud into beautiful porcelain. In the era of big data, the massive amount of information enterprise posses are useless if they are not structured into meaningful data. More information gives higher power if it produces useful knowledge.

First, we saw how to build an event-driven infrastructure in my previous article, Event-Driven Data Collection. Then, inside the following article, Event-Driven Data Visualization, we used different visualization tools to explore the generated data. But now, without preprocessing, we are limited to the existing. In this article, we are going to see how to generate new information out of existing records. This skill gives us access to an almost unlimited source of data. You can find the example project of this article on GitHub in the repository called victornitu/phi-preprocessing-example.

Overview

This article is divided into three sections. The first two sections respectively tackle the preprocessing of entities and events. Preprocessing entities are relatively straightforward. Events, on the other hand, are slightly more tricky. In the third section, we discuss when and how to extend handlers with preprocessing, in our case, with the code we defined earlier.

We are going to experiment inside a Jypyter notebook¹ using Python 3.8 to code our preprocessing jobs. In our examples, events are stored inside Kafka² and entities inside MongoDB³. We use kafka-python⁴ and PyMongo⁵ to read from Kafka and MongoDB. Additionally, we also use pandas⁶ and matplotlib⁷ to manipulate and visualize the data.

Before we start, we store the addresses of the different services we are going to query. Those services are exposed when you run the PHI architecture example locally.

Preprocessing Entities

We start by preprocessing entities. Entities are data that have already been preprocessed during data collection. But because it’s impossible to accurately predict what data is going to be needed in the future, you eventually need to add a layer of preprocessing.

They are two ways of accessing entities. If the inspector exposes the data you need, you can access your entities through the API. This way is preferred and safer. However, the data you need might not be accessible via the inspector. In that case, you can directly access the entities from the database.

In our example, both products and customers are available through the inspector API. Still, to show an example of how to access entities from a database, we query our products directly from the entity-store.

We start by creating a MongoDB client. This client then accesses the database entities and the collection products. We then call the method find that returns a list of all entries.

We store those database entries inside a pandas DataFrame. It is easier to manipulate and transform our data with pandas.

We get four columns:

  1. _id: the technical database id of an entry
  2. name: the name of a product
  3. watched: the number of times a product has been watched
  4. bought: the number of times a product has been bought

The database id is not giving us any useful information in our context, so we remove it with the method drop. We know that in case a value watched or bought is missing, it means it never happened. In this case, we can safely replace all missing values with zeros with the help of the method fillna. Finally, we can use the name of the products as the index of our DataFrame with the method set_index.

We now have a clean DataFrame with the index set to the name of the products and the two columns we want to explore: watched and bought.

We want to know what products are selling well. The conversion rate is a good metric we can use for that. To calculate the conversion rate, we need to divide the number of times a product was bought with the number of times it was watched.

We can see that the product Sugar has the lowest conversion rate. The product Pepper, on the other hand, has a conversion rate above 100%, which indicates people are buying this product without needing to watch it.

To simplify the analysis of the conversion rate, we categorize our products into two categories: good and bad. We use an arbitrary threshold of 50%.

We see that the products Salt and Pepper have a good conversion, while the product Sugar has a bad conversion.

The conversion rate is interesting, but it is equally important to know if a product sells well. Similarly to the previous example, we categorize our product in a new category sales. This category shows if a product’s sales are above or below average.

The product Salt stands out. It has a good conversion rate and has sales above average. The product Sugar, on the other hand, scores poorly.

Now, let's have a look at our customers. Here, we can simply call the inspector API. Similarly to our products, we set the name of our customers as the index of our DataFrame.

We get four columns:

  1. Sugar: the amount of product Sugar the customer bought
  2. Salt: the amount of product Salt the customer bought
  3. Pepper: the amount of product Pepper the customer bought
  4. products: the total amount of products the customer bought

For our customers, we categorize them by the total number of products they bought. We use the mean as reference.

We have 50% of each. Our two best customers are Victor and Morgane.

To be able to focus our analyses, we will just keep the customers from the category good. We also remove the columns products and sales as they are no longer needed. Additionally, we transpose our table to have the same indexes as our products.

To have a better idea of how the purchases are distributed among our customers, we replace the number of purchases with the proportion of purchases compared to the total amount of all purchases.

Almost 50% percent of all purchases are equally distributed between the products Salt and Pepper for the customer Victor. On the other hand, more than 25% of purchases are concentrated in one product for the customer Morgane.

To have deeper insights, we can combine our DataFrames to obtain a better overview of our products and customers. By previously transposing our customers’ table, we have the same indexes for both tables, which allows us to easily join them.

We can now match the categories of our products with the proportion of sales. We can observe that although the product Pepper has bad sales, it represents almost 25% of sales. And, it is almost half of the purchases of the customer Victor. This customer is the biggest consumer of the product Salt, our top product. It might be interesting to see if there is a correlation between these two products.

It would be interesting to visualize for each product the proportion of sales per customer.

This clearly shows that the products Sugar and Pepper are bought by different types of customers. Given that the product Pepper has a better conversion rate, we could suppose that its sales might increase over time with new customers.

Preprocessing Events

After entities, we continue with events. Events are a special kind of data: they describe the history and the context of your information. They contain much more information then entities, but they also need more preprocessing.

To obtain all our events, we need to query our event-store with a Kafka consumer. Our events are in the Kafka topic events. By default, a consumer starts reading from the lastest message and never stops reading. To tell the consumer to read all messages from the beginning, we set the parameter auto_offset_rest to earliest. And to tell him to stop reading after the last message, we set the parameter consumer_timeout_ms to 1000. This will force him to stop after a thousand milliseconds if he hasn’t received any new messages.

The result is a list of raw Kafka consumer records. It contains a lot of extra information that we could use to enrich our events.

To have a better view of what information we can get out of those records, let’s put it inside a DataFrame and see what we have.

We can see that a record gives us a field called offset. This field allows us to order our events chronologically. Another very useful field is called timestamp. This one can help us determine how much time passed between two events. And of course the most important field we need it the field called value. This field contains our event.

To read our events we are going to create a new DataFrame. We will use the method to_json with the parameter orient set to records.

We get three columns:

  1. name: the name of the event
  2. version: the version of the event
  3. payload: the payload of the event

Each type of event will have a different payload. If the payload of different events is too different, it’s better to separate them into different DataFrames before deserializing the payload. But in our case, the differences between our two type of events, product_watched and product_bought, is not significant enough. The only differences are the properties visitor and customer, but they both hold the name of a customer. We know we can easily match them together.

In our next step, we are going to update our DataFrame and replace the column payload with all the deserialized properties of our events: product, visitor, and customer. Additionally, we can extract the timestamp of each events from the Kafka records.

Now, we have a DataFrame with each event that has occurred in our system. We can start playing with those data to extract some information.

The first thing we can do is to start filtering our DataFrame. For example, we can extract all the events with the name product_bought.

This is very useful to work on each type of event separately.

Another example: we can extract all the events that are related to a specific customer.

Here, we have a list of all the events related to the customer Michele.

To continue our analysis, we are going to clean up the data we got. First, we don't need the version or the timestamp of an event, so we remove it. Then, we don’t need to make a distinction between visitor or customer; we can merge both columns in one column customer.

Now we have an accurate list of all events with only the information we need. We can see we have a lot of duplicates because customers have been performing some actions multiple times.

What we are interested in, is to know how often our customers have been performing certain actions. The method pivot_table will allow us to group and count all the duplicates in our DataFrame. Additionally, we will use the method unstack to set the customers as the columns and the method fillna to replace all missing values with zeros.

We can now see how often a customer has produced a particular event for a particular product. Our DataFrame contains hierarchical indexing for the name of the event and the name of the product. It allows us to represent an additional dimension to our data.

Finally, we can now visualize the differences between each type of event with two horizontally stacked bar charts.

The first chart shows us which customers are watching what product and in what proportion. In the second chart, we can see which customers are buying what product and in what proportions. This gives us a good example to show why events are important. It would have been impossible to represent this chart only with entities since the information visitor has not been used during data collection.

Extending Handlers

As mentioned earlier, entities are data that have been preprocessed. The service in charge of this is the handler. It’s a good practice to start building your preprocessing pipelines inside a Jupiter notebook. Once you are satisfied with the result, you can extend the handler with those new preprocessing pipelines. However, it should not be automatic. Using the handler to preprocess data has a cost: it increases the amount of data that is stored.

If we have a look at some of our previous example, we can try to evaluate what preprocessing could be moved to the handler. The conversion rate, for instance, could easily be computed each time a product is updated. And the conversion rate category, if the threshold doesn't change over time, could also be updated at the same time as the conversion rate.

Another interesting example is when we preprocessed the event product_watched. We noticed we didn’t use the property visitor of the event. It might be interesting to create another entity that would contain the information we had inside the table we obtained with the method pivot_table.

How Does it Scale

When we are dealing with significant quantities of information, preprocessing all the data can quickly become very expensive. To reduce the amount of computation, it’s a good practice to store the results. By using the handler to preprocess that data as the events come in gradually, it has two benefits. Firstly, if you need it more then once, you won’t need to preprocess it multiple times. This benefit can save much computation. Secondly, if you have numerous data, preprocessing all of it at once can take considerable time. If possible, it’s a good practice to spread the computation as much as possible over time to avoid bottlenecks.

However, storage is not unlimited and free. Knowing when to store preprocessed data and when not, is essential to optimize time and costs. If a particular preprocessed data is only used once every month, it might be wiser to use a cron job to build, use, and destroy it once it’s not needed anymore. At the same time, thanks to cloud computing, it has never been easier to balance your needs. And storage has never been cheaper either.

Conclusion

Unfortunately, there is no magic formula to rule the world. But with all the tools we have seen so far, you have everything you need to rule your business data. By using an event-driven approach to data collection, visualization, and preprocessing, you are always going find a way to obtain the information you need. Now that we have all the knowledge we want, we can move to the next step: leveraging the power of Machine Learning!

[1] Project Jupyter (2020). Installing the Jupyter Software — https://jupyter.org/install.html

[2] Apache Software Fondation (2017). Introduction https://kafka.apache.org/intro

[3] MongoDB, Inc (2020). The database for modern applications — https://www.mongodb.com/

[4] Dana Powers, David Arthur, and Contributors Revision 34dc36d7(2016). kafka-python — https://kafka-python.readthedocs.io/en/master/

[5] MongoDB, Inc (2020). PyMongo 3.10.1 Documentation — https://pymongo.readthedocs.io/en/stable/

[6] The pandas development team (2014). pandas documentation — https://pandas.pydata.org/pandas-docs/stable/

[7] The Matplotlib development team (2018). Matplotlib: Visualization with Python — https://matplotlib.org/

--

--