Data Engineering/Data Quality

From Wikitech

This document documents the design and implementation of Data Quality (DQ) instrumentation stack for Wikimedia datasets.

Scope

The Data Quality instrumentation stack is based atop:

  • Amazon Deequ for metric generation and constraint verification (testing DQ).
  • Iceberg for centralizes storage metrics and generate alerts.
  • refinery-spark for Deequ-to-wikimedia converters. To APIs are provided to convert the output of Deequ to Wikimedia's data model, and persit it to Iceberg. Metrics and alerts storage are centralized in GA tables (e.g. all DQ jobs results share the same data model and schema.
  • Airflow for job orchestration and alert notification
  • Superset for data presentation.
  • Analytics alert inbox as our "alert platform"

Use Cases

As a first use case we implemented DQ instrumentation for the webrequest pipeline.

The instrumentation is built atop Amazon Deequ and refinery-spark tooling. Data Quality is performed via a refinery-job side cart Spark application, orchestrated by an Airflow dag.

References:

Things you should know

Known limitation, in scope for future work (see Phab backlog):

  • No python support yet.
  • Only Deequ numeric valued metrics can be persisted (e.g. no distributions / complex types).
  • Stateful metrics require handlign persistance of a deequ repo to HDFS.

Architecture & APIs

We can model quality checks as function composition of data transformations. An airflow pipeline P is a function that generates some Dataset.

Dataset can be passed to a Summary Statistic function that performs user defined post-processing. Its output is used to feed Dashboards (e.g. Superset) and forwarded to a Quality Check function that implements decision making logic wrt Dataset “quality”. Downstream Consumers can poll a Readiness Marker (e.g. _SUCCESS flag) that will block on bad quality.

Figure 1. Data Quality lifecycle (concept)

Data Quality: Lifecycle

Wikimedia airflow operators are thin layers that schedule Spark job. Support is provided for SQL, Scala and Python jobs. By design, operators must not contain any business logic. Data processing and quality checks will be delegated to a scheduled job.Standard Spark and HDFS Airflow operators from airflow_common can be used to orchestrate DQ jobs.

These operators can then be deployed as:

  1. As steps in primary pipelines.
  2. As sidecar jobs, triggered after the primary pipeline successfully terminates.

The two approaches are not mutually exclusive.

Data checks as sidecar jobs

Assuming refine_webrequest is the pipeline we want to instrument,  we can plug a post_check quality operators that generates statistics as follow:

[...]

       post_check = SparkSqlOperator(

           task_id="refine_webrequest_check_output",

           sql=var_props.get(

               "hql_gitlab_raw_path",

               "https://gitlab.wikimedia.org/-/snippets/100/raw/main/summarize.hql"

           ),

           query_parameters={

               "table": refined_webrequest_table,

               "year": "{{ data_interval_start.year }}",

               "month": "{{ data_interval_start.month }}",

               "day": "{{ data_interval_start.day }}",

               "hour": "{{ data_interval_start.hour }}"

           },

           launcher="skein",

       )

     quality_check = SparkOperator(

           task_id="refine_webrequest_check_output",

           jar="/path/to/quality.jar",

           config=...

           launcher="skein",

       )

[...]

       refine_webrequest >> post_check >> quality_check >> mark_done

The post_check hql could store its output in hdfs (the one linked doesn't). Additional operator could store the output of post_check a dashboarding system, and trigger a quality_check job.

Pros

  • We can extend existing pipelines and add instrumentation.
  • Easier to reason about backfills?

Cons

  • Requires modifying codebases. Clients must be able to update their pipeline if upstream dependencies change.
  • Failure / lateness of data quality will impact primary SLOs

Data checks as steps in primary pipelines

Quality checks will be implemented as secondary (sidecar) pipeline, that will trigger upon completion of the primary one. We can reuse the same  post_check and quality_check as previously defined.

We can imagine a contract where:

  • a user defines their dag.
  • a user programmatically defines a set of dataset specific metrics via an hql or python logic (that we can template) they care about.
  • under the hood, an data monitoring dag is instantiated and scheduled at each run.

The API boundary for a client would be implementing the semantics of "metrics" programmatically (hql, conda env, jar); airflow "platform" would just provide a harness for managing the lifecycle of said implementation. https://docs.astronomer.io/learn/airflow-datasets could help enforcing boundaries.

In this scenario we can provide default (basic) metrics generation, yet allow for user provided (=dataset specific) metrics reporting logic to be executed. Possibly we could present an API that follows a strategy pattern to blend default and custom checks.

Pros

  • Quality checks are orthogonal to primary pipelines.
  • We can roll out quality checks without impacting existing codecases.

Cons

  • We will need to maintain pipelines for clients.
  • Failure / lateness of data quality will impact primary SLOs. But we can trade data quality checks for delivery speed.

Summary of the approaches

Both approaches are viable. For webrequest we opted for the sidecar approach. This allows to instrument the pipeline without interrupting production flow. See https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/analytics/dags/webrequest/refine_webrequest_analyzer_hourly_dag.py?ref_type=heads for implementation details.

Dataset staging would require refactorign of exising pipeline codes / best practices (readynessn notification), that is orthogonal (and beyong) the scope of this work.

Data Quality: Implementation

DQ implemented with Amazon Deequ. We piggy back on the following features.

  • Metrics Computation: to analyze data and track key metrics.
  • Constraint Verification: to generat alerts when DQ checks fail.

Key concepts of the Deequ framework can be found in their VLDB paper: Automating Large-Scale Data Quality Verification.

refinery-source helper classes will be the public, generic, API boundary for Data Quality metric reporting. Pipeline implementers must use deequ to define their metrics and constraint checks, and refinery-source to uniformly instrument pipelines, store metrics and generate alerts.

We rely on existing Wikimedia Airflow operators to orchestrate data quality operation

Two public classes are provide in refinery-source that take deequ repositories as input and:

  • DeequAnalyzersToDataQualityMetrics persists metrics into a globally available wmf_data_ops.data_quality_metrics iceberg table.
  • DeequVerificationSuiteToDataQualityAlerts persists alerts (constraint check results) into a globally available wmf_data_ops.data_quality_alerts iceberg table, and generates an alert report in HDFS.

Both classes implement a writer interface to persist data in Iceberg. For an example of what his workflow looks like, see the Webrequest instrumentation pipeline MR.

Future iteration could be benefit from a DSL or integration with a dataset configuration system, to automate generation of simple metrics.

Implementation: Data Model

Follow up from https://phabricator.wikimedia.org/T349763#9346763 We built the Wikimedia's Data Quality data model atop the schema implemented by Deequ (Table 1). This data model is transformed and enriched by the DeequAnalyzersToDataQualityMetrics (and DeequVerificationSuiteToDataQualityAlerts) SerDe, mapping it to the model depicted in Table 2.

entity instance name value dataset_time date hour metrics_type
Dataset * Size 111.0 1700491167431 2023-11-07 00 WebrequestsSequenceValidation
Column difference Minimum 0.0 1700491167431 2023-11-07 00 WebrequestsSequenceValidation
Column user_agent ApproxCountDistinct 679719.0 1700489237985 2023-11-07 01 WebrequestsSummaryStats

Table 1: dataset metric table (webrequest). Generated by the stock deequ DataFrame deserializer.

Deequ provides the concept of a ResultKey with a timestamp for metric insertion into a repository as well as a map of user defined tags. For example:

val countsResultKey = ResultKey(

    System.currentTimeMillis(), // maps to dataset_time

    Map("source_table" -> params.sourceTable,

"metric_type" -> WebrequestSummaryStats,

"year" -> params.year, // read from opt

"month" -> params.month, // read from opt

"day" -> params.day, // read from opt

"hour" -> params.hour // read from opt)

In our first PoC we’ve used tags that intuitively make sense for the target dataset ( webrequest ). These provide partition info (year, month, day, hour) and dataset information (source_table and type of the metric being generated).

When using default Deequ SerDes, ResultKey is exploded into columns. This behaviour breaks assumptions on a centralized metric schema. Moving forward we might want to store ResultKey tags as a complex field in each metric row, and only project “generic” column. A minimal set of columns could be:

  • dataset_time: timestamp of metric insertion into the repository
  • source_table:  table this metric is computed on (alternative naming: dataset? dataset table?)
  • pipeline_run_id: identifier of the pipeline that generated the metric (could be an airflow dag, Spark app id, or other orchestrator identifier).
  • partition_id : identifier of the partition the metric is computed on (/webrequest_source=text/year=2023/month=11/day=7/hour=0) )
  • partition_ts: temporal information, derived from partition_id  (e.g the partition date time “2023-11-07T00:00:00Z” maps to 1699315200 epoch time)

For example metrics from Table 1, could be stored as follows:

entity instance name value dataset_time source_table pipeline_run_id tags partition_id partition_ts
Dataset * Size 111.0 1700491167431 webrequest 1234567abcdefg {“metric_type”: “WebrequestsSequenceValidation”} webrequest_source=text/year=2023/month=11/day=7/hour=0 1699315200

Table 2: Wikimedia Data Quality metrics schema.

A sample dataset is available in superset at gmodena.webrequest_metrics_v2.

Wikimedia SerDes could be an API boundary for dag implementers. Clients would be responsible for managing a deequ repository (with guidelines), and eventual data preparation steps. Wikimedia SerDe would be responsible for writing data in the target format in a global HDFS / Iceberg table.

Implementation: Deequ Metrics repository

We use Amazon Deequ to compute metrics on input datasets and declare data quality checks.

Metrics are calculated at each pipeline run, and stored into Deequ Metrics repository in Json format. We provide a SerDe that maps Deequ metrics into the Wikimedia Data Quality metrics schema depicted in table 2.

Metrics Repositories can be stored  in memory or persisted to HDFS. The latter is required to update stateful metrics (e.g. moving averages over previously computed metrics).

It would be trivial to implement a Wikimedia Data Quality metrics  to Deequ Metrics repository converter, effectively using the Wikimedia table as a Deequ repository backend.

Known limitations

In scope for future work:

  • No python support yet.
  • Only Deequ Double valued metrics can be persisted.
  • Stateful metrics require persisting a deequ repo to HDFS.

Implementation: alerting

Alerts are built atop Deequ's Verification suite. They are declared using its Constraint Verification DSL and converted to Wikimedia's data model via a DeequVerificationSuiteToDataQualityAlerts utility class. This is analogous to the SerDe we put in place to map metrics to a common Iceberg data model.

Developers need only to implement constraints checking with standard Deequ syntax, and then export results with:

val alerts = DeequVerificationSuiteToDataQualityAlerts(webrequestConstraints ++ sequenceNumberConstraints,

           config.hive_partition,

           config.run_id)(spark)

// Persist alerts to iceberg.

alertsWriter.iceberg.save()

// Persist alerts as plain text in HDFS. These can be picked up by Airflow, and sent by email.

alertsWriter.text.save()

Airflow dags can be configured to send email alerts using Wikimedia's HdfsEmailOperator (e.g. the same logic used by the anomaly detection job).

References