Data Platform/Systems/Iceberg
Apache Iceberg adds a layer between Hadoop SQL engines and filesystems like HDFS and Ceph so that the semantics of data read and write are more familiar to users of relational databases. Iceberg implements a snapshots mechanism so that writes don’t effect reads and query planing computations are done locally without depending on external catalogs or walking file systems trees. Iceberg also supports 'hidden partitioning' effectively decoupling physical data layout from queries.
Iceberg is a library, not a service. This means that systems that want to read and write Iceberg tables must include Iceberg's library and some configuration to support it.
In our infrastructure, query engines that support Iceberg will continue to depend on the Hive Metastore to find tables in the datalake and to leverage the Metastore's locking mechanism for atomic writes.
Query Engine Support
| Engine | Version | Iceberg Read Support | Iceberg Write Support | Iceberg version | Comments |
|---|---|---|---|---|---|
| Spark | 3.1.2 | Yes | Yes | 1.2.1 | |
| Spark | 3.3.2+ | Yes | Yes | 1.2.1+ | We have limited Spark 3.3+ support for PySpark jobs. These jobs can choose to use Iceberg 1.2.1 up to 1.6.1. |
| Presto | 0.288.1 | Yes | No | 1.5.0 | Presto embeds its own Iceberg library. At WMF, Presto is setup for read only. Version 1.5.0 is backwards read compatible with 1.2.1. |
| Hive | 2.3.6 | No | No | N/A | Although we will be using the Hive Metastore for cataloging purposes (i.e. to discover tables), we do not intent to support querying in Hive. |
| Flink (Search) | 1.16 | Yes | Yes | Not installed | The search team currently runs Flink 1.16. This version is currently supported by Iceberg recent releases. We have not installed the Iceberg jars on their cluster yet. |
| Flink (Events) | 1.16,
1.17 |
Yes | Yes | Not installed | The events team currently runs Flink 1.16 and 1.17. These versions are currently supported by Iceberg recent releases. We have not installed the Iceberg jars on their cluster yet. |
Accessing metadata tables
As of May 2024, we run Spark < 3.2, which does not support accessing metadata "sub-tables" using SQL. You can work around this by using Spark's DataFrameReader API.
For example, this will not work:
import wmfdata as wmf
# Raises `AnalysisException: The namespace in session catalog must have
# exactly one name part`
wmf.spark.run("""
SELECT *
FROM wmf_traffic.referrer_daily.files
""")
However, this will work:
import wmfdata as wmf
spark = wmf.spark.create_session()
(
spark
.read
.format("iceberg")
.load("wmf_traffic.referrer_daily.files")
.toPandas()
)
Table Maintenance
For best performance, Iceberg tables require periodic table maintenance. For example, think of an Iceberg table for temporal data that gets hourly writes. These writes could be producing, say, ~4MB files. Over time, such a write pattern can become the small files problem. To ameliorate this, we can partition the table via the Iceberg transform months(timestamp). If we couple that partitioning strategy with monthly maintenance, we can rewrite the 2,280 ~4MB files into a couple ~128MB files for each month. This rewrite can be done by calling Iceberg's rewrite_data_files() procedure.
Atomic data rewrite is possible because each write to an Iceberg table creates its own snapshot. But creating snapshots also needs to be controlled somehow. Going back to the hourly writes example, writing hourly over a month will create 2280 snapshots. Over a year that would grow to ~27,000. Removing old snapshots (without removing the data) can be done by calling Iceberg's expire_snapshots() procedure.
These are just two examples. There are more maintenance tasks that we want to do periodically, and that we describe below.
Maintenance by configuration
To avoid having to manually do the maintenance, we built a mechanism to automate it (T338065, T373693). To enable automated maintenance, a user has to declare their Iceberg table in our global datasets.yaml file.
To get the default maintenance, all that is needed is to declare your table like so:
iceberg_wmf_unique_devices_per_domain_daily:
datastore: iceberg
table_name: wmf_readership.unique_devices_per_domain_daily
maintenance:
enabled: True
produced_by:
airflow:
instance: main
dag_id: unique_devices_per_domain_daily
With this declaration, the table wmf_readership.unique_devices_per_domain_daily gets the following monthly maintenance tasks:
remove_orphan_files(): This removes any data files that are not tied to any Iceberg snapshot. These orphan files can occur when writes fail and leave files behind. More info at: https://iceberg.apache.org/docs/1.6.1/spark-procedures/#remove_orphan_filesexpire_snapshots(): Iceberg keeps an immutable copy of the state of the table for each commit, so that we can 'go back in time'. This procedure removes older snapshots and thus older files that are not needed anymore. More info at: https://iceberg.apache.org/docs/1.6.1/spark-procedures/#expire_snapshotsrewrite_manifests(): Over time, Iceberg tables may accumulate many manifest files that can slow down query planning. This procedure rewrites them so that reading is more efficient. More info at https://iceberg.apache.org/docs/1.6.1/spark-procedures/#rewrite_manifestsrewrite_data_files(): Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs. This procedure rewrites the files trying to honor the table's settings.
Some iceberg tables will require weekly or even daily maintenance rather than monthly. Some others may need the default Spark resources for maintenance tasks bumped, or even periodic data deletion. All these use cases are configurable as well, and you can find examples of all these more complex scenarios by perusing the datasets.yaml file.
How is this implemented?
An Airflow DAG Factory knows how to create 3 DAGs, each for a different schedule: daily, weekly, or monthly. Each one of our Airflow instances then declares a simple file that calls the DAG factory. Each of the created DAGs then walks the datasets.yaml file looking for iceberg datasets that match the cadence and the produced_by shown in the example above. For the datasets that match, the maintenance tasks, which run as Spark jobs, are scheduled. Note that this must be run on each individual Airflow instance since they all create assets that have different service users.
Migration from Hive Tables
All new tables created in the datalake should be Iceberg tables. On a case by case basis, we may migrate supported datasets currently under the wmf Hive database to Iceberg. In the event of a migration, we intend to keep both versions (old Hive table and new Iceberg tables) working in production for a reasonable time.
What to expect in terms of querying
Iceberg supports 'hidden partitioning', and we will leverage this mechanism so that we can evolve tables while being less disruptive to users. This means that the schema of the tables will change, with most of that change coming from the way we partition the data. Here is an example for the wmf.referrer_daily table. In its Hive version, we have the following schema:
CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily` (
`country` string COMMENT 'Reader country per IP geolocation',
`lang` string COMMENT 'Wikipedia language -- e.g., en for English',
`browser_family` string COMMENT 'Browser family from user-agent',
`os_family` string COMMENT 'OS family from user-agent',
`search_engine` string COMMENT 'One of ~20 standard search engines (e.g., Google)',
`num_referrals` int COMMENT 'Number of pageviews from the referral source'
)
PARTITIONED BY (
`year` int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day` int COMMENT 'Unpadded day of request'
)
STORED AS PARQUET
LOCATION '/wmf/data/wmf/referrer/daily'
;
The respective Iceberg schema is as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily`(
`country` string COMMENT 'Reader country per IP geolocation',
`lang` string COMMENT 'Wikipedia language -- e.g., en for English',
`browser_family` string COMMENT 'Browser family from user-agent',
`os_family` string COMMENT 'OS family from user-agent',
`search_engine` string COMMENT 'One of ~20 standard search engines (e.g., Google)',
`num_referrals` int COMMENT 'Number of pageviews from the referral source',
`day` date COMMENT 'The date of the request'
)
USING ICEBERG
PARTITIONED BY (months(day))
LOCATION '/wmf/data/wmf_traffic/referrer/daily'
;
Notice the following:
- In the Hive table, partitioning is explicit, and
year,month, anddaybecomeINTcolumns that you must include in your queries. For example, you may do:SELECT * FROM wmf.referrer_daily WHERE year = 2020 AND month = 5 and day = 1;. If we ever changed the partitioning strategy, your queries would have to change accordingly. - In the Iceberg table the partitioning references the existing
daycolumn, which is of typeDATE. Thus queries are like so:SELECT * FROM wmf_traffic.referrer_daily WHERE day = '2020-05-01';. Since the partitioning refers to existing columns, if we ever changed the partitioning strategy, your queries will only need to change if you wanted to pickup the performance gains. Otherwise your queries will continue working. This mechanism also allows Data Engineering to better control the amount of files generated by any single table, which when unchecked, can be problematic.
Changes to database names
As part of the Iceberg migration, we are also taking the opportunity to functionally decompose the 56 tables currently on the wmf database into distinct databases that hopefully will help data discovery. Please note that these proposed databases should only contain Iceberg tables, given that Presto has a limitation where it is unable to query both Iceberg and Hive tables in the same session.
After some discussions, we settled on the following.
| proposed db | table | comments |
|---|---|---|
| wmf_contributors | edit_hourly | |
| wmf_contributors | editors_daily | |
| wmf_contributors | geoeditors_blacklist_country | |
| wmf_contributors | geoeditors_edits_monthly | |
| wmf_contributors | geoeditors_monthly | |
| wmf_contributors | geoeditors_public_monthly | |
| wmf_contributors | unique_editors_by_country_monthly | |
| wmf_data_ops | data_quality_stats | meta about other tables |
| wmf_data_ops | hdfs_usage | HDFS file system usage. |
| wmf_experiments | iceberg_wikitext_content | |
| wmf_mediawiki | mediawiki_history | |
| wmf_mediawiki | mediawiki_history_archive | |
| wmf_mediawiki | mediawiki_history_reduced | |
| wmf_mediawiki | mediawiki_metrics | |
| wmf_mediawiki | mediawiki_page_history | |
| wmf_mediawiki | mediawiki_page_history_archive | |
| wmf_mediawiki | mediawiki_user_history | |
| wmf_mediawiki | mediawiki_user_history_archive | |
| wmf_mediawiki | mediawiki_wikitext_current | |
| wmf_mediawiki | mediawiki_wikitext_history | |
| wmf_readership | disallowed_cassandra_articles | |
| wmf_readership | pageview_actor | |
| wmf_readership | pageview_allowlist | |
| wmf_readership | pageview_dataloss_202112_202201 | |
| wmf_readership | pageview_historical | |
| wmf_readership | pageview_hourly | |
| wmf_readership | pageview_unexpected_values | |
| wmf_readership | projectcounts_all_sites | |
| wmf_readership | projectcounts_raw | |
| wmf_readership | projectview_hourly | |
| wmf_readership | session_length_daily | |
| wmf_readership | unique_devices_per_domain_daily | |
| wmf_readership | unique_devices_per_domain_monthly | |
| wmf_readership | unique_devices_per_project_family_daily | |
| wmf_readership | unique_devices_per_project_family_monthly | |
| wmf_readership | virtualpageview_hourly | |
| wmf_traffic | anomaly_detection | |
| wmf_traffic | aqs_hourly | |
| wmf_traffic | browser_general | |
| wmf_traffic | domain_abbrev_map | |
| wmf_traffic | interlanguage_navigation | |
| wmf_traffic | mediacounts | |
| wmf_traffic | mediarequest | |
| wmf_traffic | officewiki_webrequest_daily | |
| wmf_traffic | referrer_daily | |
| wmf_traffic | traffic_anomaly_checked_countries | unsure |
| wmf_traffic | webrequest | |
| wmf_traffic | webrequest_actor_label_hourly | |
| wmf_traffic | webrequest_actor_metrics_hourly | |
| wmf_traffic | webrequest_actor_metrics_rollup_hourly | |
| wmf_traffic | webrequest_subset_tags | |
| wmf_wikidata | wikidata_entity | |
| wmf_wikidata | wikidata_item_page_link | |
| mobilewebuiclicktracking_10742159_15423246 | appears to be deprecated, so will not be migrated. | |
| pagecounts_all_sites | appears to be deprecated, so will not be migrated. | |
| tmp_druid_load_virtualpageview_daily_20230427 | this is a vestigial temp. they should now be created under database `tmp` | |
| tmp_druid_load_virtualpageview_daily_20230428 | this is a vestigial temp. they should now be created under database `tmp` | |
| wdqs_extract | appears to be deprecated, so will not be migrated. |
It is a de-facto standard that data for "production" tables should be saved in /wmf/data directory of HDFS. However, there is no guideline on how that directory should be organized (task T367243). The file path for tables is typically broken down into more than one directory (for example, referrer_daily uses referrer/daily rather than referrer_daily).
Table migration check list
- [DOC] List dependencies of the migrated table [Datasets, Dashboards]. Using the new Iceberg table will incur the datasets pipelines sensors and queries to be updated, and the dashboards to be updated as well to match the new iceberg temporal partition mechanism.
- [CODE] Prepare a code-review with the new schema for the table. This new table will be extremely similar to the old, except for 1) a timestamp field with timestamp type 2) no more year/month/day/hour partitioning but a timestamp-based hidden partitioning (see previous section)
- [CODE] Prepare a code-review updating the original table's airflow job with a new step filling in the new Iceberg table.
- [CODE] Prepare a code-review with a one-off job to be run to back-fill the old data into the new table. This step is mostly need to fill-in the new timestamp field not available in original data in most tables.
- [CODE-NOT_YET] Prepare a code-review to add iceberg maintenance jobs to the new table. This task will be doable after T338065. For now we should manually run maintenance job regularly on existing iceberg tables.
- [OPS] Create the new iceberg table after step 2 has been reviewed and validated - Add the
Icebergtag to its datahub documentation. - [OPS] Start the airflow job updating the new table - After step 3 has been reviewed and validated
- [OPS] Back-fill the old data from the existing table - After step 4 has been reviewed and validated
- [CODE] Prepare code-reviews to update dependent datasets to use the new table
- [OPS] Make the dependent datasets use the new table - After step 9 has been reviewed and validated
- [COM] Communicate about the future deprecation of the old table, particularly mentioning impacted Dashboards (possibly flagging owners)
- [CODE] Update the Airflow job to not compute the old table
- [OPS] Stop the old table updates and drop the table - After step 12 has been validate.
- [DOC] Update DataHub, tag migrated table with 'iceberg' tag
- [OPS] Validate data in the new table after one two airflow runs - data errors might occur in the regular jobs even if back-fill went well (for instance data duplication).
Which tables are migrated?
| Hive Table Name | Iceberg Table Name | Dependencies | Created? | Backfilled? | Pipeline on Airflow? | Documented? | Original Table Deprecated | Comments |
|---|---|---|---|---|---|---|---|---|
| wmf.referrer_daily | wmf_traffic.referrer_daily | Yes | Yes | Yes | Yes | Yes | Yes | https://phabricator.wikimedia.org/T335305 |
| wmf.unique_devices_per_domain_daily | wmf_readership. unique_devices_per_domain_daily | Yes | Yes | Yes | Yes | No | https://phabricator.wikimedia.org/T347689 | |
| wmf.unique_devices_per_domain_monthly | wmf_readership.unique_devices_per_domain_monthly | (above) | Yes | Yes | Yes | No | ||
| wmf.unique_devices_per_project_family_daily | wmf_readership.unique_devices_per_project_family_daily | (above) | Yes | Yes | Yes | No | ||
| wmf.unique_devices_per_project_family_monthly | wmf_readership.unique_devices_per_project_family_monthly | (above) | Yes | Yes | Yes | No | ||