Introdução ao Spark — Parte 2

Tiago Diniz
How Kovi Work
Published in
7 min readDec 22, 2020

No post anterior falei um pouco sobre alguns pontos que acho crucial para utilizar o Spark, abaixo vamos continuar com mais alguns pontos.

Photo by Tobias Fischer on Unsplash

Lidar com dados embaralhados e uso de distribuição

O embaralhamento de dados é o problema comum que pode afetar os trabalhos do aplicativo Spark. A causa do problema de distorção de dados é a distribuição desigual dos dados subjacentes, o particionamento desigual às vezes é inevitável no layout geral dos dados ou na natureza da consulta.

Então, como você descobrirá se os dados estão distorcidos ou não? Não é tão difícil de encontrar. No Spark UI, você observará que às vezes algumas tarefas demoram mais em um executor específico em comparação com outras, isso nos diz que a partição neste executor é enorme e as tarefas estão sendo executadas muito lentamente por causa disso. Este é um problema de distorção de dados.

Existem várias técnicas usadas para lidar com dados distorcidos. Você também pode reparticionar os dados, mas essa operação é cara. Portanto, você precisa ter cuidado ao usar esta opção.

Se você estiver fazendo uma operação de junção em um conjunto de dados distorcido, um dos truques é aumentar o spark.sql.autoBroadcastJoinThresholdvalor para que tabelas menores sejam transmitidas.

Outra maneira popular de lidar com dados distorcidos é Salting. Em uma operação de junção SQL, a chave de junção pode ser alterada para redistribuir dados de maneira uniforme, de forma que o processamento de uma partição não leve mais tempo. Se 70–80% dos dados pertencerem à mesma chave, gere várias chaves adicionando valores aleatórios à chave original. Você pode usar monotonically_increasing_id() para gerar valores aleatórios. Isso ajudará a particionar bem os dados durante a operação de junção.

Broadcast

Às vezes, as operações de junção no Spark são a maior fonte de problemas de desempenho. Existem várias maneiras de unir tabelas no Spark -

ShuffleHashJoin — um ShuffleHashJoin é a maneira mais básica de unir tabelas no Spark. Este é o mesmo processo Hadoop Map Reduce que envolve 3 etapas — Map, Shuffle e Reduce. Ele mapeia as duas tabelas, embaralha os dados com base na chave de saída (junção) e, em seguida, reduz os dados e fornece a saída de junção (Junção de hash aleatória funciona melhor).

Quando os dados são distribuídos uniformemente com a chave à qual você está ingressando, indico criações de novas chaves para paralelismo.

Por exemplo — se você tiver 2 milhões de linhas em uma das tabelas, mas apenas 200 chaves, isso não funcionará, pois obterá paralelismo de mais de 200.

BroadcastHashJoin — BroadcastHashJoin também é uma forma comum de unir tabelas, mas, neste caso, uma das tabelas deve ser pequena o suficiente para caber em uma única máquina. Na junção Broadcast Hash, uma pequena tabela é copiada para cada data-node no cluster e os executores usam a cópia local desta tabela para juntar-se à tabela maior. Portanto, não precisamos da fase de reprodução aleatória nesta junção, isso ajuda a melhorar o desempenho. Essa configuração é automática para muitos formatos de dados como parquet (se uma das tabelas for pequena), mas pode precisar fornecer uma dica para forçar a junção de hash de transmissão usando a palavra-chave — broadcast() para outros tipos.

Evite embaralhamento de dados

Durante as operações rápidas, muitos dados se misturam entre nós, o que é uma operação muito cara. Seu objetivo seria reduzir a quantidade de dados embaralhados sempre que possível. Existem certas transformações que você pode evitar para reduzir o embaralhamento de dados, aqui estão algumas recomendações que você pode seguir:

Use ReduceByKey() ou aggregateByKey() em vez de GroupByKey(), pois groupByKey() embaralha todas as chaves, ao passo que, em reduceByKey , os dados são combinados em cada partição e apenas uma saída para uma chave em cada partição é enviada pela rede.

Se você quiser diminuir o número de partições, use coalesce() em vez de repartition(), pois a repartição faz um embaralhamento completo de dados, enquanto que o coalesce tenta evitar o embaralhamento. As partições de saída podem ter tamanhos desiguais no caso de Coalesce. No entanto, use repartition() quando quiser aumentar o número de partições.

Repartition

Coalesce

Uso adequado de cache / persist

O Spark fornece um mecanismo de otimização para armazenar os cálculos intermediários de um dataframe do Spark para que possa ser reutilizado em ações subsequentes. Podemos armazenar os resultados intermediários de RDD / Dataframe usando cache() e persist(). Então, por que precisamos disso em primeiro lugar? O motivo está na senteça, aplicamos transformações e criamos diferentes RDDs a cada etapa. Esses RDDs são avaliados apenas quando uma ação é chamada. Suponha que você tenha chamado uma ação, mas depois percebeu que precisa usar uma ação diferente no último RDD transformado, neste caso, a segunda ação terá que recalcular o rdd desde o início e aplicar toda a transformação novamente. Isso aumentará o custo e o tempo. Logo, é melhor salvar um estado intermediário de um rdd (que foi calculado por meio de várias transformações) e, em seguida, usar o rdd persistido para a segunda ação.

Existem diferentes níveis de armazenamento fornecidos para persist() que podem ser usados ​​dependendo do caso de uso, enquanto o cache() usa apenas o nível de armazenamento padrão, ou seja, MEMORY_ONLY. persist() níveis de armazenamento

MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY

Quando for fazer o cache usando um formato colunar na memória , ajustando a propriedade batchSize , você pode melhorar ainda mais o desempenho do Spark com essas configurações:

spark.conf.set(“spark.sql.inMemoryColumnarStorage.compressed”, true)
spark.conf.set(“spark.sql.inMemoryColumnarStorage.batchSize”,10000)

Grau de paralelismo

O Spark tem tudo a ver com cálculos paralelos. Portanto, precisamos definir quantos níveis de paralelismo queremos no processamento dos dados. Paralelismo muito baixo significa que o trabalho será executado por mais tempo e pode acabar gerando exceção de falta de memória, enquanto o paralelismo muito alto exigiria um grande número de recursos (núcleos), o que pode ser uma operação cara. Por isso, a definição do grau de paralelismo depende do número de núcleos disponíveis no cluster. Idealmente, deveríamos fazer RDD com o número de partição igual ao número de núcleos no cluster, por isso todos os particionados serão processados ​​em paralelo e recursos iguais serão utilizados.

Existem 2 propriedades que podem ser usadas para aumentar o nível de paralelismo

spark.default.parallelism
spark.sql.shuffle.partition (é usado quando você está lidando com Spark SQL ou API de dataframe.)

Ajustando Garbage Collection

O Spark permite que os usuários armazenem dados em cache persistentemente para que possam ser reutilizados posteriormente para evitar a sobrecarga causada por cálculos repetidos. O executor do Spark divide o espaço de heap JVM em 2 frações:

Armazenamento de RDD como resultado da ação Persist / Cache.
Consumo de memória durante a transformação RDD.

Quanto menos espaço de memória o RDD ocupa, mais espaço de heap é deixado para a execução do programa, o que aumenta a eficiência do GC, caso contrário, o desempenho será afetado significativamente devido ao grande número de objetos em buffer nas gerações anteriores. Quando o GC é observado como muito frequente ou de longa duração, isso pode indicar que o espaço de memória não é usado de forma eficiente pelo processo ou aplicativo Spark. Você pode melhorar o desempenho limpando explicitamente os RDDs armazenados em cache depois que eles não forem mais necessários.

Garbage-First GC (G1 GC) é o coletor de lixo recomendado. O coletor G1 foi planejado pela Oracle como uma substituição de longo prazo para o CMS GC . Mais importante ainda, com respeito ao CMS, o coletor G1 visa atingir alto rendimento e baixa latência .

Bonus

Evite cálculos desnecessários

Quando você tem inicializações pesadas, como inicializar classes, as conexões de banco de dados vão para mapPartitions() em vez de map(), já que mapPartitions() faz isso uma vez para cada partição em vez de fazer isso em cada linha do DataFrame.

Use vetorização

Semelhante ao Hive, também podemos usar a vetorização no Spark. A vetorização funciona apenas com o formato de dados Colunar . Ele processa um lote de linhas por vez, em vez de uma única linha.

Use Bucketing

Bucketing é um conceito útil que não só é usado no Hive, mas também pode ser usado no Spark. Com o bucketing, os dados já estão embaralhados e classificados, portanto, as junções de bucket evitam significativamente as operações de embaralhamento de dados.

Na Kovi usamos o Spark “artilharia pesada” apenas para reprocessamento de dados pois nosso Data Lake é RealTime dessa forma nosso grande volume de dados está no final da cadeia, que se acumula com o passar do tempo.

Essas são algumas dicas que consigo passar depois de alguns anos trabalhando com essa tecnologia, algumas noites de sono perdidas e muito dado processado.

--

--