Some notes on using a Spark cluster

Helmut Neukirchen, 18. August 2016

The following notes are mainly for my personal use referring to the Spark 1.6/YARN cluster that I access, but maybe they are helpful for you as well...

Upload to HDFS

By default (=used implicitly by all HDFS operations), a HDFS paths are relative to your HDFS home directory: it needs to be created first by the administrator!

While piping through SSH should work ( cat test.txt | ssh
username@masternode "hdfs dfs -put - hadoopFoldername/" ) , it is reported to be slow -- I never checked this, but as I anyway used rather small data, I did instead an scp to the local file system of the master node and used afterwards a hdfs put:
scp localFile username@masternode
hdfs dfs -put twitterSmall.csv Twitter

Concatenate HDFS files (all inside an HDFS directory) and store in
local file system (without sorting)

hdfs dfs -getmerge HdfFolderContainingSplitResultFiles LocalFileToBeCreated

Note that Spark does not overwrite output files in HDFS by default. Either take care when you re-run jobs that the output files have been (re-)moved or you have to allow it in the Spark conf of your program:  conf.set("spark.hadoop.validateOutputSpecs","false")


  1. See
  2. Use spark-submit --verbose
  3. If executor processes are killed, this is mainly due to insufficient RAM (garbage collection takes too long, thus timeouts occur or simple out of memory/OOM exceptions). While you see in this case in the log of the driver on the spark-submit console  only "<span class="hljs-keyword">exit</span> code <span class="hljs-number">143</span>", the details need to be found in the logs of nodes/executors. This may not be possible via Web UI due to executor nodes being firewalled -- in this case use:
    yarn logs -applicationId application_1470137500465_0147
    (App Id tp be taken from ID columns in Cluster Web UI. Works only for completed runs, not the current run.) In these logs, you can find then / search for java.lang.OutOfMemoryError: GC overhead limit exceeded or java.lang.OutOfMemoryError: Java heap space

Performance tuning

  1. Note that due HDFS blocks size of 128 MB, by default, partitions of this size are created when reading data. To enforce a higher number of partitions/higher parallelism, use already at the file read stage the optional numberOfPartitions parameter (that also many other RDD creating operations support).
  2. Some introduction
    (in particular: more than 5 cores per executor is said to lead to bad HDFS throughput. Note that “executor” is not identical to “node”, thus instead of running one executor with 24 cores on one node, rather run 4 executors with 5 cores on each node or 8 executors with 3 cores! Note that then, however, the overall memory of a node needs to be divided by the numbers of executors per node, e.g. 5 BG per executor with 8 executors per node on a 40G RAM node.)
  3. Config for RAM-intensive jobs (=1 core per executor only & 1 core per node only, using 40GB heap space and 2GB overhead for Spark/Yarn itself => on each of the 38 nodes only one core is used that thus can make use of all available RAM), in addition increase timeouts and message size:
    spark-submit --conf "" --conf "spark.akka.frameSize=2000" --driver-memory 30g --num-executors 38 --executor-cores 1 --conf "yarn.nodemanager.resource.cpu-vcores=1"--executor-memory 40g --conf "spark.yarn.executor.memoryOverhead=2000"--conf "spark.driver.cores=4" --conf "spark.driver.maxResultSize=0"
    (Note: not sure about the driver memory and cores: this seems to have no influence -- is it too late to set it here?)

DBSCAN evaluation

Helmut Neukirchen, 17. August 2016

This post is used to document some DBSCAN command line parameters used in a DBSCAN implementation evaluation. Once a paper will be published referencing it, it will go to EUDAT B2SHARE and get thus a persistent handle.

Conversion of HDF5 file into CSV/SSV

h5dump -d "/DBSCAN" -o out.txt twitterSmall.h5
Yields lines (12,0): 53.3243, -1.12341,
Remove Id in first column:
cut out.txt -d ':' -f 2 >out.txt2
Yields lines 53.3243, -1.12341,
Remove extra comma at end of line
cut out.txt2 -d ',' -f 1,2 >out.csv
Seems that the first line is just an empty line, remove using
tail -n +2 out.csv >twitterSmall.csv

The mraad DBSCAN implementation expects SSV format with IDs (i.e. remove brackets after h5dump run)
cut out.txt -c 5- > out_withoutleadingbracket.txt
cut out_withoutleadingbracket.txt -d ':' -f 1,2 --output-delimiter=',' > out3.txt
cut out3.txt -d ')' -f 1,2 --output-delimiter=',' > out4.txt
cut out4.txt -d ',' -f 1,4,5 --output-delimiter=',' > cut twitterBig_withIds.csv
cut -d ',' twitterBig_withIds.csv -f 1-3 --output-delimiter=' ' > twitterBig_withIds.ssv_with_doublespaces
cut -d ' ' -f 1,3,5 twitterBig_withIds.ssv_with_doublespaces >twitterBig_withIds.ssv

The Dianwei Han DBSCAN expects SSV without IDs: remove Id from mraad format:
cut twitterSmall.ssv -f 2,3 -d ' ' >twitterSmallNoIds.ssv

Running on Spark cluster

A single "assembly" jar needs to be created. In some cases, the sbt build file does not match the JVM/Scala/library versions and thus, minor adjustments were needed.

Find out optimal number of threads (cores) per executor for RDD-DBSCAN

(typically, 5 is recommended to avoid problems with parallel HDFS access by the same JVM): 3, 4 or 6 or more(?). Note we have 39*24 = 936 cores, but we should leave a few cores for master and other stuff.

912 cores, 3 cores per executor process:
spark-submit --driver-memory 2g --driver-cores 3 --num-executors 304 --executor-cores 3 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions 25000 0.01 40 : 12mins, 55sec with 912 partitions enforced in sc.textFile(src) and no repartition to a single file when saving result output (repartition to single file would crash process that has to do this "reduce" step).

932 cores, 4 cores per executor process:
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach 25000 0.01 40 : 11mins, 3sec with 912 partitions enforced in sc.textFile(src) and no repartition to a single file when saving result output.

912 cores, 6 cores per executor process:
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 152 --executor-cores 6 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach 25000 0.01 40 : 11mins, 38sec
928 core = 116 executors * 8 cores per executor process:
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach 25000 912 0.01 40 : 10mins, 26sec
912 cores, 76 executors * 12 cores per executor process:
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 76 --executor-cores 12 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_76executors_12coreseach 25000 912 0.01 40 : 12mins, 4sec
924 core = 42 executors * 22 cores per executor process:
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 42 --executor-cores 22 --executor-memory 10gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_42executors_22coreseach 25000 912 0.01 40 : 13mins, 58sec

Trying to find out optimal number of partitions for RDD-DBSCAN

spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach 25000 0.01 40 : 11mins, 3sec with 912 partitions enforced in sc.textFile(src) and no repartition to a single file when saving result output.
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_456_partitions_233executors_4coreseach 25000 0.01 40 : 12mins, 16sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_228_partitions_233executors_4coreseach 25000 0.01 40 : 11mins, 17sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_114_partitions_233executors_4coreseach 25000 0.01 40 : 11mins, 57sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_57_partitions_233executors_4coreseach 25000 0.01 40 : 10mins, 22sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_28_partitions_233executors_4coreseach 25000 0.01 40 : 11mins, 29sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 10 --executor-cores 6 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_57_partitions_10executors_6coreseach 25000 0.01 40 : 11mins, 49sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 2 --executor-cores 6 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_12_partitions_2executors_6coreseach 25000 0.01 40 : 21mins, 20sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 4 --executor-cores 6 --executor-memory 5gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_24_partitions_24executors_6coreseach 25000 24 0.01 40 : 17mins, 11sec
Note that there is an imbalance in one of the stages (stage 3 at line 127): while the median of this task is 22s, the longest task takes 3.5 minutes! But with smaller partition size, this becomes maybe less imbalanced and thus slightly faster? (Or is this rather less overhead?)

Experiment with Partitions size (MaxPoints) parameter

Investigate wether this results in different amount of noise filtered out!

TwitterSmall: 3 704 351 points.
Lower bound of partition size that is possible to achieve with the given eps is 25 000 (=each partition has less than 25 000 points, the majority much less, maybe about 9000). 3 704 351 / 25 000 means that at least 148 partitions are created, with partition size of 9000: 3 704 351 / 9 000 = more than 400 partitions.
Or: if we want to have 912 partitions (=slightly less than the available cores): 3 704 351 / 912 = 4061 points per partition would be best! TODO: try this value! (Note: if the resulting partitioning rectangle reaches size 2*eps, no smaller partitioning is possible and in this case, this rectangle will contain more points than specified.)
MaxPoints=4061 (a couple of "cannot split messages" occured):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 6gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach_4061_epsmaxPoints 4061 912 0.01 40 : 19mins, 17sec
MaxPoints=9000 (a couple of "cannot split messages" occured):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach_9000maxPoints 9000 912 0.01 40 : 13mins, 43sec
MaxPoints=20000 (Can't split: (DBSCANRectangle(51.4999988488853,-0.13999999687075615,51.51999884843826,-0.11999999731779099) -> 21664) (maxSize: 20000)
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach_20000maxPoints 20000 912 0.01 40 : 14mins, 27sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach_25000maxPoints 25000 912 0.01 40 : 11mins, 15sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach_30000maxPoints 30000 912 0.01 40 : 9mins, 31sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach_32500maxPoints 32500 912 0.01 40 : 10mins, 53sec but second run: 9mins, 6sec third run: 9mins, 13sec, fourth run: 9mins, 20sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 233 --executor-cores 4 --executor-memory 6gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_233executors_4coreseach_35000_epsmaxPoints 35000 912 0.01 40 : 9mins, 42sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach_40000maxPoints 40000 912 0.01 40 : 10mins, 56sec
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach_50000maxPoints 50000 912 0.01 40 : 14mins, 6sec

Scaling test

Increase number of nodes or executors!

116 executors with 8 cores each (=928 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_8coreseach_25000maxPoints 25000 912 0.01 40 : 11mins, 15sec
116 executors with 4 cores each (=464 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 116 --executor-cores 4 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_116executors_4coreseach_25000maxPoints 25000 912 0.01 40 : 10mins, 40sec
58 executors with 8 cores each (=464 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 58 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_58executors_8coreseach_25000maxPoints 25000 912 0.01 40 : 10mins, 24sec
With 464 cores not slower than with 928 cores (either: too much overhead with 928 or not enough partitioning)

29 executors with 8 cores each (=232 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 29 --executor-cores 8 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_29executors_8coreseach_25000maxPoints 25000 912 0.01 40 : 11mins, 3sec
29 executors with 4 cores each (=116 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 29 --executor-cores 4 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_29executors_4coreseach_25000maxPoints 25000 912 0.01 40 : 11mins, 47sec
29 executors with 2 cores each (=58 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 29 --executor-cores 2 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_29executors_2coreseach_25000maxPoints 25000 912 0.01 40 : 10mins, 22sec
29 executors with 1 cores each (=29 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 29 --executor-cores 1 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_29executors_1coreseach_25000maxPoints 25000 912 0.01 40 : (13mins, 33sec is this value from the 14 executors run?)
14 executors with 1 cores each (=14 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 14 --executor-cores 1 --executor-memory 8gb --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_14executors_1coreseach_25000maxPoints 25000 912 0.01 40 : (13mins, 33sec is this value from the 29 executors run?) 12mins, 45sec
32 executors with 1 cores each (=32 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 32 --executor-cores 1 --executor-memory 50g --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_32executors_1coreseach_25000maxPoints 25000 912 0.01 40 : 13mins, 52sec
16 executors with 1 cores each (=16 cores):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 16 --executor-cores 1 --executor-memory 50g --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_16executors_1coreseach_25000maxPoints 25000 912 0.01 40 : 16mins, 57sec and 14mins, 49sec and 14mins, 49sec

8 executors with 1 cores each (=8 core):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 8 --executor-cores 1 --executor-memory 50g --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_8executors_1coreseach_25000maxPoints 25000 912 0.01 40 : 20mins, 19sec

4 executors with 1 cores each (=4 core):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 4 --executor-cores 1 --executor-memory 50g --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_4executors_1coreseach_25000maxPoints 25000 912 0.01 40 : 33mins, 14sec
2 executors with 1 cores each (=2 core):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 2 --executor-cores 1 --executor-memory 50g --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_2executors_1coreseach_25000maxPoints 25000 912 0.01 40 : 58mins, 41sec

1 executors with 1 cores each (=1 core):
spark-submit --driver-memory 2g --driver-cores 1 --num-executors 1 --executor-cores 1 --executor-memory 50g --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterSmall.csv twitterSmall.out_with_912_partitions_1executors_1coreseach_25000maxPoints 25000 912 0.01 40 : 2hrs, 1mins, 51sec
Note: due to skewed data, there are single tasks that delay the whole thing, hence even though not everything is processed in parallel (less executor threads/cores than tasks), but tasks are queued, the queued execution is still faster than the long running task due to skewed data.

Twitter big

spark-submit --conf "spark.akka.timeout=300s" --conf "" --conf "spark.akka.frameSize=2000" --driver-memory 30g --num-executors 38 --executor-cores 1 --executor-memory 40g --conf "yarn.nodemanager.resource.cpu-vcores=1" --conf "spark.yarn.executor.memoryOverhead=2000" --conf "spark.driver.cores=20" --conf "spark.driver.maxResultSize=0" --class SampleRddDbscanCsv --master yarn /home/helmut/ScalaProjects/H5SparkRddDbscan/target/scala-2.10/H5SparkRddDbscan-assembly-0.1.jar Twitter/twitterBig.csv twitterBig.out_with_456_partitions_38executors_1coreseach 25000 456 0.01 40 : 1hrs, 28mins, 55sec

DBSCAN on Spark (

By default: space-separated, only single-space separator supported (cut -d ' ' -f 1,4,6 twitterSmall.ssv > twitterSmall.ssv_no_extra_spaces to remove extra spaces in Twitter file)
Using property file (if ran on JUDGE, paths refers to HDFS paths!)

To avoid needing to edit the properties files, we always output to the same directory and then do a rename after the run: hdfs dfs -mv twitterSmall.out_mraad twitterSmall.out_mraad116executors8cores

spark-submit --driver-memory 20g --num-executors 116 --executor-cores 8 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 3mins, 28sec
spark-submit --driver-memory 20g --num-executors 58 --executor-cores 8 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 3mins, 3sec
spark-submit --driver-memory 20g --num-executors 29 --executor-cores 8 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 2mins, 53sec
spark-submit --driver-memory 20g --num-executors 29 --executor-cores 4 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 2mins, 47sec
spark-submit --driver-memory 20g --num-executors 29 --executor-cores 2 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 2mins, 49sec
spark-submit --driver-memory 20g --num-executors 32 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 2mins, 27sec
spark-submit --driver-memory 20g --num-executors 29 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 3mins, 3sec
spark-submit --driver-memory 20g --num-executors 16 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 2mins, 30sec
spark-submit --driver-memory 20g --num-executors 14 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 3mins, 20sec
spark-submit --driver-memory 20g --num-executors 8 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 2mins, 54sec
spark-submit --driver-memory 20g --num-executors 7 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 3mins, 41sec
spark-submit --driver-memory 20g --num-executors 4 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 5mins, 30sec
spark-submit --driver-memory 20g --num-executors 2 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 9mins, 34sec
spark-submit --driver-memory 20g --num-executors 1 --executor-cores 1 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix : 18mins, 25sec

spark-submit --driver-memory 20g --num-executors 29 --executor-cores 2 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix_58partitions smaller boxes : 1mins, 38sec
spark-submit --driver-memory 20g --num-executors 29 --executor-cores 2 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix_116partitions0.01cellsize : 1mins, 24sec
spark-submit --driver-memory 20g --num-executors 29 --executor-cores 4 --executor-memory 12g --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterSmall.properties_hdfs_noprefix_116partitions0.01cellsize : 1mins, 20sec and 1mins, 37sec

TwitterBig runs

Seem to need a lot of RAM: only use 1 core on each worker so that this core can use full RAM (otherwise: abort due to timeouts which typically are due to crashes because of out of memory)
spark-submit --conf "spark.akka.timeout=300s" --conf "" --conf "spark.akka.frameSize=2000" --driver-memory 30g --num-executors 38 --executor-cores 1 --executor-memory 30g --conf "yarn.nodemanager.resource.cpu-vcores=1" --conf "spark.yarn.executor.memoryOverhead=12000" --conf "spark.driver.cores=20" --conf "spark.driver.maxResultSize=0" --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterBig.properties_hdfs_noprefix_14592partitions0.01cellsize : 1hrs, 3mins, 58sec
spark-submit --conf "spark.akka.timeout=300s" --conf "" --conf "spark.akka.frameSize=2000" --driver-memory 30g --num-executors 38 --executor-cores 1 --executor-memory 30g --conf "yarn.nodemanager.resource.cpu-vcores=1" --conf "spark.yarn.executor.memoryOverhead=12000" --conf "spark.driver.cores=20" --conf "spark.driver.maxResultSize=0" --master yarn /home/helmut/ScalaProjects/mraad_dbscan-spark/target/scala-2.10/dbscan-spark-assembly-0.1.jar /home/helmut/DataIn/twitterBig.properties_hdfs_noprefix_1824partitions0.01cellsize : 24mins, 51se

Spark DBSCAN (

Uses CSV

Need to change the source code slighlty (to avoid passing and subsequent complaints concerning master URL consisting just of "YARN". In addition, built.sbt had a problem using version numbers not available:
Furthermore, added (hardcoded 912 partitions in it’s IOHelper class where the initial file is read).
< scalaVersion := "2.10.6"
< val sparkVersion = "1.6.1"
< // Added % "provided" so that it gets not included in assembly jar
< libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
< // Added % "provided" so that it gets not included in assembly jar
< // Elsewhere "spark-mllib_2.10" is used (the 2.10 might refer to Scala 2.10?)
< libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
< // ScalaTest used by the tests provided by Spark MLlib
< libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.0" % "provided"
> libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.1.3" % "test"

spark-submit --num-executors 116 --executor-cores 8 --executor-memory 12gb --class org.alitouka.spark.dbscan.DbscanDriver --master yarn /home/helmut/ScalaProjects/spark_dbscan/spark_dbscan-assembly-0.0.4.jar --ds-master yarn --ds-jar /home/helmut/ScalaProjects/spark_dbscan/spark_dbscan-assembly-0.0.4.jar --ds-input Twitter/twitterSmall.csv --ds-output twitterSmall_alitouka912initialpartitions4061npp --npp 4061 --eps 0.01 --numPts 40 : 53mins, 6sec

spark-submit --num-executors 116 --executor-cores 8 --executor-memory 12gb --class org.alitouka.spark.dbscan.DbscanDriver --master yarn /home/helmut/ScalaProjects/spark_dbscan/spark_dbscan-assembly-0.0.4.jar --ds-master yarn --ds-jar /home/helmut/ScalaProjects/spark_dbscan/spark_dbscan-assembly-0.0.4.jar --ds-input Twitter/twitterSmall.csv --ds-output twitterSmall_alitouka912initialpartitions25000npp --npp 25000 --eps 0.01 --numPts 40 : 40mins, 10sec


spark-submit --conf "spark.akka.timeout=300s" --conf "" --conf "spark.akka.frameSize=2000" --driver-memory 30g --num-executors 38 --executor-cores 1 --executor-memory 40g --conf "yarn.nodemanager.resource.cpu-vcores=1" --conf "spark.yarn.executor.memoryOverhead=2000" --conf "spark.driver.cores=20" --conf "spark.driver.maxResultSize=0" --class org.alitouka.spark.dbscan.DbscanDriver --master yarn /home/helmut/ScalaProjects/spark_dbscan/spark_dbscan-assembly-0.0.4.jar --ds-master yarn --ds-jar /home/helmut/ScalaProjects/spark_dbscan/spark_dbscan-assembly-0.0.4.jar --ds-input Twitter/twitterBig.csv --ds-output twitterBig_alitouka912initialpartitions25000npp --npp 25000 --eps 0.01 --numPts 40
fails with
java.lang.Exception: Box for point Point at (51.382, -2.3846); id = 6618196; box = 706; cluster = -2; neighbors = 0 was not found


time java -Xmx35G -jar elki-bundle-0.7.1.jar KDDCLIApplication /home/helmut/DataIn/twitterSmall.csv -db.index "tree.metrical.covertree.SimplifiedCoverTree$Factory" -covertree.distancefunction minkowski.EuclideanDistanceFunction -algorithm clustering.DBSCAN -dbscan.epsilon 0.01 -dbscan.minpts 40 > twitterSmall.out_elki

time java -Xmx73G -jar elki-bundle-0.7.1.jar KDDCLIApplication /home/helmut/DataIn/twitterBig.csv -db.index "tree.metrical.covertree.SimplifiedCoverTree$Factory" -covertree.distancefunction minkowski.EuclideanDistanceFunction -algorithm clustering.DBSCAN -dbscan.epsilon 0.01 -dbscan.minpts 40 > twitterBig.out_elki

Some statistics on development of number of students and professors and on student/teacher ratio

Helmut Neukirchen, 6. August 2013

As the German magazine "duz -- Unabhängige Deutsche Universitätszeitung" asked me about some statement on "demographic change and universities" in the Icelandic context, I investigated the number of students vs. the number of faculty members (=sum of different levels of professors) in Iceland (and Germany for comparison). Only a fracture of my data will get published in "duz", so here is the full data (based on data from Statice and Destatis -- data for 2012 not yet available):

       Iceland                                          Germany
Year  #Students Change  #Faculty Students/Faculty      #Students Change
2005    16 074             623    25.8                 1 985 765
2006    16 835   +4.73%    682    24.7                 1 979 043  -0,3%
2007    16 851   +0.09%    708    23.8                 1 941 405  -1,9%
2008    17 165   +1.86%    761    22.6                 2 025 307  +4,3%
2009    18 291   +6.55%    833    22.0                 2 121 178  +4,7%
2010    19 159   +4.74%    807    23.7                 2 217 294  +4,5%
2011    19 334   +0.91%    790    24.5                 2 380 974  +7,4%

The explanation for the 2009 and 2010 increase of Icelandic students is the economic crisis in Iceland 2008 that did lead to a high number of students starting to attend university.
As you can see, the student/professor ratio got worse after the crisis in Iceland (due to students flooding the Universities and reduction in the number of faculty members due to decreased funding). For Germany, I did not calculate that ratio for all years, but just for 2011, together with more detailed further data for both Iceland and Germany (note that the number of students in Germany 2011 in the table below is different than in the table above: the above time series is from a different Destatis source than the numbers below):

2011                       Iceland     Germany
Enrolled students          19 334    2 501 990
Population                318 452   81 843 743
Students/Population            6%           3%

Prófessorar  316  Professoren           42 924
Dósentar     213  Dozenten & Assistenten 3 899
Lektorar     261					
Sum faculty members 790                 46 823

Faculty members/Population  0.248%      0.057%
Students/Faculty member      24.47       53.44

OECD collects data on governmental or public spending for tertiary eduction divided by GDP -- however, I could not find this data on the OECD statistics web page.
P.S.: I just stumbled over a visualisation of academic brain drain that uses this GDP ratio (from World Bank) as well as one input parameter of their polymetric visualisation.
P.P.S.: A related visualisation is on the number of researchers per inhabitants. However, the data there is from before the 2008 economic crisis.
P.P.P.S.: Here is finally the OECD data from Education at a Glance 2015 (DOI:10.1787/eag-2015-en).