Analytics/Cluster/Revision augmentation and denormalization

From Wikitech

This process is the third step of the Data Lake pipeline.

After this step, a fully denormalized version of users, pages and revisions history will be available for queries in a single folder / hive table on Hadoop. It also means it'll be available for loading into the serving layer(s), like druid. The data sources used to build this denormalized table are: the revision and archive tables from the MediaWiki databases loaded at Data Loading, plus the page and user histories reconstructed in previous step. The resulting dataset documentation can be found in Analytics/Data Lake/Schemas/Mediawiki history.

Main functionalities of this step

  • Compute values that need complex joins, like revert information, byte-diff and delete time.
  • Historify fields that change in time, for example: page title, page namespace, user name, user groups, etc. A field is considered "historical" when, i.e. in the case of the page title, it holds the title the page had at the time of the given event. The opposite of historical fields are "latest" fields, that hold the value that is valid as of today.
  • Union and format all data (coming from 3 different schemas) in the same denormalized schema.

Performance challenges

To historify and populate all required fields, sorting of big portions of the huge dataset (like all revisions of all projects - ~3 billion items) needs to happen. This prevented the initial algorithm from running the process over large wikis.

Solution

The solution that worked around the performance problems was to use the "secondary sorting" trick, a classical distributed system pattern for sorting big datasets. The algorithm is written in Scala-Spark and uses Resilient Distributed Datasets (RDDs) to store and process big data in a distributed architechture. We tried to decouple as much as possible code for solving scalability problems from code solving data-oriented problems.