Discovery/Analytics/Creating a new DAG

From Wikitech

This guide will explain how to create a new DAG that uses Pyspark for its tasks. It's not the only way - you can define JVM based tasks too - see airflow/dags/glent_weekly.py for reference.

Create a task definition

In general, Airflow is nothing more (or less) than a scheduling service - the actual applications ran do not differ from any standard application ran from a command line. That being said, there are things predefined that can help create tasks in analytics environment - like helper methods of accessing Hive table partitions. Dependencies on frequently used tools like pyspark are also configured for the project.

Implementation

In general, we follow this template when creating new tasks:

"""
Short description of the task defined here

Longer summary of the task

"""

# Import section, this is just an example
from argparse import ArgumentParser

import json
import logging
import requests
import sys
from typing import Optional, Sequence, Mapping

from wmf_spark import HivePartition

from pyspark.sql import DataFrame, SparkSession, functions as F, types as T

# Most of the tasks use ArgumentParser and have their parameters defined as a first function for readability
def arg_parser() -> ArgumentParser:
    parser = ArgumentParser()
    # add some parameters here...
    parser.add_argument('--search-satisfaction-partition', required=True, type=HivePartition.from_spec)

    return parser


# Functions defining the task itself, all should use type hints, both for arguments and return value
# ...


# Entry point to the task is defined here - please note the return code
def main(search_satisfaction_partition: HivePartition) -> int:
    spark = SparkSession.builder.getOrCreate()
    initial_searchsatisfaction_df = search_satisfaction_partition.read(spark)
    # ...
    
    
# There's almost no variations of the actual main block
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    args = arg_parser().parse_args()
    sys.exit(main(**dict(vars(args))))
It's convenient to prototype using Jupyter Notebook on analytics cluster - simply initiate your kerberos creds and tunnel port 8000 from any stat100x instance.

If you're accessing Spark dataframes, please note the initial_searchsatisfaction_df dataframe defined in main() method - this way works well with the partitions we have defined in analytics' Hive. Script argument has to be defined in a specific manner to be able to use that, but we'll get to that.

Tests

Script code isn't different from any other production code, so it needs to be tested.

Tests are located in spark/test General rules apply, but if you're defining a task based on Hive access and Spark, there's a useful method for creating dataframes directly from predefined json fixtures. example usage:

@pytest.fixture
def df_cirrussearch_request_events(get_df_fixture):
    return get_df_fixture('dataframes', 'cirrus_search_request.events')
    
    
def test_happy_path(df_cirrussearch_request_events):
    # Test code that uses df_cirrussearch_request_events dataframe

Fixture used by above needs to be located in spark/test/fixtures. See create_df_fixture in spark/test/conftest.py to create the fixtures files. Example:

cirrus_search_request.events.json
{
  "rows": [
    [
      "8271",
      [
        {"page_title": "first"},
        {"page_title": "second"},
        {"page_title": "third"},
        {"page_title": "fourth"},
        {"page_title": "fifth"},
        {"page_title": "sixth"},
        {"page_title": "seventh"},
        {"page_title": "eighth"},
        {"page_title": "ninth"},
        {"page_title": "tenth"}
      ],
      [
        {
          "query_type": "full_text",
          "indices": ["enwiki_content"],
          "hits_total": 30,
          "hits_returned": 21,
          "hits_offset": 0,
          "syntax": ["full_text", "bag_of_words"]
        }
      ]
    ],
    [
      "1121",
      [],
      [
        {
          "query_type": "full_text",
          "indices": ["enwiki_content"],
          "hits_total": 0,
          "hits_returned": 0,
          "hits_offset": 0,
          "syntax": ["full_text", "bag_of_words"]
        }
      ]
    ]
  ],
  "schema": {
    "type": "struct",
    "fields": [
      {
        "name": "search_id",
        "type": "string",
        "nullable": false,
        "metadata": {
        }
      },
      {
        "name":"hits",
        "type":{
          "type":"array",
          "elementType":{
            "type":"struct",
            "fields":[
              {
                "name":"page_title",
                "type":"string",
                "nullable":false,
                "metadata":{

                }
              }
            ]
          },
          "containsNull":true
        },
        "nullable":false,
        "metadata":{

        }
      },
      {
        "name": "elasticsearch_requests",
        "type": {
          "type": "array",
          "elementType": {
            "type": "struct",
            "fields": [
              {
                "name": "query_type",
                "type": "string",
                "nullable": false,
                "metadata": {
                }
              },
              {
                "name": "indices",
                "type": {
                  "type": "array",
                  "elementType": "string",
                  "containsNull": true
                },
                "nullable": false,
                "metadata": {
                }
              },
              {
                "name": "hits_total",
                "type": "long",
                "nullable": false,
                "metadata": {
                }
              },
              {
                "name": "hits_returned",
                "type": "long",
                "nullable": false,
                "metadata": {
                }
              },
              {
                "name": "hits_offset",
                "type": "long",
                "nullable": false,
                "metadata": {
                }
              },
              {
                "name": "syntax",
                "type": {
                  "type": "array",
                  "elementType": "string",
                  "containsNull": true
                },
                "nullable": false,
                "metadata": {
                }
              }
            ]
          },
          "containsNull": true
        },
        "nullable": false,
        "metadata": {
        }
      }
    ]
  }
}

Please see Discovery/Analytics#Verifying_code_changes for info on how to run the test.

In general, we are not limited to creating a single task - one of the main functionalities of Airflow is the ability to link tasks into DAGs - directed acyclic graphs. You can define number of tasks and link them in a DAG, and there are some tasks that are predefined, like NamedHivePartitionSensor - next part will explain those predefined tasks in more detail.

Create a DAG definition

Once the task is in place, we need to create a DAG definition so that Airflow can properly configure it and run it. In our case, our DAGs are defined in Python, with json configuration. There are some predefined automated tests we can use, too.

Implementation and configuration

This time we have here a complete, albeit a bit simplified, example of the DAG definition - please see the comments for explanation of the various parts.

"""Put here a short description of a DAG"""

# Most imports here are at least useful, if not necessary
from datetime import datetime, timedelta
# Note that DAG is imported from 'wmf_airflow' and not 'airflow'. This provides a few extra defaults to remove boilerplate.
from wmf_airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

from wmf_airflow.spark_submit import SparkSubmitOperator
from wmf_airflow.template import MEDIAWIKI_ACTIVE_DC, YMDH_PARTITION, REPO_PATH, DagConf
from airflow.sensors.named_hive_partition_sensor import NamedHivePartitionSensor

# argument here is an important one - configuration will look for a config file named name_of_the_config.json - more on that later
dag_conf = DagConf('name_of_the_config')

# currently only required parameter is 'start-date'. It's important only if we want to catch up with the old data
default_args = {
    'start_date': datetime(2020, 12, 1)
}

# This is how we access configuration entries
SEARCH_SATISFACTION_TABLE = dag_conf('table_search_satisfaction')

# This method defines one of the predefined sensors - NamedHivePartitionSensor
# It's a task that waits until a Hive partition is ready to be querie.
# It's quite often used for DAGs that do a batch analysis on Hive tables
def get_wait_sensor(table: str) -> NamedHivePartitionSensor:
    return NamedHivePartitionSensor(
        task_id='wait_for_data',
        # We send a failure email every 6 hours and keep trying for a full day.
        timeout=60 * 60 * 6,
        retries=4,
        sla=timedelta(hours=6),
        # Select single hourly partition
        partition_names=[
            '{}/datacenter={}/{}'.format(
                table, MEDIAWIKI_ACTIVE_DC, YMDH_PARTITION) 
        ])


with DAG(
        'dag_name', 
        default_args=default_args,
        # min hour day month dow
        schedule_interval='38 5 * * *',
        max_active_runs=1,
        catchup=True # this needs to be True if we want to process old data, too
) as dag:
    # This is how we'd wrap in airflow the task we created earlier
    example_task = SparkSubmitOperator(
        task_id='task name', 
        conf={
            'spark.yarn.maxAppAttempts': 1,
            'spark.dynamicAllocation.maxExecutors': 10,
        },
        spark_submit_env_vars={
            'PYSPARK_PYTHON': 'python3.7',
        },
        env_vars={
            'REQUESTS_CA_BUNDLE': '/etc/ssl/certs/ca-certificates.crt', # only required if you're doing ssl calls
        },
        jars=REPO_PATH + '/artifacts/elasticsearch-hadoop-6.5.4.jar', # comma seperated list of additional jars
        py_files=REPO_PATH + '/spark/wmf_spark.py',   # comma seperated list of additional py files
        application=REPO_PATH + '/spark/export_queries_to_relforge.py',
        # This is how we pass arguments into our task script - defined in the arg_parser() method
        application_args=[ 
            '--search-satisfaction-partition', SEARCH_SATISFACTION_TABLE + '/' + YMDH_PARTITION 
        ]

    )


    wait_for_search_satisfaction_data = get_wait_sensor(SEARCH_SATISFACTION_TABLE) 
    
    # DummyOperator is a noop - it exists mostly to easily mark the end of the DAG execution
    complete = DummyOperator(task_id='complete')

    # This is the actual definition of the DAG, with >> operator marking the edges (with the direction) between tasks
    # Any given task will only start once all the dependent tasks are done.
    # You can specify more than one depnedent task with an array, like [task1, task2] >> task3
    wait_for_search_satisfaction_data >> export_queries_to_relforge >> complete

As mentioned, dag also requires a configuration and it has to be placed in airflow/config directory, in a form of a conf_name.json, where conf_name matches the name specified as an argument for dag_conf in the code. Same goes for the encompassing object, like this:

{
  "name_of_the_config": {
    "table_search_satisfaction": "event.searchsatisfaction"
  }
}
If you wish to add jars into the project, please remember to first run git fat init so that only hash is commited, not the binary itself. This assumes that git fat is installed.

Testing

Apart from any potential tests that might test a specific logic of the dag creation (which I'd recommend adding in the first place), DAG id has to be added to all_dag_ids table in airflow/tests/conftest.py. After that, please follow this guide to regenerate fixtures for DAG arguments. The same thing can done inside test docker, if you export REBUILD_FIXTURES=yes env var before running tox.

There is a way of running tasks inside of a self started analytics integration environment. For this, please follow the guide there - https://gerrit.wikimedia.org/g/search/analytics-integration.

Deployment

Deployment happens after merging the code - it's done via standard scap procedure - please follow this guide.