Handling Skewed Data in Apache Spark

Optimizing Apache Spark performance by tackling skewed data with techniques like salting, co-partitioning, and DAG management to enhance efficiency and reduce processing times.

1. Introduction

Apache Spark is a powerful distributed computing engine designed for processing large datasets. It's widely used for its ability to perform complex data transformations and analytics efficiently. However, one of the common challenges when working with Spark is handling skewed data, especially during join operations. This blog post explores what skewed data is, the issues it can cause, and strategies to mitigate these issues.

2. Understanding Skewed Data

What is Skewed Data?

Skewed data occurs when one or more keys have a disproportionately large number of values compared to other keys. This imbalance can lead to partitions of varying sizes, with some being significantly larger than others. In Spark, data is partitioned across the cluster for processing. When partitions are uneven, it can result in inefficient processing and longer execution times.

Issues Caused by Skewed Data

1. Imbalanced Workload: Uneven partitions lead to an imbalanced workload, where some tasks take much longer to complete, slowing down the entire job.

2. Out of Memory Errors: Large partitions may exceed memory limits, causing out-of-memory errors, particularly if data is cached for iterative processing.

3. Uneven Resource Usage: Disproportionate data distribution can lead to inefficient use of resources like CPU and memory.

4. Slow Processing Times: Operations such as joins and aggregations, which require data shuffling, can be particularly slow with skewed data.

5. Job Failures: In extreme cases, skewed data can cause job failures due to memory errors or tasks exceeding their execution time limits.

Mitigating Skewed Data Issues

To address the challenges posed by skewed data, consider the following strategies:

1. Salting

Salting involves adding a random value to the key to distribute the data more evenly across partitions. This can help balance the workload by ensuring that no single key causes a disproportionate load.

2. Co-partitioning

Ensure that datasets being joined are co-partitioned on the join key. This reduces the amount of data shuffled across the network, improving performance.

3. Skew Join Optimization

Spark provides built-in optimizations for handling skewed joins. These optimizations detect skewed keys and apply techniques to balance the load.

4. Tuning Spark Configuration

Adjusting Spark configuration parameters, such as increasing the number of partitions or allocating more memory and CPU resources, can help manage skewed data more effectively.

3. Case Study: Improving Performance in a batch Processing Jobs

Key Observations

1. Long Processing Times: Certain jobs exhibit long processing times due to skewed keys.

2. Complex Joins: The jobs involve multiple consecutive joins, contributing to the slowdown.

3. Large Tables: The tables in question are large, with high traffic exacerbating the issue.

4. Single Partition: Current partitioning strategies may not be sufficient.

Action Items

1. Refactor Job Workflow: Reduce the number of joins and optimize join strategies. Consider using broadcast joins to minimize shuffle operations.

2. Analyze Join Necessity: Evaluate each join's necessity and eliminate or combine where possible to streamline the process.

3. Optimize Data Partitioning: Explore alternative partitioning strategies to distribute data more evenly.

4. Consult with Original Developers: Understand the rationale behind the current job structure to identify potential improvements.

Handling Non-Splittable Compressed Files

When working with non-splittable compressed files like GZ files (Figure-1), Spark defaults to creating a single partition per file (Figure-2), limiting parallelism. By using the SplittableGZipCodec, you can increase parallelism by allowing multiple tasks to process different parts of the same file simultaneously. This significantly reduces processing time.

Figure-1: GZ files

Figure-2: Spark defaults to creating a single partition per file

The solution I've used is the de-compression codec: SplittableGZipCodec by Niels Basjes. This codec will feed the same file to multiple spark tasks. Each task will 'fast forward' or seek to a specific offset in the gzip file and then begin decompressing from there. It runs multiple tasks on the same gzip file, significantly decreasing the wall clock time, increasing the chances the gunzip is successful at the small cost of increasing the total core hours used.

The spark solution is described here in Rahul Singha's post here

The codec supports CSV/TSV files compressed with Gzip, allowing for faster data processing with increased parallelism in Databricks.

In order to use the codec in the Databricks notebook, the appropriate library must be installed in the cluster. Go to cluster details > Libraries tab > Install new > Maven > Search packages > Enter the name of the package that is “splittablegzip” and install (Figure-3 and Figure-4).

Figure-3: search for splittablegzip

Figure-4: install splittablegzip

Figure-5 The parallelism has been increased

After increasing parallelism (see Figure 5), we observed that writing remained slow due to limited partitioning. As shown in Figure 6, Stage 251 exhibited a long-tail task. A deeper investigation using the Spark UI revealed that this particular task was handling significantly more data than the others (refer to Figure 7).

Figure-6: Long tail task
Figure-7: Data Skew (Spark UI)

Cutting Off DAG Lineage

Upon further investigation, it was found that the Spark DAG (Directed Acyclic Graph) lineage for this join operation was excessively long, with 18 joins chained together. As illustrated in Figure 8, the lineage is so extensive that I had to zoom out my browser to 30% to capture a screenshot of the entire DAG.

Figure-8: DAG BEFORE optimization

This complexity was a major contributor to the performance issues observed. In Spark, managing the DAG lineage by cutting it off after each join can optimize memory usage and improve performance. This can be achieved by materializing intermediate results, either by writing the intermediate DataFrames to disk using techniques like checkpointing or persisting with the DISK_ONLY storage level. Below is a modification to the perform_joins function that demonstrates how to cut off the DAG lineage after each join. (Figure-9)

Figure-9: Pyspark Code - Cut off Lineage

As shown in Figure 10, the resulting DAG is significantly shorter, reflecting improved efficiency.

Figure-10: DAG AFTER optimization

Remarkably, this optimization reduced the runtime from 53 minutes to just 11 seconds.

4. Conclusion

Handling skewed data in Apache Spark is crucial for optimizing performance and resource utilization. By applying techniques such as salting, co-partitioning, and skew join optimization, and by tuning Spark configurations, you can effectively manage skewed data and improve the efficiency of your Spark jobs.

In addition to these strategies, it is important to continuously monitor and analyze your Spark applications using tools like the Spark UI. This will help you identify performance bottlenecks and understand the impact of data skew on your jobs. Implementing solutions such as cutting off the DAG lineage after complex operations can significantly reduce processing times, as demonstrated by reducing runtime from 53 minutes to 11 seconds in our case study.

Ultimately, understanding the nature of your data and the structure of your Spark jobs allows you to make informed decisions about optimization techniques. By proactively addressing data skew, you can ensure that your Spark applications run smoothly and efficiently, maximizing the potential of your data processing pipelines.