Description
Spark is Hadoop MapReduce done in memory
Software Category: devel
For detailed information, visit the Spark
website.
Available Versions
To find the available versions and learn how to load them, run:
module spider spark
The output of the command shows the available Spark
module versions.
For detailed information about a particular Spark
module, including how to load the module, run the module spider
command with the module’s full version label. For example:
module spider spark/3.4.1
Module | Version |
Module Load Command |
spark | 3.4.1 |
module load spark/3.4.1
|
Using Spark interactively
There are three ways to use Spark interactively:
- Shell prompt
- Open OnDemand PySpark
- Open OnDemand Desktop
If you need the Web UI you must use the third method (OOD Desktop).
Shell prompt
First submit an ijob.
Scala/PySpark
To start up a Scala or PySpark shell prompt, run spark-shell
or pyspark
. For example:
$ spark-shell
...
Spark context Web UI available at http://udc-xxxx-xx:4040
Spark context available as 'sc' (master = local[*], app id = local-1633023285536).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.2)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
R
To start an R prompt, you must load R first. Then run sparkR
. If the R version is different from 4.1.0, you will see a warning message:
package ‘SparkR’ was built under R version 4.1.0
We recommend loading the closest available version.
Open OnDemand PySpark
Python users can run Spark in a JupyterLab interface via the PySpark Interactive App on Open OnDemand.
Open OnDemand Desktop
Spark provides a user interface (UI) for you to monitor your Spark job. If you intend to use the Web UI, you must request a Desktop session through Open OnDemand.
The URL is displayed upon launching Spark and is of the form http://udc-xxxx-xx:4040
where udc-xxxx-xx
is the hostname of the compute node. You can either right-click on the link and select “Open Link,” or enter localhost:4040
in the browser.
Jupyter notebook/lab
You can redirect pyspark
to open a Jupyter notebook/lab as follows. The jupyter
command is provided by the jupyterlab
module.
Set two environment variables:
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=lab
If you’d prefer a notebook session, replace lab
with notebook
.
Navigate to your working directory and run:
pyspark
This will start up Jupyter inside a browser automatically. Use the “Python 3” kernel.
The example below estimates the value of pi in a PySpark session running on 16 cores, with the JupyterLab window on the left and the Spark Web UI event timeline on the right. Note that the SparkContext
object sc
is initialized automatically.
Slurm Script Templates for Batch Jobs
Local mode on a single node
#!/bin/bash
#SBATCH -p standard # partition
#SBATCH -A myaccount # your allocation
#SBATCH -N 1 # number of nodes
#SBATCH -c 10 # number of cores per node
#SBATCH -t 10:00:00 # time
module purge
module load spark
spark-submit script.py
You must initialize SparkContext
explicitly in your script, e.g.:
from pyspark import SparkContext
sc = SparkContext("local[*]")
The Spark log is written to slurm-<JOB_ID>.out
. After the job is finished, use the seff <JOB_ID>
command to verify that the cores are used effectively:
$ seff 1232109
...
Cores per node: 10
CPU Utilized: 01:17:16
CPU Efficiency: 82.20% of 01:34:00 core-walltime
...
If the CPU efficiency is much lower, please consider using fewer cores for your future jobs.
Standalone cluster mode using multiple nodes
As of 5/29/2024 this is no longer working. We will update as soon as we have a solution. For the time being please use the local mode up to 96 cores in the afton
partition.
We gratefully acknowledge the Pittsburg Supercomputing Center for giving us permission to use their Spark configuration and launch scripts.
Before using multiple nodes, please make sure that your job can use a full standard node effectively. When you request N nodes in the standalone cluster mode, one node is set aside as the master node and the remaining N-1 nodes are worker nodes. Thus, running on 2 nodes will have the same effect as running on 1 node.
#!/bin/bash
#SBATCH -p parallel # do not modify
#SBATCH --exclusive # do not modify
#SBATCH -A myaccount # your allocation
#SBATCH -N 3 # number of nodes
#SBATCH -c 40 # number of cores per node
#SBATCH -t 3:00:00 # time
module purge
module load spark
#---------------------------
# do not modify this section
export PARTITIONS=$(( (SLURM_NNODES-1) * SLURM_CPUS_PER_TASK ))
export MASTERSTRING="spark://$(hostname):7077"
$SPARK_HOME/scripts/spark-cluster-init.sh &
sleep 10
#---------------------------
spark-submit --master $MASTERSTRING script.py
In the above Slurm script template, note that:
-
Request parallel
nodes with exclusive access.
-
You may reduce the number of cores if the job needs more memory per core.
-
Your code should begin with:
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
-
You may need to set the number of partitions explicitly, e.g. in the second argument of sc.parallelize()
:
sc.parallelize(..., os.environ['PARTITIONS'])
where the PARTITIONS
environment variable is defined as the total number of cores on worker nodes in the Slurm script for your convenience. Without doing so only one partition will be created on each node.
Benchmark
We used a code that estimates the value of pi as a benchmark. The following table illustrates good scaling performance across multiple nodes (40 cores per node) on the HPC system.
Nodes |
Worker nodes |
Time (s) |
1 |
1 |
134.4 |
3 |
2 |
71.3 |
5 |
4 |
39.6 |
9 |
8 |
23.6 |
Cleanup
Temporary files are created inside your scratch directory during a multinode Spark job. They have the form:
spark-mst3k-org.apache.spark.deploy.master.Master-1-udc-aw33-2c1.out
spark-8147c5b8-eb70-4b98-809e-19fdbcf3eafb
app-20211012113817-0000
blockmgr-b41a7c79-cbf4-49f0-b373-f6c6467e9d01
You may safely remove these files when your job is done by running:
find /scratch/$USER -maxdepth 1 -regextype sed \( -name "spark-$USER-*" -o -regex '.*/spark-[0-9a-z]\{8\}-.*' -o -regex '.*/app-[0-9]\{14\}-.*' -o -regex '.*/blockmgr-[0-9a-z]\{8\}-.*' \) -exec rm -rf {} \;
Make sure that you do not use this pattern to name other files!