Kafka 메시지에 스키마를 정의해 보자 : Apache Avro

Joon
Techeer Tech Blog
Published in
11 min readJun 12, 2023

Apache Avro

아파치 아브로는 데이터 직렬화 시스템으로서, 대량의 데이터를 효율적으로 저장하고 전송하기 위한 개방형 데이터 직렬화 프레임워크이다. 아파치 소프트웨어 재단에서 개발하고 관리하며, 고성능이며 확장 가능한 데이터 직렬화 형식으로 널리 사용된다.

아브로는 특히 대용량 데이터 처리 시스템에서 사용되는데, 이는 데이터 스키마의 유연성과 직렬화된 데이터의 압축 효율성으로 인해 가능하다. 아브로는 이진 형식으로 데이터를 직렬화하며, 데이터 스키마를 사용하여 데이터의 구조와 유형을 정의한다. 데이터 스키마는 JSON 형식으로 표현되어 가독성이 높고 직관적이다. JSON과 비교하더라도 타입을 명시할 수 있어서 데이터를 사용하기 편하다는 장점이 있다.

하지만 바이너리 형태로 직렬화되어 데이터를 쉽게 들여다 보기 어렵다는 단점도 동시에 존재한다.

Avro의 Record 자료형

아브로에는 null, boolean, int, long, float, double, bytes, string 의 원시 타입과 record, enums, arrays, maps, unions 등의 복합 타입이 존재한다. 그 중 가장 많이 사용하는 record 타입은 아래와 같은 구성 요소를 가진다.

{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
  • name — 문자열 : 스키마 이름 (필수)
  • namespace — 문자열 : 패키지 (선택)
  • doc — 문자열 : 스키마에 대한 설명 (선택)
  • aliases — 배열 : 이 레코드에 대한 대체 이름 (선택)
    이름을 바꾼 경우 여기에 구버전 이름을 기입하여 사용할 수 있다.
  • fields — 배열 : 필드에 대한 리스트 (필수)

fields 하위 요소

  • name — 문자열 : 필드의 이름 (필수)
  • doc — 문자열 : 필드에 대한 설명 (선택)
  • type — 스키마 : 필드의 타입, 타입 혹은 스키마를 여러개 기입할 수 있다 (필수)
  • default : 필드의 기본 값 (선택)
  • order : 데이터를 정렬할 방향을 정한다.
    ascending이 기본이고, descending, ignore 옵션이 있다. 직렬화 될 때 정렬이 수행된다. (선택)

이외 다른 자료형은 아래 링크 참고하면 좋다.

카프카의 직렬처리기

카프카 프로듀서의 필수 구성에는 직렬처리기가 포함된다. String, Integer, ByteArray 직렬처리기가 있지만 이것들로 모든 데이터의 직렬화를 충족시킬 수 없으므로 다른 직렬 처리기가 필요하다. 그래서 커스텀 직렬처리기를 사용할 수 있지만, 타입의 변화가 있거나 필드가 추가 혹은 삭제된다면 기존 메시지와 새로운 메시지 사이에 호환에 문제가 생긴다.

이런 이유로 JSON, Apache Avro, Thrift, Protobuf 같은 범용 직렬처리기와 역직렬처리기 사용을 권하고 있다. 범용 직렬처리기를 사용하면 데이터를 읽는 모든 애플리케이션을 변경하지 않고 스키마를 변경하더라도 어떤 에러도 발생하지 않으며, 기존 데이터를 변경할 필요도 없다.

스키마 레지스트리

카프카를 사용하여 구성한 Pub/Sub (발행/구독) 시스템은 시스템이 복잡해지는 것에 장점이 있다. 카프카를 기준으로 데이터를 발행하는 발행자가 늘어나거나 데이터를 구독하는 구독자가 늘어나는 것에 자유롭기 때문이다. 하지만 이러한 시스템에서 각 발행자/구독자 간에 Avro 스키마를 직접 공유하기는 어려울 것이다. 어떤 서비스에서 구독하고 있는지 발행자 입장에서는 알 수 없기 때문이다.

데이터 파일에 스키마 전체를 저장하는 Avro 파일과 다르게, 데이터를 갖는 각 메시지에 스키마 전체를 포함하여 발행 한다면 메시지의 크기가 두배 가까이 증가할 것이다. 따라서 사용하는 것이 스키마 레지스트리를 많이 사용한다.

발행자는 스키마는 스키마 레지스트리에 저장하고 스키마의 id를 메시지에 첨부하여 보내면, 구독자는 메시지에 첨부된 스키마의 id를 바탕으로 스키마 레지스트리에서 스키마를 전달받아 메시지를 처리하는 방식이다.

Confluent에서 만든 스키마 레지스트리가 가장 많이 사용되며 다른 스키마 레지스트리도 있지만, 직접 구현해서 사용해도 될 것 같다.

카프카에 Avro 메시지를 프로듀싱 하기

다음은 파이썬에서 Avro를 활용하여 카프카에 프로듀싱하는 예제이다. 일반적인 프로듀싱 방법에서 인코딩하는 절차만 추가된 것으로 볼 수 있다. 스키마 레지스트리는 여기서 사용되지 않았다. 만약 스키마 레지스트리를 사용한다면 스키마를 조회해오는 구문만 추가하면 될 것이다. 그리고 data의 값이 Avro 스키마와 일치하지 않는다면 Avro 타입 에러가 발생한다.

from io import BytesIO

import avro
from avro.io import DatumWriter
from avro.schema import parse as parse_schema
from kafka import KafkaProducer

# Kafka producer 설정
kafka_bootstrap_servers = "localhost:19092,localhost:29092,localhost:39092" # Kafka 브로커 서버 주소
kafka_topic = "your_topic_name" # 프로듀싱할 토픽 이름

# Kafka producer 생성
producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda x: x
)

# 프로듀싱할 데이터 생성
data = {
"ip": "127.0.0.1",
"timestamp": "2023-06-12 12:00:00",
"method": "GET",
"url": "<http://example.com>",
"http_version": "HTTP/1.1",
"status": "200",
"byte": "1024"
}

# Avro 스키마 설정
avroSchema = """
{
"type": "record",
"name": "LogRecord",
"fields": [
{"name": "ip", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "method", "type": "string"},
{"name": "url", "type": "string"},
{"name": "http_version", "type": "string"},
{"name": "status", "type": "string"},
{"name": "byte", "type": "string"}
]
}
"""
avro_schema = parse_schema(avroSchema)

# Avro 데이터 직렬화
bytes_writer = BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer = DatumWriter(avro_schema)
writer.write(data, encoder)
bytes_data = bytes_writer.getvalue()

# 데이터 프로듀싱
producer.send(kafka_topic, value=bytes_data)
producer.flush()

# Kafka producer 종료
producer.close()

Avro 스키마로 직렬화된 바이트 데이터를 출력하면 다음과 같이 출력된다. 어떤 값을 갖고 있는지 알아보기 조금 힘든 것을 볼 수 있다. 하지만 JSON으로 데이터를 전송하는 것 보다는 길이가 짧은 것을 알 수 있다.

b'\\x12127.0.0.1&2023-06-12 12:00:00\\x06GET$http://example.com\\x10HTTP/1.1\\x06200\\x081024'

카프카의 Avro 메시지를 컨슈밍 하기

이번에는 방금 퍼블리싱한 메시지를 컨슈밍하고 역직렬화하여 데이터를 사용하는 예제이다. 여기서도 마찬가지로 일반적인 컨슈머 코드에서 역직렬화하는 부분만 추가된 것을 볼 수 있다.

from io import BytesIO

import avro
from avro.io import DatumReader
from avro.schema import parse as parse_schema
from kafka import KafkaConsumer

# Kafka consumer 설정
kafka_bootstrap_servers = "localhost:19092,localhost:29092,localhost:39092" # Kafka 브로커 서버 주소
kafka_topic = "your_topic_name" # 컨슈밍할 토픽 이름
kafka_group_id = 'temp-consumer'

# Avro 스키마 설정
avroSchema = """
{
"type": "record",
"name": "LogRecord",
"fields": [
{"name": "ip", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "method", "type": "string"},
{"name": "url", "type": "string"},
{"name": "http_version", "type": "string"},
{"name": "status", "type": "string"},
{"name": "byte", "type": "string"}
]
}
"""
avro_schema = parse_schema(avroSchema)

# Kafka consumer 생성
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=kafka_bootstrap_servers,
group_id=kafka_group_id,
value_deserializer=lambda x: x,
enable_auto_commit=False # 수동으로 오프셋 커밋 설정
)

# 메시지 컨슈밍
for message in consumer:
value = message.value

# Avro 데이터 역직렬화
reader = DatumReader(avro_schema)
bytes_reader = BytesIO(value)
decoder = avro.io.BinaryDecoder(bytes_reader)
data = reader.read(decoder)

# 오프셋 커밋
consumer.commit()

여기서도 Avro를 역직렬화한 데이터를 출력해본다면 아래와 같이 잘 변환 된 것을 볼 수 있다.

{'ip': '127.0.0.1', 'timestamp': '2023-06-12 12:00:00', 'method': 'GET', 'url': '[<http://example.com>](<http://example.com/>)', 'http_version': 'HTTP/1.1', 'status': '200', 'byte': '1024'}

마무리

대규모 시스템을 구축하기 위해 카프카를 필수적으로 사용하고 있다. 따라서 카프카를 공부하는 개발자가 많은데, Avro는 쉽게 사용할 수 있으니 꼭 활용해보면 좋을 것 같다.

참고 자료

책 : 카프카 핵심 가이드

--

--

Joon
Techeer Tech Blog

엉뚱한 구석이 있는 데이터 엔지니어입니다.