Data Stream Prediction, using [WSO2 CEP][H2O.ai]
In real world data transfer in different ways. When it comes to analyzing those data, we can use batch analytics, real-time analytics, interactive analytics and predictive analytics. This article will guide you through how predictive analysis can be done using WSO2 CEP.
In predictive analytics, it goes through different phases. First, we have to collect the data relevant to the scenario that we are going to process. Then it is required to do some analysis on top of those data to verify their behavioral patterns. Then we can do model the scenario with cleaned data. After the modeling part has done, we can proceed to the prediction phase as data arrives.
The following guide will describe how model a scenario and real-time predictions can be done using WSO2 products.
Prerequisites
- Pandas
- H2o.ai
- WSO2 CEP
- Dataset
Setting up environment
- Download WSO2 CEP and extract it.
- Install h2o.ai in your python environment.
- Place h2o_genmodel.jar in <CEP_HOME>/repository/components/lib.
- Place featureeng-1.0-SNAPSHOT.jar in <CEP_HOME>/repository/components/dropins
- Place h2opojo-1.0-SNAPSHOT.jar in <CEP_HOME>/repository/components/dropins
- Install pandas in your python environment.
- Add featureeng and h2omodelext packages to your python project.
1) Model the Scenario
The sample data set that we are going to discuss is Combined Cycle Power Plant. This data set is available at
https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant
Before training a machine learning model sometimes it is required to do some feature engineering process to the given data set. These features will help machine learning algorithms to identify the patterns hidden beneath data. Use featureeng package to generate more features to your data set.
Here we have calculated moving average for all the features.
import pandas as pd
from featureeng import Frame
# Load csv to pandas frame
data = pd.read_csv('ccpp.csv')
# Create feature processor frame
frame = Frame(data)
columns = ['AT', 'V', 'AP', 'RH']
# Apply feature engineering for each column
for column in columns:
frame.apply_moving_average(input_column=column, window=5)
# Reorder columns in the dataset
column_order = [ 'AT', 'V', 'AP', 'RH', 'AT_ma_5', 'V_ma_5', 'AP_ma_5', 'RH_ma_5', 'PE']
frame.order_columns(column_names=column_order)
# Output modified data frame
frame.save_file(file_name='modified_ccpp.csv')
At the end of the program modified data set has saved as ‘modified_ccpp,csv’. Now your data set is ready to train.Here we are using H2O.ai to train the model. Code snippet for model generating part is given below.
import h2o
from h2o.estimators import H2ORandomForestEstimator
from h2omodelext import ModelWrapper
# Initialize h2o instance
h2o.init()
# Read csv file
data = h2o.import_file('modified_ccpp.csv')
# Split data into train and test
train, test = data.split_frame(ratios=[0.8])
# Define model
model = H2ORandomForestEstimator(ntrees=50, max_depth=20, nbins=100)
# Define input and response columns
input_columns = ['AT', 'V', 'AP', 'RH', 'AT_ma_5', 'V_ma_5', 'AP_ma_5', 'RH_ma_5']
response_column = 'PE'
# Train model
model.train(x=input_columns, y=response_column, training_frame=train)
print model.model_performance(test_data=test)
# Save model
ModelWrapper.save_model(path='/home/wso2123/PycharmProjects/FeatureProcessor', model=model)
save_model() method will compile the output model java file and save the generated class files in a folder named by model’s name.
Note :
MODEL_PATH = Location to this model folder
Now we have completed the first phase of the flow. Then it is time to deal with real-time data streams. Run WSO2 CEP server.
2) Real Time Prediction
Setting up Streams
1)Data_input stream
Data source contains 4 different values. Temperature(T), vacuum(V), ambient pressure(AP) and relative humidity(RH). We need 4 streams to carry those data inside to the system.
Streams > Add Event Stream
Define each attribute that your payload contains.
Finally press `Add Event Stream` button to proceed with stream creation part.
2) feature_engineered stream
The model that we have trained, it is required to have 8 input streams to generate a prediction. So we need to create a stream that can hold 8 attributes.
3) prediction_out stream
Finally, we have to output the prediction result. Usually, prediction result will pass as a string. With the prediction result, you can decide which attributes to be published.
To see what you are outputting, let’s place a logger publisher to the prediction_out stream. It will display the results in your console.
Now you have configured the baseline to your execution environment. It is time to create an execution plan. Let’s first create an execution plan for feature processing. Then we can create an another one for prediction.
Feature Processing Execution Plan
/* Enter a unique ExecutionPlan */
@Plan:name(‘FeatureProcessing’)
/* Enter a unique description for ExecutionPlan */
— @Plan:description(‘ExecutionPlan’)
/* define streams/tables and write queries here … */
@Import(‘data_input:1.0.0’)
define stream data_input (T double, V double, AP double, RH double);
@Export(‘feature_engineered:1.0.0’)
define stream data_output (T double, V double, AP double, RH double, T_5 double, V_5 double, AP_5 double, RH_5 double);
from data_input#window.length(5)
select T, V, AP, RH, featureeng:movavg(5, T) as T_5, featureeng:movavg(5, V) as V_5, featureeng:movavg(5, AP) as AP_5, featureeng:movavg(5, RH) as RH_5
insert into data_output
Prediction Engine Execution Plan
‘ccpp/DRF_model_python_1479702792496_1’ is the path where model is located.
/* Enter a unique ExecutionPlan */
@Plan:name(‘PredictionEngine’)
/* Enter a unique description for ExecutionPlan */
— @Plan:description(‘ExecutionPlan’)
/* define streams/tables and write queries here … */
@Import(‘feature_engineered:1.0.0’)
define stream data_input (T double, V double, AP double, RH double, T_5 double, V_5 double, AP_5 double, RH_5 double);
@Export(‘prediction_out:1.0.0’)
define stream data_output (T double, V double, AP double, RH double, prediction string);
from data_input#h2opojo:predict(‘ccpp/DRF_model_python_1479702792496_1’)
select T, V, AP, RH, prediction
insert into data_output
After adding those execution plans, final flow will be like this.