Managing HUGE Datasets with Scala Spark
Some tips and tricks for working with large datasets in scala spark
Spark is awesome! It’s scalable and fast, especially when you writing in a “native Spark” and avoiding custom Udfs. But when you working with large data frames there are some tips that you can use to avoid OOM errors and speed up the whole computation.
Here is a shortlist of the things that I learned from my personal experience.
Using configuration suited for the task
It’s always a good idea to start with proper configuration.
In my opinion, Spark has awesome documentation, highly recommend starting with it.
Depending on how you use your Spark: inside cluster or in a stand-alone mode your configuration will be different. I’m using spark mostly in standalone mode, so here are my examples:
1. Driver memory and driver maxResult:
When you are working with a large dataset you need to increase default memory allocation and maxResultSize value.
val spark = SparkSession.builder
.config("spark.driver.maxResultSize", "{YOUR-VALUE}")
.config("spark.driver.memory", "{YOUR-VALUE}")
2. Broadcast timeout, network timeout, and heartbeat
When you trying to save a large data frame to the database or some bucket I noticed that sometimes tasks might fail just because default timeouts thresholds are too small.
.config("spark.sql.broadcastTimeout", "{YOUR-VALUE}")
.config("spark.sql.debug.maxToStringFields", "{YOUR-VALUE}")
.config("spark.network.timeout", "{YOUR-VALUE}")
.config("spark.executor.heartbeatInterval", "{YOUR-VALUE}")
3. Garbage collection
You can use the garbage collector provided by JVM
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'")
Using caching in the right place:
Caching is an essential thing in spark and by using it in the correct places you can seriously reduce execution time. Sometime you might consider .persist instead of caching. You can read more about it here.
// cache you dataframe after expensive operations
dataFrame
.select(...)
.filter(...)
.groupBy(...)
.agg(...)
.cache
Repartition before joining:
Joins are a pretty expensive operation to run, there are several tricks you can use, but the one that I found the most useful is re-partitioning before joining. It helps because awesome by the time you get to join you can already perform some operations on your datasets and partitions might be skewed. And skewed partitions will seriously affect join execution time.
val leftRepartitioned = left.repartition(256, col("YOUR COLUMN"))
val rightRepartitioned = right.repartition(256, col("YOUR COLUMN"))
val joined = left.join(right, ...)
Repartition after groupBy:
The same goes for groupBy, It usually helps a lot
val groupedDataset = foo
.groupBy("bar")
.// your aggregations and other operations
.
.repartition(256, col("YOUR COLUMN"))
Data Skew
In order to get the best performance from Spark, you need to pay attention to partitions skew. There are many great articles about it (1, 2), so I would not repeat them here. But just keep in mind that sometimes repartitions trick my new work if the column that you chose for partitioning is skewed. If you are not sure which column to choose you can always use the salting trick.
UDFs
I should also mention elephant in the room — User Defined Functions. Because they give you so much freedom it’s sometimes tempting to use them more often than you actually need them.
Every time when you wanna implement something custom I recommend you to double-check collections with default spark functions from the org.apache.spark.sql.functions. Maybe you can solve your problem using expr :).
I also recommend you to check out spark-daria. It’s a collection with some useful methods that expand Spark capabilities.
Deleting cached datasets after you are done with them
If you cached some data frames using .cache
you can call .unpersist
to delete it from memory.
val dataFrameCached = dataframe.cache// some more code
dataFrameCached.unpersist
Or you can flush memory completely using
sqlContext.clearCache()