Analytics/Cluster/Spark/Administration

From Wikitech

Upgrading Spark

Our production version of spark is currently installed as part of conda-analytics and we download pyspark packages from Conda Forge as part of the build process for this environment.

However, in addition to this we have been working to enable Spark on Kubernetes and so we also build spark ourselves in the production_images repo.

However, there are a number of gotchas to be careful about:

  • Spark Assembly spark.yarn.archive
  • Spark YARN Shuffle Service

These gotchas make a rolling upgrade in YARN difficult.

Read on for more information. Here is a general upgrade plan:

  1. Update spark.version in any pom.xml files and rebuild and deploy all Spark jobs jars.
  2. Stop all Spark jobs (Refine, Airflow, etc.)
  3. Upgrade spark .deb package everywhere
  4. Run Puppet everywhere
  5. Restart all hadoop-yarn-nodemanagers
  6. Update all Spark job jar versions to the ones built with the new spark.version
  7. Restart all Spark jobs (Refine, Oozie, etc.)

Spark Shuffle Service

YARN runs the Spark YarnShuffleService as an auxilliary service. Spark uses this to shuffle data around between tasks. The spark-X.Y.Z-yarn-shuffle.jar is symlinked by Puppet into Hadoop's classpath, and then enabled in yarn-site.xml:

  <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
  </property>

The Spark YarnShuffleService runs inside of NodeManagers. After upgrading the Spark .deb package on NodeMananger boxes, a Puppet run will cause the new spark-X.Y.Z-yarn-shuffle.jar to by symlinked into Hadoop's classpath. Once this is done, all NodeManagers need to be restarted. This is unlikely to work in as rolling upgrade; Spark tasks need to talk to a Spark YarnShuffleService of the same Spark version that they were launched with.

This makes it difficult to test Spark upgrades in YARN, but we don't have a better alternative.

Spark Assembly

Spark is configured with an 'assembly' archive file via the spark.yarn.archive config. The Spark assembly is a zip file of all of the jars Spark uses to run. If spark.yarn.archive is configured, a Spark job running in Hadoop will use these .jars to launch Spark processes. This allows for a faster startup time when running in YARN; the local Spark dependencies don't have to be shipped to Hadoop every time you launch Spark.

After upgrading the Spark .deb package on an analytics 'coordinator' node, Puppet should automatically upload the new /usr/lib/spark2/spark-X.Y.Z-assembly.zip file to HDFS in /user/spark/share/lib.

The value of spark.yarn.archive is set by Puppet in spark-defaults.conf to the currently installed version of Spark. This means that a Spark job will use the Spark Assembly that matches the version of Spark where the Spark job was launched from.

Testing PySpark

# Print the python executable and the PYTHONPATH used on a YARN PySpark worker:
def python_interpreter():
    import sys, platform
    return "{}: {} {}".format(platform.node(), sys.executable, sys.path)
rdd = spark.sparkContext.parallelize(range(2), 2).mapPartitions(lambda p: [python_interpreter()])
rdd.collect()

# Test numpy on remote workers:
import numpy as np
rdd = sc.parallelize([np.array([1,2,3]), np.array([1,2,3])], numSlices=2)
rdd.reduce(lambda x,y: np.dot(x,y))

# Test pyarrow on remote workers:
import pyspark.sql.functions as F
df = spark.range(0, 1000).withColumn('id', (F.col('id') / 100).cast('integer')).withColumn('v', F.rand())

@F.pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def pandas_subtract_mean(pdf):
    return pdf.assign(v=pdf.v - pdf.v.mean())

df2 = df.groupby('id').apply(pandas_subtract_mean)
df2.show()


# Test Python requests library.
# Requires that CA certs are loaded properly, probably via REQUESTS_CA_BUNDLE env var.
# Also requires that http_proxy, https_proxy, and no_proxy settings are correct.

def http_request(url):
    import requests
    return f"{url}: " + str(requests.get(url).status_code)

def test_http_requests():
    external_url = "https://en.wikipedia.org"
    internal_url = "https://ms-fe.svc.eqiad.wmnet/wikipedia/commons/thumb/a/a8/Tour_Eiffel_Wikimedia_Commons.jpg/100px-Tour_Eiffel_Wikimedia_Commons.jpg"
    return [http_request(external_url), http_request(internal_url)]

# Test local driver requests
test_http_requests()

# Test executor requests
rdd = spark.sparkContext.parallelize(range(2), 2).mapPartitions(lambda p: test_http_requests())
rdd.collect()