User:EBernhardson/pyspark on SWAP

From Wikitech


Utilizing dependencies inside pyspark is possible with some custom setup at the start of a notebook.

TL/DR

import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
import os

master = 'yarn'
builder = (
    SparkSession.builder
    .appName('query understanding')
    .config(
        'spark.driver.extraJavaOptions',
        ' '.join('-D{}={}'.format(k, v) for k, v in {
            'http.proxyHost': 'webproxy.eqiad.wmnet',
            'http.proxyPort': '8080',
            'https.proxyHost': 'webproxy.eqiad.wmnet',
            'https.proxyPort': '8080',
        }.items()))
    .config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
)
if master == 'yarn':
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'
    os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'
    builder = (
        builder
        .master('yarn')
        .config('spark.sql.shuffle.partitions', 600)
        .config('spark.dynamicAllocation.maxExecutors', 100)
        .config('spark.executor.memory', '4g')
        .config('spark.executor.cores', 2)
    )
elif master == 'local':
    builder = (
        builder
        .master('local[12]')
        .config('spark.driver.memory', '8g')
    )
else:
    raise Exception()

spark = builder.getOrCreate()
spark.sparkContext.setCheckpointDir('/user/ebernhardson/spark-checkpoints')

Setting the foundations

Breaking this into a few pieces, first the findspark package must be installed to your SWAP instance.

!pip install findspark

findspark can then be called to bring the system spark 2.x libraries into your environment

import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
import os

The SparkSession builder is then used to give the spark app a name and specify some useful defaults. In this example we provide appropriate proxies to the spark driver to download the graphframes package. Other spark dependencies can be installed in a similar fashion. For python library dependencies, continue reading.

builder = (
    SparkSession.builder
    .appName('example spark application')
    .config(
        'spark.driver.extraJavaOptions',
        ' '.join('-D{}={}'.format(k, v) for k, v in {
            'http.proxyHost': 'webproxy.eqiad.wmnet',
            'http.proxyPort': '8080',
            'https.proxyHost': 'webproxy.eqiad.wmnet',
            'https.proxyPort': '8080',
        }.items()))
    .config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
)

Application Master

Spark can be run either in local mode, where all execution happens on the machine the notebook is started from, or in yarn mode which is fully distributed.

Local

Local mode is especially convenient for initial notebook development. Local mode is typically more responsive than cluster mode, and allows prototyping code against small samples of the data before scaling up to full month or multi-month datasets. Dependencies are additionally easily handled in local mode, when running local nothing special has to be done. Dependencies can be installed with !pip install and will be available in all executor processes. In local mode the number of java threads can be specified using brackets, such as local[12] . It is recommended to provide more memory than the defaults when running in local mode, otherwise a dozen java executor threads might fight over a couple GB of memory.

(
    builder
    .master('local[12]')
    .config('spark.driver.memory', '8g')
)

Yarn

Local mode has it's limits. When more compute is necessary set the master to yarn and specify some limits to the amount of compute to use. Spark will execute a task per core, so 100 executors with 2 cores will give 200 parallel tasks with 400G of total memory to work on the notebook.

(
    builder
    .master('yarn')
    .config('spark.dynamicAllocation.maxExecutors', 100)
    .config('spark.executor.memory', '3g')
    .config('spark.executor.cores', 2)
)

Yarn with python dependencies

Yarn with dependencies is managed by shipping a virtualenv containing all the dependencies to each executor. There are two options for packaging up the virtualenv. The simplest is to use a notebook bang command to zip up the current virtualenv. For more advanced usage see venv-pack.

!cd venv; zip -qur ~/spark_venv.zip .

Two environment variables work together to tell spark to ship the virtualenv to the executors and run workers from the included python executable. First is PYSPARK_SUBMIT_ARGS which must be provided an --archives parameter. This parameter is a comma separated list of file paths. Each path can be suffixed with #name to decompress the file into the working directory of the executor with the specified name. The final segment of PYSPARK_SUBMIT_ARGS must always invoke pyspark-shell.

os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'

In PYSPARK_SUBMIT_ARGS we instructed spark to decompress a virtualenv into the executor working directory. In the next environment variable, PYSPARK_PYTHON, we instruct spark to start executors using python provided in that virtualenv.

os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'

Ready to Go

After the builder is fully configured spark can be started and utilized as normal.

spark = builder.getOrCreate()

Other working examples

An example notebook has been put together demonstrating loading the ELMo package from tensorflow hub and running it across many executors in the yarn cluster.