Blog
Blog
A big blog for Big Data.
 
Analytics, Big Data, Hadoop

Tips and Tricks for Running Spark on Hadoop, Part 4: Memory Settings

By | March 3rd, 2016 | Analytics, Big Data, Hadoop

In Part 3 of this blog series, we discussed how to configure RDDs to optimize Spark performance. We outlined various storage persistence options for RDDs and how to size memory-only RDDs. In this blog, Part 4 of the series, we’ll discuss the memory requirements of other Spark application components. Although Spark does a number of things very well, it will not, unfortunately, intelligently configure memory settings on your behalf. So, we’ll outline how to determine how much memory is available for your RDDs and data so that you can adjust the command line parameters and configuration when you launch your Spark jobs.

We’ll begin with a general discussion of Spark on YARN, and then talk about individual major components, such as the Spark driver and Spark executor. Next, we’ll explain a bit more about how RDDs and values are actually passed on the network and cached between the drivers and executors. This will give you a better understanding of the purpose of the ‘BlockManager,’ a Spark component.

Spark on YARN

With Spark running on top of YARN, Spark resources are constrained by the Resource Manager (RM) and the containers spawned by the Node Managers. If, for example, you specify the following in your Spark command:

--master yarn
--deploy-mode client
--driver-memory 1024M
--executor-memory 2048M
--num-executors 2
--conf spark.yarn.executor.memoryOverhead=512

You will see a total of three containers launched on the cluster. One is the Application Master (1GB, no overhead specified), and the other two are the YARN JVM containers (2.5GB each) for the Spark executors to run later. You’ll also notice that the RM is reporting a total of 6GB used on your Hadoop cluster. The Application Master (AM) takes up 1GB, and the other two JVM containers intended for the actual Spark executors take up 2.5GB each. This adds up to 6GB. In the next two sections of this article, we’ll discuss the logic behind these Spark allocations.

Memory for Spark Driver

Depending on how a Spark job is launched on YARN, the Spark driver can run either on the workbench (yarn-client mode) or as part of the AM (yarn-cluster mode). See https://www.altiscale.com/blog/spark-on-hadoop/ for more details. In both cases, the Spark framework reports Spark driver memory usage in the SparkContext log, spark.log. This is located on the workbench where the Spark job is launched, as specified by /etc/spark/log4j.properties. For Altiscale, this is at /home/$USER/Hadooplog/spark/logs/spark.log.

For our example from the previous section, which was launched in yarn-client mode, spark.log puts out the following lines of interest:

20150814 15:07:58,921 INFO  org.apache.spark.deploy.yarn.Client (Logging.scala:logInfo(59))Will allocate AM container, with 896 MB memory including 384 MB overhead
 …
20150814 15:07:59,456 INFO  org.apache.spark.storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(59)) Registering block manager 10.252.7.180:45300 with 530.3 MB RAM, BlockManagerId(driver, 10.252.7.180, 45300)

However, as we mentioned in the previous section, you’ll see in the RM UI that the AM for this particular Spark job is allocated as 1GB.

Summarizing what we have so far:

  1. We specified 1024MB for the driver from the command line.
  2. However, spark.log shows that the Spark driver/BlockManager is using only ~530MB of memory.
  3. The RM UI shows YARN has allocated 1GB for the AM. But spark.log indicates that Spark is actually requesting 896MB for the AM container to run the Spark driver, not the 1024MB that we specified.

What accounts for these differences? We will first analyze how the Spark framework came up with the ~530MB figure. If you review the BlockManager source code:

./core/src/main/scala/org/apache/spark/storage/BlockManager.scala
private def getMaxMemory(conf: SparkConf): Long = {
          val memoryFraction =
conf.getDouble(“spark.storage.memoryFraction”, 0.6)
          val safetyFraction =
conf.getDouble(
“spark.storage.safetyFraction”, 0.9)
          (Runtime.getRuntime.maxMemory * memoryFraction *
safetyFraction).toLong
}

You will note that the memory allocation is based on the algorithm Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction. This is 1024MB x 0.6 x 0.9 = 552.96MB. The default values 0.6 and 0.9 come from the configuration settings spark.storage.memoryFraction and spark.storage.safetyFraction respectively. However, 552.96MB is a little larger than the value as shown in the log. This is because of the runtime overhead imposed by Scala, which is usually around 3-7%, more or less. If you do the calculation using 982MB x 0.6 x 0.9, (982MB being approximately 4% of 1024) then you will derive the number 530.28MB, which is what is indicated in the log file after rounding up to 530.30MB.

Now, let us examine why Spark requests 896MB for the AM. The logic for Spark to determine how much memory to allocate to the AM is found in:

./yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:

  def createApplicationSubmissionContext(
            :
            :
  capability.setMemory(args.amMemory + amMemoryOverhead)
  capability.setVirtualCores(args.amCores)
  appContext.setResource(capability)
  appContext
}

amMemoryOverhead is set to 384MB via spark.yarn.driver.memoryOverhead. args.amMemory is fixed at 512MB by Spark when it’s running in yarn-client mode. The yarn-cluster mode can involve more size adjustments by Spark before it makes requests to YARN. In this case, however, the AM doesn’t need to be very large as the actual Spark driver process is running on the client’s machine, not in the AM in the Hadoop cluster.

Adding 384MB of overhead to 512MB provides the 896MB figure requested by Spark. However, the RM ultimately allocates 1024MB since the YARN property yarn.scheduler.minimum-allocation-mb is defined as 512MB on this Hadoop cluster. This means the RM will allocate containers in increments of this size.

Memory for Spark Executors

In our original example, we specified two executors, sized at 2GB each. If you recall, however, the YARN client API requested 2.5GB each. Here’s the relevant excerpt from the AM logs when it tries to request the two containers for the executors:

15/08/14 15:07:58 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 2560 MB memory including 512 MB overhead
15/08/14 15:07:58 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:2560, vCores:1>) 15/08/14 15:07:58 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:2560, vCores:1>)

This logic is laid out in the following code snippets from Spark’s implementation of the YARN client API. (It may vary a bit between different Spark versions).

./yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala:

// Additional memory overhead
// 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
// the common cases. Memory overhead tends to grow with container size.
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN = 384
and

./yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala:

// Executor memory in MB.
protected val executorMemory = args.executorMemory

// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.getInt(“spark.yarn.executor.memoryOverhead”,
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))

// Number of cores per executor.
protected val executorCores = args.executorCores

// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)

As you can see, Spark adds a certain amount of memoryOverhead to the executorMemory setting when requesting containers from YARN. We’re actually specifying memoryOverhead via the spark.yarn.executor.memoryOverhead setting from the command line (512MB). Adding 512MB to the 2048GB produces Spark’s ask for 2.5GB per executor.

For the executor runtime, you will also see the following information in the AM log:

{{JAVA_HOME}}/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2048m -Xmx2048m … org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@10.252.7.180:45055/user/CoarseGrainedScheduler --executor-id 1 --hostname 10-10-10-2.test.altiscale.com --cores 1 --app-id application_1439529549900_0028 …
...
{{JAVA_HOME}}/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2048m -Xmx2048m … org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@10.252.7.180:45055/user/CoarseGrainedScheduler --executor-id 2 --hostname 10-10-10-3.test.altiscale.com --cores 1 --app-id application_1439529549900_0028 …

And, for each YARN container log, you will see that the Spark executor’s MemoryStore starts with 1060.3 MB and registers it with the Spark driver that is running on 10.252.7.180.

5/08/14 15:08:05 INFO storage.MemoryStore: MemoryStore started with capacity 1060.3 MB

And in the Spark driver’s log from spark.log:

2015-08-14 15:08:05,594 INFO org.apache.spark.storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(59)) - Registering block manager 10-10-10-2.test.altiscale.com:45300 with 1060.3 MB RAM, BlockManagerId(1, 10-10-10-2.test.altiscale.com, 45300)
2015-08-14 15:08:05,620 INFO org.apache.spark.storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(59)) - Registering block manager 10-10-10-3.test.altiscale.com:45300 with 1060.3 MB RAM, BlockManagerId(2, 10-10-10-3.test.altiscale.com, 45300)

Remember the Scala runtime overhead discussed in the previous section (982/1024 * 100 =~ 4.101562%)? If we apply the same math and equation, we can derive the value 1060.3MB (2048MB x (1 – 0.04101562) x 0.6 x 0.9 = 1060.56MB, which is pretty close).

BlockManagers and RDDs Memory Relationship

As you’ve probably noticed, we’ve referred several times to a Spark component called the BlockManager. In this section, we’ll dig a little deeper into the BlockManager (aka MemoryStore) and how it’s used by Spark to manage memory and RDDs on the driver and executors behind the scenes.

The BlockManager is the main service in Spark responsible for managing programming variables (in Scala, Java, Python, etc.), data structures, RDDs, user/system defined values, and other memory elements so that they can be cached on the Spark executors. In other words, the BlockManager framework is responsible for how Spark caches RDDs and re-uses them appropriately to speed up computation and reduce data transfer between the HDFS and the executors.

The BlockManager runs separately within the Spark Driver and as well as the Spark executors when a Spark job starts. When this occurs, the driver may ask the executors to cache a variable value to boost performance later in the execution path of the application. In that case, it will be broadcast to one of the executors that is working on this job, which will then cache it for further use when one of the RDD persistence APIs (e.g. persist, cache) are invoked. Post application launch, the Spark driver logs are full of BlockManager messages indicating how values are being passed to executors. These messages can be copious. Fortunately, the Spark UI for the running job provides another way to view the same information.

spark_memory_settings_figure1.20160224.v1

The UI provides a convenient way of indicating whether the driver or executor is under pressure due to limited availability of memory and whether an out-of-memory (OOM) exception will occur soon if the job is caching more data in memory that the BlockManagers can handle. They can also start swapping between disk and memory, which will degrade performance. Entries like the following in the Spark driver logs:

Added rdd_12_3 in memory on 10-10-10-3.test.altiscale.com:45300 (size: 920.0 B, free: 1060.2 MB)

provide hints as to how much memory is consumed on the executor and how much is available. Armed with this information, you can assess how much to allocate for your executor (–executor-memory) and perhaps employ one of the performance tuning options listed below:

  1. Bump up executor memory (–executor-memory) and make sure you adjust the configuration spark.yarn.executor.memoryOverhead when you do so.
  2. Adjust the memoryFraction for the driver or the executors to provide more headroom for the BlockManager service.
  3. Adjust the RDD persistence level of your application. See Part 3 of this blog series for more information on how to improve Spark performance through RDD persistence.

As you can see, Spark memory usage as well as the various memory usage reports in spark.log and elsewhere can be confusing and even misleading. We hope this post has shed more light on how Spark manages and reports memory.