User:EBernhardson/pyspark on SWAP
Utilizing dependencies inside pyspark is possible with some custom setup at the start of a notebook.
TL/DR
import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
import os
master = 'yarn'
builder = (
SparkSession.builder
.appName('query understanding')
.config(
'spark.driver.extraJavaOptions',
' '.join('-D{}={}'.format(k, v) for k, v in {
'http.proxyHost': 'webproxy.eqiad.wmnet',
'http.proxyPort': '8080',
'https.proxyHost': 'webproxy.eqiad.wmnet',
'https.proxyPort': '8080',
}.items()))
.config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
)
if master == 'yarn':
os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'
os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'
builder = (
builder
.master('yarn')
.config('spark.sql.shuffle.partitions', 600)
.config('spark.dynamicAllocation.maxExecutors', 100)
.config('spark.executor.memory', '4g')
.config('spark.executor.cores', 2)
)
elif master == 'local':
builder = (
builder
.master('local[12]')
.config('spark.driver.memory', '8g')
)
else:
raise Exception()
spark = builder.getOrCreate()
spark.sparkContext.setCheckpointDir('/user/ebernhardson/spark-checkpoints')
Setting the foundations
Breaking this into a few pieces, first the findspark package must be installed to your SWAP instance.
!pip install findspark
findspark can then be called to bring the system spark 2.x libraries into your environment
import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
import os
The SparkSession
builder is then used to give the spark app a name and specify some useful defaults. In this example we provide appropriate proxies to the spark driver to download the graphframes package. Other spark dependencies can be installed in a similar fashion. For python library dependencies, continue reading.
builder = (
SparkSession.builder
.appName('example spark application')
.config(
'spark.driver.extraJavaOptions',
' '.join('-D{}={}'.format(k, v) for k, v in {
'http.proxyHost': 'webproxy.eqiad.wmnet',
'http.proxyPort': '8080',
'https.proxyHost': 'webproxy.eqiad.wmnet',
'https.proxyPort': '8080',
}.items()))
.config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
)
Application Master
Spark can be run either in local mode, where all execution happens on the machine the notebook is started from, or in yarn mode which is fully distributed.
Local
Local mode is especially convenient for initial notebook development. Local mode is typically more responsive than cluster mode, and allows prototyping code against small samples of the data before scaling up to full month or multi-month datasets. Dependencies are additionally easily handled in local mode, when running local nothing special has to be done. Dependencies can be installed with !pip install
and will be available in all executor processes. In local mode the number of java threads can be specified using brackets, such as local[12]
. It is recommended to provide more memory than the defaults when running in local mode, otherwise a dozen java executor threads might fight over a couple GB of memory.
(
builder
.master('local[12]')
.config('spark.driver.memory', '8g')
)
Yarn
Local mode has it's limits. When more compute is necessary set the master to yarn and specify some limits to the amount of compute to use. Spark will execute a task per core, so 100 executors with 2 cores will give 200 parallel tasks with 400G of total memory to work on the notebook.
(
builder
.master('yarn')
.config('spark.dynamicAllocation.maxExecutors', 100)
.config('spark.executor.memory', '3g')
.config('spark.executor.cores', 2)
)
Yarn with python dependencies
Yarn with dependencies is managed by shipping a virtualenv containing all the dependencies to each executor. There are two options for packaging up the virtualenv. The simplest is to use a notebook bang command to zip up the current virtualenv. For more advanced usage see venv-pack.
!cd venv; zip -qur ~/spark_venv.zip .
Two environment variables work together to tell spark to ship the virtualenv to the executors and run workers from the included python executable. First is PYSPARK_SUBMIT_ARGS
which must be provided an --archives
parameter. This parameter is a comma separated list of file paths. Each path can be suffixed with #name
to decompress the file into the working directory of the executor with the specified name. The final segment of PYSPARK_SUBMIT_ARGS
must always invoke pyspark-shell.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'
In PYSPARK_SUBMIT_ARGS
we instructed spark to decompress a virtualenv into the executor working directory. In the next environment variable, PYSPARK_PYTHON
, we instruct spark to start executors using python provided in that virtualenv.
os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'
Ready to Go
After the builder is fully configured spark can be started and utilized as normal.
spark = builder.getOrCreate()
Other working examples
An example notebook has been put together demonstrating loading the ELMo package from tensorflow hub and running it across many executors in the yarn cluster.