Jump to content

User:EBernhardson/pyspark

From Wikitech

Pyspark can be used, albeit a bit oddly, to request computing resources on-demand from the hadoop cluster rather than sharing compute on stat100[456] and hoping for the best.

A python shell on a remote machine

This isn't necessarily the most useful thing, but it's a reasonable demonstration of the concepts behind using pyspark as a method of requesting on-demand resources.

import time
import threading

def start_repl(*args):
    import socketserverREPL

    server = socketserverREPL.ThreadedTCPServer(("0.0.0.0", 14328), socketserverREPL.RequestPythonREPL)
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    while not socketserverREPL.should_exit:
        time.sleep(1)
    server.server_close()
    server.shutdown()
    server_thread.join()


from pyspark import SparkContext
SparkContext().parallelize([0], 1).foreachPartition(start_repl)
  • Run this with: spark-submit --py-files socketserverREPL.py --master yarn remote_repl.py
  • Figure out where this is running (likely spark has emitted some log messages containing a host, like analytics1061) and telnet to the repl
    • telnet analytics1061 14328
  • You should now have a python repl running on the other machine.

Caveats

  • You need to know how much memory you want and request it with --conf spark.yarn.executor.memoryOverhead=<MB>. Yarn will ruthlessly kill your executors that run over
  • If you will be using multi-threaded operations you need to request more vcpu's from yarn with --executor-cores <N> combined with --conf spark.task.cpus=<N>. I don't think yarn monitors cpu usage, but for scheduling to work well you need to respect the amount of resources yarn was told you would use.
  • Individual files can be shipped over using the --files ... parameter
  • If you need complex python dependencies they can also be shipped as well, but it's more complicated
  • This is incredibly insecure, anyone can telnet into that port.
  • You aren't actually running as you on the other side, you are running as the yarn user. This could cause interesting issues.
  • You can access hdfs if you need using python subprocess and the hdfs dfs ... command to copy files to/from local. Temporary files should not be placed in /tmp, but instead in one of the temp directories provided by yarn in the comma delimited LOCAL_DIRS environment variable.
  • Due to python version differences between stat machines and hadoop workers binary packages (compiled code: numpy, tf, etc.) distributed via virtualenv often only works under 2.7 and throw errors on 3.x

Python Dependencies

More complex python dependencies from a virtualenv:

  • Create a virtualenv somewhere
$ mkdir ~/tmp
$ cd ~/tmp
$ virtualenv venv
  • Upgrade pip and wheel, on some older debian systems you can't download binary packages without this

$ venv/bin/pip install --upgrade pip wheel

  • Install the things you need

$ venv/bin/pip install keras

  • Package up the virtualenv so it can be shipped to the executors.
$ cd venv
$ zip -qr ../venv.zip .
$ cd ..
  • Call spark-submit with --archives 'venv.zip#venv' which tells spark to decompress venv.zip to the venv directory of the executors working dir.
  • Set the PYSPARK_PYTHON environment variable so that it points to python both locally and on the executors

$ PYSPARK_PYTHON=venv/bin/python spark-submit --archives 'venv.zip#venv' <myfile.py>