Carrier Payments Big Data Pipeline using Apache Storm

PayPal Tech Blog Team
The PayPal Technology Blog
4 min readNov 15, 2016

Carrier payments is a frictionless payment method enabling users to place charges for digital goods directly on their monthly mobile phone bill. There is no account needed, just the phone number. Payment authorization happens by verification of a four digit PIN sent via SMS to a user’s mobile phone. After the successful payment transaction, charges will appears on user’s monthly mobile phone bill.

Historically fraud has been handled on the mobile carrier side through various types of spending caps (daily, weekly, monthly, etc.). While these spending caps were able to keep fraud at bay in the early years, as this form of payment has matured, so have the fraud attempts. Over the last year we saw an increase in fraudulent activity based around social engineering. Through social engineering fraudsters are able to trick victims into revealing their PINs.

To tackle the increasing fraud activity we decided to build carrier payments risk platform using an hadoop eco system. First step in building the risk platform is to create big data pipeline that will collect data from various sources, clean or filter the data and structure it for data repositories. The accuracy of our decision-making directly depends on variety, quantity and speed of data collection.

Data comes from various sources. Payment application generates transaction data. Refund job generates data about refunds processed. Customer service department provides customer complaints regarding wrong charge or failed payments. Browser fingerprint can help establish another identity of transacting user. All these sources generate data in different formats and at varying frequencies. Putting together a data pipeline for this is challenging.

We wanted to build a data pipeline that processes data reliably and has the ability to replay in case of failure. It should be easy to add or remove new data sources or sinks into pipeline. Data pipeline should be flexible to allow schema changes. We should be able to scale the data pipeline.

System Architecture

First challenge was to provide a uniform and flexible entry point to data pipeline. We built a micro service with single REST end point that accept data in JSON format. This makes it easy for any system to send data to pipeline. The schema consists of property “ApplicationRef” and JSON object “data”. Sending data as JSON object allows easy schema changes.

Micro service writes data into kafka which also acts as buffer for data. There is separate kafka topic for any source that sends data to pipeline. Assigning separate kafka topic brings logical separation to data, makes the kakfa consumer logic simpler and allow us to provide more resources to topics that handle large volume of data.

We set up storm cluster to process data after reading it from kafka. To do real time computation on storm, you create “topologies”. Topology is a directed acyclic graph or DAG of computation. Topology contains logic for reading data from kafka, transforming it and writing to hive. Typically each topology consists of two nodes of computation — kafkaspout and hivebolt. Some of the topologies contain intermediate node of computation for transforming complex data. We created separate topologies to process data coming from various sources. At the moment we have 4 topologies — purchase topology, customer service topology, browser topology and refund topology.

Storm allows deploying and configuring topologies independently. This makes it possible for us to add more data sources in future or change existing ones without much hassle. This also allows us to allocate resources to topologies depending on data volume and processing complexities.

architecture

Storm cluster set up and parallelism

We set up high availability storm cluster. There are two nimbus nodes with one of them acting as leader node. We have two supervisor nodes each having 2 core Intel® Xeon® CPU ES-2670 0 @ 2.60GHz.

Each topology is configured with 2-number workers, 4-number executors and 4-number tasks. The number of executors is the number of threads spawned by each worker (JVM process). Together these settings define storm’s parallelism. Changing any of the above settings can easily change storm’s parallelism.

topology

Storm message guarantee set up

Storm’s basic abstraction provides an at-least-once processing guarantee. Messages are replayed only in case of failure. This message semantics are ideal for our use case. The only problem was, we didn’t wish to keep trying failed message indeterminately. We added the following settings to our spoutconfig to stop trying failed message if there are at least five successful messages processed after a failed one. We also increased elapsed time between retrials of failed message.

spoutConfig.maxOffsetBehind = 5
spoutConfig.retryInitialDelayMs = 1000
spoutConfig.retryDelayMultiplier = 60.0
spoutConfig.retryDelayMaxMs = 24 * 60 * 60 * 1000

Results

Our data pipeline is currently processing payment transaction, browser, refund and customer service data at >100 tuples/min rate. The biggest convenience factor with this architecture was staggered release of topologies. It was very easy to add new topology in storm without disturbing already running topologies. Time taken to release new topology is less than a day. Architecture is proved to be very reliable and we haven’t experience any issue or loss of data so far.

--

--