Some notes on using a Spark cluster
The following notes are mainly for my personal use referring to the Spark 1.6/YARN cluster that I access, but maybe they are helpful for you as well...
Upload to HDFS
By default (=used implicitly by all HDFS operations), a HDFS paths are relative to your HDFS home directory: it needs to be created first by the administrator!
While piping through SSH should work ( cat test.txt | ssh
username@masternode "hdfs dfs -put - hadoopFoldername/"
) , it is reported to be slow -- I never checked this, but as I anyway used rather small data, I did instead an scp to the local file system of the master node and used afterwards a hdfs put:
scp localFile username@masternode
hdfs dfs -put twitterSmall.csv Twitter
Concatenate HDFS files (all inside an HDFS directory) and store in
local file system (without sorting)
hdfs dfs -getmerge HdfFolderContainingSplitResultFiles LocalFileToBeCreated
Note that Spark does not overwrite output files in HDFS by default. Either take care when you re-run jobs that the output files have been (re-)moved or you have to allow it in the Spark conf of your program: conf.set("spark.hadoop.validateOutputSpecs","false")
Debugging
- See http://spark.apache.org/docs/latest/running-on-yarn.html
- Use
spark-submit --verbose
- If executor processes are killed, this is mainly due to insufficient RAM (garbage collection takes too long, thus timeouts occur or simple out of memory/OOM exceptions). While you see in this case in the log of the driver on the spark-submit console only "
<span class="hljs-keyword">exit</span> code <span class="hljs-number">143</span>
", the details need to be found in the logs of nodes/executors. This may not be possible via Web UI due to executor nodes being firewalled -- in this case use:
yarn logs -applicationId application_1470137500465_0147
(App Id tp be taken from ID columns in Cluster Web UI. Works only for completed runs, not the current run.) In these logs, you can find then / search for
java.lang.OutOfMemoryError: GC overhead limit exceeded
orjava.lang.OutOfMemoryError: Java heap space
Performance tuning
- Note that due HDFS blocks size of 128 MB, by default, partitions of this size are created when reading data. To enforce a higher number of partitions/higher parallelism, use already at the file read stage the optional numberOfPartitions parameter (that also many other RDD creating operations support).
- Some introduction https://www.mapr.com/blog/resource-allocation-configuration-spark-yarn
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
(in particular: more than 5 cores per executor is said to lead to bad HDFS throughput. Note that “executor” is not identical to “node”, thus instead of running one executor with 24 cores on one node, rather run 4 executors with 5 cores on each node or 8 executors with 3 cores! Note that then, however, the overall memory of a node needs to be divided by the numbers of executors per node, e.g. 5 BG per executor with 8 executors per node on a 40G RAM node.) - Config for RAM-intensive jobs (=1 core per executor only & 1 core per node only, using 40GB heap space and 2GB overhead for Spark/Yarn itself => on each of the 38 nodes only one core is used that thus can make use of all available RAM), in addition increase timeouts and message size:
spark-submit --conf "spark.network.timeout=300s" --conf "spark.akka.frameSize=2000" --driver-memory 30g --num-executors 38 --executor-cores 1 --conf "yarn.nodemanager.resource.cpu-vcores=1"--executor-memory 40g --conf "spark.yarn.executor.memoryOverhead=2000"--conf "spark.driver.cores=4" --conf "spark.driver.maxResultSize=0"
(Note: not sure about the driver memory and cores: this seems to have no influence -- is it too late to set it here?)