Оптимизация соединений в Hive
Перевод: Саянкин А.А.
Содержание
- 1 Введение
- 2 Shuffle/Common Join (Общее соединение)
- 3 Map Join / Broardcast Join (Соединение только в фазе отображения)
- 4 Bucket Map Join (Сегментное соединение только в фазе отображения)
- 5 Sort Merge Bucket(SMB) Map Join (Сортирующе-объединяющее сегментное соединение только в фазе отображения)
- 6 Skew Join (Ассиметричные соединения)
- 7 Перечень использованных ссылок
Предисловие переводчика
Данная статья представляет перевод оригинальных публикаций авторов проекта OpenKB, а также материалы презентации Лиин Танг (Liyin Tang) и Намит Джейн (Namit Jain).
Введение
В своих последних версиях Hive использует оптимизатор по стоимости (CBO — cost based optimizer). Как известно, соединение двух таблиц — важный аспект для оптимизации. Понимание практического опыта использования соединений — ключевой фактор в настройке производительности Hive. В данной статье описаны разновидности оптимизации соединений двух таблиц и показаны планы выполнения для этих соединений.
Примечание: все примеры выполнены для Hive 0.13.
Shuffle/Common Join (Общее соединение)
Принцип работы:
Общее соединение выполняется по умолчанию и включает как фазу отображения (map) так и фазу свёртки (reduce):
- Mapper (процесс отображения): читает таблицы и выводит пары значений для соединения в промежуточный файл.
- Shuffle (процесс тасовки): пары сортируются и объединяются.
- Reducer (процесс свёртки): получает сортированный список пар для соединения и непосредственно выполняет соединение.
Условия использования:
- Работает для таблиц произвольных размеров, в особенности, когда другие разновидности оптимизации соединений не могут быть использованы, например, при выполнении полного внешнего соединения.
Недостатки:
- Наиболее ресурсоёмкая операция, поскольку тасовка (Shuffle) занимает значительное время.
Пример:
hive> explain select a.* from passwords a, passwords2 b where a.col0=b.col1;
OK
STAGE DEPENDENCIES:
Stage-5 is a root stage , consists of Stage-1
Stage-1
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-5
Conditional Operator
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col1 (type: string)
sort order: +
Map-reduce partition columns: col1 (type: string)
Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
value expressions: col1 (type: string)
TableScan
alias: a
Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col0 (type: string)
sort order: +
Map-reduce partition columns: col0 (type: string)
Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
value expressions: col0 (type: string), col1 (type: string), col2 (type: string), col3 (type: string), col4 (type: string), col5 (type: string), col6 (type: string)
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col1} {VALUE._col2} {VALUE._col3} {VALUE._col4} {VALUE._col5} {VALUE._col6}
1 {VALUE._col1}
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col10
Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col10) (type: boolean)
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 1.707 seconds, Fetched: 58 row(s)
Советы при использовании:
- БОльшая по размеру таблица должна быть расположена правее в соединении, поскольку она будет использоваться как потоковая таблица (процесс чтения данных из неё буде представлен в виде потока). Тем не менее вы можете использовать подсказку оптимизатора "STREAMTABLE" для назначения определённой таблицы в качестве потоковой.
select /*+ STREAMTABLE(a) */ a.* from passwords a, passwords2 b, passwords3 c
where a.col0=b.col0 and b.col0=c.col0;
Map Join / Broardcast Join (Соединение только в фазе отображения)
Принцип работы:
Если одна или несколько таблиц достаточно малы, чтобы разместиться в памяти, то mapper сканирует большую таблицу и выполняет соединение. Стадии тасовки (Shuffle) и свёртки (reduce) отсутствуют.
Условия использования:
- Маленькая таблица (таблица размерностей) соединяется с большой таблицей (таблица фактов). Соединение происходит очень быстро, поскольку отсутствуют стадии тасовки (Shuffle) и свёртки (reduce).
Недостатки:
- Необходимо, чтобы как минимум одна таблица была достаточно мала.
- Правое и полное внешнее соединение не поддерживаются в этой разновидности оптимизации.
Пример:
В данном примере таблица passwords3 очень мала по сравнению с passwords.
hive> explain select a.* from passwords a,passwords3 b where a.col0=b.col0;
OK
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
b
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
b
TableScan
alias: b
Statistics: Num rows: 1 Data size: 31 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
condition expressions:
0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
1 {col0}
keys:
0 col0 (type: string)
1 col0 (type: string)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: a
Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
1 {col0}
keys:
0 col0 (type: string)
1 col0 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col9
Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col9) (type: boolean)
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 0.1 seconds, Fetched: 63 row(s)
Советы при использовании:
1. Автоматическое преобразование общих соединений (Shuffle/Common Join) в соединения только в фазе отображения (Map Join)
Для этого могут быть использованы три параметра:
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
Начиная с Hive 0.11, hive.auto.convert.join=true по умолчанию. Вы можете отключить эту опцию, установив hive.auto.convert.join=false. В случае, когда hive.auto.convert.join.noconditionaltask=true, если оценочный размер малых таблиц меньше, чем hive.auto.convert.join.noconditionaltask.size (по умолчанию 10MB), то общее соединение (Shuffle/Common Join) может быть преобразовано в соединение только в фазе отображения (Map Join) автоматически.
Из плана SQL запроса выше мы знаем, что оценочный размер таблицы b равен 31 согласно статистике по таблице. Если выполнить в командой строке Hive:
hive> set hive.auto.convert.join.noconditionaltask.size = 32;
то план запроса покажет, что будет использоваться соединение только в фазе отображения (Map Join): см. ключевое слово Map Join Operator в плане выполнения запроса.
Если выполнить в командой строке Hive:
hive> set hive.auto.convert.join.noconditionaltask.size = 32;
то план запроса покажет, что будет использоваться общеее соединения (Shuffle/Common Join): см. ключевое слово Join Operator в плане выполнения запроса.
2. Подсказка оптимизатору "MAPJOIN" может быть использована для принудительного преобразования общего соединения (Shuffle/Common Join) в соединения только в фазе отображения (Map Join).
Bucket Map Join (Сегментное соединение только в фазе отображения)
Принцип работы:
Соединение выполняется только в фазе отображения. Процесс отображения (mapper), обрабатывающий сегмент (bucket) 1 для таблицы А, будет считывать только сегмент (bucket) 1 таблицы B.
Условия использования:
Используется, когда все таблицы:
- Большие.
- Сегментированы по тем колонкам, по которым происходит соединение.
- Число сегментов одной таблицы кратно числу сегментов другой таблицы.
- Содержат не сортированные данные.
Недостатки:
- Таблицы должны быть сегментированы по тем колонкам, которые будут использоваться в SQL соединении, и поэтому для других соединений данная разновидность оптимизации не сработает.
Советы при использовании:
1. Таблица должна быть создана с использованием сегментов (bucket), причём сегментирование должно быть выполнено по тем колонкам, по которым будет выполнено соединение. Также данные должны быть сегментированы при вставке. Один из способов достичь этого — установить hive.enforce.bucketing=true перед вставкой данных.
Пример:
create table b1(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string) clustered by (col0) into 32 buckets;
create table b2(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string)
clustered by (col0) into 8 buckets;
set hive.enforce.bucketing = true;
From passwords insert OVERWRITE table b1 select * limit 10000;
From passwords insert OVERWRITE table b2 select * limit 10000;
2. Свойство hive.optimize.bucketmapjoin должно быть установлено в true.
Пример:
set hive.optimize.bucketmapjoin=true;
select /*+ MAPJOIN(b2) */ b1.* from b1,b2 where b1.col0=b2.col0 ;
Sort Merge Bucket(SMB) Map Join (Сортирующе-объединяющее сегментное соединение только в фазе отображения)
Соединение выполняется только в фазе отображения. Соответствующие сегменты (buckets) соединяются друг с другом в процессе отображения (mapper).
Условия использования:
Используется, когда все таблицы:
- Большие.
- Сегментированы по тем колонкам, по которым происходит соединение.
- Содержат сортированные данные в тех колонках, по которым происходит соединение.
- Все таблицы имеют одинаковое число сегментов.
Недостатки:
- Таблицы должны быть сегментированы по тем колонкам, которые будут использоваться в SQL соединении, и поэтому для других соединений данная разновидность оптимизации не сработает.
- Работа секционированных (Partition) таблиц может замедлиться.
Пример:
hive> explain select c1.* from c1,c2 where c1.col0=c2.col0;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: c1
Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
Sorted Merge Bucket Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
1 {col0}
keys:
0 col0 (type: string)
1 col0 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col9
Filter Operator
predicate: (_col0 = _col9) (type: boolean)
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 0.134 seconds, Fetched: 37 row(s)
Советы при использовании:
1. Таблица должна быть создана с использованием сегментов (bucket), причём сегментирование должно быть выполнено по тем колонкам, по которым будет выполнено соединение. Также данные должны быть сегментированы при вставке. Один из способов достичь этого — установить hive.enforce.bucketing=true перед вставкой данных.
Пример:
create table c1(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string) clustered by (col0) sorted by (col0) into 32 buckets;
create table c2(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string) clustered by (col0) sorted by (col0) into 32 buckets;
set hive.enforce.bucketing = true;
From passwords insert OVERWRITE table c1 select * order by col0;
From passwords insert OVERWRITE table c2 select * order by col0;
2. Указанные ниже параметры должны быть установлены для преобразования SMB соединений в SMB соединения только в фазе отображения.
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
3. Параметр hive.auto.convert.sortmerge.join.bigtable.selection.policy определяет какая таблица будет использована только в качестве потоковой таблицы. Он имеет три значения:
org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
4. Подсказка оптимизатору "MAPJOIN" определяет какая из таблиц является маленькой и может быть загружена в память.
5. Маленькие таблицы загружаются по требованию, что означает он не хранятся в памяти все время.
6. Внешние соединения не поддерживаются.
Skew Join (Ассиметричные соединения)
Принцип работы:
Предположим, что таблица A соединяется с B, и таблица A имеет ассиметрию в данных по соединяемой колонке: значение «1» встречается намного чаще других. В начале мы читаем таблицу B и сохраняем строки с ключом «1» в хеш таблице, находящейся в памяти. Затем запускаем набор процессов отображения (mapper) для чтения таблицы A и выполняем следующее:
- Если значение в таблице A в соединяемой колонке равно «1», то мы используем хеш таблицу на основе таблицы B, полученную ранее.
- Для всех остальных ключей мы передаём его процессу свёртки (reducer), который и выполняет соединение. Этот же процесс свёртки (reducer) получает строки таблицы B также из процесса отображения (mapper).
Поступая таким образом, мы читаем таблицу B дважды. Ассиметричне данные (те, которые встречаются чаще других) из таблицы A читаются и обрабатываются только в процессе отображения (Mapper) и не передаются процессу свёртки (reducer). Остальные ключи из таблицы A обрабатываются в один раз в фазе отображения (Map) и свёртки (Reduce).
Для использования данного типа оптимизации соединения мы предполагаем, что таблица B содержит относительно не много строк с ключами, часто встречающимися в A. Так что эти строки могут быть загружены в память.
Условия использования:
- Одна из таблиц содержит множество одинаковых значений в колонках соединения.
Недостатки:
- Одна таблица читается дважды.
- Пользователи должны знать об ассиметрии данных.
Пример:
hive> explain select a.* from passwords a, passwords2 b where a.col0=b.col1;
OK
STAGE DEPENDENCIES:
Stage-7 is a root stage , consists of Stage-1
Stage-1
Stage-4 depends on stages: Stage-1 , consists of Stage-8
Stage-8
Stage-3 depends on stages: Stage-8
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-7
Conditional Operator
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col1 (type: string)
sort order: +
Map-reduce partition columns: col1 (type: string)
Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
value expressions: col1 (type: string)
TableScan
alias: a
Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col0 (type: string)
sort order: +
Map-reduce partition columns: col0 (type: string)
Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
value expressions: col0 (type: string), col1 (type: string), col2 (type: string), col3 (type: string), col4 (type: string), col5 (type: string), col6 (type: string)
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col1} {VALUE._col2} {VALUE._col3} {VALUE._col4} {VALUE._col5} {VALUE._col6}
1 {VALUE._col1}
handleSkewJoin: true
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col10
Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col10) (type: boolean)
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-4
Conditional Operator
Stage: Stage-8
Map Reduce Local Work
Alias -> Map Local Tables:
1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
1
TableScan
HashTable Sink Operator
condition expressions:
0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2} {0_VALUE_3} {0_VALUE_4} {0_VALUE_5} {0_VALUE_6}
1 {1_VALUE_0}
keys:
0 joinkey0 (type: string)
1 joinkey0 (type: string)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2} {0_VALUE_3} {0_VALUE_4} {0_VALUE_5} {0_VALUE_6}
1 {1_VALUE_0}
keys:
0 joinkey0 (type: string)
1 joinkey0 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col10
Filter Operator
predicate: (_col0 = _col10) (type: boolean)
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 0.331 seconds, Fetched: 110 row(s)
Как показано выше, в запросе использовано два оператора соединения: одно общее соединение, другое - соединение только в фазе отображения. Также показано "handleSkewJoin: true".
Советы при использовании:
1. Для использования ассиметричных соединений должен быть установлен следующий параметр.
set hive.optimize.skewjoin=true;
2. Следующий параметр определяет в каком случае мы будем считать данные ассиметричными. Если мы видим, что число повторяющихся значений в колонке, по которой происходит соединение, превосходит значение, указанное в параметре, что мы считаем, что это ассиметричное соединение.
set hive.skewjoin.key=100000;