Data Platform/Data Lake/Content/Mediawiki content history v1
wmf_dumps.mediawiki_content_history_v1
is a dataset available in the Data Lake that provides the full content of all revisions, past and present, from all Wikimedia wikis.
The 'main' content is stored as unparsed Wikitext, but Multi-Content Revisions content is also included. The schema of this table is similar to that of Mediawiki wikitext history. However, this table's source data comes primarily from event streams from the Event Platform, allowing us to update it on a daily basis.
Consuming this table will be different from snapshot-based tables like Mediawiki wikitext history. See FAQ below for details.
Schema
Note: This is an Iceberg dataset.
col_name | data_type | comment |
---|---|---|
page_id | bigint | id of the page |
page_namespace | int | namespace of the page |
page_title | string | title of the page |
page_redirect_title | string | title of the redirected-to page |
user_id | bigint | id of the user that made the revision; null if anonymous, zero if old system user, and -1 when deleted or malformed XML was imported |
user_text | string | text of the user that made the revision (either username or IP) |
user_is_visible | boolean | Whether the revision editor information is visible |
revision_id | bigint | id of the revision |
revision_parent_id | bigint | id of the parent revision |
revision_timestamp | timestamp | timestamp of the revision |
revision_is_minor_edit | boolean | whether this revision is a minor edit or not |
revision_comment | string | Comment made with revision |
revision_comment_is_visible | boolean | Whether the comment of the revision is visible |
revision_sha1 | string | Nested SHA1 hash of hashes of all content slots. See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_sha1 |
revision_size | bigint | the sum of the content_size of all content slots' |
revision_content_slots | MAP<
STRING,
STRUCT<
content_body: STRING,
content_format: STRING,
content_model: STRING,
content_sha1: STRING,
content_size: BIGINT
>
>
|
a MAP containing all the content slots associated to this revision. Typically just the "main" slot, but also "mediainfo" for commonswiki. |
revision_content_is_visible | boolean | Whether the revision content body is visible |
wiki_db | string | the wiki project |
errors | ARRAY<
STRUCT<
error_upstream_id: STRING,
error_upstream_timestamp: TIMESTAMP,
error_producer: STRING,
error_message: STRING,
error_severity: STRING
>
>
|
an ARRAY, typically empty, that will contain any detected ingestion errors. Consumers may want to skip rows with errors depending on their severity. |
row_last_update | timestamp | the timestamp of the last content event or backfill that updated this row |
row_visibility_last_update | timestamp | the timestamp of the last visibility event or backfill that updated this row |
row_move_last_update | timestamp | the timestamp of the last move event or backfill that updated this row |
Changes and known problems
Date | Phab
Task |
Details |
---|---|---|
2024-11-21 | task T358877 | Release candidate of table available under the name wmf_dumps.wikitext_raw_rc2 . Reconcile mechanism is still under development. The table will suffer schema changes before production.
|
FAQ
How do I query this table if I am only interested in the wikitext?
This tables includes the content from all the slots of a revision as in the work done for Multi-Content Revisions. But if you are not interested in that, and would like to use this table to just access the 'main' wikitext, you can query like in the following example:
SELECT
revision_id,
revision_content_slots['main'].content_body AS wikitext
FROM wmf_dumps.mediawiki_content_history_v1
WHERE wiki_db = 'simplewiki'
LIMIT 10
This table doesn't seem to have Hive partitions, such as the usual 'snapshot' column. How do I consume it?
This table indeed does not use a snapshot
partition as with other tables such as Mediawiki wikitext history. We are using a table format called Iceberg. Instead of rewriting all data like we have done before, this technology allows us to update the content of the table in place, with the main benefit being updates to the table's content on a daily cadence.
If you are building a data pipeline, and you need to define an Airflow sensor to wait on this table's updates, instead of waiting on Hive partitions, you have to define a RestExternalTaskSensor
like so:
from wmf_airflow_common.sensors.rest_external_task import RestExternalTaskSensor
sensor = RestExternalTaskSensor(
external_instance_uri="http://an-launcher1002.eqiad.wmnet:8600",
task_id="wait_for_dumps_merge_events_to_mediawiki_content_history_v1_daily_dag",
external_dag_id="dumps_merge_events_to_mediawiki_content_history_v1_daily",
check_existence=True,
)
Waiting on this sensor guarantees the following: we have ingested all the revision events from the last 24 hours, and we have ingested all the detected reconciliation events from the last 24 hours. Since the reconciliation process lags by 24 hours, this means that the last 24 hours of data may be missing updates. This is the fastest way to consume this table if your use case can tolerate eventually consistent data.
If your use case requires more precision, you can use the sensor as above, but then always consume the data with a filter on the revision_timestamp
column like so:
SELECT
*
FROM wmf_dumps.mediawiki_content_history_v1
WHERE wiki_db = 'simplewiki'
AND revision_timestamp <= TIMESTAMP '{{ data_interval_start | to_dt() }}'
LIMIT 10
Where {{ data_interval_start | to_dt() }}
is a Jinja template generated from your Airflow DAG.
What minimal Spark configuration do I need to fully read this table?
To be able to take advantage of Iceberg performance improvements, this table's chosen file format is PARQUET. This format has historically been problematic at WMF when wikitext content was included, as some pages do include very big content payloads. Thus we made some basic performance benchmarks to make sure that a Spark job with reasonable resources can fully read the table. We have found that a yarn-large
configuration, as defined on the wmfdata helper library, is sufficient:
spark = wmfdata.spark.create_custom_session(
master='yarn',
spark_config={
"spark.driver.memory": "4g",
"spark.dynamicAllocation.maxExecutors": 128,
"spark.executor.memory": "8g",
"spark.executor.cores": 4,
"spark.sql.shuffle.partitions": 512
}
)
Pipeline
- EventBus generates page_change and page_visibility events. Events are materialized into HDFS.
- Events are ingested daily into the table via a Spark job whose main job is a MERGE INTO statement.
- A consistency check is done between the last 24 hours of ingested data and the Analytics Replicas. Any inconsitentcies are kept at
wmf_dumps.wikitext_inconsistent_rows
. - A Spark job then queries the Analytics Replicas for any revision that was found inconsistent in the last 24 hours. Reconcile page_change events are generated.
- Reconcile events from the last 24 hours shifted back 24 hours are ingested into the table via a a Spark job whose main job is a MERGE INTO statement.
- Every month, a full reconcile over all revisions is triggered to catch any historic inconsistencies.
- All of this is coordinated via a set of Airflow jobs.
The data is stored in Parquet, but tests where done to confirm that Spark executors with 1 core, 16GB RAM can read the entire table.