GDI/Equity Landscape/Airflow
Equity Landscape Airflow
All the Airflow DAGs for Equity Landscape can be found under the Data Engineering repo.
We currently have four types of DAGs for the project namely.
- equity_landscape_api_dag (API)
- equity_landscape_csv_dag (CSV)
- equity_landscape_hql_dag (HQL)
- equity_landscape_app_dag (APP)
- The DAGs are not scheduled, they are run on an ad-hoc basis.
- This is due to the pipeline pooling data from various external sources, some of which their update schedules cannot be determined.
DAG execution
- The order of execution is:
- API and CSV can run independently.
- HQL needs to run after both API and CSV have run.
- APP can only run after HQL is run.
- Ideal location for all data that needs to be stored onto HDFS via API calls.
- Uses a job_config property to specify a task and it's parameters.
- The job_config property consists of:
- task_name (dict) which contains the below properties
- application : The python program to execute (string)
- args: The arguments that the python program expects (dict)
- Example DAG variable
"jobs_config": { "load_world_bank_data": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/loaders/", "args": {"--schema": "gdi"}, "loop_args": [ {"--db": 2, "--series": "IT.NET.USER.ZS"}, {"--db": 2, "--series": "IT.CEL.SETS.P2"}, {"--db": 2, "--series": "SP.POP.TOTL"}, {"--db": 2, "--series": "NY.GDP.PCAP.PP.CD"}, {"--db": 2, "--series": "NY.GDP.PCAP.PP.KD"}, {"--db": 2, "--series": "SP.POP.GROW"}, {"--db": 2, "--series": "SP.POP.TOTL.FE.IN"}, {"--db": 2, "--series": "PA.NUS.PPP"}, {"--db": 2, "--series": "PA.NUS.PPPC.RF"}, {"--db": 2, "--series": "PA.NUS.FCRF"}, {"--db": 2, "--series": "PA.NUS.ATLS"}, {"--db": 2, "--series": "FP.CPI.TOTL.ZG"}, {"--db": 3, "--series": "CC.EST"}, ] } }
- The job_config property consists of:
- The DAG consists of the following tasks:
- load_lua (currently not in use and is removed from the code but it's there should it be required)
- This uses load_lua_data from the project repo to load data into organizational_info_input_metrics. We aren't using this data currently as it has not matured enough to be used to be used as an alternative for Affiliate data.
"jobs_config": { "load_lua_data": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/loaders/", "args": {"--schema": "gdi", "--current-year": 2021, "--end-year": 2021} } }
- load_world_bank_data
- This uses the load_wbgapi from the project repo
- It takes in the db number and series as arguments. Uses the world bank python package to fetch the data and load them into Hive (world_bank_data_input_metrics)
- This uses the load_wbgapi from the project repo
- load_lua (currently not in use and is removed from the code but it's there should it be required)
- Responsible for loading CSV files into the respective Hive Tables.
- This also uses a job_config dictionary that consists of the following properties:
- A list of dictionaries that consist of:
- table_name - name of the Hive table to insert into
- file_location - filename of the csv file (in the DAG, this value is concatenated to base_csv_location variable that can also be specified as part of the DAG variables.
- There is an optional args dictionary that is used to specify additional parameters to load_csv
- Example DAG variable
{ "schema": "gdi", "base_csv_location": "hdfs://analytics-hadoop/wmf/data/raw/gdi/equity_landscape/csv", "conda_env":"", "csv_app": "lib/python3.7/site-packages/gdi_source/equity_landscape/loaders/", "partition_year": 2022, "jobs_config": [ { "table_name": "country_meta_data", "file_location": "country_meta_data.csv" }, { "table_name": "population_data_input_metrics", "file_location": "population_data_2020.csv", "args": { "--partition_columns": "year='2020'"} } ] }
DAG graph
- This is responsible for the first part of data transformations that uses HQL files to load data into Hive tables.
- The files are stored on the refinery repo.
- The DAG uses TaskGroups to define the metrics each set of HQL files represent.
- geometrics
- editorship
- Includes the following tasks:
- geoeditor_input_metrics
- Takes in data from canonical_data.wikis and wiki_db_map_input_metrics that we loaded using the CSV DAG and lastly wmf.geoeditor_monthly to load the data into geoeditor_input_metrics.
- It is executed twice (has the years: [] config and uses the a_year_ago and two_years_ago flag)
- geoeditor_input_metrics_pivot
- Takes in data from geoeditor_input_metrics and pivots it.
- geoeditor_online_input_metrics
- Used for aggregating data from wmf.geoeditors_monthly and country_meta_data to calculate active editor data.
- brief_projects_edited_metrics
- Calculates the average active editors per wiki_db, project_label by country.
- geoeditor_input_metrics
- Includes the following tasks:
- readership
- georeadership_input_metrics
- Used for storing the following partitions:
- pageviews
- unique_devices
- yoy_metrics for the above metrics.
- Used for storing the following partitions:
- georeadership_input_metrics
- grants_leadership
- grants_leadership_input_metrics
- Takes in data from grants_input_metrics and calculates various grants_metrics including total_historical_grants_to_data, total_calendar_year_grants amongst others.
- grants_leadership_input_metrics
- population
- geometrics
- Each metric is represented by a dictionary that contains a list of HQL tasks and properties.
- Each HQL task contains it's own properties, namely:
- pos used for ordering each task in the DAG list to ensure that it runs before/after another task.
- hql_path an hdfs location specifiying the location of the hql file.
- query_parameters the parameters passed into the hql file.
- Note: year='-' is replaced by the a_year_ago parameter from the DAG's variables.
- The years: [] property that uses the a_year_ago and two_years_ago parameters to create a range of year that the HQL task needs to run for.
- Each HQL task contains it's own properties, namely:
- Example DAG variable
{ "schema": "gdi", "a_year_ago": 2022, "two_years_ago": 2021, "editorship_config": [ { "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/geoeditor_input_metrics.hql", "pos": 0, "query_parameters": { "source_table": "wmf.geoeditors_monthly", "map_table": "gdi.wiki_db_map_input_metrics", "canonical_wiki": "canonical_data.wikis", "destination_table": "gdi.geoeditor_input_metrics" }, "years": [] }, { "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/geoeditor_input_metrics_pivot_monthly_bins.hql", "pos": 1, "query_parameters": { "source_table": "gdi.geoeditor_input_metrics", "map_table": "gdi.wiki_db_map_input_metrics", "destination_table": "gdi.geoeditor_input_metrics_pivot", "metric": "monthly_bins" }, "years": [] }, { "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/geoeditor_online_input_metrics.hql", "pos": 2, "query_parameters": { "source_table": "wmf.geoeditors_monthly", "geoeditor_metrics": "gdi.geoeditor_input_metrics", "country_data": "gdi.country_meta_data", "destination_table": "gdi.geoeditor_online_input_metrics", "year": "-" } } ], "readership_config": [ { "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/georeadership_input_metrics_pageviews_yearly.hql", "pos": 1, "query_parameters": { "source_table": "wmf.pageview_hourly", "destination_table": "gdi.georeadership_input_metrics", "metric": "pageviews" }, "years": [] }, { "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/georeadership_input_metrics_unique_devices.hql", "pos": 2, "query_parameters": { "source_table": "wmf.unique_devices_per_domain_monthly", "destination_table": "gdi.georeadership_input_metrics", "metric": "unique_devices" }, "years": [] } ], "grants_leadership_config": [ { "hql_path": "hdfs://analytics-hadoop/user/ntsako/gdi/equity_landscape/hql/grants_leadership_input_metrics.hql", "pos": 1, "query_parameters": { "source_table": "gdi.grants_input_metrics", "destination_table": "gdi.grants_leadership_input_metrics" }, "years": [] } ], "population_config": [ { "hql_path": "hdfs://analytics-hadoop/wmf/refinery/current/hql/gdi/equity_landscape/population_leadership_input_metrics.hql", "pos": 1, "query_parameters": { "source_table": "gdi.population_data_input_metrics", "metrics_table": "gdi.geoeditor_online_input_metrics", "country_data": "gdi.country_meta_data", "pivot_table": "gdi.geoeditor_input_metrics_pivot", "destination_table": "gdi.population_leadership_input_metrics", "year": "-" } } ] }
- The second and final part of data transformations which comprises of two portions, the inputs and outputs.
- We use a job_config dictionary to define the tasks and their properties.
- This DAG is responsible for the hdfs outputs which are the final output metrics and dashboard files.
- For the inputs we currently have:
- access_input_metrics
- affiliate_input_metrics
- programs_and_events_input_metrics
- freedom_leadership_input_metrics
- For the outputs we currently have:
- input_metrics
- regional_input_metrics
- output_metrics
- dashboard_output
- languages_output
- dashboard_output and languages_output are tasks that generate files to be used by the dashboard.
- Example DAG
{ "conda_env": "", "partition_year": 2022, "jobs_config": { "access_input_metrics": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/", "args": { "--schema": "gdi", "--csv-path": "hdfs://analytics-hadoop/wmf/data/raw/gdi/equity_landscape/csv/MCI_Data_2022.csv", "--world-bank-year": 2022, "--score-year": 2021, "--spi-year": 2022 } }, "affiliate_input_metrics": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/", "args": { "--schema": "gdi" } }, "input_metrics": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/", "args": { "--schema": "gdi", "--output-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/input_metrics" } }, "regional_input_metrics": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/input_metrics/", "args": { "--input-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/input_metrics", "--output-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/regional_input_metrics" } }, "output_metrics": { "application": "lib/python3.7/site-packages/gdi_source/equity_landscape/output_metrics/", "args": { "--output-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/output_metrics", "--input-path": "hdfs://analytics-hadoop//wmf/data/gdi/equity_landscape/output/regional_input_metrics" } } } }