Confluent cloud integration with Logz.io.

Sameer Deshmukh
Globant
Published in
5 min readMay 4, 2022

Introduction

Apache Kafka is a real-time streaming platform which streams data between systems or applications. Confluent Cloud is cloud-native service for data in motion built around Apache Kafka ,a simple, scalable, resilient and secure event streaming.

Confluent cloud used for the problem originally set out to solve was low-latency ingestion of large amounts of event data from the application real-time event processing systems.

The key was the “real-time” processing. Kafka act as buffer between the log collection framework and the ELK stack. It’s ability to multi-sink the logs to different sinks like ELK and SEIM tool.

Logz.io basically ELK stack for log management solution, used for extended retention for historical time-series analysis. Observability point of view it’s very important to monitor the real time confluent cloud kafka resources logs to react quickly to a situation to change an outcome or to prevent a poor result.

S3 sink connector is nothing but a plugin offered by confluent cloud allows us to integrate Apache Kafka® with other apps and data systems in our case export confluent audit and event logs to external systems like s3 bucket for later use. by default confluent cloud storing logs in their internal independent audit logs cluster for only 7 days.

Problem statement

Confluent cloud stored the audit logs and event logs of the customer kafka clusters in their internal independent audit logs cluster and no direct integration available between confluent cloud or logz.io to directly capture the confluent cloud audit logs because of security reasons. but confluent cloud expose some API/rest endpoint to consume those customer account audit logs for security audit and compliance management.

Confluent cloud provides managed s3 sink connector but we can not implement it on confluent cloud internal independent audit logs cluster as earlier mentioned due to security reasons.

To counter this problem we are implementing a self managed s3 sink connector. and consume the Confluent cloud event logs and audit logs.

This article is focused on how we can Export the confluent cloud (SAAS) audit logs and event logs to Logz.io.

High Level Overview

Points are covered in this article

1. Prerequisites

2. Self managed S3 sink connector configuration/implementation.

3. Consume audit and event logs from Confluent cloud internal audit log cluster to AWS S3 bucket.

4. Exporting audit logs from Amazon S3 bucket to external logging systems like logz.io.

5. Summary

6. References

Prerequisites

1.1 Confluent cloud account and audit log cluster details.

1.2 Logz.io account with a token to import logs.

1.3 Linux VM instance with confluent platform install and Java-1.8.0,VM’s access to the Internet.

Follow the steps to install Confluent platform latest version and Java-1.8.0.

wget -qO — https://packages.confluent.io/deb/4.0/archive.key | sudo apt-key add -
sudo apt-get update && sudo apt-get install -y confluent-platform
sudo apt-get install -y openjdk-8-jdk

4. AWS access: IAM user, IAM user policy, s3 bucket. Confluent cloud internal audit log cluster and s3 bucket must be in the same region.

Attached below IAM policy should be attached to the confluent user.

{ 
“Version”:”2012–10–17",
“Statement”:[
{
“Effect”:”Allow”,
“Action”:[
“s3:ListAllMyBuckets”
],
“Resource”:”arn:aws:s3:::*”
},
{
“Effect”:”Allow”,
“Action”:[
“s3:ListBucket”,
“s3:GetBucketLocation”
],
“Resource”:”arn:aws:s3:::logexporter-ccloud”
},
{
“Effect”:”Allow”,
“Action”:[
“s3:PutObject”,
“s3:GetObject”,
“s3:AbortMultipartUpload”,
“s3:ListMultipartUpload”,
“s3:ListMultipartUploadParts”,
“s3:ListBucketMultipartUploads”
],
“Resource”:”arn:aws:s3:::logexporter-ccloud/*”
}
]
}

Self managed S3 sink connector configuration

There are two modes for running workers: standalone mode and distributed

mode. Standalone mode used for environments that typically use single agents.

2.1 Stand-alone mode Configuration:

For Our case we are implementing standalone mode.

On a confluent VM open connect-standalone.properties file under /etc/kafka/ and add below configuration and SAVE.

vim /etc/kafka/connect-standalone.propertiesbootstrap.servers=<audit-log-cluster-endpoint>
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”<AUDITLOG_CLUSTER_API_KEY>”
password=”<AUDITLOG_CLUSTER_SECRET_KEY>”;
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/etc/kafka-connect-s3
consumer.bootstrap.servers=<audit-log-cluster-endpoint>
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”<AUDITLOG_CLUSTER_API_KEY>” password=”<AUDITLOG_CLUSTER_SECRET_KEY>”;

Here in the above configuration bootstrap.server is a confluent cloud independent audit log cluster endpoint. and the API Key/Secret used for connect-standalone.properties has to be our confluent cloud independent audit log cluster.

Consume audit and event logs from Confluent cloud

internal audit log cluster to AWS S3 bucket.

Now, configuring the S3-sink connector under the /etc/kafka-connect-s3/ location.

vim /etc/kafka-connect-s3/s3-sink-connector.propertiesname=auditlog-connector-s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=confluent-audit-log-events
s3.region=us-west-2
s3.bucket.name=logexporter-ccloud
s3.part.size=5242880
flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE

SAVE all configuration and execute connect-standalone command to start s3-sink connector execution.

To enable debugging for s3-sink connector change log4j.rootlogger value from INFO to DEBUG in /etc/kafka/connect-log4j.properties file for more visibility on error while running connectors.

vim /etc/kafka/connect-log4j.propertieslog4j.rootLogger=DEBUG, stdout

Then run below command to execute the connector in standalone mode.

connect-standalone -daemon /etc/kafka/connect-standalone.properties /etc/kafka-connect-s3/s3-sink-connector-properties

Once the command runs without any error, we are able to see logs uploaded to our logexporter-ccloud s3 bucket.

Exporting audit logs from Amazon S3 bucket to external

logging systems like logz.io.

Now the Third step is to shift logs to logz.io First login to the account where you want to export these logs.

on Kibana Dashboard select Logs Icon and then MANAGE DATA section:

Under MANAGE DATA click on Send your logs select the log source Amazon S3 bucket.

Follow below step to setup log shifting from s3 to logz.io.

  1. Add a new S3 bucket using the dedicated Logz.io configuration wizard.

Click + Add a bucket

2. Provide the S3 bucket name

3. Select the hosting region from the dropdown list

4. Select your preferred method of authentication — an IAM role or access keys The configuration wizard will open.

S3 bucket name : log exporter-ccloud
Region: <bucket region>
AWS Access key:<access key>
AWS Access key: <secret key>

Save your information.

5. Give your logs some time to get from your system to logz.io, and then open Kibana.

Custom Service for s3-sink-connector.

vim /etc/systemd/system/s3-auditlog-sink.service[Unit]
Description=s3-sink-connector for exporting confluent cloud auditlog from internal audit log cluster to logz.io service
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=centos
ExecStart=connect-standalone -daemon /etc/kafka/connect-standalone.properties /etc/kafka-connect-s3/s3-sink-connector.properties
[Install]
WantedBy=multi-user.target

Save service configuration.

systemctl start s3-sink-connector.service
systemctl enable s3-sink-connector.service
systemctl status s3-sink-connector.service

Summary

Through this article, we are able to consume the Confluent audit logs from confluent cloud internal independent audit logs cluster, and visualize those log form observability.

References

https://docs.confluent.io/cloud/current/connectors/cc-s3-sink.html#cc-s3-connect-sink

https://docs.logz.io/shipping/log-sources/s3-bucket.html

--

--