การใช้งาน Apache Atlas ก่อนจะเบื้องต้น

Pongthep Vijite
DAMAGeek
Published in
9 min readMar 24, 2020

หมายเหตุ ผู้อ่านสามารถดู table of contents ของ Data Engineering from Noob to Newbie ได้ที่ http://bit.ly/2P7isEw

apache atlas เป็น 1 project ของโครงการ apache ที่อยู่ในหมวดหมู่ของการทำงานเกี่ยวกับ data แต่อาจจะไม่มีชื่อเสียงโด่งดังมากนักถ้าเทียบกับ apache hadoop หรือ apache spark โดย apache atlas หรือ atlas ถูกใช้สำหรับทำ data governance ภายในองค์กร ช่วยในการจัดการ data ของ data (meta data) โดยมันสามารถบอกได้ว่า data นี้เป็นของใคร ถูกประมวลผลโดยใคร และจัดเก็บที่ไหน เรียกได้ว่าเป็นเสมือนศูนย์กลางของความรู้เกี่ยวกับข้อมูลที่วิ่งอยู่ในองค์กร

ขั้นตอนการติดตั้ง

ก่อนทำการติดตั้ง atlas จะมี software พื้นฐานที่ควรจะติดตั้งภายในเครื่องดังนี้

  • java8
  • maven
  • python2

อีกทั้งจะต้องมีพื้นที่ว่างขั้นตํ่า 20GB เมื่อเรามีครบตามข้างต้นแล้วต่อไปก็ทำการ download atlas จากเว็บกันเลย (https://atlas.apache.org/#/Downloads) โดย version ล่าสุด ณ ตอนที่เขียนบทความนี้คือ 2.0.0 หลังจาก download เรียบร้อยแล้วก็ทำการ extract file

tar xvfz apache-atlas-2.0.0-sources.tar.gz
cd apache-atlas-sources-2.0.0/
export MAVEN_OPTS="-Xms2g -Xmx2g"

จากนั้นเราต้องทำการแก้ไข file pom.xml ใน project โดยทำการเปลี่ยนข้อมูลในส่วนของ maven repo จาก http://repo1.maven.org/maven2 เป็น https://repo1.maven.org/maven2 (อ้างอิงจาก https://stackoverflow.com/a/59764763/1037159) หลังจากนั้นทำการ build project

mvn clean -DskipTests install

หลังจาก build project สมบูรณ์แล้วจากนั้น เราก็จะทำการ pack project โดยตัว atlas พื้นฐานแล้วจะใช้ apache hbase และ apache solr เป็นตัวจัดเก็บข้อมูลต่างๆ เพื่อความง่ายต่อการใช้งานเบื้องต้น เราจะทำการ pack hbase และ solr เข้ากับตัว atlas แล้วทำงานในรูปแบบ standalone คือ set up ทั้ง 3 applications ให้ทำงานอยู่บนเครื่องเดียวกัน

mvn clean -DskipTests package -Pdist,embedded-hbase-solr

เมื่อทำการ pack เสร็จเรียบร้อยแล้ว เราจะสามารถหาตัว application ได้ใน distro/target/apache-atlas-2.0.0-server แต่ก่อนที่เราจะทำการ run apache atlas กันเราจำเป็นต้อง setup environment variables บ้างตัวเสียก่อนดังนี้

export MANAGE_LOCAL_HBASE=true
export MANAGE_LOCAL_SOLR=true
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

ขั้นตอนต่อไปทำการแก้ไข file atlas-application.properties ใน apache-atlas-2.0.0-server/conf โดย uncomment บรรทัดของ atlas.DeleteHandlerV1.impl และเปลี่ยนค่าจาก org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1 เป็น org.apache.atlas.repository.store.graph.v1.HardDeleteHandlerV1 ดังนี้

ทำการ set hard delete

โดย default แล้วการ delete object ต่างๆใน atlas จะเป็นแบบ soft delete หมายความว่าถึงแม้ว่าเราจะสั่งลบแล้วแต่จำนวนตัวเลขของ entity ใน type นั้นก็ยังเท่าเดิมและหากในหน้า gui เรา check ในช่อง `Show historical entites` ก็ยังสามารถเห็น object นั้นได้อยู่ดังรูปด้านล่างนี้ ซึ่งส่วนตัวแล้วคิดว่าสิ่งที่เราต้องการ delete ก็อยากให้มันหายไปจากระบบเพิ่มความ clean มากกว่าดังนั้นจึงได้ set การลบเป็น hard delete

ตัวอย่างการ set soft delete

จากนั้นทำการ run atlas โดย file script สำหรับ run จะอยู่ใน apache-atlas-2.0.0-server/bin ให้ทำการ run file ที่ชื่อ atlas_start.py

./atlas_start.py

โดยตัว script จะทำการ start hbase, solr และ atlas ตามลำดับ โดย deafult atlas web server จะทำงานที่ port 21000 ซึ่งเราจำเป็นต้องทำการ login จึงจะสามารถเข้าใช้งานได้ โดย default แล้ว user ที่มีมาให้นั้นคือ admin ซึ่งจะมาพร้อมกับ password ที่ปลอดภัยที่สุดนั้นคือ admin (ล้อเล่นนะครับ หวังว่าจะไม่มีใครเอาไปตั้งเป็น password ของ admin บน production ^_^)

start atlas
ตรวจสอบว่า hbase, solr และ atlas ทำงานอยู่
หน้า login ของ atlas
หน้า index หลังจากทำการ login

หลังจาก login เข้ามาในหน้าข้อมูลของ atlas จะว่างเปล่า เพราะเรายังไม่มีข้อมูลใดๆ ซึ่งใน atlas ตัว component ที่ใช้สำหรับเก็บข้อมูลจะประกอบไปด้วย 3 ส่วนหลักๆคือ Type, Entities และ Attributes (https://atlas.apache.org/#/TypeSystem) โดย atlas จะมี Type พื้นฐานมาให้ซึ่งสามารถกดดูได้จาก dropdown `Search By Type` ทางด้านซ้านของจอ โดยทั่วไปการใช้งาน atlas จะเริ่มจากการสร้าง Entity จาก Type และกำหนดความสัมพันธ์ของ Entities จากนั้นก็ทำงานจัดหมวดหมู่ของ Entities

Type ทั้งหมดที่มีใน atlas

ตัวเว็บของ atlas มี feature ในการสร้าง Entity มาให้ แต่เหมือนจะมี bug ทำให้ Type เดียวที่สามารถสร้าง Entity จาก gui ได้ก็คือ hdfs_path ซึ่งผมได้ลองกับ web browsers 2–3 เจ้าแล้วก็ยังคงเป็นเหมือนเดิม

bug ใน web gui ของ atlas

ทีนี้ก็เหลืออีกทางเลือกสำหรับการ insert, update และ delete ข้อมูลต่างๆใน atlas นั้นคือผ่านทาง web api และละ web gui ไว้ใช้งานสำหรับการแสดงผลข้อมูลเท่านั้น แต่ก่อนจะมาเริ่มใส่ข้อมูล ผมอยากจำลอง case เล็กๆของการทำระบบ ingestion เพื่อทำให้ผู้อ่านได้เห็นภาพการใช้งานได้ดียิ่งขึ้น

case ตัวอย่าง

จากรูปข้างบนคือตัวอย่างระบบ ingestion ที่เราจะมาทำการสร้าง flow ของ data ภายใน atlas กัน โดยเริ่มจาก ftp server ไว้สำหรับให้ external user ทำการวาง file วันละครั้ง จากนั้นจะมี airflow job ไว้ให้ทำการ run script ตามเวลาที่กำหนด โดย script ดังกล่าวจะทำการอ่าน file ใน ftp server และ ingest เข้าสู่ kafka จากนั้นเราจะมี kafka connect(sink) นำข้อมูลลงใน database โดยข้อมูลที่จะทำการ ingest ในตัวอย่างนี้คือข้อมูล user (user detail) ประกอบไปด้วย user id, name และ gender

ขั้นแรกเราจะมาเริ่มทำการสร้าง entity สำหรับ type ftp server กัน แต่เนื่องด้วยใน atlas ไม่มี type ที่เป็น ftp server มาให้เราจึงจำเป็นต้องสร้าง type ดังกล่าวเอง

curl -X POST --header 'Content-Type: application/json;charset=UTF-8' --header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Accept: application/json' -d '{ \
"enumDefs": [], \
"structDefs": [], \
"classificationDefs": [], \
"entityDefs": [ \
{ \
"category": "ENTITY", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970434941, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "ftp_instance", \
"description": "FTP Server", \
"typeVersion": "1.1", \
"serviceType": "file_system", \
"attributeDefs": [ \
{ \
"name": "name", \
"typeName": "string", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": true, \
"isIndexable": true, \
"includeInNotification": false \
}, \
{ \
"name": "host", \
"typeName": "string", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
}, \
{ \
"name": "path", \
"typeName": "string", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
} \
], \
"superTypes": [ \
"DataSet" \
], \
"subTypes": [], \
"relationshipAttributeDefs": [] \
} \
], \
"relationshipDefs": [] \
}' 'https://localhost:21000/api/atlas/v2/types/typedefs'

หลังจากยิง http post ตัว atlas ก็จะ return json กลับมาโดย มีรูปร่างหน้าตาคล้ายกับ json ที่เรา post เข้าไป เว้นแต่ 1 จุดสำคัญนั้นคือมันจะมีค่า `guid` กลับมาด้วย ซึ่งค่าดังกล่าวเปรียบเสมือน id ที่ใช้อ้างอิงถึง object ภายใน atlas

จากนั้นเราจำเป็นต้องสร้าง type อีก 2 ชนิดนั้นคือ ingestion script และ kafka connect ดังนี้

ingestion script

curl -X POST --header 'Content-Type: application/json;charset=UTF-8' --header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Accept: application/json' -d '{ \
"enumDefs": [ \
{ \
"category": "ENUM", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "programing_language", \
"description": "Programing language", \
"typeVersion": "1.1", \
"serviceType": "atlas_core", \
"elementDefs": [ \
{ \
"value": "Scala", \
"ordinal": 1 \
}, \
{ \
"value": "Python", \
"ordinal": 2 \
} \
] \
}, \
{ \
"category": "ENUM", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "processing_platform", \
"description": "Platform that be used for process data", \
"typeVersion": "1.1", \
"serviceType": "atlas_core", \
"elementDefs": [ \
{ \
"value": "StandaloneInstance", \
"ordinal": 1 \
}, \
{ \
"value": "AWS_ECS", \
"ordinal": 2 \
}, \
{ \
"value": "Kubernetes", \
"ordinal": 3 \
} \
] \
}, \
{ \
"category": "ENUM", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "job_scheduler", \
"description": "Job scheduler platform", \
"typeVersion": "1.1", \
"serviceType": "atlas_core", \
"elementDefs": [ \
{ \
"value": "Cronjob", \
"ordinal": 1 \
}, \
{ \
"value": "Airflow", \
"ordinal": 2 \
} \
] \
} \
], \
"structDefs": [], \
"classificationDefs": [], \
"entityDefs": [ \
{ \
"category": "ENTITY", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970434941, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "ingestion_script", \
"description": "Data Ingestion Script", \
"typeVersion": "1.1", \
"serviceType": "atlas_core", \
"attributeDefs": [ \
{ \
"name": "name", \
"typeName": "string", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": true, \
"isIndexable": true, \
"includeInNotification": false \
}, \
{ \
"name": "language", \
"typeName": "programing_language", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
}, \
{ \
"name": "platform", \
"typeName": "processing_platform", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
}, \
{ \
"name": "trigger_by", \
"typeName": "job_scheduler", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
} \
], \
"superTypes": [ \
"Process" \
], \
"subTypes": [], \
"relationshipAttributeDefs": [] \
} \
], \
"relationshipDefs": [] \
}' 'https://localhost:21000/api/atlas/v2/types/typedefs'

kafka connect

curl -X POST --header 'Content-Type: application/json;charset=UTF-8' --header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Accept: application/json' -d '{ \
"enumDefs": [ \
{ \
"category": "ENUM", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "kafka_connect_direction", \
"description": "Kafka connect direction", \
"typeVersion": "1.1", \
"serviceType": "atlas_core", \
"elementDefs": [ \
{ \
"value": "Source", \
"ordinal": 1 \
}, \
{ \
"value": "Sink", \
"ordinal": 2 \
} \
] \
}, \
{ \
"category": "ENUM", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "kafka_connect_type", \
"description": "Kafka connect type", \
"typeVersion": "1.1", \
"serviceType": "atlas_core", \
"elementDefs": [ \
{ \
"value": "Jdbc", \
"ordinal": 1 \
}, \
{ \
"value": "AWS_S3", \
"ordinal": 2 \
}, \
{ \
"value": "File", \
"ordinal": 3 \
} \
] \
} \
], \
"structDefs": [], \
"classificationDefs": [], \
"entityDefs": [ \
{ \
"category": "ENTITY", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970434941, \
"updateTime": 1584970700801, \
"version": 1, \
"name": "kafka_connect", \
"description": "Kafak Connect", \
"typeVersion": "1.1", \
"serviceType": "kafka", \
"attributeDefs": [ \
{ \
"name": "name", \
"typeName": "string", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": true, \
"isIndexable": true, \
"includeInNotification": false \
}, \
{ \
"name": "direction", \
"typeName": "kafka_connect_direction", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
}, \
{ \
"name": "type", \
"typeName": "kafka_connect_type", \
"isOptional": false, \
"cardinality": "SINGLE", \
"isUnique": false, \
"isIndexable": false, \
"includeInNotification": false \
} \
], \
"superTypes": [ \
"Process" \
], \
"subTypes": [], \
"relationshipAttributeDefs": [] \
} \
], \
"relationshipDefs": [] \
}' 'https://localhost:21000/api/atlas/v2/types/typedefs'

จาก http post ด้านบนผู้อ่านอาจจะสังเกตบ้างอย่างแปลกตาไปจากการสร้าง entity type ของ ftp_instance นั้นคือเรามีการสร้าง enum เพื่อใช้ในการกำหนด entity type ด้วยตัวอย่างเช่นใน type kafka connect เรามี 3 attributes ประกอบไปด้วย name, direction และ type ตัว name เป็น string ที่สามารถกรอกได้อย่างอิสระ แต่ในส่วนของ direction และ type เราต้องการกำหนดตายตัวว่าค่าในส่วนนี้จะสามารถเป็นอะไรได้บ้าง ด้วยเหตุผลนี้เราจึงจำเป็นต้องสร้าง enum ขึ้นมาใช้ เวลาเราสร้าง entity ในส่วนของ direction เราสามารถกำหนดได้เพียง 2 ค่าเท่านั้นนั้นคือ ไม่ เป็น source ก็ต้องเป็น sink

ในส่วนถัดไปเราจะมาเริ่มสร้าง entity object กันนะครับ โดยเริ่มจาก ftp_instance ซึ่งเราทำการกำหนดให้ instance นี้ชื่อว่า ftp_customer_team เพื่อจำลองว่าทุกๆว่า team ที่ชื่อว่า customer จะต้องนำ file มาวางที่เครื่อง customer.mycompany.com ใน path /daily_new_users

curl -X POST --header 'Content-Type: application/json;charset=UTF-8'
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Accept: application/json' -d '{ \
"entity": { \
"typeName": "ftp_instance", \
"attributes": { \
"modifiedTime": 1584970700801, \
"qualifiedName": "ftp_customer_team", \
"description": "Customer team%27s FTP server", \
"createTime": 1584970700801, \
"name": "ftp_customer_team", \
"host": "customer.mycompany.com", \
"path": "/daily_new_users" \
}, \
"status": "ACTIVE", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"relationshipAttributes": {} \
} \
}' 'https://localhost:21000/api/atlas/v2/entity'

หลังจาก http post เรียบร้อยแล้วถ้าเราเข้าไปยังหน้าเว็บของ atlas ก็จะพบว่าตอนนี้ใน entity type `ftp_instance` มี entity ถูกสร้างขึ้นมาแล้วจำนวน 1 ตัว

หน้าเว็บหลังจากการเพิ่ม ftp_customer_team

customer_ftp_ingestion_script

curl -X POST --header 'Content-Type: application/json;charset=UTF-8' --header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Accept: application/json' -d '{ \
"entity": { \
"typeName": "ingestion_script", \
"attributes": { \
"modifiedTime": 1584970700801, \
"qualifiedName": "customer_ftp_ingestion_script", \
"description": "Script for ingest customer data from FTP", \
"createTime": 1584970700801, \
"name": "customer_ftp_ingestion_script", \
"language":"Scala", \
"platform":"Kubernetes", \
"trigger_by":"Airflow" \
}, \
"status": "ACTIVE", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"relationshipAttributes": { \
} \
} \
}' 'https://localhost:21000/api/atlas/v2/entity'

ในตอนนี้เราก็มี 2 entity ใน atlas เรียบร้อยขั้นตอนต่อไปเราจะทำการเชื่อมทั้ง 2 ตัวเข้าด้วยกันโดยทำการ update `customer_ftp_ingestion_script` โดยกำหนดว่ามี input ที่เข้ามาจาก `ftp_customer_team` ซึ่งการ update สามารถทำได้แค่เปลี่ยน body ของ http post ที่เราใช้ในการสร้าง ซึ่ง atlas จะดูจาก `qualifiedName` โดยถ้าเป็นชื่อที่มีอยู่แล้วก็จะทำการ update ซึ่งในส่วนที่เราจะเริ่มเข้ามานั้นคือ input เพื่อบอกว่า `customer_ftp_ingestion_script` รับ input มาจาก entity ตัวไหน

curl -X POST --header 'Content-Type: application/json;charset=UTF-8' --header 'Accept: application/json' -d '{ \ 
"entity": { \
"typeName": "ingestion_script", \
"attributes": { \
"modifiedTime": 1584970700801, \
"qualifiedName": "customer_ftp_ingestion_script", \
"description": "Script for ingest customer data from FTP", \
"createTime": 1584970700801, \
"name": "customer_ftp_ingestion_script", \
"language": "Scala", \
"platform": "Kubernetes", \
"trigger_by": "Airflow", \
"inputs": [ \
{ \
"guid": "1b4706d9-02c2-40ac-bae1-eccca299e0ba", \
"typeName": "ftp_instance" \
} \
] \

}, \
"status": "ACTIVE", \
"createdBy": "pongthepv", \
"updatedBy": "pongthepv", \
"createTime": 1584970700801, \
"updateTime": 1584970700801, \
"relationshipAttributes": {} \
} \
}' 'https://localhost:21000/api/atlas/v2/entity'

เราสามารถตรวจดูผลการเชื่อมความสัมพันธ์ในหน้าเว็บโดยการเลือกที่ entity ` customer_ftp_ingestion_script` จากนั้นไปที่ช่อง Lineage ก็จะเห็นความเชื่อมโยงระหว่าง 2 entity ตามภาพด้านล่าง

การเชื่อม entity

ณ จุดนี้ผมจะของ skip การสร้าง entity และความสัมพันธ์ของส่วนที่เหลือใน case example นะครับ เพื่อคุ้มความยาวของบทความซึ่งตอนนี้รูปภาพและตัวอย่าง http post เริ่มจะเยอะจนอาจจะลายตาแล้ว

เมื่อเราทำการใส่ entity ทุกตัวลงใน atlas รวมถึงกำหนดความสัมพันธ์เรียบร้อยแล้วก็จะได้ข้อมูลดังรูปด้านล่างซึ่งวงกลมสีแดงแสดงว่าขณะนี้เรากำลังอยู่ที่ node ไหน โดยจากรูปจะเห็นถึงเส้นทางการวิ่งของข้อมูลจาก ftp server จนลงใน table ของ rdbms

ตัวอย่าง Lineage ของ customer table

นอกจากจะสามารถดูเส้นทางของข้อมูลแล้วยังมี table ชื่อว่า Relationships ไว้ดูว่า entity ที่เราดูอยู่มีสัมพันธ์เชิงเกี่ยวข้องกับใครบ้างซึ่งในกรณีของ rdbms table ก็ต้องมีความสัมพันธ์กับ entity `customer_db` ซึ่งเป็น entity ประเภท rdbms_database

ตัวอย่าง Relationships

มาถึงจุดนี้หวังว่าผ้อ่านคงจะพอมองเห็นภาพถึงประโยชน์ของการทำ data governance ในเบื้องต้นกันบ้างแล้ว แต่ feature ของตัว atlas ยังมีอีกมากทั้งในการของการทำ จัดหมวดหมู่ของ data หรือแม้แต่การกำหนดว่าข้อมูลดังกล่าวเป็น PII (Personally identifiable information) หรือไม่ ซึ่งถ้าองค์กรไหนไม่มีการจัดเก็บข้อมูลดังกล่าวไว้บอกเลยว่าถ้าต้องทำ PDPA ขึ้นมา วิ่งค้นกันมันแน่นอน แต่ด้วย atlas สามารถค้นหาข้อมูลต่างๆได้ด้วยการคลิกเท่านั้น

สุดท้ายนี้ถ้าท่านใดมีข้อสงสัยหรือคำชี้แนะใดๆสามารถฝากข้อความได้ที่ https://www.facebook.com/coeffest/ นะครับ ขอบคุณมากครับที่ติดตาม

อ้างอิง

https://atlas.apache.org/#/BuildInstallation

https://atlas.apache.org/1.0.0/InstallationSteps.html

https://stackoverflow.com/a/59764763/1037159

https://atlas.apache.org/#/TypeSystem

http://atlas.apache.org/api/v2/index.html

--

--