Analytics/Systems/Cluster/Spark

From Wikitech
Jump to navigation Jump to search

Spark is a set of libraries and tools available in Scala, Java, Python, and R that allow for general purpose distributed batch and real-time computing and processing.

Spark is available for use in on the Analytics Hadoop cluster in YARN. Most (external) spark documentation will refer to spark executables without the '2' versioning. Ours are installed with '2' in the name to avoid clashing with previously installed Spark 1.

The Spark 2 CLI executables are:

  • spark2-submit
  • spark2-shell
  • spark2R
  • spark2-sql
  • pyspark2
  • spark2-thrifserver

spark2R and spark2-sql are new in Spark 2. spark2-sql allows you to interact with Hive tables via the Spark SQL engine, but in a purely SQL REPL, rather than having to code in a programming language.

In the rest of this doc, spark2 shell commands will be used, as it is the preferred installation of Spark. Note that our spark2 configuration defaults pyspark2 to using python3 (and ipython3 for the driver).

How do I ...

Start a spark shell in yarn

  • Scala
spark2-shell --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G
  • Python
pyspark2 --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G
  • R
spark2R --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G
  • SQL
spark2-sql --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G

See spark logs on my local machine when using spark submit

  • If you are running Spark on local, spark2-submit should write logs to your console by default.
  • How to get logs written to a file?
    • Spark uses log4j for logging, and the log4j config is usually at /etc/spark2/conf/log4j.properties
    • This uses a ConsoleAppender by default, and if you wanted to write to files, an example log4j properties file would be:
# Set everything to be logged to the file
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=/tmp/spark.log
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

This should write logs to /tmp/spark.log

  • On the analytics cluster (stat1007):
    • On the analytics cluster, running a spark job through spark submit writes logs to the console too, on both yarn and local modes
    • To write to file, create a log4j.properties file, similar to the one above that uses the FileAppender
    • Use the --files argument on spark-submit and upload your custom log4j.properties file:
spark2-shell --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G --files /path/to/your/log4j.properties
  • While running a spark job through Oozie
    • The log4j file path now needs to be a location accessible by all drivers/executors running in different machines
    • Putting the file on a temp directory on Hadoop and using a hdfs:// url should do the trick
    • Note that the logs will be written on the machine where the driver/executors are running - so you'd need access to go look at them

Monitor Spark shell job Resources

If you run some more complicated spark in the shell and you want to see how Yarn is managing resources, have a look at https://yarn.wikimedia.org/cluster/scheduler.

Don't hesitate to poke people on #wikimedia-analytics for help!

Use Hive UDF with Spark SQL

Here is an example in R. On stat1007, start a spark shell with the path to jar:

spark2R --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G --jars /srv/deployment/analytics/refinery/artifacts/refinery-hive.jar

Then in the R session:

sql("CREATE TEMPORARY FUNCTION is_spider as 'org.wikimedia.analytics.refinery.hive.IsSpiderUDF'")
sql("Your query")

Spark and Jupyter Notebooks

SWAP

Spark is now supported in SWAP Jupyter Notebooks. See the documentation there for more information.

Custom virtual environment

If however you want to run a Notebook with a specific python virtual environment, the solution is to set up the environment, and from it launch and connect to notebooks configured to work with spark:

# Connect to stat1007 through ssh (the remote machine that will host you notebooks)
ssh stat1007.eqiad.wmnet

# Create your python virtual environment (using the http proxy)
http_proxy=http://webproxy.eqiad.wmnet:8080 https_proxy=http://webproxy.eqiad.wmnet:8080 virtualenv -p python3 test_spark_venv

#Activate the newly created virtual environment
source test_spark_venv/bin/activate

# Download the minimal set of needed libraries, again using the proxy (ipython and jupyter, needed to start notebooks)
http_proxy=http://webproxy.eqiad.wmnet:8080 https_proxy=http://webproxy.eqiad.wmnet:8080 pip install ipython jupyter

# Configure pyspark to launch a notebook server when it starts
export PYSPARK_DRIVER_PYTHON=ipython
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --port 8123  --ip='*' --no-browser"

# start the pyspark job that will launch the notebook server
pyspark2 --master yarn --deploy-mode client --executor-memory 2g --conf spark.dynamicAllocation.maxExecutors=32

Spark job is now started on the cluster, having its master driven from a notebook server on stat1007. The terminal from which you have launched the commands shows you something like:

 To access the notebook, open this file in a browser:
        file:///srv/home/joal/.local/share/jupyter/runtime/nbserver-13696-open.html
    Or copy and paste one of these URLs:
        http://stat1004:8123/?token=BLAH
     or http://127.0.0.1:8123/?token=BLAH

We will need the last url to connect to the notebook server, but we first need to setup an ssh tunnel allowing your local computer to access the notebook server on stat1007:

# From your local machine
ssh -N stat1007.eqiad.wmnet -L 8123:stat1007.eqiad.wmnet:8123

Now you can browse from your local machine to the last url given by the notebook app, start a new notebook and use the 'spark' variable to access the spark session.


pyspark and external packages

To use external packages like graphframes:

pyspark2 --packages graphframes:graphframes:0.3.0-spark2.0-s_2.11 --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080"

Use this to avoid

resolving dependencies :: org.apache.spark#spark-submit-parent;1.0

confs: [default]

Spark and Oozie

Oozie has a spark action, allowing you to launch Spark jobs as you'd do (almost ...) with spark-submit:

<spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${job_tracker}</job-tracker>
            <name-node>${name_node}</name-node>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queue_name}</value>
                </property>
                <property>
                    <name>oozie.launcher.mapred.job.queue.name</name>
                    <value>${oozie_launcher_queue_name}</value>
                </property>
                <property>
                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
                    <value>${oozie_launcher_memory}</value>
                </property>
            </configuration>
            <master>yarn</master>
            <mode>cluster</mode>
            <name>${spark_job_name}</name>
            <jar>${spark_code_path_jar_or_py}</jar>
             <spark-opts>--conf spark.yarn.jar=${spark_assembly_jar} --executor-memory ${spark_executor_memory} --driver-memory ${spark_driver_memory} --num-executors ${spark_number_executors} --queue ${queue_name} --conf spark.yarn.appMasterEnv.SPARK_HOME=/bogus --driver-class-path ${hive_lib_path} --driver-java-options "-Dspark.executor.extraClassPath=${hive_lib_path}" --files ${hive_site_xml}</spark-opts>
            <arg>--arg1_name</arg>
            <arg>arg1</arg>
            <arg>--arg2_name</arg>
            <arg>arg2</arg>
            ...
        </spark>

The tricky parts here are in the spark-opts element, with the need for spark to be given specific configuration settings not automatically loaded as they are with spark-submit:

  • Core spark jar is needed in configuration:
--conf spark.yarn.jar=${spark_assembly_jar}
# on analytics-hadoop:
#    spark_assembly_jar = hdfs://analytics-hadoop/user/spark/share/lib/spark-assembly.jar
  • When using python, you need to set the SPARK_HOME environment variable (to dummy for instance):
--conf spark.yarn.appMasterEnv.SPARK_HOME=/bogus
  • If you want to use HiveContext in spark, you need to add the hive lib jars and hive-site.xml to spark (not done by default in our version):
--driver-class-path ${hive_lib_path} --driver-java-options "-Dspark.executor.extraClassPath=${hive_lib_path}" --files ${hive_site_xml}
# on analytics-hadoop: 
#   hive_lib_path = /usr/lib/hive/lib/*
#   hive_site_xml = hdfs://analytics-hadoop//util/hive/hive-site.xml

SparkR in production (stat100* machines) examples

SparkR: Basic example

From stat100*, and with the latest {SparkR} installed:

library(SparkR)

# - set environmental variables
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

# - start SparkR api session
sparkR.session(master = "yarn", 
   appName = "SparkR", 
   sparkHome = "/usr/lib/spark2/", 
   sparkConfig = list(spark.driver.memory = "4g", 
                      spark.driver.cores = "1", 
                      spark.executor.memory = "2g", 
                      spark.shuffle.service.enabled = TRUE, 
                      spark.dynamicAllocation.enabled = TRUE)
)

# - a somewhat trivial example w. linear regression on iris 

# - iris becomes a SparkDataFrame
df <- createDataFrame(iris)

# - GLM w. family = "gaussian"
model <- spark.glm(data = df, Sepal_Length ~ Sepal_Width + Petal_Length + Petal_Width, family = "gaussian")

# - summary
summary(model)

# - end SparkR session
sparkR.stop()

SparkR: Large(er) file from HDFS

Also from stat100*, and with the latest {SparkR} installed:

### --- flights dataset Multinomial Logistic Regression
### --- SparkDataFrame from HDFS
### --- NOTE: in this example, 'flights.csv' is found in /home/goransm/testData on stat1007

setwd('/home/goransm/testData')
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

### --- Start SparkR session w. Hive Support enabled
sparkR.session(master = "yarn",
               appName = "SparkR",
               sparkHome = "/usr/lib/spark2/",
               sparkConfig = list(spark.driver.memory = "4g",
                                  spark.driver.cores = "2",
                                  spark.shuffle.service.enabled = TRUE,
                                  spark.dynamicAllocation.enabled = TRUE,
                                  spark.executor.instances = "8",
                                  spark.dynamicAllocation.minExecutors = "4",
                                  spark.executor.cores  = "2",
                                  spark.executor.memory = "4g",
                                  spark.rpc.message.maxSize = "512", # - probably not necessary
                                  spark.enableHiveSupport = TRUE
                                  )
              )

# - copy flight.csv to HDFS
system('hdfs dfs -put /home/goransm/testData/flights.csv hdfs://analytics-hadoop/user/goransm/flights.csv', 
       wait = T)

# - load flights
df <- read.df("flights.csv",
              "csv",
               header = "true",
               inferSchema = "true",
               na.strings = "NA")

# - structure
str(df)

# - dimensionality
dim(df)

# - clean up df from NA values
df <- filter(df, isNotNull(df$AIRLINE) & isNotNull(df$ARRIVAL_DELAY) & isNotNull(df$AIR_TIME) & isNotNull(df$TAXI_IN) & 
                 isNotNull(df$TAXI_OUT) & isNotNull(df$DISTANCE) & isNotNull(df$ELAPSED_TIME))

# - dimensionality
dim(df)

# - Generalized Linear Model w. family = "multinomial"
model <- spark.logit(data = df, 
                     formula = AIRLINE ~ ARRIVAL_DELAY + AIR_TIME + TAXI_IN + TAXI_OUT + DISTANCE + ELAPSED_TIME,
                     family = "multinomial")

# - Regression Coefficients
res <- summary(model)
res$coefficients

# - delete flight.csv from HDFS
system('hdfs dfs -rm hdfs://analytics-hadoop/user/goransm/flights.csv', wait = T)

# - close SparkR session
sparkR.stop()

Spark tuning for big jobs

Many Spark default settings are not optimal for large scale jobs. This article from the Facebook technical team gives hints at how to better tune Spark in those cases. In this section we try to explain how the tuning helps.

Note: These tweaks may be useful if your job handles a big amount of data (Tb+ across stages), and/or if your job has a very big number of tasks/stages (tens of thousands).

Scaling the driver

  • First, make sure your job uses dynamic allocation. It's enabled by default on the analytics-cluster, but can be turned off. This will ensure a better use of resources across the cluster. If your job fails because of errors at shuffle (due to the external shuffle service), the tuning below should help.
  • Allow for more consecutive attempts per stage (default is 4, 10 is suggested): spark.stage.maxConsecutiveAttempts = 10. This tweak allows to better deal with fetch-failures. They happen usually when an executor is not available anymore (dead because of OOM or cluster resource preemption for instance). In such a case, other executors fail fetching data, and lead to failed stages. Bumping the number possible consecutive attempts allows for more error-recovery space.
  • Increase the RPC server threads to prevent out of memory errors: spark.rpc.io.serverThreads = 64 (no information available as to why this help - It can be assumed that since spark.rpc.connect.threads = 64then it's better to have the same amount of server threads answering, but I have not found proper information).

workers

  • Manually set spark.yarn.executor.memoryOverhead when using big executors or when using a lot of string values (interned string are store in the memory buffer). By default spark allocates 0.1 * total-executor-memory for the buffer, which can be too small.
  • Increase shuffle file buffer size: to reduce number of disk seeks and system calls made: spark.shuffle.file.buffer = 1 MB and spark.unsafe.sorter.spill.reader.buffer.size = 1 MB
  • Optimize spill files merging by facilitating merging newly computed streams to existing files (useful when the job spills a lot): spark.file.transferTo = false, spark.shuffle.file.buffer = 1 MB and spark.shuffle.unsafe.file.output.buffer = 5 MB
  • Reduce spilled data size by augmenting compression block size: spark.io.compression.lz4.blockSize = 512KB
  • If needed: Enable off-heap memory if GC pause become problematic (not needed for analytics jobs so far): spark.memory.offHeap.enable = true and spark.memory.ofHeap.size = 3g (don't forget that the off-heap memory is part of the yarn container, therefore your container is of size: executor-memory + memory.offHeap,size)

Scaling the external shuffle service

  • Speed up file retrieval by bumping the cache size available for the file index: spark.shuffle.service.index.cache.size = 2048