Флюссоник: чтение UDP MPEG-TS

Vladimir Gordeev
6 min readMay 18, 2015

--

Ещё одна статья в жанре одного из моих предыдущих постов, но на этот раз про чтение, а не раздачу. Решение проблемы по шагам.

Речь идёт о полировке фич нашего видеостримингового сервера Flussonic.

Хо-хо, Макс Лапшин, мой ментор и начальник, подкинул мне ещё одну интересную задачу. Итак, вот она:

Проблема

Флюссоник умеет захватывать MPEG-TS поток по UDP. Так как он написан на Erlang, то логично для этого использовать gen_udp. Опытным путём было установлено, что захват UDP видеопотока заметно нагружает CPU. (больше чем всё остальное)

На самом деле нетрудно понять в чём причина: gen_udp работает как порт-драйвер, и каждую пришедшую датаграмму отправляет как сообщение, а это хоть и дешёвая операция, но на больших объёмах даёт о себе знать.

MPEG-TS пакет занимает 188 байт, в MTU влезает 7 пакетов: 1316 байт, значит в идеале максимальный размер датаграммы 1316 байт. Возьмём 2-х мегабитный поток, поделим на размер датаграммы, получим 2000000/(1316*8) ≈ 190 сообщений в секунду.

Если мы возьмём 10-ти мегабитный поток (совсем не редкость), то всё становится сильно хуже: 10000000/(1316*8) ≈ 950 сообщений в секунду.

Макс написал на Си свой собственный порт-драйвер, который занимался тем же что и gen_udp, только склеивал датаграммы, накапливал буфер, и лишь затем отсылал процессу. Очевидное решение проблемы.

Этот порт даёт примерно троекратную экономию CPU, но в некоторых условиях работает нестабильно. Проблема которую мне необходимо решить — переписать этот порт чтобы он хорошо работал.

Ну а для этого мне нужно полностью погрузиться в проблему — увидеть много всего интересного.

Первые идеи

Первая мысль — неужели создатели Erlang не подумали о таком варианте событий? Неужели нельзя читать UDP как-то иначе?

Я увидел функцию gen_udp:recv/2, которая должна читать датаграммы в блокирующем режиме. Вроде бы то что нужно, без отправки сообщений! Однако если посмотреть исходный код, то можно заметить следующее:

Так, вызывается одноимённая функция из другого модуля. Похоже это разделение на код работающий по IPv6 и IPv4.

Там же можно увидеть два файла inet_udp.erl и inet6_udp.erl. Пусть будет IPv4, смотрим дальше:

Так, снова отфутболили, ищем заветный prim_inet и находим его:

И видим по-сути то же самое от чего пытались убежать.

Хорошо, раз в стандартной библиотеке нельзя просто получить данные сделав блокирующий вызов recv, то почему бы не сделать порт-драйвер, который бы предоставлял возможность таких вызовов, к примеру через erlang:port_control/3.

Однако такой способ не подойдёт — нативные вызовы (через порты или NIF-ы) в идеале должны занимать не более 1–3 миллисекунд. Если больше, то планировщик Erlang совсем потеряется в временном пространстве, сойдёт с ума, и всему приложению станет плохо.

Похоже что отправка сообщений из порта и получение их процессом единственный подходящий способ передать данные.

Тогда действительно, нужно накапливать буфер из датаграмм и отправлять дальше одним большим куском.

Проблемы с существующем порт-драйвером

Наш драйвер открывает сокет с параметром O_NONBLOCK, далее мы используем вызов driver_select.

Мне удалось воспроизвести ситуацию, когда порт-драйвер терял часть трафика, при этом стандартный gen_udp отдавал всё чётко. Кроме того, это происходило строго при наличии нагрузки на сеть. Казалось, что лажал именно driver_select — будто ready_input коллбэк просто не вызывался когда следовало.

По-идее код gen_udp должен работать по той же схеме. Первым делом полез разбираться как там открывается сокет, какие опции применяются.

Поковыряться в исходниках стандартной библиотеки Erlang мы всегда успеем, а сперва можно попробовать посмотреть на вывод strace для двух вариантов работы — с нашим портом и с gen_udp.

Смотрим через strace

strace — это клёвая тулза которая ведёт лог вызова системных вызовов любой пользовательской программы.

Открытие сокета, включение каких-то опций, подписка на события по сокету (select) — всё это системные вызовы. Можно будет детально посмотреть чем отличается код gen_udp от нашего порта, не влезая в исходники стандартной библиотеки.

Мне лень выдирать конкретную команду которая запускает наш флюссоник (вида erl -pa …), я просто натравил strace на сам make и всего дочерние процессы.

Эта команда выполнит make run, и для каждого созданного процесса создаст отдельный файл-лог вида strace/log.PID.

У меня таких файлов сгенерилось много — аж 82. Значит за время выполнения этой команды было создано 82 процесса.

Найти нужные нам процессы довольно просто, нужно просто поискать по ключевому слову IPPROTO_UDP, это параметр передаваемый при создании UDP сокета.

Можно увидеть множество таких вызовов. Похоже что общение между Erlang-нодами тоже происходит по UDP, Erlang рантайм делает вызовы socket() даже если мы не запускаем никакого захвата.

Среди множества этих вызовов нам нужно найти такие, где выполняется bind() именно по тому адресу, который пытаемся захватить. Я делал захват по 239.0.0.1:1234.

Итак, чем отличается открытие сокета у gen_udp и у mpegts_udp (наш порт-драйвер?). Вот:

Можно увидеть, что устанавливается больший размер буфера (стандартный 8 кб, здесь 2 мб). Кроме того, указываются дополнительные опции для задания приоритета этому сокету.

Вообще эти повторяющиеся вызовы с SO_PRIORITY и IP_TOS выглядят странно, но если поковыряться в файле inet_drv.c, то можно найти пояснение от одного из разработчиков Erlang:

Ага, значит любая попытка установить какую-то опцию сокету обрамляется в двумя get-ами и set-ами для SO_PRIORITY и IP_TOS.

Итак, указали все те же опции для нашего сокета что и gen_udp. Картинка стала намного лучше, большей частью из-за явно указанного буфера (SO_RCVBUF).

Повреждение картинки

Но записав одновременно один и тот же кусок видео через gen_udp и через mpegts_udp я вижу рассыпания из-за нашего порт-драйвера.

Поймал рассыпание кадра:

mpegts_udp
gen_udp

Вообще, рассыпание картинки может быть по-двум причинам:

  1. Потеря пакетов. Если пакеты прилетевшие в буфер по сети вовремя не считать, то их может смыть свежими данными и они будут потеряны.
  2. Повреждение данных уже после получения. Возможно у нас в коде драйвера мы где-то допустили ошибку, как-то небрежно копируем какие-то данные, по каким-то причинам повреждаем данные, получаем какой-то мусор.

Если разобрать (через contrib/ts_reader.erl) эти два куска видео, а потом сделать diff, то можно увидеть следующее:

Как можно видеть, потерянных кадров нет, однако те кадры что есть отличаются в размере. Кроме того, один из кусков оказался длинее другого, можно видеть в конце шесть кадров, которые оказались только у него.

Можно заметить, что для одних и тех же кадров размер в первом файле всегда больше чем во втором (указан в скобках). Если посчитать разницу в размере для каждой пары кадров, то получим это:

1599, 1472, 1472, 1472, 1472, 1104

Интересно. Кадр разрезается на несколько MPEG-TS пакетов. Размер пакета — 188 байт. Однако количество полезной информации (payload) которую можно поместить в пакет меньше: 184 байта.

Если мы потеряли несколько пакетов подряд в середине одного кадра, то эти потери будут кратны 184. Если потеряли конец кадра, то не будет кратен. Если мы поделим 1472 на 184, то получим ровно 8 MPEG-TS пакетов. Т.е. на самом деле мы потеряли 188*8 = 1504 байта (несколько раз).

Значит никакого повреждения данных нет, по каким-то причинам мы не успеваем считать данные из сокета до того как их смоет новыми данными.

Неаккуратная работа с памятью

По всей видимости проблема в том как мы работаем с буфером нашего порт-драйвера.

Наш драйвер время от времени делает flush —проверяет сколько данных уже накопилось, и если что-то есть отправляет содержимое эрланг-процессу.

Кроме того, если в процессе считывания с сокета мы накопили уже много данных, и скоро в буфер может не влезть, то тоже нужно делать flush().

Стыд и срам, но ошибка оказалась очень банальной. Мы пытались пользоваться памятью после того как уже её освободили и занулили. В функции flush() есть такие строчки:

Обращаю внимание на последние две. Во flush() вы отправляли данные эрланг-процессу, и освобождали буфер. Если flush() выполняется в середине цикла recvfrom и ещё есть куча данных, а буфер уже освобождён и ему присвоен нулевой адрес, то произойдёт беда.

Мы берём адрес buf, читаем оттуда адрес (его значение оказалось 8), и по нему пишем.

Т.е. по-идее recvfrom() должен был попытаться записать данные по адресу 8 и всё должно было упасть, но почему-то этого не произошло.

Внутри recvfrom() производится проверка на возможность записи, и возвращается ошибка в случае недоступности. А мы по-глупости просто не проверяли ошибку.

На этом всё.

--

--