Параллельные реплики
Введение
ClickHouse обрабатывает запросы крайне быстро, но как эти запросы распределяются и параллелизируются по нескольким серверам?
В этом руководстве мы сначала обсудим, как ClickHouse распределяет запрос по нескольким шардам через распределённые таблицы, а затем как запрос может использовать несколько реплик для выполнения.
Шардированная архитектура
В архитектуре shared-nothing кластеры обычно разделяются на несколько шардов, каждый из которых содержит подмножество общих данных. Распределённая таблица находится поверх этих шардов, предоставляя единое представление всех данных.
Запросы могут быть отправлены в локальную таблицу. Выполнение запроса будет происходить только на указанном шарде или он может быть отправлен в распределённую таблицу, и в этом случае каждый шард выполнит данные запросы. Сервер, на котором выполнялся запрос к распределённой таблице, агрегирует данные и отвечает клиенту:

В приведённой выше схеме показано, что происходит, когда клиент выполняет запрос к распределённой таблице:
Select-запрос отправляется в распределённую таблицу на узле случайным образом (через метод round-robin или после маршрутизации на определённый сервер балансировщиком нагрузки). Этот узел теперь будет действовать как координатор.
Узел определяет каждый шард, который должен выполнить запрос, через информацию, указанную распределённой таблицей, и запрос отправляется на каждый шард.
Каждый шард локально читает, фильтрует и агрегирует данные, а затем отправляет состояние, которое можно объединить, обратно координатору.
Координирующий узел объединяет данные и затем отправляет ответ клиенту.
Когда мы добавляем реплики в смесь, процесс довольно схож, с той лишь разницей, что только одна реплика из каждого шарда выполнит запрос. Это означает, что большее количество запросов может быть обработано параллельно.
Нешардированная архитектура
ClickHouse Cloud имеет совершенно другую архитектуру по сравнению с представленной выше. (См. "Архитектуру ClickHouse Cloud" для получения дополнительной информации). С разделением вычислений и хранения, а также с практически бесконечным объёмом хранения, потребность в шардах становится менее значительной.
На рисунке ниже показана архитектура ClickHouse Cloud:

Эта архитектура позволяет нам добавлять и удалять реплики практически мгновенно, обеспечивая очень высокую масштабируемость кластера. Кластер ClickHouse Keeper (показан справа) обеспечивает единый источник истинных метаданных. Реплики могут получать метаданные из кластера ClickHouse Keeper и все поддерживают одинаковые данные. Сами данные хранятся в объектном хранилище, и кеш SSD позволяет ускорить запросы.
Но как теперь можно распределять выполнение запросов по нескольким серверам? В шардированной архитектуре это было довольно очевидно, учитывая, что каждый шард мог фактически выполнять запрос на подмножестве данных. Как это работает, если шардирование отсутствует?
Введение параллельных реплик
Чтобы параллелизировать выполнение запросов через несколько серверов, сначала нам нужно назначить один из наших серверов координатором. Координатор — это тот, кто создаёт список задач, которые нужно выполнить, обеспечивает их выполнение, агрегацию и возвращение результата клиенту. Как и во многих распределённых системах, это будет роль узла, который получает изначальный запрос. Нам также нужно определить единицу работы. В шардированной архитектуре единица работы — это шард, подмножество данных. С параллельными репликами мы будем использовать небольшую часть таблицы, называемую гранулы, в качестве единицы работы.
Теперь давайте посмотрим, как это работает на практике, с помощью рисунка ниже:

С параллельными репликами:
Запрос от клиента отправляется на один узел после прохождения через балансировщик нагрузки. Этот узел становится координатором для этого запроса.
Узел анализирует индекс каждой части и выбирает правильные части и гранулы для обработки.
Координатор разделяет рабочую нагрузку на набор гранул, которые могут быть назначены различным репликам.
Каждый набор гранул обрабатывается соответствующими репликами, и в случае завершения обработки координатору отправляется состояние, которое можно объединить.
Наконец, координатор объединяет все результаты от реплик и затем возвращает ответ клиенту.
Вышеописанные шаги описывают, как работают параллельные реплики в теории. Однако на практике есть множество факторов, которые могут помешать данной логике работать идеально:
Некоторые реплики могут быть недоступны.
Репликация в ClickHouse является асинхронной, на момент времени у некоторых реплик могут отсутствовать одинаковые части.
"Хвостовая" задержка между репликами требует какого-то решения.
Кэш файловой системы различается от реплики к реплике в зависимости от активности на каждой реплике, что означает, что случайное назначение задач может привести к менее оптимальной производительности с учетом локальности кэша.
Мы исследуем, как эти факторы преодолеваются в следующих разделах.
Объявления
Для решения вопросов (1) и (2) из приведённого выше списка мы ввели концепцию объявления. Давайте посмотрим, как это работает, используя рисунок ниже:

Запрос от клиента отправляется на один узел после прохождения через балансировщик нагрузки. Узел становится координатором для этого запроса.
Координирующий узел отправляет запрос, чтобы получить объявления от всех реплик в кластере. У реплик могут быть немного разные представления текущего набора частей для таблицы. В результате нам необходимо собирать эту информацию, чтобы избежать некорректных решений о планировании.
Координирующий узел затем использует объявления для определения набора гранул, которые могут быть назначены различным репликам. Здесь, к примеру, видно, что ни одна из гранул части 3 не была назначена реплике 2 потому что эта реплика не предоставила эту часть в своём объявлении. Также обратите внимание, что задачи не были назначены реплике 3 потому что реплика не предоставила объявление.
После того, как каждая реплика обработала запрос на своём подмножестве гранул и состояние, которое можно объединить, было отправлено обратно координирующему узлу, координатор объединяет результаты и ответ отправляется клиенту.
Динамическая координация
Для решения проблемы хвостовой задержки мы добавили динамическую координацию. Это означает, что все гранулы не отправляются реплике в одном запросе, но каждая реплика может запросить новую задачу (набор гранул для обработки) у координирующего узла. Координатор предоставит реплике набор гранул, основываясь на полученном объявлении.
Предположим, что мы находимся на этапе процесса, когда все реплики отправили объявление со всеми частями.
Рисунок ниже визуализирует, как работает динамическая координация:

Реплики сообщают координирующему узлу, что они могут обрабатывать задачи, они также могут указать, сколько работы они могут выполнить.
Координатор назначает задачи репликам.

Реплика 1 и 2 завершают свои задачи очень быстро. Они запросят другую задачу у координирующего узла.
Координатор назначает новые задачи репликам 1 и 2.

Все реплики завершили обработку своих задач. Они запрашивают больше задач.
Координатор, используя объявления, проверяет, какие задачи остаются для обработки, но никаких оставшихся задач нет.
Координатор даёт знать репликам, что всё было обработано. Теперь он объединит все состояния, которые можно объединить, и ответит на запрос.
Управление локальностью кеша
Последняя оставшаяся потенциальная проблема — это как мы управляем локальностью кеша. Если запрос выполняется несколько раз, как мы можем обеспечить, чтобы одна и та же задача направлялась на ту же реплику? В предыдущем примере у нас были назначены следующие задачи:
Реплика 1 | Реплика 2 | Реплика 3 | |
---|---|---|---|
Часть 1 | g1, g6, g7 | g2, g4, g5 | g3 |
Часть 2 | g1 | g2, g4, g5 | g3 |
Часть 3 | g1, g6 | g2, g4, g5 | g3 |
Чтобы гарантировать, что те же задачи назначаются тем же репликам и могут использовать преимущества кеша, происходит два действия. Рассчитывается хеш от части + набора гранул (задача). Применяется модуль по количеству реплик для назначения задачи.
На бумаге это звучит хорошо, но на практике, внезапная нагрузка на одну реплику или
ухудшение работы сети, могут привести к "хвостовой" задержке, если одна и та же реплика
постоянно используется для выполнения определённых задач. Если max_parallel_replicas
меньше
чем количество реплик, то для выполнения запроса выбираются случайные реплики.
Кража задач
если какая-то реплика обрабатывает задачи медленнее, чем другие, другие реплики попытаются "украсть" задачи, которые, в принципе, принадлежат этой реплике по хешу, чтобы уменьшить "хвостовую" задержку.
Ограничения
Эта функция имеет известные ограничения, о которых основные из них задокументированы в этом разделе.
Если вы нашли проблему, которая не относится к приведённым ниже ограничениям, и
подозреваете, что причиной может быть параллельная реплика, пожалуйста, откройте тикет на GitHub, используя
метку comp-parallel-replicas
.
Ограничение | Описание |
---|---|
Сложные запросы | В настоящее время параллельная реплика работает довольно хорошо для простых запросов. Сложные слои, такие как CTE, подзапросы, JOIN, неструктурированное выполнение запроса и т.д., могут негативно сказаться на производительности запроса. |
Небольшие запросы | Если вы выполняете запрос, который не обрабатывает много строк, выполнение его на нескольких репликах может не привести к лучшему времени выполнения, учитывая, что сетевое время для координации между репликами может привести к дополнительным временным интервалам выполнения запроса. Вы можете ограничить эти проблемы, используя настройку: parallel_replicas_min_number_of_rows_per_replica . |
Параллельные реплики отключены с FINAL | |
Высокая кардинальность данных и сложная агрегация | Высокая кардинальность агрегации, которая нуждается в отправке большого объема данных, может значительно замедлить ваши запросы. |
Совместимость с новым анализатором | Новый анализатор может значительно замедлить или ускорить выполнение запросов в определённых сценариях. |
Параметры, связанные с параллельными репликами
Настройка | Описание |
---|---|
enable_parallel_replicas | 0 : отключено1 : включено 2 : Форсировать использование параллельной реплики, вызовет исключение, если не используется. |
cluster_for_parallel_replicas | Имя кластера, используемое для параллельной репликации; если вы используете ClickHouse Cloud, используйте default . |
max_parallel_replicas | Максимальное количество реплик, используемых для выполнения запроса на нескольких репликах, если указано количество, меньшее чем количество реплик в кластере, узлы будут выбраны случайным образом. Это значение также может быть превышено для учёта горизонтального масштабирования. |
parallel_replicas_min_number_of_rows_per_replica | Помогает ограничить количество реплик, используемых на основе количества строк, которые нужно обработать. Количество реплик определяется: оценочное количество строк для чтения / min_number_of_rows_per_replica . |
allow_experimental_analyzer | 0 : использовать старый анализатор1 : использовать новый анализатор. Поведение параллельных реплик может измениться в зависимости от используемого анализатора. |
Исследование проблем с параллельными репликами
Вы можете проверить, какие настройки используются для каждого запроса в таблице system.query_log
. Вы также можете взглянуть на таблицу system.events
, чтобы увидеть все события, которые произошли на сервере, и вы можете использовать табличную функцию clusterAllReplicas
, чтобы увидеть таблицы на всех репликах (если вы облачный пользователь, используйте default
).
Ответ
Таблица system.text_log
также содержит информацию о выполнении запросов с использованием параллельных реплик:
Ответ
Наконец, вы также можете использовать EXPLAIN PIPELINE
. Это подчеркнёт, как ClickHouse собирается выполнять запрос и какие ресурсы будут использоваться для выполнения запроса. Давайте рассмотрим следующий пример запроса:
Посмотрим на конвейер запроса без параллельной реплики:

А теперь с параллельной репликой:
