Jump to content

Data Platform/Data Lake/Content/Mediawiki content history v1

From Wikitech
This table is currently under development, and NOT a production table. The current development table name is wmf_dumps.wikitext_raw_rc2.

wmf_dumps.mediawiki_content_history_v1 is a dataset available in the Data Lake that provides the full content of all revisions, past and present, from all Wikimedia wikis.

The 'main' content is stored as unparsed Wikitext, but Multi-Content Revisions content is also included. The schema of this table is similar to that of Mediawiki wikitext history. However, this table's source data comes primarily from event streams from the Event Platform, allowing us to update it on a daily basis.

Consuming this table will be different from snapshot-based tables like Mediawiki wikitext history. See FAQ below for details.

Schema

Note: This is an Iceberg dataset.

col_name data_type comment
page_id bigint id of the page
page_namespace int namespace of the page
page_title string title of the page
page_redirect_title string title of the redirected-to page
user_id bigint id of the user that made the revision; null if anonymous, zero if old system user, and -1 when deleted or malformed XML was imported
user_text string text of the user that made the revision (either username or IP)
user_is_visible boolean Whether the revision editor information is visible
revision_id bigint id of the revision
revision_parent_id bigint id of the parent revision
revision_timestamp timestamp timestamp of the revision
revision_is_minor_edit boolean whether this revision is a minor edit or not
revision_comment string Comment made with revision
revision_comment_is_visible boolean Whether the comment of the revision is visible
revision_sha1 string Nested SHA1 hash of hashes of all content slots. See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_sha1
revision_size bigint the sum of the content_size of all content slots'
revision_content_slots
MAP<
  STRING,
  STRUCT<
    content_body:   STRING,
    content_format: STRING,
    content_model:  STRING,
    content_sha1:   STRING,
    content_size:   BIGINT
  >
>
a MAP containing all the content slots associated to this revision. Typically just the "main" slot, but also "mediainfo" for commonswiki.
revision_content_is_visible boolean Whether the revision content body is visible
wiki_db string the wiki project
errors
ARRAY<
  STRUCT<
    error_upstream_id:        STRING,
    error_upstream_timestamp: TIMESTAMP,
    error_producer:           STRING,
    error_message:            STRING,
    error_severity:           STRING
  >
>
an ARRAY, typically empty, that will contain any detected ingestion errors. Consumers may want to skip rows with errors depending on their severity.
row_last_update timestamp the timestamp of the last content event or backfill that updated this row
row_visibility_last_update timestamp the timestamp of the last visibility event or backfill that updated this row
row_move_last_update timestamp the timestamp of the last move event or backfill that updated this row

Changes and known problems

Date Phab

Task

Details
2024-11-21 task T358877 Release candidate of table available under the name wmf_dumps.wikitext_raw_rc2. Reconcile mechanism is still under development. The table will suffer schema changes before production.

FAQ

How do I query this table if I am only interested in the wikitext?

This tables includes the content from all the slots of a revision as in the work done for Multi-Content Revisions. But if you are not interested in that, and would like to use this table to just access the 'main' wikitext, you can query like in the following example:

SELECT 
  revision_id,
  revision_content_slots['main'].content_body AS wikitext
FROM wmf_dumps.mediawiki_content_history_v1
WHERE wiki_db = 'simplewiki'
LIMIT 10

This table doesn't seem to have Hive partitions, such as the usual 'snapshot' column. How do I consume it?

This table indeed does not use a snapshot partition as with other tables such as Mediawiki wikitext history. We are using a table format called Iceberg. Instead of rewriting all data like we have done before, this technology allows us to update the content of the table in place, with the main benefit being updates to the table's content on a daily cadence.

If you are building a data pipeline, and you need to define an Airflow sensor to wait on this table's updates, instead of waiting on Hive partitions, you have to define a RestExternalTaskSensor like so:

from wmf_airflow_common.sensors.rest_external_task import RestExternalTaskSensor

sensor = RestExternalTaskSensor(
  external_instance_uri="http://an-launcher1002.eqiad.wmnet:8600",
  task_id="wait_for_dumps_merge_events_to_mediawiki_content_history_v1_daily_dag",
  external_dag_id="dumps_merge_events_to_mediawiki_content_history_v1_daily",
  check_existence=True,
)

Waiting on this sensor guarantees the following: we have ingested all the revision events from the last 24 hours, and we have ingested all the detected reconciliation events from the last 24 hours. Since the reconciliation process lags by 24 hours, this means that the last 24 hours of data may be missing updates. This is the fastest way to consume this table if your use case can tolerate eventually consistent data. If your use case requires more precision, you can use the sensor as above, but then always consume the data with a filter on the revision_timestamp column like so:

SELECT 
  *
FROM wmf_dumps.mediawiki_content_history_v1
WHERE wiki_db = 'simplewiki'
  AND revision_timestamp <= TIMESTAMP '{{ data_interval_start | to_dt() }}'
LIMIT 10

Where {{ data_interval_start | to_dt() }} is a Jinja template generated from your Airflow DAG.

What minimal Spark configuration do I need to fully read this table?

To be able to take advantage of Iceberg performance improvements, this table's chosen file format is PARQUET. This format has historically been problematic at WMF when wikitext content was included, as some pages do include very big content payloads. Thus we made some basic performance benchmarks to make sure that a Spark job with reasonable resources can fully read the table. We have found that a yarn-large configuration, as defined on the wmfdata helper library, is sufficient:

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
            "spark.driver.memory": "4g",
            "spark.dynamicAllocation.maxExecutors": 128,
            "spark.executor.memory": "8g",
            "spark.executor.cores": 4,
            "spark.sql.shuffle.partitions": 512
    }
)

Pipeline

  1. EventBus generates page_change and page_visibility events. Events are materialized into HDFS.
  2. Events are ingested daily into the table via a Spark job whose main job is a MERGE INTO statement.
  3. A consistency check is done between the last 24 hours of ingested data and the Analytics Replicas. Any inconsitentcies are kept at wmf_dumps.wikitext_inconsistent_rows.
  4. A Spark job then queries the Analytics Replicas for any revision that was found inconsistent in the last 24 hours. Reconcile page_change events are generated.
  5. Reconcile events from the last 24 hours shifted back 24 hours are ingested into the table via a a Spark job whose main job is a MERGE INTO statement.
  6. Every month, a full reconcile over all revisions is triggered to catch any historic inconsistencies.
  7. All of this is coordinated via a set of Airflow jobs.

The data is stored in Parquet, but tests where done to confirm that Spark executors with 1 core, 16GB RAM can read the entire table.