Build a Scalable Data Pipeline with AWS Kinesis, AWS Lambda, and Google BigQuery

Heather Fan
The Startup
Published in
6 min readApr 18, 2020
Photo by NASA on Unsplash

This blog details how to handle large amounts of event-triggered data for live time backend analysis with AWS Kinesis, AWS Lambda and Google BigQuery.

Design

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service.

Kinesis is a reliable tool for large scale data streaming with competitive pricing comparing to other message queue services. As Kinesis came into the picture, on the consumer side, AWS lambda seems like a natural choice for its easy integration with Kinesis.

One thing to be noted of the Kinesis + Lambda pipeline is error handling. While it’s good to have auto-retry provided by this combination, when it comes to corrupted records the whole pipeline will be stuck by this chunk of data contains the “bad record”. The retry will keep going on until the record is expired (by default 24 hours) and removed from the Kinesis stream.

In order to solve the above issue, an SQS is added here as a DLQ for the corrupted records. The total retry times and option to split failed batch are also configurable in the source mapping configuration for the lambda function.

Setup Kinesis

Kinesis is fairly easy to set up as it doesn’t require much configuration aside from the number of shards.

Streams are

  • Made up of Shards
  • Each Shard ingests data up to 1MB/sec
  • Each Shard emits data up to 2MB/sec

Number of shards needed for a stream can be computed by the formula provided by AWS —

number of shards = max(incoming_write_bandwidth_in_KB/1000, outgoing_read_bandwidth_in_KB / 2000)

After deciding the name of your stream and number of shards needed, we can create the data stream either in the console or through AWS CLI —

aws kinesis create-stream -stream-name <name> -shard-count <number_of_shards> -region <AWS_Region>

Create Lambda Consumer

Photo by Pravin Chavda on Unsplash

After setup the pipe, now it’s time to create the consumer. Event though node.js is the most recommended language to use when it comes to Lambda, Java as my main coding language is what I’m most confident in so I decided to stick to it (as there are already LOTS of uncertainties and learnings during the whole infrastructure setup).

The simple handler function looks like this-

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, String> {private static final BigQueryService bigQueryService = new BigQueryService();
private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private static final ObjectMapper mapper = new ObjectMapper();
@Override
public String handleRequest(KinesisEvent event, Context context) {
LambdaLogger logger = context.getLogger();
// log execution details
logger.log(“ENVIRONMENT VARIABLES: “ + gson.toJson(System.getenv()));
logger.log(“CONTEXT: “ + gson.toJson(context));
// process event
logger.log(“EVENT: “ + gson.toJson(event));
Map<String, Map<String, Object>> rows = new HashMap<>();
for (KinesisEventRecord rec : event.getRecords()) {// extract data
KinesisEvent.Record record = rec.getKinesis();
String id = rec.getKinesis().getPartitionKey() + rec.getKinesis().getSequenceNumber();
Charset charset = StandardCharsets.UTF_8;
String json = charset.decode(record.getData()).toString();
String recordLog = String.format(“RECORD: RECORD ID: %s, DATA: %s”, id, json);
logger.log(recordLog);
try {
Map<String, Object> row = mapper.readValue(json, Map.class);
rows.put(id, row);
} catch (Exception e) {
logger.log(String.format(“ERROR: PARSING ERROR, RECORD: %s, ERROR: %s, %s”, recordLog, e.getMessage(), e.getCause()));
throw new ParsingException(e.getMessage(), e.getCause());
}
}
// insert into big query
try {
bigQueryService.insertTransactionData(rows);
} catch (Exception e) {
logger.log(String.format(“ERROR: BQ ERROR, EVENT: %s, ERROR: %s, %s”, gson.toJson(event), e.getMessage(), e.getCause()));
throw new BigQueryException(e.getMessage(), e.getCause());
}
return “200 OK”;
}
}

Some notes here-

  1. The exceptions thrown in the handler will be handled by the error handling mechanism of lambda function and counted into the number of retries.
  2. The combination of partition key and sequence number is used here as an insert ID for the insertion to the BigQuery table for de-duplication. The sequence number is only unique within the same shard, so partition key (which is used by Kinesis to decide which shard the data goes to) is also needed here.
  3. Data is handled in bulk and inserted into BigQuery in bulk.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>example</groupId>
<artifactId>consumer</artifactId>
<version>1.0-SNAPSHOT</version>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="com.github.edwgiz.maven_shade_plugin.log4j2_cache_transformer.PluginsCacheFileTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.github.edwgiz</groupId>
<artifactId>maven-shade-plugin.log4j2-cachefile-transformer</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>3.3.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Big Query -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
<!-- Lambda -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.2.7</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-log4j2</artifactId>
<version>1.1.0</version>
</dependency>
<!-- Kinesis -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.762</version>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.3</version>
</dependency>
</dependencies>

</project>

Integrate with BigQuery

Dat has a better idea by Franki Chamaki on Unsplash

Some examples of streaming data into BigQuery can be found in Google Cloud documentation.

Create a service account for your lambda function in Google Cloud Platform before getting started, add a copy of the credentials.json in your project directory, it will be needed to access BigQuery.

public class BigQueryService {private BigQuery bigQuery = null;public BigQuery getBigQuery() throws IOException {
if (bigQuery == null) {
InputStream in = BigQueryService.class.getResourceAsStream(
BigQueryConsts.CREDENTIALS_FILE_PATH);
GoogleCredentials cred = ServiceAccountCredentials.fromStream(in).createScoped(BigqueryScopes.all());
bigQuery = BigQueryOptions.newBuilder().setCredentials(cred).build().getService();
}
return bigQuery;
}
public void insertTransactionData(Map<String, Map<String, Object>> rows) throws IOException {// build bulk insert request
TableId table = TableId.of(BigQueryConsts.PROJECT, BigQueryConsts.DATASET, BigQueryConsts.TABLE);
InsertAllRequest.Builder insertRequestBuilder =
InsertAllRequest.newBuilder(table);
rows.forEach(insertRequestBuilder::addRow);
InsertAllRequest request = insertRequestBuilder.build();
// make request
BigQuery bigQuery = getBigQuery();
InsertAllResponse insertResp = bigQuery.insertAll(request);
if (insertResp.hasErrors()) {
throw new IOException((Throwable) insertResp.getInsertErrors());
}
}
}

Upload to Lambda

Finally, it’s time to compile the code and upload it to lambda.:beers:

Before we create the function, an AWS Lambda Execution Role is required with read-only access to Kinesis, write access to SQS (for DLQ) and write access to CloudWatchLogs.

Creation of lambda function can be as simple as —

aws lambda create-function --region <AWS_REGION> --function-name <FUNCTION> --zip-file <YOUR_JAR> --role <EXECUTION ROLE CREATED> --handler ProcessKinesisRecords --runtime java11

Lambda function must be in the same region as the Kinesis data stream set up earlier. Handler is the name of the class that contains the handler method.

Create DLQ

Source mapping configuration for Kinesis-to-Lambda supports both SNS topic and SQS queue as DLQ, here we’ll use Amazon SQS dead-letter queues — Amazon Simple Queue Service.

Command to create queue —

aws sqs create-queue -queue-name <NAME>

Configure Event Source Mapping

Sometimes you just have to look up. Photo by Joshua Sortino on Unsplash

After everything is UP, it’s time to put all the pieces together, source map the Kinesis to Lambda with SQS DLQ.

aws lambda create-event-source-mapping —function-name <FUNCTION> \
—batch-size 500 —starting-position TRIM_HORIZON \
—event-source-arn arn:aws:kinesis:xxxx \
—maximum-retry-attempts 3 \
—destination-config ‘{“OnFailure”: {“Destination”: “arn:aws:sqs:xxxxxxxx”}}’ \
—bisect-batch-on-function-error true

The above command will add the Kinesis as source, SQS as DLQ, with a batch processing size of 500, retry-limit 3 and split batch into two on error (which is not counted into total retries).

Sit Back and Watch the Data Flow

Congrats! Now that we’ve finished the setup on the infrastructure of the data pipeline with Kinesis and Lambda, it’s time to enjoy the live stream of data into your BigQuey data warehouse.

Happy hacking!

--

--

Heather Fan
The Startup

Full-time software developer, part-time podcaster, love to build and break stuff with code ❤