User Intent Prediction & Retargeting via Spark Structured Streaming

Rohan Raj
MiQ Tech and Analytics
6 min readNov 5, 2020

In this article, we will discuss how at MIQ we have built a real-time user intent prediction capability. The focus will be on the technology platforms and the setup we used, but at the same time, we will also cover and discuss the need, relevance, business aspect of it. Further, we will also talk about how this setup is flexible and has been catering to a number of other real-time use cases that have been coming up on the same dataset.

A Very Brief Introduction to Retargeting

Retargeting with Ad-Tech on an e-commerce website can be explained as a practice of persuading potential customers to come back to a website after they have failed to convert on the website. For instance, everyone who shops online has been subjected to retargeting at one point or another. You visit a website and then you go to another and suddenly you see banners of a product you just looked at everywhere.

While retargeting is a really good strategy there is a lot that goes behind the scene. Regarding who we should show the ads to, when should we show the ads, etc. Let’s see how at MIQ we spend every bit of advertising money in an efficient and effective way with the use case of user intent prediction in real-time.

Let’s Deep-Dive into how we do this

Getting the Real-Time Data into a stream

To collect events from a website (linked to the online behavior) the requirement is that every page on that website should contain a very tiny Javascript Code called pixel which calls a pixel server with relevant online behavior data. MIQ has partnered with advertisers and third-party players to place those pixels on their websites. The MIQ pixel server keeps listening to the pixel calls and continuously generates events from these callbacks to push them on to a queuing system, which in our case is AWS Kinesis.

Pixel Firing Event Flow

Using Spark Structured Streaming

If you are slightly familiar with spark structured streaming you might know that it can have different kinds of sources from which it can keep consuming data like Kafka, s3, kinesis, etc. So in our case, as you might have guessed it will be kinesis. We directly read the kinesis data into a Spark data frame, something like this:

var ds_kinesis = spark
.readStream
.format(“kinesis”)
.option(“streamName”, kinesisStreamName)
.option(“endpointUrl”, endpointUrl)
.option(“regionName”, kinesisRegion)
.option(“startingPosition”, “earliest”)
.option(“kinesis.executor.maxFetchTimeInMs”, 10000)
.option(“kinesis.executor.maxFetchRecordsPerShard”, 500000)
.option(“kinesis.executor.maxRecordPerRead”, 10000)
.load()

Running Spark Streaming on our Qubole Infrastructure

Talking about the infrastructure where this code runs, we are using Qubole’s Pipelines Service which helps build streaming pipelines from the Pipelines UI by using out-of-box connectors and operators (in assisted mode). The Qubole platform is being used heavily in MIQ and there is a nice case-study on their website which also talks about the “real-time predictive user retargeting use-case” along with others. Qubole offers a lot of enhancements for our analytics pipelines to be enterprise-ready as well as a lot of support for custom connectors, custom sources and sinks, managed alerting, checkpointing, etc.

Using the data for prediction

For a brand, it’s very important to be able to understand the target audience for their respective customers. Marketing or search activities create awareness to bring a pool of prospective audience to the website. Based on the experience as well as experimentation our Data Science team has built a real-time predictive model in Spark which runs behind the scenes. We enforce the schema and then do the necessary transformations to build the features needed by the model.

val mySchema = (new StructType)
.add("pixel_id", StringType)
.add("dt", StringType)
.add("uid", StringType)
.add("hostname", StringType)
.add("accept_language", StringType)
.add("user_agent", StringType)
.add("timestamp", StringType)
.add("referrer", StringType)
.add("xwap_profile", StringType)
val df1_kinesis = df_kinesis.select(from_json($"data", mySchema).as("data2"), $"approximateArrivalTimestamp").select("data2.*", "approximateArrivalTimestamp").withColumn("date", $"approximateArrivalTimestamp".cast("timestamp")).withColumn("year", year(col("date")))
// Business logic model below

If you want to know more about the prediction model specifics or generally, about the prediction logic from the data science perspective, I would recommend to please keep a watch on MIQ medium blog space. We are soon coming up with another blog on how MiQ’s Data Science uses this solution. The blog will also explain about the Business Value, the Data Relevance Signals, and the Model details being used.

Making Real-Time Data Available for General Purpose Analytics

While we started consuming the data in real-time with this predictive retargeting use case, in particular, to start with. In course of time, we have got a number of new use cases identified where a lot of our analyst stakeholders wanted to do ad hoc analysis on this real-time data. All thanks to near real-time interactive analytics capabilities provided on this data using Presto. Let’s see how this is being done below.

Along with doing the user intent model prediction the pipeline also writes this data in real-time in parquet format in a partitioned s3 location something like this:

val query = df1_kinesis.writeStream
.outputMode(“Append”)
.trigger(ProcessingTime(“60 seconds”)) //write micro batches every 1 min
.format(“parquet”) // write as Parquet partitioned by date
.partitionBy(“year”, “month”, “day”, “hour”)
.option(“path”, parquetOutputPath)
.option(“checkpointLocation”, checkPointPath)
.start()

We have created a hive table on top of this location with a partition structure like: s3://data/year=2020/month=08/day=31/hour=10. There is a scheduled job that keeps adding partitions to the hive table much in advance daily so that we have some buffer time to fix it in case this scheduled job itself fails. From the streaming pipeline, s3 files keep getting added to the hourly folders in s3 as per the micro-batch interval which is 60 seconds as shown above(ProcessingTime).

We use Presto to do ad hoc analysis on top of the hive table created above which is continuously getting real-time data. We also run automated high-frequency jobs that query real-time data for different solutions created by our Business Analysis Team. You might think that — the data gets written to s3 every minute but does it reflect in real-time so that one can query. The answer is — it depends on the hive.metastore-refresh-interval and hive.metastore-cache-ttl. In our case, we have set both to 5 minutes (5m) and we are running a number of production jobs and are able to query data in near real-time with max delays of 5 mins which are good for all of these use cases.

Ad hoc Querying using Presto

Talking about the experience with this platform setup so far — It is pretty scalable, we have used the solution across 50+ clients and it has worked for us smoothly supporting 500–1000 queries per hour. How queries are performing depends on a lot of factors like the volume of data being queried, the no. of nodes and the type of nodes you have in your presto cluster, etc.

Some of the challenges we faced and learnings during running this pipeline in production were, Although we had enabled pipeline failure alerts, sometimes we saw data delays. The delays started to come after running well for a month or two in production. So the pipeline failure alert did not trigger but there were data delays in the final output. To cater to this we have put data delay alerts and infrastructure monitoring alerts in place on top of pipeline failure alerts. We have also enabled auto retrials so as to cope up automatically from some transient network or infra related issues.

As a sort of next step, we are also working on automating the flow for self serve the management of the pipeline by data science users and automated model upgrades by integrating this into our existing Apps & micro-services.

--

--