29CM 로그 수집 시스템 소개

29CM
29CM TEAM
Published in
23 min readJul 9, 2021

29CM에서는 기존의 로그 시스템, 신규 구축한 로그 시스템 두가지를 함께 운영중입니다.

기존의 로그 시스템은 Python + Django 기반의 모놀리틱 메인 서비스에서 기존대로 활용하고, 신규 로그 시스템은 Java + Spring 기반의 MSA 서비스 중심으로 사용하고 있었습니다. 그 후 최근에는 Django 기반의 메인 서비스도 함께 적용하여 운영중입니다.

29CM 로그 시스템의 주요 오픈 소스 (ELK + kafka)

ElasticSearch 장애 발생

불타는 금요일 오후 10시30분
여러 채널에서 알람이 발생하기 시작합니다.

기존 로그 시스템의 로그 유실 알람

위 로그는 Open Distro 를 활용한 장애 탐지에서 추가한 얼럿입니다.

신규 로그 시스템에서 카프카 지연 발생 알람
휴먼 알람..

장애 원인

장애 원인은 ElasticSearch (이하 ES)의 Disk 부족으로 인한 로그 적재 실패였습니다.

기존의 로그 시스템으로 쌓이는 신규 로그가 추가 되면서 기존보다 많은 정보를 담게 되었고, 이로 인해 처음의 예상보다 더 많은 Disk 를 사용하게 되었습니다. 그로 인해 Disk 용량이 부족해지면서 장애가 발생했습니다.

ES의 인덱스를 정리하여 Disk 확보하여 대응 했으나 장애 시간 동안의 로그가 유실되는 현상이 발생 했습니다.

기존 로그 시스템의 메인 서비스 로그 유실

하지만, 신규 로그 시스템에서는 로그 유실이 없었습니다.

동일 시간의 신규 로그 시스템의 메인 서비스 로그

29CM의 로그 유실0% 를 달성한 신규 로그 시스템을 소개하려 합니다.

기존 로그 시스템 소개

기존 29CM 에는 Kubernetes 환경에 맞게 Fluentd 를 활용한 EFK 기반의 로그 파이프라인이 기존에 존재했습니다.

쉽게 구성할 수 있고 흔히 볼 수 있는 심플한 파이프라인 아키텍쳐입니다.
Kubernetes 환경에 맞게 가벼운 Fluentd 를 사용했고 Fluentd 컨테이너들은 각각의 서비스에 사이드카로 주입되어 배포됩니다.

사이드카로 주입된 각각의 Fluentd 컨테이너들은 Fluentd Aggretator 에게 로그데이터를 송신하고,Aggregator는 모든 로그 데이터를 집계하여 ES 에 인덱싱합니다.

하지만 기존의 로그 파이프 라인에는 몇 가지 문제가 있었습니다.

로그 데이터의 가시성 부족

기존 로그 시스템의 ES 에는 인덱스 매핑이 strict 하게 설정되어 있었고 Fluentd 내부 파이프라인에 parser 와 filter 를 이용해 body 에서 정규식을 이용해 값을 꺼내고 있었습니다.

그 결과 로그 데이터의 컬럼 확장이 매우 어려운 상태 였습니다.

초반에는 API 요청에 대한 body 데이터를 남기는 것이 없었고, 로그에 대한 컬럼 확장도 어렵다보니 가장 기초적인 인풋 / 아웃풋에 대한 데이터조차 로그를 남기지 못하는 상황이었습니다. 추후에 인풋 / 아웃풋을 남기기 시작했으나 동적으로 컬럼 확장이 막혀 있었기 때문에 body 전체를 base64 로 인코딩하여 우회적으로 로그를 남기는 식으로 일부 개선을 진행했었습니다.

로그 데이터의 유실 가능성과 버퍼 부실

기존 파이프 라인의 경우 로그 유실 가능성도 있었습니다.

Disk 용량 부족으로 인해 ES 에 로그가 적재되지 못하게 되면 Disk 용량을 확보하고 ES 복구하기 전까지는 로그 유실을 피할 수 없었습니다.

또한 ES 의 인덱싱 속도가 잠시 느려지거나 일시적인 장애 발생 시 Fluentd Aggregator 가 버퍼 역할을 해줘야 하는데 해당 버퍼는 너무 단순하고 대량의 데이터를 쓰기에는 적절하지 못했습니다.

Fluentd Aggregator 의 파드에 PV 를 연결해 파일로 데이터를 쓰거나 Memory 에 올려야하는데, PV를 연결하기엔 Fluentd 의 장점인 수평확장에 제약이 생기게 되고 Memory를 사용하기엔 너무 많은 리소스를 사용하게 될수도 있어서 아주 긴 시간동안의 장애는 여전히 대응하기 어렵다고 판단했습니다.

기존 로그 시스템은 Memory 와 File 두 가지 유형을 버퍼로 사용중이었고 PV 는 연결하지 않았습니다. 이런 경우 Kubernetes 의 pod 볼륨에 File 을 기록해놓더라도, 해당 pod 이 재시작되는 경우 pod 볼륨이 휘발되어 로그 데이터가 유실되는 이슈가 발생하게 됩니다.

실제로 장애시간대의 메트릭을 살펴보면 Aggregator 의 메모리가 치솟았고 OOM Killed 가 자주 발생했으며, 해당 시그널이 발생하는 시점 이전의 모든 로그들은 한번씩 유실이 되었습니다.

기존 로그 시스템에서는 의미있는 로그를 남기지 않고 있었고, 그런 로그조차 특정 시점마다 유실이 발생하는 상황이 지속되었습니다.

신규 파이프라인 소개

위와 같은 문제점을 해결하고자 신규 로그 시스템을 구축하기 시작했습니다.

기존 로그 시스템과 파이프라인은 단계별로 타이트하게 연결되어 있기 때문에 변경이 쉽지 않았고, 해당 로그를 기반으로 운영 중인 서비스도 일부 존재했기 때문에 기존 로그 시스템은 유지하면서 새로운 로그 시스템을 구축하는 방향으로 의사 결정 하였습니다.

아키텍쳐

아키텍쳐는 기존과 거의 동일합니다.
다른점은 로그를 송신하는 구간과 수집하는 구간 사이에 Kafka 를 버퍼로 두었다는 것입니다.

Kafka 도입 근거

Kafka 를 로그 수집 구간의 중간에 두어 ES 의 장애와 상관없이 Kafka 에 기록된 로그 자체가 유실되지 않도록 버퍼 역할을 하게끔 하였습니다.

29CM 의 신규 로그 시스템에서는 Kafka 의 retention 을 3일(24 * 3 hours) 로 지정하여, 주말을 포함하여 ES 의 장애가 발생하더라도 ES 가 다시 복구 되는 즉시 Kafka 에 쌓인 모든 로그 데이터를 ES에 인덱싱 할 수 있게 하였습니다.

로그 데이터의 경우 비즈니스 로직과는 분리되는 데이터로 실시간적인 로그 적재가 중요한 요소라고 생각하지는 않았습니다. 그보다는 어떤 경우에서도 로그 데이터가 유실되지 않는 것이 중요하다고 생각했고 Kafka 를 통한 로그 적재 구간에 버퍼를 두어 유실 없는 로그 시스템을 만들었습니다.

다이나믹 매핑 적용

신규 로그 시스템의 ES 에는 다이나믹 매핑을 적용하여 로그 내에 특정 값을 신규로 추가하면 자동적으로 ES 에 매핑이 되도록 하였습니다. 신규 로그 시스템에서 남기고 싶은 데이터를 파이프라인에 밀어넣기만 하면 ES 에서 인덱싱까지 동적으로 완료됩니다.

다이나믹 매핑 방식으로 변경했기 때문에 전송되는 모든 로그 데이터가 로그 형태 그대로 적재될 수 있게 되었고, 로그를 남기는 서비스에서는 필요한 로그 데이터를 마음껏 추가하고 삭제할 수 있게 되었습니다.

Fluentd 를 Logstash 로 변경

로그를 수집하고 적재하는 역할에 있어서 기존 Fluentd 를 Logstash 로 변경하였습니다.

신규 로그 시스템에서는 Kafka 를 통한 버퍼 구간을 도입하는게 중요했는데 Fluentd 의 기본 베이스 이미지에서는 Kafka 의 플러그인이 미설치 되어 있었고 Logstash 에서는 Kafka 플러그인이 기본으로 제공되고 있었습니다.

로그 수집 스택에 있어서 ELK 스택 (ElasticSearch + Logstash + Kibana) 가 오랜 시간 검증된 것도 있다고 생각했기 때문에 Fluentd 를 Logstash 로 변경하였습니다.

Kafka 운영 도구 - Strimzi 선택

Kubernetes 환경에서 Kafka 를 쉽게 프로비저닝 하기 위해 여러가지 선택지가 존재합니다.

29CM 은 대체로 helm 을 활용하여 기반 인프라를 운영하고 있는데 Kafka 만큼은 Strimzi 를 이용해 프로비저닝 하고 있습니다.

Strimzi는 Kubenetes 의 Custom Resource Definition (이하 CRD) 를 통해 Kafka 클러스터를 프로비저닝 할 수 있습니다. 단순한 매니페스트 설정만으로도 클러스터 프로비저닝부터 jbod, topic 설정 등 다양한 기능을 제공하고 있습니다.

아래는 Kafka 클러스터의 프로비저닝을 담당하는 매니페스트 예시입니다.
(상세 설정은 임의로 삭제 및 변경 했습니다.)

# kafka-cluster.yamlapiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: kafka-cluster
namespace: platform
spec:
kafka:
version: 2.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
auto.create.topics.enable: "false"
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '2.6'
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 2Gi
deleteClaim: false
kafkaExporter:
template:
pod:
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9404"

아래는 Kafka 의 토픽 생성에 대한 예시 매니페스트입니다.
예시에서는 logging 이라는 토픽을 생성하고 retention 은 6시간, partition 은 60개, replica 는 2개로 설정합니다.

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: service-log
namespace: devops
labels:
strimzi.io/cluster: logging-kafka
spec:
topicName: logging
partitions: 60
replicas: 2
config:
retention.ms: 21600000
segment.bytes: 1073741824

Strimzi 에 대한 더 자세한 내용은 공식홈페이지(https://strimzi.io/) 를 참고해주세요.

HTTP 기반의 전송 프로토콜 선택

29CM 내부에서는 TCP 프로토콜의 사용이 어려운 인프라 운영환경이 존재하고, 추후 카프카의 Rest Connector로 변경할 가능성까지 고려하여 http 프로토콜 기반의 로그 전송 프로토콜을 사용하는 것으로 의사결정을 했습니다

각 레이어 별 사용된 프로토콜

코드 작업

Spring

신규 MSA 구조 안에서 구현되는 스프링 프로젝트에서는 로깅에 Slf4j를 이용하고 있고, logback 설정을 통해 간단히 Appender를 추가하여 사용하고 있습니다.

현재 사용중인 HttpAppender 를 간단히 작성하여 공유하면 아래와 같습니다.

public class HttpAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

private String protocol = "http";
private String host = "localhost";
private int port = 80;
private String path = "/";
private int connectionTimeout = 30_000;
private int readTimeout = 30_000;
private Encoder<ILoggingEvent> encoder = new LoggingEventCompositeJsonEncoder();

@Override
protected void append(ILoggingEvent event) {
var bytes = encoder.encode(event);

var uri = String.format("%s://%s:%d%s", protocol, host, port, path);
try {
sendHttp(uri, "application/json", bytes);
} catch (IOException e) {
e.printStackTrace();
addError("Logging Error..", e);
}
}


private void sendHttp(String uri, String contentType, byte[] bytes) throws IOException {
final HttpURLConnection conn = (HttpURLConnection) new URL(uri).openConnection();
conn.setConnectTimeout(connectionTimeout);
conn.setReadTimeout(readTimeout);
conn.setDoOutput(true);
conn.setRequestMethod("POST");
conn.setFixedLengthStreamingMode(bytes.length);
conn.setRequestProperty("Content-Type", contentType);

final OutputStream os = conn.getOutputStream();
os.write(bytes);

os.flush();
os.close();
}

public int getConnectionTimeout() {
return connectionTimeout;
}

public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

public int getReadTimeout() {
return readTimeout;
}

public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

public String getProtocol() {
return protocol;
}

public void setProtocol(String protocol) {
this.protocol = protocol;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public Encoder<ILoggingEvent> getEncoder() {
return encoder;
}

public void setEncoder(Encoder<ILoggingEvent> encoder) {
this.encoder = encoder;
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}
}

logstash-logback-encoder 라는 라이브러리의 디펜던시가 존재합니다.

Gradle

implementation 'net.logstash.logback:logstash-logback-encoder:6.6'

logback

<appender name="HTTP" class="kr.co._29cm.common.logging.HttpAppender">
<protocol>${HTTP_LOGSTASH_PROTOCOL}</protocol>
<host>${HTTP_LOGSTASH_HOST}</host>
<port>${HTTP_LOGSTASH_PORT}</port>
<path>${HTTP_LOGSTASH_PATH}</path>
<connectionTimeout>${HTTP_LOGSTASH_CONNECTION_TIMEOUT}</connectionTimeout>
<readTimeout>${HTTP_LOGSTASH_READ_TIMEOUT}</readTimeout>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<pattern>
<pattern>
{
"@timestamp":"%d{yyyy-MM-dd'T'HH:mm:ss.SSSZ}",
"service":"${SPRING_APPLICATION_NAME}",
"phase":"${SPRING_PROFILES_ACTIVE}",
"traceId": "%X{X-B3-TraceId:-}",
"spanId": "%X{X-B3-SpanId:-}",
"level":"%level",
"thread":"%thread",
"logger":"%logger",
"exception": "%ex{2}",
"payload": "#asJson{%msg}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>

<appender name="ASYNC-HTTP" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="HTTP"/>
<queueSize>${HTTP_ASYNC_QUEUE_SIZE}</queueSize>
</appender>

Spring의 로그 전송 구간에서 http 전송 실패에 대한 retry 로직이 현재는 빠져있습니다. 전송만 하면 유실이 없지만 전송 구간에서의 유실은 현재 대응되어 있지 않고, 추후 Async기반의 appender 로 리팩터링 하여 전송구간 유실도 대응하도록 할 예정입니다.

Django

기존 모놀리틱 구조에서의 Django 서버에서는 2개의 로그 파이프라인을 사용하여 로그를 수집하고 있습니다.

  • 기존 로깅 파이프라인의 경우, 파이썬 표준 로그 모듈인 logging 라이브러리를 사용하여 파일로 로깅한 후, 사이드카로 주입한 Fluentd 컨테이너가 Tailing하며 Aggregator로 전송하는 구조였습니다.
  • 신규 로깅 파이프라인의 경우, 네트워크 상의 제약사항으로 인해 http 프로토콜을 사용해 로그를 Logstash 로 전송해야 했습니다. 표준 로그 모듈에서 http 프로토콜을 통해 로그 데이터를 전송할 수 있도록 HttpHandler 클래스를 제공하고 있었으나 이를 그대로 쓰는 것은 아래와 같은 이슈가 있어 사용하기 어려웠습니다.

(1) 표준 로그 모듈에서 제공하는 HttpHandler 클래스의 경우 Sync 통신을 사용한다.

#!/bin/python3class HttpHandler(logging.Handler):
...
def emit(self, record):
try:
import http.client, urllib.parse
...
h = http.client.HTTPConnection(host)
h.putrequest(self.method, url)
h.endheaders()

...
if self.method == "POST":
h.send(data.encode('utf-8'))
h.getresponse() # HTTP Sync 통신 사용
except Exception:
self.handleError(record)
...

(2) 해당 클래스를 사용할 경우, 로그를 남길 때 마다 LogStash 와 Sync 통신을 하게 되고, 로그를 남기는 횟수에 비례하여 요청 처리 시간에 누적된다.

(3) logging 모듈은 로그를 남길 때, 스레드 락을 획득한 후 로그를 쓴다.

#!/bin/python3class Handler(Filterer):
...
def handle(self, record):
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record) # 로그 데이터를 처리하는 구간
finally:
self.release()
return rv
...

(4) 여러 요청 스레드가 동시에 로그를 남기는 상황에서 (3)의 특성으로 인해 먼저 진입한 스레드가 LogStash와 Sync 통신을 완료할 때까지 다른 스레드들은 wait 상태가 되고, 이 시간이 요청 처리 시간에 누적된다.

위의 문제점을 해결하기 위해 pypi 에서 제공하는 python-logstash-async라는 3rd party 라이브러리를 검토하였습니다. 해당 라이브러리에는 AsynchronousLogstashHandler 클래스를 제공하고 있는데 해당 클래스는 클라이언트 요청을 처리 중인 스레드가 Logstash와 직접 통신하게 하지 않고 로그 데이터를 Python Queue 객체에 넣도록 하고있으며 별도의 Worker 스레드가 해당 Queue를 Non-blocking 하게 가져와서 bulk로 로그 데이터를 처리하게 끔 구현되어있습니다. (자세한 사용 방법은 링크로 대체합니다)

해당 라이브러리를 가지고 실 운영하다보니 그대로 쓰기에 몇가지 제약사항이 있었으며, 29CM에서는 운영환경에 맞게 라이브러리를 확장하여 사용하였습니다.

(1) 해당 라이브러리는 개발자의 로그데이터를 Python Queue 객체에 넣기 전에 직렬화합니다. 개발자가 찍은 로그 데이터가 1MB 넘는 대용량 데이터일 경우 직렬화하는 비용이 클라이언트 요청을 처리중인 스레드에 부과되기 때문에, 이를 Worker 스레드가 담당하도록 동작을 수정하였고, 운영중인 카프카 브로커가 max.message.size=1MB 설정을 사용중이기 때문에, 로그 데이터를 Python Queue 객체에서 꺼낸 뒤 직렬화 하는 과정에서 크기를 압축하여 최대한 1MB 를 안넘도록 메시지 크기를 최적화 하였습니다.

(2) 해당 라이브러리는 Logstash 와 통신시, Exception 이 발생할 경우 Queue에 re-queue 하도록 동작합니다. 그러나 ReadTimeout 의 경우, 서버로부터 응답을 못받았을 뿐 메시지 Consume 은 될 수 있기 때문에, 이를 확장하여 Queue에 re-queue 하지 않도록 기능을 확장함으로써 로그 데이터가 중복으로 Produce 되지 않도록 처리했습니다.

(3) 또한, 로그데이터에 고객 정보와 같은 민감 정보가 그대로 수집되면 안되기 때문에 Masking 을 할 수 있도록 기능을 확장하였습니다.

추후 Kafka topic 튜닝 포인트

로그 시스템의 Kafka 에 튜닝을 고려하는 대부분의 경우는 1) 어떻게 처리량을 높이고 2) 가용성을 늘릴 수 있을지가 대부분일거라 생각합니다.

Kafka 특성상 파티션 1개당 하나의 File 이 생성되고 파티션이 일종의 병렬처리 단위라고 볼 수 있기 때문에, 파티션이 늘어나면 처리량이 많아진다고 볼 수 있습니다. 컨슈머의 개수도 파티션 하나당 하나의 컨슈머 스레드가 할당되며 파티션이 많을수록 컨슈밍도 병렬로 처리된다고 볼 수 있습니다.

하지만 파티션 수가 많아지면 File Descriptor (이하 FD)가 증가하게 됩니다(파티션 한개당 FD가 하나씩 생기니). 그렇다보니 동일 노드에서 생성할 수 있는 총 파티션의 개수는 어느정도 정해져있다고 볼 수 있습니다.

또한 파티션 수가 많아질수록 가용성이 떨어질 수 있는데요. 복제 설정과도 연관이 있습니다.

파티션은 크게 Leader와 Follower로 나눠집니다.
메세지의 쓰기는 Leader 만 가능하고 Leader 파티션을 가진 노드가 장애가 발생한다면 Leader 선출 전까진 해당 파티션에는 데이터를 쓰지 못하는 상태가 됩니다.

파티션 Leader 선출까지 2ms 가 소요된다고 가정해보겠습니다.
1,000개의 파티션이 재선출이 필요한 경우, 총 2초의 시간이 소요되게 됩니다. 총 비가용 시간은 2초 + 장애 발견 시간이 되고, 이렇게 가용성이 떨어지는 결과를 만들게 됩니다.

위 케이스에서 조금 더 최악의 경우는 해당 Broker 가 Controller 일 경우 새로운 브로커로 Controller 가 장애조치가 되기 전까지 Leader 선출이 시작되지 않습니다. 클러스터에 총 10,000개의 파티션이 존재하고 Zookeeper 에서 Metadata 를 초기화하는데 파티션 당 2ms 가 소요된다고 가정한다면 비가용 시간은 20초가 추가되게 됩니다.

즉, Controller broker에 장애가 발생하는 경우 20초 + 2초 의 비가용시간이 발생하게 됩니다.

위 내용들을 숙지하고나서 파티션수의 조정을 진행하는데 1개 파티션의 최대 처리량을 측정한 뒤, 목표 처리량과 최대 비가용 시간을 맞춰 파티션을 조정하면 되겠습니다.

글쓴이

강호길 / 윤정오

함께 성장할 동료를 찾습니다

29CM ((주)무신사)는 3년 연속 거래액 2배의 성장을 이루었습니다.
이제 더 큰 성장을 위해 기존 모놀리틱 서비스 구조를 마이크로서비스 구조로 전환하고, 앵귤러 기반 프론트엔드 코드를 리액트로 전환하는 등의 기술적인 시도를 진행하고 있습니다. 모바일 앱 내부 구조도 모듈러 아키텍처로 개선하는 과정에 있습니다. 함께 성장하고 유저 가치를 만들어낼 동료 개발자분들을 찾습니다.

🚀 29CM 채용 페이지 : https://www.29cmcareers.co.kr/

--

--

29CM
29CM TEAM

Guide to Better Tech, 29CM의 개발블로그입니다