Let’s Dump

Видеозапись доклада на Minsk Python Meetup

Intro

Не так давно я прошел на Coursera известный курс по машинному обучению от Andrew Ng, и курс этот оказался прекрасен еще и тем, что сразу захотелось попробовать применить машинное обучение к каким-то реальным, не синтетическим данным. Тогда я еще не знал про Kaggle и долго в фоновом режиме думал, что бы такого интересного сделать.

Однажды я просматривал свою статистику в World of Tanks и “внезапно” обнаружил, что проценты побед на разных танках довольно сильно отличаются друг от друга, и подумал: а что если попробовать взять эту информацию по всем игрокам и извлечь из нее что-нибудь полезное?

Скачать бесплатно и без СМС

А чтобы извлечь из нее что-нибудь полезное, нужно сначала эту информацию достать. И здесь нам на помощь приходит Wargaming Public API: account/tanks. В API есть метод, который позволяет по ID аккаунта получить весь список танков игрока, количество боев и количество побед на каждом из них.

Это был мой первый опыт взаимодействия с ним, и я даже не сразу заметил, что в одном запросе можно передать до 100 разных account ID, и самый первый вариант скрипта запрашивал информацию в цикле в один поток по одному account ID в запросе. Ожидаемое время скачивания информации по 20 миллионам игроков было что-то около 2–3 месяцев. ☺

Тогда я пошел вчитываться в документацию и, кроме того, что стал запрашивать статистику группами по 100 аккаунтов, стал использовать Keep-Alive, чтобы не тратить время на установку соединения при каждом запросе. Если вы используете requests, то сделать это очень просто: достаточно один раз создать объект сессии requests.Session() и делать запросы через него:

$ python -m timeit -s "import requests" -- "requests.get('http://google.com')"
10 loops, best of 3: 543 msec per loop
$ python -m timeit -s "import requests; s = requests.Session()" -- "s.get('http://google.com')"
10 loops, best of 3: 313 msec per loop

После этого мне удалось скачать статистику всех игроков за чуть более, чем двое суток. Это уже хорошо, но все еще не так хорошо, как хотелось бы: скрипт задействует далеко не всю ширину сетевого канала, а частота запросов еще очень далека от ограничения, установленного в API. Нужно запрашивать статистику в несколько потоков.

Я переписал скрипт так, чтобы запросы выполнялись через ThreadPoolExecutor из пакета concurrent.futures, и обнаружил что скрипт очень плохо реагирует на Ctrl+C, непонятно, в какой момент его главный поток завершается, и не будет ли дамп “обрываться” при таком завершении. И отправил этот скрипт в небытие.

На этом моменте я вспомнил, что в стандартной библиотеке Python версии 3.4 появился новый модуль asyncio (ранее известный как tulip), который должен предоставить инфраструктуру для написания конкурентного однопоточного кода. В частности, event loop, coroutines и поддержку TCP “из коробки”. Но мы воспользуемся не самим модулем asyncio, а построенной на его базе библиотекой aiohttp.

Итак, у меня был однопоточный код, написанный для requests, выполняющий последовательно в цикле по одному запросу, и мне нужно было переделать его под aiohttp.

Для начала, заметим, что нельзя просто так взять и выполнить конкурентный код — он должен выполняться в event loop’е:

import asyncio

import aiohttp


@asyncio.coroutine
def func():
response = yield from aiohttp.request("GET", "http://ya.ru")
text = yield from response.text()
print(text)


asyncio.get_event_loop().run_until_complete(func())

Здесь достаточно понимать, что coroutine это функция, выполнение которой можно приостановить, а затем передать в нее какие-то данные и продолжить выполнение с того места, в котором она была приостановлена. Собственно, event loop этим и занимается: он ожидает наступления какого-либо события и возвращает результат в то место функции, где с помощью yield from приостановлено ее выполнение.

Функции request и text являются coroutines и возвращают генераторы объектов типа asyncio.Future. Соответственно, наша функция также становится coroutine и возвращает генератор. А потом мы говорим event loop’у, что он должен выполнять нашу coroutine, пока она не завершится. То есть, фактически, пока не закончатся элементы в генераторе.

А теперь достаточно вернуться к исходному коду на requests и заменить requests.get(…) на yield from aiohttp.request(“GET”, …), response.json() на yield from response.json():

response = yield from aiohttp.request(
"GET",
"http://api.worldoftanks.ru/wot/account/tanks/",
params=params,
)
if response.status == http.client.OK:
json = yield from response.json()
if json["status"] == "ok":
return json["data"]
logging.warning("API error: %s", json["error"]["message"])
else:
logging.warning("HTTP status: %d", response.status_code)

Keep-Alive на базе aiohttp реализуется через создание коннектора, который умеет повторно использовать уже открытые соединения:

connector = aiohttp.TCPConnector()
# …
response = yield from aiohttp.request('get', 'http://python.org', connector=connector)

Для задержки перед повторной отправкой запроса делаем yield from asyncio.sleep(…), а для установки таймаута ответа сервера оборачиваем request(…) в asyncio.wait_for:

asyncio.get_event_loop().run_until_complete(
asyncio.wait_for(
aiohttp.request("GET", "http://ya.ru"),
0.01,
))

Отлично. Теперь у нас есть однопоточный конкурентный код, который скачивает статистику игроков:

def get(app_id, start_id, end_id, output):
"""Get account statistics dump."""
api = Api(app_id)
start_time, account_count, tank_count = time(), 0, 0
for account_ids in chop(range(start_id, end_id + 1), 100):
# Sort by account ID.
account_tanks = sorted(
(yield from api.account_tanks(account_ids)),
key=itemgetter(0),
)
# Write account stats.
for account_id, tanks in account_tanks:
write_account_stats(account_id, tanks, output)
tank_count += len(tanks)
account_count += len(account_tanks)
# Print statistics.
aps = (account_ids[-1] - start_id) / (time() - start_time)
logging.info(
"#%d (%d) tanks: %d | aps: %.1f | apd: %.0f",
account_ids[-1], account_count, tank_count, aps, aps * 86400.0,
)

Но он по-прежнему ждет завершения и обработки результатов одного запроса, прежде чем отправить следующий, и поэтому никакого выигрыша в общей скорости скачивания статистики нет. Сейчас мы исправим этот недостаток.

Давайте поступим следующим образом. Вместо того, чтобы содержать “штат” потоков, которые по очереди получают задачи, будем хранить множество выполняющихся на данный момент запросов. На каждой итерации будем запускать новый запрос с помощью asyncio.async и, не дожидаясь его завершения, проверять, как много их в данный момент выполняется. Если окажется, что достаточно— будем ждать завершения первого попавшегося запроса с помощью asyncio.wait и обрабатывать его результат. Кроме того, нужно не забыть дождаться завершения самых последних запросов, когда цикл по account ID уже закончился. Получаем примерно такой код:

consumer = AccountTanksConsumer(start_id, output)
pending = set()
start_time = time()
# Main loop.
for account_ids in chop(range(start_id, end_id + 1), 100):
pending.add(asyncio.async(api.account_tanks(account_ids)))
if len(pending) < 8:
continue
done, pending = yield from asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
consumer.consume_all(done)
# Print runtime statistics.
aps = (consumer.expected_id - start_id) / (time() - start_time)
logging.info(
"#%d (%d) buffered: %d | tanks: %d | aps: %.1f | apd: %.0f",
consumer.expected_id, consumer.account_count, len(consumer.buffer), consumer.tank_count, aps, aps * 86400.0,
)

Когда мы вызываем asyncio.async, мы планируем выполнение нашего запроса в текущем event loop’е. Запрос уже начинает выполняться, но не блокирует наш главный поток!

asyncio.wait возвращает два множества: множество выполненных задач, и множество задач, ожидающих выполнения. Мы забираем результаты из первого множества, а второе оставляем на следующую итерацию.

А теперь мне хочется, чтобы данные записывались в дамп упорядоченными по account ID и tank ID. И если со вторым проблем нет (просто отсортируем ответ сервера), то конкурентные запросы с разными account ID могут, в общем случае, завершиться в любом порядке. Именно поэтому я вынес логику записи результатов в отдельный объект consumer. Давайте посмотрим, как он работает.

def consume_all(self, tasks):
for task in tasks:
self.consume(task.result())

def consume(self, result):
"""Consumes request result."""
# Buffer account stats.
for account_id, tanks in result:
self.buffer[account_id] = tanks
# Dump stored results.
while self.expected_id in self.buffer:
# Pop expected result.
tanks = self.buffer.pop(self.expected_id)
# Write account stats.
if tanks:
write_account_stats(self.expected_id, tanks, self.output)
# Update stats.
self.account_count += 1
self.tank_count += len(tanks)
# Expect next account ID.
self.expected_id += 1

Логика работы у него простая: храним текущий expected account ID, а все данные по умолчанию буферизуем — складываем в map. Затем пытаемся вытолкнуть из буфера в дамп максимально возможное количество последовательных account ID, начиная с текущего.

Размер буфера нужно ограничить на тот случай, если какой-нибудь запрос “зависнет”, или скрипт сделает длинную паузу перед повторной отправкой запроса. Тогда буфер станет неограниченно расти, и моя маленькая виртуалка на DigitalOcean истечет памятью. ☺

Поправим код так, что если на определенной итерации окажется, что буфер уже заполнен, то дождемся завершения всех запущенных запросов. В противном случае, как и прежде, будем дожидаться только первого завершенного запроса:

if len(consumer.buffer) < MAX_BUFFER_SIZE:
done, pending = yield from asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
else:
logging.warning("Maximum buffer size is reached.")
done, pending = yield from asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)

Остался последний момент, о котором я хотел бы рассказать. API ограничивает частоту запросов, а я ленивый и не хочу определять и задавать предельно допустимое количество конкурентных запросов вручную.

Давайте подсчитывать количество ошибок REQUEST_LIMIT_EXCEEDED среди каждых 150 запросов, и если оно окажется равным нулю, то увеличивать текущее максимально допустимое количество запросов на один, а если ошибок становится слишком много, то уменьшать. Число 150 выбрано экспериментально. ☺

if api.request_limit_exceeded_count > max_pending_count:
max_pending_count = max(max_pending_count - 1, MIN_PENDING_COUNT)
api.reset_error_rate()
elif api.request_count >= AUTO_ADAPT_REQUEST_COUNT:
if api.request_limit_exceeded_count == 0:
max_pending_count = min(max_pending_count + 1, MAX_PENDING_COUNT)
api.reset_error_rate()

Теперь дамп статистики (до account ID равного 40000000) скачивается за чуть более, чем 11 часов. Это, кстати, примерно на 5 часов быстрее, чем написанный мною ранее многопоточный скрипт с вручную подобранным количеством потоков.

Напоследок — занимательная статистика. Последний ID на текущий момент: 35237559, игроков: 18102100, а всего танков у всех игроков: 567710204, это в среднем 31.4 танка на один аккаунт. Все числа в дампе я сохранял в формате Protocol Buffers (без указания тегов и типов, просто последовательные UVarint’ы), получился дамп размером 2466MiB — это по 4.6B на танк и по 143B на аккаунт.