Jump to content

GDI/Equity Landscape/Airflow

From Wikitech

Equity Landscape Airflow

DAGs

All the Airflow DAGs for Equity Landscape can be found under the Data Engineering repo.

We currently have four types of DAGs for the project namely.

  • equity_landscape_api_dag (API)
  • equity_landscape_csv_dag (CSV)
  • equity_landscape_hql_dag (HQL)
  • equity_landscape_app_dag (APP)

Scheduling

  • The DAGs are not scheduled, they are run on an ad-hoc basis.
  • This is due to the pipeline pooling data from various external sources, some of which their update schedules cannot be determined.

DAG execution

  • The order of execution is:
    • API and CSV can run independently.
    • HQL needs to run after both API and CSV have run.
    • APP can only run after HQL is run.

API DAG

  • Ideal location for all data that needs to be stored onto HDFS via API calls.
  • Uses a job_config property to specify a task and it's parameters.
    • The job_config property consists of:
      • task_name (dict) which contains the below properties
      • application : The python program to execute (string)
      • args: The arguments that the python program expects (dict)
        • Example DAG variable
        • Image of API DAG graph
          API DAG graph
        •           "jobs_config": {
                      "load_world_bank_data": {
                          "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/loaders/load_wbgapi.py",
                          "args": {"--schema": "gdi"},
                          "loop_args": [
                              {"--db": 2, "--series": "IT.NET.USER.ZS"},
                              {"--db": 2, "--series": "IT.CEL.SETS.P2"},
                              {"--db": 2, "--series": "SP.POP.TOTL"},
                              {"--db": 2, "--series": "NY.GDP.PCAP.PP.CD"},
                              {"--db": 2, "--series": "NY.GDP.PCAP.PP.KD"},
                              {"--db": 2, "--series": "SP.POP.GROW"},
                              {"--db": 2, "--series": "SP.POP.TOTL.FE.IN"},
                              {"--db": 2, "--series": "PA.NUS.PPP"},
                              {"--db": 2, "--series": "PA.NUS.PPPC.RF"},
                              {"--db": 2, "--series": "PA.NUS.FCRF"},
                              {"--db": 2, "--series": "PA.NUS.ATLS"},
                              {"--db": 2, "--series": "FP.CPI.TOTL.ZG"},
                              {"--db": 3, "--series": "CC.EST"},
                          ]
                         }
                     }
          
  • The DAG consists of the following tasks:
    • load_lua (currently not in use and is removed from the code but it's there should it be required)
      • This uses load_lua_data from the project repo to load data into organizational_info_input_metrics. We aren't using this data currently as it has not matured enough to be used to be used as an alternative for Affiliate data.
      •           "jobs_config": {
                    "load_lua_data": {
                        "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/loaders/load_lua_data.py",
                        "args": {"--schema": "gdi", "--current-year": 2021, "--end-year": 2021}
                       }
                   }
        
    • load_world_bank_data
      • This uses the load_wbgapi from the project repo
        • It takes in the db number and series as arguments. Uses the world bank python package to fetch the data and load them into Hive (world_bank_data_input_metrics)

CSV DAG

  • Responsible for loading CSV files into the respective Hive Tables.
  • This also uses a job_config dictionary that consists of the following properties:
    • A list of dictionaries that consist of:
    • table_name - name of the Hive table to insert into
    • file_location - filename of the csv file (in the DAG, this value is concatenated to base_csv_location variable that can also be specified as part of the DAG variables.
    • There is an optional args dictionary that is used to specify additional parameters to load_csv
    • Example DAG variable
    • Image of CSV DAG graph
      CSV DAG graph
    •       {
              "schema": "gdi",
              "base_csv_location": "hdfs://analytics-hadoop/wmf/data/raw/gdi/equity_landscape/csv",
              "conda_env":"https://gitlab.wikimedia.org/repos/gdi/equity-landscape/gdi-source/-/package_files/1338/download",
              "csv_app": "lib/python3.7/site-packages/gdi_source/equity_landscape/loaders/load_csv.py",
              "partition_year": 2022,
              "jobs_config": [
                 {
                   "table_name": "country_meta_data",
                   "file_location": "country_meta_data.csv"
                 }, 
                 {
                   "table_name": "population_data_input_metrics",
                   "file_location": "population_data_2020.csv",
                   "args": { "--partition_columns": "year='2020'"}
                 }
                ]
            }
      

HQL DAG

DAG graph

Image of HQL DAG graph

  • This is responsible for the first part of data transformations that uses HQL files to load data into Hive tables.
  • The DAG uses TaskGroups to define the metrics each set of HQL files represent.
    • geometrics
      • Image of Geometrics Task Group
      • editorship
        • Includes the following tasks:
          • geoeditor_input_metrics
            • Takes in data from canonical_data.wikis and wiki_db_map_input_metrics that we loaded using the CSV DAG and lastly wmf.geoeditor_monthly to load the data into geoeditor_input_metrics.
            • It is executed twice (has the years: [] config and uses the a_year_ago and two_years_ago flag)
          • geoeditor_input_metrics_pivot
            • Takes in data from geoeditor_input_metrics and pivots it.
          • geoeditor_online_input_metrics
            • Used for aggregating data from wmf.geoeditors_monthly and country_meta_data to calculate active editor data.
          • brief_projects_edited_metrics
            • Calculates the average active editors per wiki_db, project_label by country.
      • readership
        • georeadership_input_metrics
          • Used for storing the following partitions:
            • pageviews
            • unique_devices
            • yoy_metrics for the above metrics.
    • grants_leadership
      • grants_leadership_input_metrics
        • Takes in data from grants_input_metrics and calculates various grants_metrics including total_historical_grants_to_data, total_calendar_year_grants amongst others.
    • population
      • population_leadership_input_metrics
        • Uses data from population_data_input_metrics that was loaded using the CSV DAG.
      • access' has been moved to the APP DAG.
  • Each metric is represented by a dictionary that contains a list of HQL tasks and properties.
    • Each HQL task contains it's own properties, namely:
      • pos used for ordering each task in the DAG list to ensure that it runs before/after another task.
      • hql_path an hdfs location specifiying the location of the hql file.
      • query_parameters the parameters passed into the hql file.
        • Note: year='-' is replaced by the a_year_ago parameter from the DAG's variables.
      • The years: [] property that uses the a_year_ago and two_years_ago parameters to create a range of year that the HQL task needs to run for.
  • Example DAG variable
    •  {
       
        "schema": "gdi",
        "a_year_ago": 2022,
        "two_years_ago": 2021,
        "editorship_config": [
          {
            "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/geoeditor_input_metrics.hql",
            "pos": 0,
            "query_parameters": {
              "source_table": "wmf.geoeditors_monthly",
              "map_table": "gdi.wiki_db_map_input_metrics",
              "canonical_wiki": "canonical_data.wikis",
              "destination_table": "gdi.geoeditor_input_metrics"
            },
            "years": []
          },
          {
            "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/geoeditor_input_metrics_pivot_monthly_bins.hql",
            "pos": 1,
            "query_parameters": {
              "source_table": "gdi.geoeditor_input_metrics",
              "map_table": "gdi.wiki_db_map_input_metrics",
              "destination_table": "gdi.geoeditor_input_metrics_pivot",
              "metric": "monthly_bins"
            },
            "years": []
          },
          {
            "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/geoeditor_online_input_metrics.hql",
            "pos": 2,
            "query_parameters": {
              "source_table": "wmf.geoeditors_monthly",
              "geoeditor_metrics": "gdi.geoeditor_input_metrics",
              "country_data": "gdi.country_meta_data",
              "destination_table": "gdi.geoeditor_online_input_metrics",
              "year": "-"
            }
          }
        ],
        "readership_config": [
          {
            "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/georeadership_input_metrics_pageviews_yearly.hql",
            "pos": 1,
            "query_parameters": {
              "source_table": "wmf.pageview_hourly",
              "destination_table": "gdi.georeadership_input_metrics",
              "metric": "pageviews"
             },
            "years": []
          },
          {
            "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/georeadership_input_metrics_unique_devices.hql",
            "pos": 2,
            "query_parameters": {
              "source_table": "wmf.unique_devices_per_domain_monthly",
              "destination_table": "gdi.georeadership_input_metrics",
              "metric": "unique_devices"
            },
            "years": []
          }
        ],
        "grants_leadership_config": [
          {
            "hql_path": "hdfs://analytics-hadoop/user/ntsako/gdi/equity_landscape/hql/grants_leadership_input_metrics.hql",
            "pos": 1,
            "query_parameters": {
              "source_table": "gdi.grants_input_metrics",
              "destination_table": "gdi.grants_leadership_input_metrics"
            },
            "years": []
          }
        ],
        "population_config": [
          {
            "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/population_leadership_input_metrics.hql",
            "pos": 1,
            "query_parameters": {
              "source_table": "gdi.population_data_input_metrics",
              "metrics_table": "gdi.geoeditor_online_input_metrics",
              "country_data": "gdi.country_meta_data",
              "pivot_table": "gdi.geoeditor_input_metrics_pivot",
              "destination_table": "gdi.population_leadership_input_metrics",
              "year": "-"
            }
          }
        ]
       
       }
      

APP DAG

  • The second and final part of data transformations which comprises of two portions, the inputs and outputs.
  • We use a job_config dictionary to define the tasks and their properties.
  • This DAG is responsible for the hdfs outputs which are the final output metrics and dashboard files.
  • For the inputs we currently have:
    • access_input_metrics
    • affiliate_input_metrics
    • programs_and_events_input_metrics
    • freedom_leadership_input_metrics
  • For the outputs we currently have:
    • input_metrics
    • regional_input_metrics
    • output_metrics
    • dashboard_output
    • languages_output
  • dashboard_output and languages_output are tasks that generate files to be used by the dashboard.
  • Example DAG

Image of APP DAG graph

  •   {
       "conda_env": "https://gitlab.wikimedia.org/repos/gdi/equity-landscape/gdi-source/-/package_files/1146/download",
        "partition_year": 2022,
        "jobs_config":
        {
          "access_input_metrics":
            {
              "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/access_input_metrics.py",
              "args":
                {
                  "--schema": "gdi",
                  "--csv-path": "hdfs://analytics-hadoop/wmf/data/raw/gdi/equity_landscape/csv/MCI_Data_2022.csv",
                  "--world-bank-year": 2022,
                  "--score-year": 2021,
                  "--spi-year": 2022
                }
            },
          "affiliate_input_metrics":
            {
              "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/affiliate_input_metrics.py",
              "args": { "--schema": "gdi" }
            },
          "input_metrics":
            {
              "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/input_metrics.py",
              "args":
                {
                  "--schema": "gdi",
                  "--output-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/input_metrics"
                }
            },
          "regional_input_metrics":
            {
              "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/regional_input_metrics.py",
              "args":
                {
                  "--input-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/input_metrics",
                  "--output-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/regional_input_metrics"
                }
            },
          "output_metrics":
            {
              "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/output_metrics/output_metrics.py",
              "args":
                {
                  "--output-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/output_metrics",
                  "--input-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/regional_input_metrics"
                }
            }
        }
       }