Kafka — для самых маленьких

Kirill Sereda
14 min readMay 10, 2023

--

В интырнете тыщщи миллионов статей на тему Kafka. И даже не смотря на это я все равно хочу поделиться своими знаниями в области этой замечательной штуки.

Сейчас ни одно интервью на позицию backend developer не обходится без знаний брокеров сообщений, и в большинстве случаев это Kafka.

Давайте разберем основные моменты с самого нуля для тех, кто никогда не работал и не был знаком с Kafka, а затем углубимся более детальней.

А уже в следующей статье поговорим про Kafka Streams.

Что такое Kafka?

Для начала максимально кратко, а потом уже остановимся на каждой детали и все будет понятно.

Поехали!

Kafka — это распределенный брокер сообщений, который работает по принципу Издатель-Подписчик.

Kafka нужна для организации потоковой обработки данных.

Состоит из двух типов компонентов:

  • кластер zookeeper (zookeeper встраивается в брокер в последних версиях)
  • кластер брокеров

Забегая наперед:

Очередь в Kafka всегда однонаправленная! Нельзя сделать двунаправленную!

Данные в Kafka представлены в виде пар ключ-значение.

Kafka гарантирует, что все сообщения будут упорядочены именно в той последовательности, в которой поступили.

Kafka хранит прочитанные сообщения определенный период времени (не удаляет их после прочтения — по умолчанию хранит 1 неделю).

Kafka хранит свои записи на диске и ничего не держит в оперативной памяти.

Особенности:

  • Конкретной партицией владеет один брокер-лидер (остальные брокеры, у которых также размещена эта партиция — реплики).
  • По умолчанию создается одна партиция на топик.
  • Сообщения в партициях строго упорядочены, но не упорядочены между партициями одного топика, потому что запись сообщений в партиции происходит параллельно.
  • Сообщения в партиции сохраняются до накопления определенного объема или периода хранения.
  • Если consumer объединены в consumer group, то в каждой consumer group каждая партиция принадлежит только одному consumer (т.е. consumer может читать несколько партиций, но одна партиция не может читаться несколькими consumer). Если количество consumer больше количества партиций, то часть consumer будет простаивать.
  • Если consumers не объединены в consumer group, они читают независимо (т.е. каждый consumer, не объединенный в consumer group, читает из всех партиций).
  • Consumer может подписываться на топики по regular expression, в этом случае при создании соответствующего топика произойдет rebalance и consumer начнет читать также этот топик.
  • Consumer может читать только после того, как сообщение запишется на все не отстающие реплики, для того чтобы гарантировать, что прочитанные сообщения не пропадут из Kafka при сбое (из-за выхода из строя лидера, если сообщения есть только на нем) и смогут быть прочитаны всеми consumers. Чем больше реплике позволено отставать и при этом все еще считаться “не отстающей”, тем больше может быть пауза между записью producer и чтением consumer (потому что consumer сможет читать только после записи сообщения на самую отстающую реплику, которая все еще считается “не отстающей”).
  • Для того, чтобы при выходе consumer из строя было известно, какие сообщения он успел прочитать, consumer делает commit offset — записывает оффсет последнего записанного сообщения. Раньше consumer записывал оффсеты прямо в zookeeper.
  • Consumer может вручную подписаться на определенные партиции топика (например все), не входя в consumer group, но в этом случае он должен периодически уточнять, не появились ли новые партиции, потому что в случае ручной подписки добавление партиций не приведет к ребалансировке.
  • Порядок внутри партиции: если сообщение А записано в партицию после сообщения Б, то они будут прочитаны из этой партиции в том же порядке и сообщение А будет иметь меньший оффсет.
  • Поддерживается атомарная запись в несколько топиков в рамках одной транзакции.

Consumer может читать в режимах:

  • read_committed — в этом режиме consumer прочтет сообщения, записанные в рамках транзакции, только после коммита транзакции.
  • read_uncommitted — в этом режиме consumer прочтет сообщения, записанные в рамках транзакции сразу после их записи, не дожидаясь коммита транзакции.

Принцип работы

  • Издатель посылает сообщение в Kafka

Эти сообщения обрабатываются другими приложениями (Consumer = Потребителями).

  • Сообщения сохраняются в Topic
  • Потребители сами опрашивают Kafka, не появилось ли у него новых сообщений, и указывают, какие записи им нужно прочесть.

Очередь может работать в 2 режимах: push и pull.

Push — когда Topic сам рассылает сообщения всем, кто на него подписан.

Pull — когда Потребители сами опрашивают Topic в надежде получить новое сообщение. Потребители подписываются на тему чтобы получить новые сообщения.

Также можно построить систему, в которой будет 1 главный (ведущий) брокер Kafka и также будут дополнительные (запасные) брокеры. В случае неработоспособности главного брокера, дополнительные будут работать. Все взаимодействие происходит через Zookeeper (аналоги Eureka, Consul).

Как происходит добавление брокера ?

Дополнительные брокеры добавляются в список ISR и контроллер начинает производить балансировку этих брокеров (например на одном 4 partitions, на другом 2 или 3. В этом случае Kafka второму добавит пару штук чтобы разбалансировать нагрузку).

Topic

Теперь более детально и по порядку, чтобы все понять.

Topic — это поток сообщений (неограниченная последовательность key-value пар). Мысленно можете представить что это труба, по которой текут данные.

Ключи и значения — обычные массивы байтов, т.е. <byte[], byte[]>.

  • имеет имя
  • можно создавать сколько угодно топиков
  • данные в топике хранятся 1 неделю (по дефолту)
  • нельзя удалить одно сообщение из топика (в таком случае лучше сделать новый топик, скопировать все сообщения из старого топика кроме того, которое хотим удалить, и затем просто удалить старый топик)

Данные в Topic хранятся в Log-файлах! В файловой системе Kafka есть папка logs -> в ней есть папки для каждой partitions. В каждой папке будет 3 файла: .log, .index, .timeindex.

.log -> тут сами данные в формате: offset (номер сообщения в partition), position (смещение в байтах в файле), timestamp, message.

Лимит log файла = 1 Гб (при достижении лимита старый останавливается и больше не изменяется и создается новый).

.index -> маппинг offset на позицию.

.timeindex -> маппинг timestamp на offset.

Compacted topic

Это значит, что для такого Topic для каждого ключа нам важно знать только самое последнее значение!

т.е. если есть несколько значений с одним и тем же ключом, то Kafka их выкинет и оставит только самые свежие значение для этого ключа.

Если Topic помечен compacted, то время от времени Kafka будет заниматься его компактификацией, т.е. будет удалять те значения, которые уже переписаны более новым.

Никаких гарантий нет — Kafka будет делать это когда ей вздумается (как GC в Java).

Работает в параллельном потоке.

Partition

Topic делится на партиции.

Когда мы кидаем сообщение в Topic, то на самом деле это сообщение попадает в Partition внутри Topic.

Мы представляли что Topic — это труба. Так вот представьте теперь что Partition — это маленькая трубка, которая находится внутри большой трубы (Topic).

  • Каждый Partition сохраняет порядок сообщений (сообщения упорядочены только в рамках одной partition)
  • Каждое сообщение в Partition получает номер или offset (номер сообщения)

Broker

Это сервер. У него есть имя. Каждый Broker хранит несколько Partitions.

Kafka состоит из нескольких Broker.

Например, если у нас в кластере есть 3 broker, есть 1 topic и в нем 3 partitions -> то можно сказать что каждый partition будет храниться на своем broker.

Когда Kafka соединяется с broker, это означает что Kafka соединяется сразу со всем кластером!

Визуально вот так представляйте себе

А если чуть более детальней, то вот так

Заметили что добавилось ? Внутри Broker мы видим Topics.

Ну а если еще детальней и наглядней, то вот так это выглядит

Внутри Broker мы видим Topics, а внутри Topics лежат наши Partitions.

Думаю теперь стало понятней. Идем дальше.

Topic replication

Итак, в итоге наша картинка имеет вид

Что такое replication?

Это копирование данных с одного partition одного broker на другой для надежности.

Если у нас есть несколько одинаковых копий для partition, то среди них есть главная — Leader.

Producer записывает сообщения именно в Leader какого-либо partition, остальные partition только копируют данные из Leader (синхронизируются с ним)!

Leader-ом может быть только одна копия!

ISR (in-sync replicas)

Когда пишем сообщения в leader, то также синхронно пишем и в ISR-follower-ы, для надежности, если вдруг leader упадет.

Когда leader падает, Kafka переизбирает leader (при этом ВСЕГДА будет потеря данных!)

Producer

Producer пишет сообщения в Topic в partition в порядке отправки.

Producer сам определяет в какие Partitions он будет писать.

НЕ Kafka определяет, а он сам!

Он реализован как клиент, который работает отдельно от Kafka.

Он автоматически понимает в какой partition и broker (leader) писать.

При коннекте к broker он узнает состояние всего Kafka кластера => например если один broker упал, то producer подключится к другому и т.д.

При отправке сообщения он идет в Broker, а Broker идет в ZooKeeper чтобы узнать, какие реплики являются leader чтобы в них писать (ведь Producer пишет ТОЛЬКО в leader!)

Если вдруг broker, в который producer пишет сообщение, упал, то Kafka автоматически исправит ситуацию: нагрузка балансируется по разным broker и partition.

Например если надо записать миллионы сообщений в секунду в один Topic, то это легко: данные будут писаться в разные partition, которые физически располагаются на разных брокерах.

Отправка:

Когда Producer шлет сообщение в Topic происходит следующее:

  1. fetch metadata (происходит синхронно!)

Из чего состоит кластер, какие реплики leader и где они находятся и т.д.

2. сериализация сообщения в нужный формат

Параметры: key.serializer и value.serializer

3. выбор partition в который будем слать сообщение

  • можно указать в какую конкретную partition
  • или указать чтобы Kafka выбрала сама (параметр round robin)
  • partition определяется по ключу (если мы отправляем сообщение с одним и тем же ключом, то оно всегда будет попадать в одну и ту же partition)

4. компрессия сообщения с помощью кодеков

5. accumulate batch для повышения производительности

batch.size — по дефолту 16 кб. Если превысили размер 16 кб то сообщения отправляются батчем в брокер

linger.ms — если мы копим батч продолжительное время, которое превышает заданный параметр, то сообщения отправляются батчем в брокер

Если же batch.size и linger.ms не превышены, то мы все равно можем отправить сообщения в брокер.

Например у нас есть Broker и у него 2 partitions. Мы накапливаем 2 батча данных для отправки и если же эти 2 батча суммарно превышают batch.size, то сообщения отправляются этими батчами в брокер, даже если эти батчи еще не заполнены до конца.

6. отправка сообщений батчем в брокер

Режимы подтверждения записи сообщений:

В конфиг файле есть параметр acks

  • acks=0 не ждем подтверждения
  • acks=1 ждем от лидера
  • acks=all ждем от всех (нет потерь 100%)

Message key

Key — часть сообщения (может быть = null)

  • если НЕ null -> то все сообщения с этим key будут писаться в один и тот же partition
  • если null -> то выбирается partition по round robin

Consumer

Читает сообщения по порядку из topic из partition.

Kafka сама определяет из каких Partitions будет читать Consumer! (НЕ Consumer, а Kafka)! НО в случае с Producer, как вы помните, сам Producer определяет, куда писать, не Kafka!

т.е. Producer сам решает, куда писать и Kafka сама решает, откуда Consumer будет читать!

Также как и Producer, Consumer узнает о всех Broker в кластере, когда подключается к Broker.

Consumer знает из какого Broker (leader) читать сообщения.

При падении Broker, Consumer переключится на другой.

Сообщения из partition читаются по правилу FIFO (First in First out — первый пришел, первый обработан).

Данные из разных partition в рамках одного Consumer могут перемешиваться любым способом.

Consumer должен коммитить свой offset (он получает сообщение и подтверждает, что оно прочитано).

Если Consumer упал после того, как прочитал сообщение, но перед тем, как отправил подтверждение, то Kafka отправит это сообщение еще раз либо этому же Consumer либо другому Consumer в этой же группе, т.к. Kafka следит за offset.

Если Consumer упал, то когда поднимется, он может продолжить читать с того сообщения, на котором упал, зная offset!

Прием сообщений:

Читаем не по одному сообщению, а сразу пачку сообщений!

  1. fetch metadata
  2. устанавливается коннект на необходимый Broker-Topic-partition для чтения данных (подключение к leader репликам!)

Гарантия доставки:

  • at most once: коммит сразу после получения (но если Consumer упадет после получения, но перед обработкой сообщения, то сообщение будет не обработано и потеряно)
  • at least once: коммит сразу после обработки:

Если Consumer упадет во время обработки, то Kafka отправит это же сообщение еще раз или другому Consumer из группы.

Если Consumer упадет сразу после обработки, но до отправки уведомления, то Kafka отправит это же сообщение уже другому Consumer из группы.

Может быть проблема, когда сообщение может быть отправлено несколько раз.

Нужен идемпотентный Consumer, чтобы он мог дважды присланный один и тот же запрос запрос обработать 1 раз!

  • exactly once: коммит сразу после обработки (доставка гарантируется) + получатель получит сообщение ровно 1 раз

Внутри системы exactly once гарантировать можно (что мы не обработаем одни и те же данные 2 раза)!

Но на входе в топик и на выходе из топика это гарантировать невозможно!

т.е. мы даем гарантию ТОЛЬКО ВНУТРИ самой Kafka, но не за ее пределами!

Consumer Group

Это несколько Consumer, которые объединены в группу.

Каждый Consumer из своей группы читает из собственного поднабора partition этого Topic.

Если много Producer-ов будут слать сообщения в Kafka, то есть вероятность, что 1 Consumer может не справиться с такой нагрузкой.

Если для Consumer в группе не хватило partition, то он будет неактивный.

Kafka сохраняет offset для каждой группы для каждой partition — это указатель на то, какое сообщение читать дальше.

  • Если несколько Consumer в одной группе, то каждому из них будет назначен свой partition!
  • Если несколько Consumer в разных группах, то они будут читать одновременно и Kafka для каждого из них будет сохранять свой offset!

Offset

Например в каком-то определенном Topic в partition лежит 5 сообщений.

Допустим Consumer прочитал из Topic из partition 2 сообщения и упал, то другой Consumer из этой же группы начнет читать эти сообщения из этого же Topic из этой же partition.

Но это лишняя работа, т.к. мы уже прочитали первые 2 сообщения, зачем их еще раз читать.

Тот Consumer, который прочитал 2 сообщений и упал, после чтения второго сообщения сделал commit и указал offset = 2.

В Kafka есть специальный Topic с названием “__consumer_offsets”, который хранит offset для каждой группы для каждой partition.

В этом топике хранится сообщение в виде Field-Value

И теперь другой Consumer из этой же группы обратится к Topic “__consumer_offsets” за информацией и будет читать с partition с 3-го сообщения.

виды commit:

  • auto commit:

at most once: коммит сразу после получения (но если Consumer упадет после получения, но перед обработкой сообщения, то сообщение будет не обработано и потеряно)

  • manual commit:

at least once: коммит сразу после обработки:

Если Consumer упадет во время обработки, то Kafka отправит это же сообщение еще раз или другому Consumer из группы

Если Consumer упадет сразу после обработки, но до отправки уведомления, то Kafka отправит это же сообщение уже другому Consumer из группы

Может быть проблема, когда сообщение может быть отправлено несколько раз.

Нужен идемпотентный Consumer, чтобы он мог дважды присланный один и тот же запрос запрос обработать 1 раз!

Retention

Kafka пишет логи на диск.

Retention помогает указать какое кол-во логов мы сохраняем, а какое удаляем.

Например записи недельной давности уже удаляем или все что больше 100 Гб не храним.

Когда Kafka пишет данные на диск, она пишет их в файловый сегмент (по дефолту 4 Гб). Когда этот сегмент переполняется, он начинает писать во второй сегмент (первый сегмент закрывается на запись и его всегда можно прочитать). Когда второй переполняется, то аналогично начинает писать в третий и т.д.

Пишет в конец!

И потом Kafka понимает, что например 1 сегмент уже не актуален по лимиту места или времени и удаляет его.

Kafka Broker Discovery

Каждый Broker — это bootstrap сервер.

Каждый Broker знает обо всех остальных в кластере, их Topic и partitions.

Bootstraping — это когда клиент подключается к одному из них и узнает о всех остальных.

ZooKeeper

Это часть кластера Kafka.

Это распределенный кластер, который обеспечивает хранение конфигураций и уведомлений о том, что эта конфигурация изменилась.

Обычно должен состоять из нечетного кол-ва узлов (3,5,7 узлов).

Содержит актуальный список всех Broker, topics, partitions.

Kafka vs RabbitMQ

Теперь давайте сравним Kafka и наиболее популярный аналог RabbitMQ.

  • Kafka более масштабируема, нежели RabbitMQ
  • Другой подход к работе

RabbitMQ:

1) Publisher отправляет сообщение на exchange

2) exchange отправляет сообщение в Очередь

3) RabbitMQ отправляет подтверждения паблишерам при получении сообщения

4) Сonsumers поддерживают постоянные TCP-соединения с RabbitMQ

5) RabbitMQ проталкивает (push) сообщения получателям

6) Получатели отправляют подтверждения успеха/ошибки

7) После успешного получения, сообщения удаляются из очередей

Kafka:

  1. Producer публикует сообщение в Broker
  2. Сообщение сохраняется в Topic -> Partition

3) Consumer подписывается на Topic для получения сообщений

4) Consumer запрашивает у Kafka новые сообщения и указывает, какие записи ему нужно прочитать.

- В Kafka легко добавить еще один Broker в систему (т.к. Kafka брокер — это кластер), в rabbitMQ это сделать сложнее.

- В Kafka Topics с сообщениями можно разбить на разделы (partition) и распределить внутри кластера (внутри брокера) и сделать реплику.

Получается что несколько брокеров обслуживают 1 Topic. Если 1 брокер умрет, то данные не будут потеряны.

Другими словами Kafka умеет дробить свои очереди на части и распределять по кластеру.

  • Kafka работает в памяти (по умолчанию), RabbitMQ с диском
  • Kafka горизонтально масштабируема, RabbitMQ только вертикально
  • В Kafka есть возможность реализовывать что-то вроде шардирования за счет consumer groups, которые гарантируют что одна partition (считай шард) будет читаться только одним Consumer-ом

Масштабирование

Как масштабировать?

1) Больше Brokers

2) Больше partitions

3) Больше replication factor (т.е. сколько реплик в каждой partition) — для надежности

Норма — это 3.

replication factor — позволяет указать сколько падений реплик partition мы можем пережить.

Если factor = N, то сможем пережить N-1 падений брокеров одновременно.

Плюсы

  • позволяет хранить большой объем данных (большая пропускная способность)
  • высокодоступность

Выход из строя одного узла не нарушает доступ к данным

  • отказоустойчивость
  • распределенность

Это множество сервером, объединенных вместе

  • надежность и согласованность данных

Поддерживает C и A из CAP теоремы:

  • согласованность (consistency) — во всех вычислительных узлах в один момент времени данные не противоречат друг другу
  • доступность (availability) — любой запрос к распределённой системе завершается корректным откликом, однако без гарантии, что ответы всех узлов системы совпадают
  • высокая производительность
  • горизонтальное масштабирование
  • интегрируемость с различными системами и БД

Выводы

  1. Пока количество partition остается постоянным, то один и тот же ключ пишется в один и тот же replication factor.
  2. Удалить данные из Topic нельзя!

В таком случае лучше сделать новый Topic, скопировать все сообщения из старого Topic кроме того, которое хотим удалить, и затем просто удалить старый Topic.

3. Поддерживается автоматическое удаление данных по TTL (time-to-live):

  • удаляются целиком сегменты партиций (не отдельные сообщения)

4. Вы можете начать обработку ваших данных с использованием Kafka, затем продолжить работу с Kafka Streams, а затем опять вернуться к Kafka.

Независимо от того, используете вы Kafka Streams или просто Kafka, благодаря Kafka вы получите гибкую масштабируемую и отказоустойчивую распределенную потоковую обработку данных, которая работает везде (в docker контейнераз, на виртуальных машинах, локально, удаленно, в облаках и т.д.).

Если вы нашли неточности в описании данной статьи, вы можете написать мне на email и я с радостью вам отвечу.

Kirill Sereda

email: kirill.serada@gmail.com

skype: kirill-sereda

linkedin: www.linkedin.com/in/ksereda

--

--