Analyze your web traffic in near real-time with AWS Kinesis

Sam Joshva Baskar Jesudasan
Ankercloud Engineering
9 min readNov 24, 2019

There is a need for business analysts to track and identify the geolocation of their site visitors. Real-time tracking can help provide more fitting recommendations for site improvement. This information will help to improve and promote site traffic. Third-party analytics for e-commerce and other websites are available, however, it might not be in the interest of the stockholders or architects to send the data out to external service providers. Therefore there is a need to implement real-time analytics with tools that are internally available.

In the AWS world, the entire process is simplified through AWS Kinesis. This blog is a walk-thru of how we utilized AWS Kinesis to demonstrate near real-time tracking of the user’s geolocation on a website hosted on Apache server.

Overall Process:

  • Install the GeoIP module on apache to track the user’s location.
  • Create a custom access log format to log user locations.
  • Install Kinesis Agent on all the web servers to stream the access log to Kinesis Datastream.
  • Kinesis Analytics will collect user country information in real-time with 10 seconds window.
  • Kinesis Firehose will push the Kinesis analytics results to S3.
  • Athena will query the data in S3.
  • QuickSight will visualize the Athena query results.

Architecture:

Enable GeoIP on apache access log:

We will first enable GeoIP and create custom server logs.

apt-get install libapache2-mod-geoip

sudo a2enmod remoteip

vi /etc/apache2/mods-available/geoip.conf

<IfModule mod_geoip.c>

GeoIPEnable On

GeoIPDBFile /usr/share/GeoIP/GeoIP.dat

</IfModule>

Add the below lines in apache2.conf file to set up custom access logs.

sudo vi /etc/apache2/apache2.conf

-- comment/remove the line below

LogFormat "%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" combined

-- add these lines

RemoteIPHeader X-Forwarded-For

LogFormat "%{GEOIP_COUNTRY_CODE}e|%a|%l|%u|%t|\"%r\"|%>s|%O|\"%{Referer}i\"|\"%{User-Agent}i\"" combined

To apply the above changes restart Apache.

service apache2 restart

Check the access log and verify if the geolocation is added.

tail /var/log/apache2/access.log

US|162.243.6.123|-|-|[18/Dec/2018:16:21:59 +0000]|"GET /feed/ HTTP/1.1"|200|3125|"-"|"Mozilla/5.0 (X11; U; Linux i686; pl-PL; rv:1.8.1.10) Gecko/20071213 Fedora/2.0.0.10-3.fc8 Firefox/2.0.0.10"

We used the pipe as a delimiter because while queuing for streamed data on Kinesis Analytics, spaces will create some issues. So it is recommended to use a pipe or comma as a delimiter.

Create a Kinesis DataStream:

Go to AWS Console → Kinesis → Datastream

Click on the created kinesis stream.

  • Kinesis stream name: real-time-analysis
  • Shards: 1

Note: 1 shared can able to,

Read: 2MB/Sec

Write: 1MB/Sec

Transfer: 1000 records/Sec

Based on your log generation you can add up to 500 shards

Install and configure Kinesis Agent:

Note: The latest git commit for this agent is not working on Ubuntu. Hence we will use a previous commit of Kinesis Agent.

-- Install Java

sudo add-apt-repository ppa:webupd8team/java

apt update

sudo apt install oracle-java8-installer

-- Install git

apt install git

cd /opt

git clone https://github.com/awslabs/amazon-kinesis-agent.git

-- Install Kinesis agent

cd /opt/amazon-kinesis-agent

git reset --hard c29662e

./setup --install

You need to use an EC2 role which has access to Kinesis to put in the data. Or create an IAM user with full access to Kinesis Data Stream and use its keys to authenticate Kinesis.

In our case,

  • Kinesis Region: Virginia (us-east-1)
  • CloudWatch region: Virginia (us-east-1)
  • Stream Name: real-time-analysis
  • Where to Stream Access logs: Start streaming from the last line of the file.

vi /etc/aws-kinesis/agent.json

{

"awsAccessKeyId": "XXXXXXXXXXXXXXXX",

"awsSecretAccessKey": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",

"cloudwatch.emitMetrics": true,

"kinesis.endpoint": "kinesis.us-east-1.amazonaws.com",

"flows": [{

"filePattern": "/var/log/apache2/access.log",

"kinesisStream": "real-time-analysis",

"initialPostion": "END_OF_FILE",

"maxBufferAgeMillis": "2000"

}]

}

Grant Kinesis agent to read the access log:

sudo setfacl -m u:aws-kinesis-agent-user:rwx /var/log/apache2/

Start streaming the data:

service aws-kinesis-agent start

service aws-kinesis-agent status

● aws-kinesis-agent.service - LSB: Daemon for Amazon Kinesis Agent.

Loaded: loaded (/etc/init.d/aws-kinesis-agent; bad; vendor preset: enabled)

Active: active (running) since Tue 2018-12-18 16:37:19 UTC; 3s ago

Docs: man:systemd-sysv-generator(8)

.....

.....

Dec 18 16:37:19 ip-172-31-12-105 systemd[1]: Started LSB: Daemon for Amazon Kinesis Agent..

Checking and verifying that the Kinesis Agent data is going to Kinesis data stream.

vi /var/log/aws-kinesis-agent/aws-kinesis-agent.log

2018-12-17 15:49:30.220+0000 (FileTailer[kinesis:realtime-traffic:/var/log/apache2/access.log].MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.tailing.FileTailer [INFO] FileTailer[kinesis:realtime-traffic:/var/log/apache2/access.log]: Tailer Progress: Tailer has parsed 2 records (1440633 bytes), transformed 0 records, skipped 0 records, and has successfully sent 2 records to destination.

2018-12-17 15:49:30.223+0000 (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 2 records parsed (1440633 bytes), and 2 records sent successfully to destinations. Uptime: 90072ms

Query streamed data in real-time with Kinesis Analytics:

Go to AWS Console → Kinesis → Kinesis Analytics

Click on Create Application.

  • Application name: realtime-analytics
  • Runtime: SQL

The connected streaming data can be found under the source section.

Select Choose Source.

  • Source: Kinesis stream
  • Kinesis stream: real-time-analytics
  • Access permissions: Create/update IAM role kinesis-analytics-test1-us-east-1 (If you already have an IAM role with the necessary permissions, then you can use the existing one. However, we recommend choosing to create a user which will have all necessary permission.)
  • Schema: Kinesis is able to understand the schema of the streamed data. Often, it’ll detect automatically. But in the worst-case or if we can unstructured data we’ll end up with like below.

To structure the data, click on Edit schema to make the following changes and save it.

Kinesis Analytics is ready. Let’s see the real-time aggregated data.

Under RealTime Analytics, Click on Go to SQL Results.

In the query window, type the below query.

(Note: again we are notifying you that, in this blog, we are capturing user locations, so the query is made to collect the country column alone, If you want to do further analytics, you can create the stream with necessary columns)

-- From the stram we need Country only and add number of request by each country.

-- Create the steam only for Country and its request count.

CREATE

OR REPLACE STREAM "access_log_stream" (

"country" VARCHAR(10),

"total" INTEGER

);

-- Pump will continuosly insert the data to the above stream in 10 seconds interval.

CREATE

OR REPLACE PUMP "access_log_pump" AS INSERT INTO "access_log_stream"

SELECT

STREAM "country",

count(*) as "total"

from

"SOURCE_SQL_STREAM_001"

group by

"country",

FLOOR(

(

"SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00'

) SECOND / 10 TO SECOND

);

The above query is executed.

This window will get aggregated results from the Stream. But to visualize, one can integrate this kinesis analytics into their own application. However, here we are going to integrate it with QuickSight. QuickSight will not directly support Kinesis. Therefore we need to push the query results to S3 first. Following which, Athena can be used to query and QuickSight to visualize the results. We can directly visualize the S3 data too. But Athena is a good choice to analyse historic data for sites with heavy traffic.

Create a Kinesis Firehose to push data to S3:

Kinesis Analytics will not write the data to S3, instead, we need to push the results to Kinesis Firehose and Firehose will put the data on S3.

Go to AWS Console → Kinesis → Kinesis Firehose.

Click on the Create Delivery Stream button.

  • Delivery stream name: real-time-output
  • Source: Direct PUT or other sources

Click the Next Button.

  • Record transformation: Disabled
  • Record format conversion: Disabled

Click the Next Button.

  • Destination: Amazon S3
  • S3 bucket: serverless-data lake (our S3 bucket name, you can choose yours)
  • Prefix: kinesis-realtime-output/

Click the Next button to view the configuration of the Firehose.

The data will be pushed to S3 when the query result set reaches 1MB or every 60 seconds.

  • Buffer size: 1MB
  • Buffer interval: 60 seconds

The rest of the options are left as default.

  • IAM role: Create/Choose role.

Firehose will select the default role and automatically apply the necessary permission. Just click on the Allow button.

Click Next and click on the Create Delivery Stream button.

Wait for a few seconds and it’ll be available.

Push Kinesis Analytics query results to S3:

Go to Kinesis → Kinesis Analytics.

Under Destination, click on Connect to a destination.

  • Destination: Kinesis Firehose delivery stream
  • Kinesis Firehose delivery stream: real-time-output
  • Connect in-application stream: Choose an existing in-application stream
  • In-application stream name: access_log_stream (this is the steam which we created from SQL query)
  • Output format: CSV
  • Access permissions: Create / update IAM role kinesis-analytics-realtime-analysis-us-east-1

Click the Save and Continue button.

After a minute, you see the results in S3. Also it’ll push the data in proper subfolders like yyyy/mm/dd/files.

Create an external table in Athena:

To query these S3 files, we created an external table in Athena.

Go to AWS Console → Athena

Run the query below to create the table.

CREATE EXTERNAL TABLE traffic_analysis

(

country string,

count int)

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'

WITH SERDEPROPERTIES ("separatorChar" = ",", "escapeChar" = "\\"

) LOCATION 's3://serverless-datalake/kinesis-realtime-output/'

Get the results from Athena:

select

country,

sum(count) as count

from

traffic_analysis

where

country != '-'

group by

country

order by

count desc

Create QuickSight Dashboard:

AWS Console → QuickSight → New analysis → New dataset

  • Select Athena as a DataSet.
  • Athena is selected as the datasource name.
  • Click Validate connection.
  • Once it’s validated, click on Create datasource button.

Note: If S3 access denied error occurs. It could be because QuickSight is denied permission to access the S3 bucket.

Grant QuickSight to access the buckets.

  • Click account Icon → Manage QuickSight → Account settings → Manage QuickSight Permissions.
  • This will show a list of buckets. Grant all buckets access or select the bucket which is used by Athena. (the bucket details can be obtained in Athena console too).

Once the dataset has been created, it’ll ask to select the database name table name from Athena.

Based on your QuickSight Plan (standard/Enterprise) it’ll show the SPICE allocation.

Click on Visualize button.

From the field list, select the country and count.

Select the graph type.

That is the end of this walk thru. We hope the very brief introduction showcases the simplicity of of AWS Kinesis to set up real-time analytics. Another cost reduction measure would be to eliminate Kinesis and use firehose to push the data to S3. This will also work, however, it should be noted that there will be some delay to get data and visualize it. This delay can be avoided if real-time analytics is implemented with Kinesis Analytics with an overall better experience and streamlined workflow.

Cleanup:

  • Stop kinesis agent on Ec2.
  • Uninstall Kinesis Agent.
  • Delete Kinesis DataStream.
  • Delete Kinesis Analytics.
  • Delete Kinesis Firehose.
  • Delete S3 Bucket.
  • Delete Athena Tables.
  • Delete DataSets and analysis.

Happy Streaming

--

--

Sam Joshva Baskar Jesudasan
Ankercloud Engineering

I am a Data Scientist experienced in the education sector and biomedical sciences.