Discovery/Analytics

From Wikitech


Discovery uses the Analytics Cluster to support CirrusSearch. The source code is in the wikimedia/discovery/analytics repository. There is an additional repository at search/airflow which deploys the airflow service much of the analytics code depends on for scheduling.

search/airflow

The search/airflow repository deploys all necessary code to run the airflow service. This includes dependencies needed by upstream airflow, along with dependencies for our own airflow plugins. This does not include the dag definitions or anything related to the specific tasks we run outside the runtime dependencies. The airflow web UI can be accessed via ssh tunnel: ssh -N -L8778:localhost:8778 an-airflow1001.eqiad.wmnet.

Updating dependencies

Update the requirements.txt file in the base of the repository to include any new or updated dependencies. The related requirements-frozen.txt file is maintained by the make_wheels.sh script. To avoid unrelated updates when building wheels this process first installs requirements-frozen.txt, then installs requirements.txt on top of that. Due to this removing a package requires removing it from requirements-frozen.txt as well as requirements.txt. After the requirements files have been edited make_wheels.py must be run to collect the necessary artifacts.

  1. Build the docker environment for make_wheels. Docker is used to ensure this is compatible with debian 10.2 which runs on an-airflow1001:
    docker build -t search-airflow-make-wheels:latest -f Dockerfile.make_wheels .
  2. Run make_wheels:
    docker run --rm -v $PWD:/src:rw --user $(id -u):$(id -g) search-airflow-make-wheels:latest
  3. Upload wheels to archiva:
    bin/upload_wheels.py artifacts/*.py

Deploy

From deployment.eqiad.wmnet:

  1. cd /srv/deployment/search/airflow
    git fetch origin/master
    git log HEAD..origin/master
    git pull --ff-only origin/master
    scap deploy 'a meaningful description'

Post-Deploy

Scap does not auto-restart the related airflow services (but it should!). After deployment you will need to login to an-airflow1001 and restart the airflow-scheduler and airflow-webserver services.

Debugging

  • Individual job logs are available from the airflow web UI.
  • Typically those logs are only for a bit of control code, and report a yarn application ID for the real work.
    • The application ID looks like this: application_1601916545561_192397
    • It may not be present in the logs if the job fails very early.
  • sudo -u analytics-search kerberos-run-command analytics-search yarn logs -applicationId <app_id_from_logs> | less
    • always redirect the output! These can be excessively verbose
  • Airflow jobs that previously finished can be re-triggered from the UI if necessary. From the dag tree view select the earliest task in a dagrun that should be cleared, and from the modal that pops up select clear (with defaults of downstream recursive). This will put all downstream tasks back to a clean state and they will be re-run as before.
  • If you can't figure out what is going on with a particular failure, you can open a Phabricator ticket or get help from someone with more experience (currently probably Erik).
    • Team-internal projects that don't depend on up-to-the-minute updates, like Mjolnir and Glent, can probably wait for the usual Phab ticket process.
    • Jobs that run for other teams, like ORES, are typically more urgent. Ask for help to work on it (or triage it and decide it can be a Phab ticket).

wikimedia/discovery/analytics

The wikimedia/discovery/analytics repository contains airflow DAGs, various python scripts invoked by those DAGs, and custom python environments that airflow will use to run the scripts in.

WMF Dependencies

While many of the simpler tasks performed by this repository are implemented locally, more advanced functionality comes from other repositories:

Adding a new DAG

See Discovery/Analytics/Creating a new DAG for a tutorial.

Verifying code changes

To verify correctness of the changes in wikimedia/discovery/analytics repository, run a docker command:

docker run -it --rm -e XDG_CACHE_HOME=/src/.cache -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro -v $PWD:/src:rw --user $(id -u):$(id -g) --entrypoint /bin/bash docker-registry.wikimedia.org/releng/tox-pyspark:0.7.0-s1

After this, run:

tox

How to prepare a commit

Airflow fixtures

For many of the tasks run by airflow the test suite runs them and records the commands that would have been executed as test fixtures. This helps keep track of what exactly is changing when we deploy, and helps reviewers understand the effect of changes to the dags. These fixtures are rarely edited by hand, rather the standard workflow is to delete all fixtures and allow the test suite to rebuild them. Changes to the fixtures are then reviewed and added to the commit using standard git tooling.

Fixtures can be rebuilt using the same docker image used to run tests in CI. The below only works on a fairly simply linux install where /etc/passwd is the source of truth (like a desktop install):

 find airflow/tests/fixtures -name \*.expected -exec rm '{}' ';'
 docker run -it --rm -e REBUILD_FIXTURES=yes -e XDG_CACHE_HOME=/src/.cache \
   -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro -v $PWD:/src:rw --user $(id -u):$(id -g) \
   --entrypoint /usr/local/bin/tox \
   docker-registry.wikimedia.org/releng/tox-pyspark:0.7.0-s1 \
   -e airflow -- -e pytest

Command line explanation:

  • XDG_CACHE_HOME=/src/.cache - Gives pip a place to store files between runs, generally holds downloaded python packages
  • --user $(id -u):$(id -g) - By default the container runs as the nobody user. That user wont be able to write files in the repo, so assign the same uid/gid as the account executing docker.
  • -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro - If the id/gid passed dont exist in the container it says 'I have no name!' everywhere the username should be. Attempting to run the tests will say something like getpwuid(): uid not found: 22524. Mounting /etc/passwd makes the uid exist. Since macOS uses Open Directory to manage users its /etc/passwd does not contain regular users. You may fake it on the fly by replacing -v /etc/passwd with -v $(echo "$(id -un):*:$(id -u):$(id -g):$HOME:/bin/bash" > /tmp/passwd && echo /tmp/passwd)
  • --entrypoint /usr/local/bin/tox - The entrypoint to run, can also run /bin/bash if you prefer a shell and want to invoke tox yourself
  • -e airflow -- -e pytest - Invoke airflow environment of top level tox.ini, then invoke the pytest environment of airflow/tox.ini

A slightly more complicated version will work on instances build in WMF cloud. At this time we don't have a way to run this from a MacOS host machine.

 docker run -it --rm \
   -v $PWD:/src:rw --user 0:0 --entrypoint /bin/bash \
   docker-registry.wikimedia.org/releng/tox-pyspark:0.7.0-s1 \
   -c "adduser --no-create-home --disabled-password --uid $(id -u) --gecos '' someuser && ln -s /opt/spark* /opt/spark && su --command="'"'"cd /src && XDG_CACHE_HOME=/src/.cache SPARK_HOME=/opt/spark REBUILD_FIXTURES=yes tox"'"'" - someuser"

Command line explanation:

  • In cases where /etc/passwd can't be mounted, for example when user accounts are sourced from a network identity, this work around starts the container as root and creates a matching user inside the container before running the tests.

Airflow Variables

Airflow variables are the primary method of parameterizing DAGs. Within the search platform deployment variables are split into two classes. System variables describe facts of the specific deployment, such as the path on the airflow host to the repository, or the https_proxy used to access public endpoints. All system variables are codified into our airflow plugin as wmf_airflow.template.*. In addition to system variables each DAG should have a single variables named {dag_id}_conf containing a dictionary of variables used by that DAG. This variable should define all locations the DAG will read and write to such that test deployments can easily identify and redirect these as necessary.

Variable values used in production are controlled by the airflow/config directory. Each file there must be a json dict containing variables to import. A file may define one or more variables. Convention is for each independent unit (whatever that is) to have it's own .json file defining the set of related variables. Variables defined here will be imported to the production instance when scap deployment promotes a deployment to active. There is room for race conditions here, If variables are changing in a non-compatible way the relevant dags must be paused while the deployment is run, and then re-enabled (future work may stop the scheduler or some other approach during deployment to avoid races).

No affordance is provided for deleting unused variables from the production installation. An operator (typically human) must clean up the unused variables post-deployment.


How to prepare a deploy

Updating python scripts

Tasks defined as python scripts in the wikimedia/discovery/analytics repository require no special preparation prior to deployment. For the environments the scripts run within, see below.

Updating python environments

As of nov 2020 none of the yarn client instances are running 9.12, environments built by deployments are not valid and must be manually built.

If any python environments have been updated the artifacts/ directory needs to be updated as well. On a debian 9.12 (matching yarn worker nodes) instance: xy

 bin/make_wheels.sh
 bin/upload_wheels.py artifacts/*.whl
 git add artifacts/*.whl

The make_wheels.py script checks each directory in environments/ for requirements.txt. If present all necessary wheel's are collected into artifacts/ and a requirements-frozen.txt containing exact versioning is emitted.

The upload_wheels.py script will sync the wheels to archiva. If a wheel already exists in archiva for the same version but a different sha1 your local version will be replaced. Wheel upload requires setting up maven with WMF's Archiva configuration. This step can be run from any instance, it does not have the strict versioning requirement of the previous step.

Updating java jars

Tasks defined as java jars source their jar from the artifacts/ directory of wikimedia/discovery/analytics. This repository uses git-fat to access the files. Mostly git-fat means while we add jars to the repo, the only thing commited is a hash of the jar. To update a task to a new jar:

  1. Release a new version of the job jar to archiva (example).
  2. Copy {project}-{version}-jar-with-dependencies.jar from the job project target/ directory, or download direct from archiva, into the analytics project artifacts/ directory
  3. Remove (git rm) the old version of the jar from the artifacts/ directory
  4. Grep the repository (likely airflow/dags or airflow/config directories) for references to the old jar version, update with new version

Changes to the airflow dag will require rebuilding the fixtures. The jar copied into the repository will not be uploaded and must already exist in Archiva. The local copy of the file will be used to derive the hash included in the commit.

Verify artifacts in patch set are fetchable

There are no CI procedures to ensure a jar added in this way actually exists in archiva. To verify manually create a new clone of the repository:

 git clone . /tmp/deleteme

pull the artifacts:

 cd /tmp/deleteme
 git fat init && git fat pull

Updating airflow dags

No special preparation is required when deploying dag updates. Newly created dags will start in the off position, updated dags will pick up their changes within a minute of deployment.

How to deploy

  1. Ssh to deployment.eqiad.wmnet
  2. Run:
    cd /srv/deployment/wikimedia/discovery/analytics
    git fetch
    git log HEAD..origin/master
    git pull --ff-only origin master
    scap deploy 'a logged description of deployment'

    This part brings the discovery analytics code from gerrit to stat1007 and an-airflow1001. It additionally builds the virtualenv's necessary. Airflow DAGs are automatically loaded on deployment, but new dags start in the off position.

Airflow Production Deployment

Airflow dags are scheduled from an-airflow1001. After scap deployment airflow will see the updated dags within one minute.

WARNING: If any python in the airflow/plugins directory is updated by the deployment the airflow-scheduler and airflow-webserver systemd services will need to be restarted to ensure all systems use the updated plugin code.

Airflow Test Deployment

The preferred method for testing airflow and it's related tasks is through the search platform analytics integration environment. See the README there for more details on running the environment.

Alternatively, many airflow tasks result in essentially executing a shell command (spark-submit, hive, skein, etc.) and waiting for it to complete. When testing in production is necessary prefer to test running those shell commands directly with modified command lines over performing airflow test deployments. Deeper or more complex (mjolnir) dags may require test deployment due to the number of tasks involved.

Yet another option for debugging/testing, when the context is our airflow plugins, is to get a repl in airflow's context. To accomplish this, first start the REPL:

   sudo -u analytics-search bash -c "source /srv/airflow-search/bin/airflow-search-profile.sh; /usr/lib/airflow/bin/python"

Then set some kerberos related variables in python. From there airflow hooks can be imported and invoked to determine where things are going awry.

from airflow.configuration import conf
import os

os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')

Debugging

Web Based

Yarn

Yarn shows the status of currently running jobs on the hadoop cluster. If you are in the wmf LDAP group (every WMF employees/contractors) you can login directly to yarn.wikimedia.org. Yarn is also accessible over a SOCKS proxy to analytics1001.eqiad.wmnet.

Currently running jobs are visible via the running applications page.

CLI

CLI commands are typically run by ssh'ing into stat1007.eqiad.wmnet

Yarn

Yarn is the actual job runner.

List running spark jobs in Yarn
 yarn application -appTypes SPARK -list
Fetch application logs

the yarn application id can be used to fetch application logs

 sudo -u analytics-search yarn logs -applicationId application_1454006297742_16752 | less
Yarn manager logs

When an executor is lost, it's typically because it ran over it's memory allocation. You can verify this by looking at the yarn manager logs for the machine that lost an executor. To fetch the logs pull them down from stat1007 and grep them for the numeric portion of the application id. For an executor lost on analytics1056 for application_1454006297742_27819:

   curl -s http://analytics1056.eqiad.wmnet:8042/logs/yarn-yarn-nodemanager-analytics1056.log | grep 1454006297742_27819 | grep WARN

It might output something like this:

   2016-02-07 06:05:06,480 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_e12_1454006297742_27819_01_000020
   2016-02-07 06:26:57,151 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Process tree for container: container_e12_1454006297742_27819_01_000011 has processes older than 1 iteration running over the configured limit. Limit=5368709120, current usage = 5420392448
   2016-02-07 06:26:57,152 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=112313,containerID=container_e12_1454006297742_27819_01_000011] is running beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory used; 5.7 GB of 10.5 GB virtual memory used. Killing container.
   2016-02-07 06:26:57,160 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_e12_1454006297742_27819_01_000011 is : 143

Airflow Alerts

Airflow alerts are emitted to the discovery-alerts mailing-list.

Sensor timeouts and SLAs

Sensors are dedicated tasks that for some data to be present before executing the next task in the DAG. Sensor tasks generally have a name with wait in them. When the data is not there in the time configured on the task the sensor will emit two kinds of alerts depending on how they're configured:

  • SLA miss with for instance: [airflow] SLA miss on DAG=wcqs_streaming_updater_reconcile_hourly
  • Timeouts with for instance: Airflow alert: <TaskInstance: mediawiki_revision_recommendation_create_hourly.wait_for_data 2023-01-09T10:00:00+00:00 [up_for_retry]

When sensors are complaining an investigation must be done to understand why the data is not there. DAG runs that failed because of this might have to be resumed manually by clearing the failed sensor task manually, to identify such tasks go to Browser/Task Instances, set a filter on state failed and sort by date. Select the failed tasks and clear them (select Clear in the With selected drop down menu).

False positives

It is possible that airflow emits an alert that is a false positive, they can be identified by looking at the alert body:

   Try 0 out of 5
   Exception:
   Executor reports task instance finished (failed) although the task says its queued. Was the task killed externally?


If the task does not report further problems it is likely that this is a false positive. These false positives should be rare (a couple per week) but if they happen more frequently this might be a symptom of something malfunctioning on the airflow instance, see phab:T325279 for more details.