[TUTO] Créer un datapipeline en 20 minutes à l’aide de Kafka Connect

Jocelyn Dréan
iAdvize Engineering
14 min readDec 9, 2016
Data pipeline rocks

Objectifs :

L’objectif de cet article est de présenter pas à pas un exemple de création de datapipeline en moins de 20 minutes grâce à la puissance de Kafka Connect. On pourra ainsi découvrir Amazon EMR, Kafka, Kafka Connect, S3, Avro, Parquet, Hive, Presto.

Dans un souci de simplicité et de rapidité, ce tutoriel se base sur des services d’AWS (EMR + S3). Si vous ne souhaitez pas utiliser AWS, vous pouvez remplacer S3 par un clone et remplacer EMR par Hive + Presto en local.

Use-Case :

On part sur un use-case simple: on récolte des données visiteurs (click, device, user_agent, etc…) avec une très forte volumétrie (plusieurs millions d’évènement par jour). On va écrire ces évènements visiteurs dans un topic Kafka. Notre souhait est de pouvoir analyser ces données visiteurs en temps réel, mais également de manière asynchrone. Pour cela, nous allons les archiver sur Amazon S3. Ensuite grâce à Hive + Presto, on va être capable de requêter nos données facilement en SQL.

Schéma de notre futur Pipeline

Mots clés :

Amazon EMR, Kafka, Kafka Connect, Streamx, Schema Registry, Apache Avro, Apache Parquet, Amazon S3, Hive, Presto, Hadoop, Streaming

Avant de commencer/Pré-requis :

Avant de commencer, il est nécessaire d’avoir un compte AWS et un serveur Kafka setup (Pour setup un serveur Kafka, vous pouvez trouver facilement de nombreux tutoriel sur internet. Dans ce cas, je vous conseille d’attendre de finir l’étape 3 de ce présent tuto et ainsi setup Kafka directement sur le cluster EMR via quick start Kafka).

1. Setup awscli

Si vous n’avez pas setup le client pour AWS (aws-cli), vous pouvez l’installer facilement comme ceci:

$ sudo apt-get update
$ sudo apt-get -y install \
python-pip \
python-virtualenv
$ virtualenv amazon
$ source amazon/bin/activate
$ pip install awscli

Ensuite, vous pouvez entrer vos identifiants AWS (votre access_key et votre secret_access_key)

$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY

2. Lancer un cluster Amazon EMR

Amazon EMR, qu’est-ce c’est ?

Amazon EMR fournit une infrastructure Hadoop qui permet de traiter de manière simple, rapide et rentable de grandes quantités de données sur des instances Amazon EC2 dynamiquement évolutives. Vous pouvez également exécuter d’autres frameworks distribués courants tels qu’Apache Spark, HBase, Presto et Flink dans Amazon EMR et interagir avec le contenu d’autres magasins de données AWS comme Amazon S3 et Amazon DynamoDB. (Source: Amazon AWS website — https://aws.amazon.com/fr/emr/)

En gros, ca permet d’avoir un cluster Hadoop as a service, disponible en quelques clics. Si vous n’avez jamais lancé de cluster EMR, je vous conseille d’utiliser la console AWS. Penser à utiliser les “options avancées” pour sélectionner les types d’instance que vous souhaitez. Personnellement, j’utilise en général 3 instances m3.xlarge. Dans la section “Software Configuration”, sélectionnez au moins : Hadoop + Hive + Presto, puisque nous les utiliserons plus tard.

Lorsque vous utilisez l’assistant EMR, il créera les rôles et les groupes de sécurité nécessaires par défaut pour que vos nœuds de cluster communiquent entre eux.

Vous pouvez sinon lancer un cluster EMR directement via le client aws-cli. Avant d’exécuter la commande ci-dessous, générez une key “emr_key”. Téléchargez et stockez le fichier emr_key.pem dans le dossier ~/.ssh/. Ce fichier est votre clé privée SSH qui vous permettra d’accéder à votre cluster EMR via SSH.

$ aws emr create-cluster \
--applications \
Name=Hadoop \
Name=Hive \
Name=Presto \
--ec2-attributes '{
"KeyName": "emr_key",
"InstanceProfile": "EMR_EC2_DefaultRole",
"AvailabilityZone": "eu-west-1a",
"EmrManagedSlaveSecurityGroup": "sg-*******",
"EmrManagedMasterSecurityGroup": "sg-******"
}' \
--service-role EMR_DefaultRole \
--release-label emr-5.2.0 \
--log-uri 's3n://aws-logs-591231097547e1897133-eu-west-1/elasticmapreduce/' \
--name 'Tuto Pipeline Kafka Connect' \
--instance-groups '[
{
"InstanceCount":1,
"InstanceGroupType":"MASTER",
"InstanceType":"m3.xlarge",
"Name":"Master - 1"
},
{
"InstanceCount":2,
"InstanceGroupType":"CORE",
"InstanceType":"m3.xlarge",
"Name":"Core - 2"
}
]' \
--region eu-west-1

Une fois le cluster lancé et prêt, n’oubliez pas d’ajouter votre adresse IP, sur le port 22, au groupe de sécurité ElasticMapReduce-master pour vous permettre de vous connecter en SSH sur le master node.

Le cluster peut prendre jusqu’à 20 minutes pour être UP. Ca vous laisse le temps d’aller prendre un café avant d’entamer réellement ce tuto (qui là vous prendra seulement 20 minutes, promis !)

Pour vous connecter au cluster EMR en ssh:

$ ssh -i emr_key.pem hadoop@your-host-aws.eu-central-1.compute.amazonaws.com

Vous devriez obtenir quelque chose qui ressemble à ceci, si tout s’est bien passé :

__|  __|_  )
_| ( / Amazon Linux AMI
___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2015.09-release-notes/
29 package(s) needed for security, out of 44 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR

[hadoop@ip-10-91-26-11 ~]$

3. Télécharger les sources de Kafka Connect depuis la plateforme Confluent

The free, open-source streaming platform based on Apache Kafka. Confluent Platform is the best way to get started with real-time data streams.

Vous pouvez télécharger les sources de la plateforme Confluent depuis l’url http://confluent.io/downloads/ ou vous pouvez directement récupérer le fichier .tar.gz

[hadoop@ip-10-91-26-11 ~]$ curl -sSLO http://packages.confluent.io/archive/3.0/confluent-3.0.1-2.11.tar.gz
[hadoop@ip-10-91-26-11 ~]$ tar xzf confluent-3.0.1-2.11.tar.gz
[hadoop@ip-10-91-26-11 ~]$ ls ./confluent-3.0.1
README.archive bin etc logs share src

La plateforme Confluent contient entre autre les source des projets: Kafka, Kafka Connect, Kafka Stream et Schema Registry.

4. Lancer le Schema Registry

Un Schema Registry, qu’est-ce que c’est ?

Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format. (Source: Confluent — http://docs.confluent.io/1.0/schema-registry/docs/intro.html)

Le Schema Registry va donc vous permettre de stocker les schemas Avro de vos futures données. Et ça c’est cool ! (Si, si !!)

Du coup, qu’est-ce qu’Avro ?

Apache Avro est un système de sérialisation de données indépendant de la langue. Avro est très utile, car il traite avec des formats de données qui peuvent être traitées par de multiples langues. Avro est un outil privilégié pour la sérialisation des données dans Hadoop. Avro dispose d’un système basé sur un schéma. Un schéma indépendant de la langue est associée à ses lecture et d’écriture. Avro sérialise les données qui présente un schéma intégré. Avro sérialise les données dans un format binaire compact, qui peut être désérialisé par aucune application. (Source: http://www.w3ii.com/fr/avro/avro_overview.html)

Si vous ne connaissais pas bien Avro, je vous invite à lire cet article : http://www.w3ii.com/fr/avro/avro_overview.html

Le Schema Registry se base sur un serveur Kafka (pour aller stocker ses données et donc vos schémas). Avant de lancer le Schema Registry, il est nécessaire d’aller modifier un fichier de configuration “confluent-3.0.1/etc/schema-registry/schema-registry.properties

[hadoop@ip-10-91-26-11 ~]$ vi confluent-3.0.1/etc/schema-registry/schema-registry.properties

Indiquer le lien vers votre serveur Kafka en modifiant la propriété kafkastore.connection.url
Exemple:

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.91.26.11:2181
kafkastore.topic=_schemas
debug=false

Ensuite, on peut lancer le schema registry:

[hadoop@ip-10-91-26-11 ~]$ cd ./confluent-3.0.1
[hadoop@ip-10-91-26-11 confluent-3.0.1]$ bin/schema-registry-start etc/schema-registry/schema-registry.properties

5. Création et ajout du schema AVRO

Nous allons maintenant créer un schéma qui correspond à nos données. Pour ce tuto, nous allons générer des fausses données qui correspondent à des informations visiteurs : visiteur id, page type, timestamp, user agent, device, etc…

Schéma Avro:

{
"namespace": "io.confluent.connect.avro",
"type": "record",
"name": "test_event_record_avro",
"fields": [
{ "name": "visitorId", "type": "int"},
{ "name": "pageViewId", "type": "int"},
{ "name": "timestamp", "type": "int"},
{ "name": "referer", "type": "string"},
{ "name": "pageType", "type": "string"},
{ "name": "userAgent", "type": "string"},
{ "name": "device", "type": "int"},
{ "name": "locale", "type": "string"},
{ "name": "eventType", "type":"string","default": "unknown"}
]
}

On va enregistrer ce schema Avro sur le schema registry que nous avons lancé précédemment (Retrouvez plus d’information dans la doc de confluent.io).

curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"namespace\": \"io.confluent.connect.avro\",\"type\": \"record\",\"name\": \"test_event_record_avro\",\"fields\": [{ \"name\": \"visitorId\", \"type\": \"int\"}, { \"name\": \"pageViewId\", \"type\": \"int\"}, { \"name\": \"timestamp\", \"type\": \"int\"}, { \"name\": \"referer\", \"type\": \"string\"}, { \"name\": \"pageType\", \"type\": \"string\"}, { \"name\": \"userAgent\", \"type\": \"string\"}, { \"name\": \"device\", \"type\": \"int\"}, { \"name\": \"locale\", \"type\": \"string\"}, { \"name\": \"eventType\",\"type\": \"string\", \"default\": \"unknown\" }]}"}' http://schema.example.com:8081/subjects/test_event_record_avro-value/versions

Si tout à bien fonctionné, vous devriez être capable de le vérifier via une requête GET à http://schema.example.com:8081/subjects/test_event_record_avro-value/versions

Par convention, nous avons choisi d’appeler notre subject “test_event_record_avro-value”. En effet:

The subjects resource provides a list of all registered subjects in your schema registry. A subject refers to the name under which the schema is registered. If you are using the schema registry for Kafka, then a subject refers to either a “<topic>-key” or “<topic>-value” depending on whether you are registering the key schema for that topic or the value schema. (Source: http://docs.confluent.io/3.0.0/schema-registry/docs/api.html)

6. Ecrire nos données de test dans Kafka

Nous allons générer des données. Pour cela, j’ai créé un petit script Python (développé très rapidement!), disponible sur Github ici :

[hadoop@ip-10-91-26-11 ~]$ sudo yum install git
[hadoop@ip-10-91-26-11 ~]$ git clone https://github.com/jocelyndrean/generate_avro_message_to_kafka.git

Modifier le fichier python generate_avro_message_to_kafka/generator.py pour modifier la configuration :

SCHEMA_REGISTRY_URL = 'http://schema.registry.com:8081'
TOPIC = 'test_event_record_avro'
KAFKA_SERVER_URL = 'kafka.url.com:9092'

Lancer le script:

[hadoop@ip-10-91-26-11 ~]$ python generator.py

(Si vous vous sentez d’humeur à optimiser le script, surtout n’hésitez pas à faire une PR)

7. Configurer Kafka Connect

Kafka Connect, qu’est-ce que c’est ?

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to quickly define connectors that move large data sets into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency (Source: http://docs.confluent.io/2.0.0/connect/intro.html)

En gros, Kafka Connect vous permet de transporter facilement vos données depuis ou vers Kafka. Dans notre use-case, on a des données au format Avro dans un topic Kafka, et on aimerait bien pouvoir les archiver sur Amazon S3 et pouvoir ensuite requêter facilement ces données via Hive et/ou Presto. Kafka Connect vient avec une liste de connecteurs (Liste non exhaustif de connecteurs: https://www.confluent.io/product/connectors/). Si vous ne trouvez pas un connecteur qui correspond à votre besoin dans cette liste, vous pouvez développer facilement le votre (Docs dispo sur le site de confluent). Nous allons utiliser le connecteur streamx, développé par qubole.

Commençons par installer Maven:

[hadoop@ip-10-91-26-11 ~]$ curl -sSLO  http://www-eu.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
[hadoop@ip-10-91-26-11 ~]$ tar xzvf apache-maven-3.3.9-bin.tar.gz
[hadoop@ip-10-91-26-11 ~]$ export PATH=/home/hadoop/apache-maven-3.3.9/bin:$PATH

Téléchargeons streamx et on va build le projet avec Maven:

[hadoop@ip-10-91-26-11 ~]$ git clone https://github.com/qubole/streamx.git
[hadoop@ip-10-91-26-11 ~]$ cd streamx
[hadoop@ip-10-91-26-11 streamx]$ mvn -DskipTests package

Quand le build sera terminé, les packages StreamX nécessitent les jars dans le dossier target/streamx-0.1.0-SNAPSHOT-development/share/java/streamx/* . Il faut donc ajouter ce dossier au classpath.

[hadoop@ip-10-91-26-11 streamx]$ export CLASSPATH=$CLASSPATH:`pwd`/target/streamx-0.1.0-SNAPSHOT-development/share/java/streamx/*

Nous allons maintenant configurer streamx. La première étape est de modifier le fichier /home/hadoop/streamx/config/hadoop-conf/hdfs-site.xml en y ajoutant vos access_key et secret_key AWS.

Ensuite, la deuxième étape est de modifier le fichier /home/hadoop/streamx/config/quickstart-s3.properties. Voici un exemple de configuration (penser à modifier le lien vers votre bucket S3):

name=s3-sink
connector.class=com.qubole.streamx.s3.S3SinkConnector
tasks.max=3
topics=test_event_record_avro
flush.size=10000
s3.url=s3://my-bucket-s3/demo/
hadoop.conf.dir=/home/hadoop/streamx/config/hadoop-conf
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
rotate.interval.ms=60000
partition.duration.ms=3600000
path.format:'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/
locale=en
timezone=GMT
hive.metastore.uris=thrift://localhost:9083
hive.integration=true
schema.compatibility=BACKWARD

Le connecteur S3 est un fork du connecteur HDFS développé par confluent.io. On peut donc retrouver plus d’information sur les configurations sur leur doc. Leur documentation est vraiment bien faite, alors je vous invite à la parcourir pour mieux comprendre la configuration que je vous ai donné en exemple: http://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html#configuration

Notre configuration va créer une nouvelle partition toutes les 3600000 ms (1 heure). Les dossiers ressembleront à : “s3://my-bucket-s3/demo/topics/clickstream/year=2016/month=09/day=21/hour=22/

Ensuite, nous allons modifier le fichier suivant “/home/hadoop/confluent-3.0.1/etc/schema-registry/connect-avro-standalone.properties”, comme ceci:

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=your-kafka-brocker-ip:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://your.schema.registry.com:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://your.schema.registry.com:8081

Remplacer la bonne IP de votre brocker Kafka et ensuite indiquer bien l’url vers votre schema registry pour les propriétés:
key.converter.schema.registry.url
value.converter.schema.registry.url

8. Pourquoi utiliser Apache Parquet ?

Qu’est-ce qu’Apache Parquet ?

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language. (Source: https://parquet.apache.org/)

On pourrait décider de stocker nos évènements directement au format Avro (Avro en entrée => Avro en sortie). Il y a en fait trois raisons de stocker au format Parquet plutôt Avro dans notre use-case. Déjà, ça nous permet de découvrir ensemble le format Parquet. Ensuite Presto, que l’on découvrira ensuite, ne supporte pas encore Avro (dans la version disponible avec EMR). Enfin, Parquet permet des performances supérieurs en Map-Redure qu’Avro. Je vous invite à lire cet excellent benchmark sur Avro VS Parquet VS ORC: https://www.svds.com/how-to-choose-a-data-format/

Apache Parquet nécessite un schéma pour encoder les données, mais ca tombe bien, on a déjà fait ça avec Avro précédemment. Du coup, Kafka Connect va être capable d’aller écrire les données en Parquet tout seul comme un grand. Job done.

9. Lancer Kafka Connect

Pour lancer Kafka Connect, il suffit ensuite d’exécuter cette ligne:

[hadoop@ip-10-91-26-11 ~]$ cd confluent-3.0.1
[hadoop@ip-10-91-26-11 confluent-3.0.1]$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties /home/hadoop/streamx/config/quickstart-s3.properties

Kafka Connect va alors commencer à consommer les messages de votre topic Kafka et les écrire au format parquet sur S3 ! \o/

Fichiers Parquet qui contiennent nos données

10. Explorer nos données via Hive

Pendant que Kafka Connect s’occupe de consommer en streaming vos données pour les archiver sur S3. Nous allons tenter de requêter nos données via Hive.

Qu’est-ce que Hive ?

The Apache Hive ™ data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive. (Source: https://hive.apache.org/)

Dans le fichier de configuration, nous avions activé l’option hive.integration. Kafka Connect va donc utiliser notre schema Avro pour aller créer une table dans Hive et lui indiquer que les données se trouvent sur S3.

[hadoop@ip-10-91-26-11 ~]$ hive
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false
hive>
hive>show tables;
OK
test_event_record_avro
Time taken: 1.547 seconds, Fetched: 1 row(s)

On retrouve bien notre table test_event_record_avro qui correspond à notre topic. Tentons de compter le nombre de ligne que nous avons.

hive> select count(*) from test_event_record_avro;
OK
2300002
Time taken: 46.064 seconds, Fetched: 1 row(s)

Si on relance plusieurs fois cette requête, on constate bien que grâce à Kafka Connect, les insertions de nos données se déroulent en streaming. Par contre, les performances au niveau requête ne sont pas très bonnes avec Hive. On va tenter d’améliorer ça en utilisant Presto.

11. Explorer nos données via Presto

Qu’est-ce que Presto ?

Presto is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto was designed and written from the ground up for interactive analytics and approaches the speed of commercial data warehouses while scaling to the size of organizations like Facebook (Source: https://prestodb.io/)

Quittez Hive et lancer Presto

$ presto-cli --catalog hive --schema default
presto:default>
presto:default> show tables;
Table
------------------------
test_event_record_avro
(1 row)
presto:default> select count(*) from test_event_record_avro;
9700002
(1 row)
Query 20161208_164218_00003_9md97, FINISHED, 2 nodes
Splits: 100 total, 100 done (100.00%)
0:14 [9.7M rows, 274MB] [710K rows/s, 20.1MB/s]

On constate que j’ai déjà pu absorber 9 700 002 events. Le format Parquet (columnar storage) permet également de ne parcourir que les colonnes dont nous avons besoin. Dans la requête précédente, nous parcourons uniquement 9,7Mo sur 274.
Tentons maintenant d’analyser un peu nos données avec Presto:

presto:default> select eventType, count(*) as nb_events from test_event_record_avro group by eventType;      eventType      | nb_events
---------------------+-----------
checkCart | 2678598
removeProductToCart | 2679954
clic | 2680305
search | 2679920
addProductToCart | 2681225
(5 rows)
Query 20161208_164418_00005_9md97, FINISHED, 2 nodes
Splits: 139 total, 139 done (100.00%)
0:13 [13.4M rows, 379MB] [1.05M rows/s, 29.8MB/s]
presto:default> select locale, count(*) as nb_events from test_event_record_avro where device=1 group by locale;
locale | nb_events
--------+-----------
el_VN | 2614
zh_GW | 2583
pt_ZM | 2532
ru_DM | 2595
es_UY | 2614
zh_DJ | 2582
....
...
Query 20161208_164708_00007_9md97, FINISHED, 2 nodes
Splits: 144 total, 144 done (100.00%)
0:43 [13.6M rows, 385MB] [316K rows/s, 8.95MB/s]
presto:default> select userAgent, count(*) as nb_events from test_event_record_avro where eventType = 'addProductToCart' group by userAgent order by nb_events desc limit 3;userAgent | nb_events
------------------------------------------------------+-----------
Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.2... | 1685
Mozilla/5.0 (compatible; MSIE 8.0; Windows 95 ... | 1677
Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.1... | 1677
Query 20161208_165502_00014_9md97, FINISHED, 2 nodes
Splits: 160 total, 160 done (100.00%)
0:19 [14M rows, 397MB] [728K rows/s, 20.6MB/s]

Bon… 14Millions de lignes soit 400 Mo de données, je vous l’accorde on n’est pas encore sur du Big Data. Si vous souhaitez tester avec un plus gros data-set, n’hésitez pas (Vous pouvez trouver une liste de dataset libre ici : https://github.com/caesar0301/awesome-public-datasets). Ou alors faîtes tourner mon script Python longtemps… très longtemps…

Si vous souhaitez requêter vos données depuis votre poste en local, via un script Python par exemple, vous devez ouvrir un tunnel SSH avec votre cluster EMR:

$ ssh -o ServerAliveInterval=10 -i  emr_key.pem -N -L 10000:localhost:10000 hadoop@ec2-your-master-node.eu-central-1.compute.amazonaws.com

Presto propose un connecteur JDBC et on peut requêter nos données très facilement depuis Tableau Software, Python, Java, PHP, etc..

12. Conclusion et aller plus loin

Kafka Connect rocks !

En conclusion, nous avons réussi à créer un data pipeline très simplement. Nous n’avons pas écrit de ligne de code. Dans notre use-case, nous n’avons effectué que de la configuration. Pourtant, notre data pipeline est parfaitement scalable et très performant. Et nous sommes capable d’analyser un large dataset via Presto.

Kafka Connect est donc un outil très puissant pour transporter facilement de la données depuis ou vers Kafka, et tout ça en streaming.

On peut imaginer d’autres use-case:
- Archivage de logs d’application sur HDFS/S3 (ie: Kafka->HDFS)
- Analyse de logs d’une base de données (ie: MySQL -> Kafka)
- Insertion d’évènements dans Elastic Search (ie: Kafka -> ES)
- …

--

--

Jocelyn Dréan
iAdvize Engineering

Data Geek - Dude, it's easy, all you need is Data... Yep, I'm also working at iAdvize as Data Engineer, check my LinkedIn profil if you can...