Оптимизация соединений в Hive

Материал из JavaCogito
Перейти к навигации Перейти к поиску

Перевод: Саянкин А.А.





Предисловие переводчика


Данная статья представляет перевод оригинальных публикаций авторов проекта OpenKB, а также материалы презентации Лиин Танг (Liyin Tang) и Намит Джейн (Namit Jain).





Введение

В своих последних версиях Hive использует оптимизатор по стоимости (CBO — cost based optimizer). Как известно, соединение двух таблиц — важный аспект для оптимизации. Понимание практического опыта использования соединений — ключевой фактор в настройке производительности Hive. В данной статье описаны разновидности оптимизации соединений двух таблиц и показаны планы выполнения для этих соединений.

Примечание: все примеры выполнены для Hive 0.13.





Shuffle/Common Join (Общее соединение)

Принцип работы:

Общее соединение выполняется по умолчанию и включает как фазу отображения (map) так и фазу свёртки (reduce):

  • Mapper (процесс отображения): читает таблицы и выводит пары значений для соединения в промежуточный файл.
  • Shuffle (процесс тасовки): пары сортируются и объединяются.
  • Reducer (процесс свёртки): получает сортированный список пар для соединения и непосредственно выполняет соединение.


Shuffle/Common Join (Общее соединение)

Условия использования:

  • Работает для таблиц произвольных размеров, в особенности, когда другие разновидности оптимизации соединений не могут быть использованы, например, при выполнении полного внешнего соединения.

Недостатки:

  • Наиболее ресурсоёмкая операция, поскольку тасовка (Shuffle) занимает значительное время.

Пример:

hive> explain select a.* from passwords a, passwords2 b where a.col0=b.col1;
OK
STAGE DEPENDENCIES:
  Stage-5 is a root stage , consists of Stage-1
  Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-5
    Conditional Operator

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
            Reduce Output Operator
              key expressions: col1 (type: string)
              sort order: +
              Map-reduce partition columns: col1 (type: string)
              Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
              value expressions: col1 (type: string)
          TableScan
            alias: a
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            Reduce Output Operator
              key expressions: col0 (type: string)
              sort order: +
              Map-reduce partition columns: col0 (type: string)
              Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
              value expressions: col0 (type: string), col1 (type: string), col2 (type: string), col3 (type: string), col4 (type: string), col5 (type: string), col6 (type: string)
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col0} {VALUE._col1} {VALUE._col2} {VALUE._col3} {VALUE._col4} {VALUE._col5} {VALUE._col6}
            1 {VALUE._col1}
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col10
          Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: (_col0 = _col10) (type: boolean)
            Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
              Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
              File Output Operator
                compressed: false
                Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1

Time taken: 1.707 seconds, Fetched: 58 row(s)

Советы при использовании:

  • БОльшая по размеру таблица должна быть расположена правее в соединении, поскольку она будет использоваться как потоковая таблица (процесс чтения данных из неё буде представлен в виде потока). Тем не менее вы можете использовать подсказку оптимизатора "STREAMTABLE" для назначения определённой таблицы в качестве потоковой.


select /*+ STREAMTABLE(a) */ a.* from passwords a, passwords2 b, passwords3 c
where a.col0=b.col0 and b.col0=c.col0;





Map Join / Broardcast Join (Соединение только в фазе отображения)

Принцип работы:

Если одна или несколько таблиц достаточно малы, чтобы разместиться в памяти, то mapper сканирует большую таблицу и выполняет соединение. Стадии тасовки (Shuffle) и свёртки (reduce) отсутствуют.


Map Join / Broardcast Join (Соединение только в фазе отображения)

Условия использования:

  • Маленькая таблица (таблица размерностей) соединяется с большой таблицей (таблица фактов). Соединение происходит очень быстро, поскольку отсутствуют стадии тасовки (Shuffle) и свёртки (reduce).

Недостатки:

  • Необходимо, чтобы как минимум одна таблица была достаточно мала.
  • Правое и полное внешнее соединение не поддерживаются в этой разновидности оптимизации.


Пример:

В данном примере таблица passwords3 очень мала по сравнению с passwords.

hive> explain select a.* from passwords a,passwords3 b where a.col0=b.col0;
OK
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        b 
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        b 
          TableScan
            alias: b
            Statistics: Num rows: 1 Data size: 31 Basic stats: COMPLETE Column stats: NONE
            HashTable Sink Operator
              condition expressions:
                0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
                1 {col0}
              keys:
                0 col0 (type: string)
                1 col0 (type: string)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: a
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            Map Join Operator
              condition map:
                   Inner Join 0 to 1
              condition expressions:
                0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
                1 {col0}
              keys:
                0 col0 (type: string)
                1 col0 (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col9
              Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: (_col0 = _col9) (type: boolean)
                Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
                  Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
                    Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1

Time taken: 0.1 seconds, Fetched: 63 row(s)

Советы при использовании:

1. Автоматическое преобразование общих соединений (Shuffle/Common Join) в соединения только в фазе отображения (Map Join)

Для этого могут быть использованы три параметра:

set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;

Начиная с Hive 0.11, hive.auto.convert.join=true по умолчанию. Вы можете отключить эту опцию, установив hive.auto.convert.join=false. В случае, когда hive.auto.convert.join.noconditionaltask=true, если оценочный размер малых таблиц меньше, чем hive.auto.convert.join.noconditionaltask.size (по умолчанию 10MB), то общее соединение (Shuffle/Common Join) может быть преобразовано в соединение только в фазе отображения (Map Join) автоматически.

Из плана SQL запроса выше мы знаем, что оценочный размер таблицы b равен 31 согласно статистике по таблице. Если выполнить в командой строке Hive:

hive> set hive.auto.convert.join.noconditionaltask.size = 32;

то план запроса покажет, что будет использоваться соединение только в фазе отображения (Map Join): см. ключевое слово Map Join Operator в плане выполнения запроса.

Если выполнить в командой строке Hive:

hive> set hive.auto.convert.join.noconditionaltask.size = 32;

то план запроса покажет, что будет использоваться общеее соединения (Shuffle/Common Join): см. ключевое слово Join Operator в плане выполнения запроса.

2. Подсказка оптимизатору "MAPJOIN" может быть использована для принудительного преобразования общего соединения (Shuffle/Common Join) в соединения только в фазе отображения (Map Join).





Bucket Map Join (Сегментное соединение только в фазе отображения)

Принцип работы:

Соединение выполняется только в фазе отображения. Процесс отображения (mapper), обрабатывающий сегмент (bucket) 1 для таблицы А, будет считывать только сегмент (bucket) 1 таблицы B.


Bucket Map Join (Сегментное соединение только в фазе отображения)

Условия использования:

Используется, когда все таблицы:

  • Большие.
  • Сегментированы по тем колонкам, по которым происходит соединение.
  • Число сегментов одной таблицы кратно числу сегментов другой таблицы.
  • Содержат не сортированные данные.

Недостатки:

  • Таблицы должны быть сегментированы по тем колонкам, которые будут использоваться в SQL соединении, и поэтому для других соединений данная разновидность оптимизации не сработает.

Советы при использовании:

1. Таблица должна быть создана с использованием сегментов (bucket), причём сегментирование должно быть выполнено по тем колонкам, по которым будет выполнено соединение. Также данные должны быть сегментированы при вставке. Один из способов достичь этого — установить hive.enforce.bucketing=true перед вставкой данных.

Пример:

create table b1(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string) clustered by (col0) into 32 buckets; 
create table b2(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string)
clustered by (col0) into 8 buckets;
set hive.enforce.bucketing = true;
From passwords insert OVERWRITE table b1 select * limit 10000;
From passwords insert OVERWRITE table b2 select * limit 10000;

2. Свойство hive.optimize.bucketmapjoin должно быть установлено в true.

Пример:

set hive.optimize.bucketmapjoin=true;
select /*+ MAPJOIN(b2) */ b1.* from b1,b2 where b1.col0=b2.col0 ;





Sort Merge Bucket(SMB) Map Join (Сортирующе-объединяющее сегментное соединение только в фазе отображения)

Соединение выполняется только в фазе отображения. Соответствующие сегменты (buckets) соединяются друг с другом в процессе отображения (mapper).


Sort Merge Bucket(SMB) Map Join

Условия использования:

Используется, когда все таблицы:

  • Большие.
  • Сегментированы по тем колонкам, по которым происходит соединение.
  • Содержат сортированные данные в тех колонках, по которым происходит соединение.
  • Все таблицы имеют одинаковое число сегментов.

Недостатки:

  • Таблицы должны быть сегментированы по тем колонкам, которые будут использоваться в SQL соединении, и поэтому для других соединений данная разновидность оптимизации не сработает.
  • Работа секционированных (Partition) таблиц может замедлиться.

Пример:

hive> explain select  c1.* from c1,c2 where c1.col0=c2.col0;       
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: c1
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            Sorted Merge Bucket Map Join Operator
              condition map:
                   Inner Join 0 to 1
              condition expressions:
                0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
                1 {col0}
              keys:
                0 col0 (type: string)
                1 col0 (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col9
              Filter Operator
                predicate: (_col0 = _col9) (type: boolean)
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
                  File Output Operator
                    compressed: false
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1

Time taken: 0.134 seconds, Fetched: 37 row(s)

Советы при использовании:

1. Таблица должна быть создана с использованием сегментов (bucket), причём сегментирование должно быть выполнено по тем колонкам, по которым будет выполнено соединение. Также данные должны быть сегментированы при вставке. Один из способов достичь этого — установить hive.enforce.bucketing=true перед вставкой данных.

Пример:

create table c1(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string) clustered by (col0) sorted by (col0) into 32 buckets;
create table c2(col0 string,col1 string,col2 string,col3 string,col4 string,col5 string,col6 string) clustered by (col0) sorted by (col0) into 32 buckets;
set hive.enforce.bucketing = true;
From passwords insert OVERWRITE table c1 select * order by col0;
From passwords insert OVERWRITE table c2 select * order by col0;

2. Указанные ниже параметры должны быть установлены для преобразования SMB соединений в SMB соединения только в фазе отображения.

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

3. Параметр hive.auto.convert.sortmerge.join.bigtable.selection.policy определяет какая таблица будет использована только в качестве потоковой таблицы. Он имеет три значения:

org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ

4. Подсказка оптимизатору "MAPJOIN" определяет какая из таблиц является маленькой и может быть загружена в память.

5. Маленькие таблицы загружаются по требованию, что означает он не хранятся в памяти все время.

6. Внешние соединения не поддерживаются.





Skew Join (Ассиметричные соединения)

Принцип работы:

Предположим, что таблица A соединяется с B, и таблица A имеет ассиметрию в данных по соединяемой колонке: значение «1» встречается намного чаще других. В начале мы читаем таблицу B и сохраняем строки с ключом «1» в хеш таблице, находящейся в памяти. Затем запускаем набор процессов отображения (mapper) для чтения таблицы A и выполняем следующее:

  • Если значение в таблице A в соединяемой колонке равно «1», то мы используем хеш таблицу на основе таблицы B, полученную ранее.
  • Для всех остальных ключей мы передаём его процессу свёртки (reducer), который и выполняет соединение. Этот же процесс свёртки (reducer) получает строки таблицы B также из процесса отображения (mapper).

Поступая таким образом, мы читаем таблицу B дважды. Ассиметричне данные (те, которые встречаются чаще других) из таблицы A читаются и обрабатываются только в процессе отображения (Mapper) и не передаются процессу свёртки (reducer). Остальные ключи из таблицы A обрабатываются в один раз в фазе отображения (Map) и свёртки (Reduce).

Для использования данного типа оптимизации соединения мы предполагаем, что таблица B содержит относительно не много строк с ключами, часто встречающимися в A. Так что эти строки могут быть загружены в память.


Skew Join (Ассиметричные соединения)

Условия использования:

  • Одна из таблиц содержит множество одинаковых значений в колонках соединения.

Недостатки:

  • Одна таблица читается дважды.
  • Пользователи должны знать об ассиметрии данных.


Пример:

hive> explain select a.* from passwords a, passwords2 b where a.col0=b.col1;  
OK
STAGE DEPENDENCIES:
  Stage-7 is a root stage , consists of Stage-1
  Stage-1
  Stage-4 depends on stages: Stage-1 , consists of Stage-8
  Stage-8
  Stage-3 depends on stages: Stage-8
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-7
    Conditional Operator

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
            Reduce Output Operator
              key expressions: col1 (type: string)
              sort order: +
              Map-reduce partition columns: col1 (type: string)
              Statistics: Num rows: 9961472 Data size: 477102080 Basic stats: COMPLETE Column stats: NONE
              value expressions: col1 (type: string)
          TableScan
            alias: a
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            Reduce Output Operator
              key expressions: col0 (type: string)
              sort order: +
              Map-reduce partition columns: col0 (type: string)
              Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
              value expressions: col0 (type: string), col1 (type: string), col2 (type: string), col3 (type: string), col4 (type: string), col5 (type: string), col6 (type: string)
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col0} {VALUE._col1} {VALUE._col2} {VALUE._col3} {VALUE._col4} {VALUE._col5} {VALUE._col6}
            1 {VALUE._col1}
          handleSkewJoin: true
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col10
          Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: (_col0 = _col10) (type: boolean)
            Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
              Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
              File Output Operator
                compressed: false
                Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-4
    Conditional Operator

  Stage: Stage-8
    Map Reduce Local Work
      Alias -> Map Local Tables:
        1 
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        1 
          TableScan
            HashTable Sink Operator
              condition expressions:
                0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2} {0_VALUE_3} {0_VALUE_4} {0_VALUE_5} {0_VALUE_6}
                1 {1_VALUE_0}
              keys:
                0 joinkey0 (type: string)
                1 joinkey0 (type: string)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            Map Join Operator
              condition map:
                   Inner Join 0 to 1
              condition expressions:
                0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2} {0_VALUE_3} {0_VALUE_4} {0_VALUE_5} {0_VALUE_6}
                1 {1_VALUE_0}
              keys:
                0 joinkey0 (type: string)
                1 joinkey0 (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col10
              Filter Operator
                predicate: (_col0 = _col10) (type: boolean)
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
                  File Output Operator
                    compressed: false
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1

Time taken: 0.331 seconds, Fetched: 110 row(s)

Как показано выше, в запросе использовано два оператора соединения: одно общее соединение, другое - соединение только в фазе отображения. Также показано "handleSkewJoin: true".

Советы при использовании:

1. Для использования ассиметричных соединений должен быть установлен следующий параметр.

set hive.optimize.skewjoin=true;

2. Следующий параметр определяет в каком случае мы будем считать данные ассиметричными. Если мы видим, что число повторяющихся значений в колонке, по которой происходит соединение, превосходит значение, указанное в параметре, что мы считаем, что это ассиметричное соединение.

set hive.skewjoin.key=100000;





Перечень использованных ссылок

  1. http://www.openkb.info/2014/11/understanding-hive-joins-in-explain.html