Data Platform/Systems/Cluster
The Analytics cluster is the Hadoop cluster and related components that runs the Analytics Data Lake. Its most important functions are providing:
- Webrequest computation (ingestion from caches, refinement, pageview extraction, computation of metrics, extraction into other systems, ad-hoc querying)
- MediaWiki history computation (ingestion from DB, history rebuilding, computation of metrics, extraction onto other systems, ad-hoc querying)
Glossary
- Airflow
- A system that can be used to launch jobs on the Hadoop cluster, according to a flexible schedule. The Data Engineering team provides a number of airflow instances, such that any team may manage their own jobs. Jobs are defined in terms of a DAG.
- Hadoop
- A collection of services for batch processing of large data. Core concepts are a distributed filesystem (HDFS) and MapReduce.
- HDFS
- Hadoop Distributed File System (HDFS). HDFS is a distributed, scalable, and portable filesystem written in Java for the Hadoop framework. Each node in a Hadoop instance typically has a single namenode; a cluster of datanodes form the HDFS cluster.
- NameNode (Hadoop)
- Manages HDFS file metadata.
- DataNode (Hadoop)
- Responsible for storage of HDFS block data.
- YARN (Hadoop)
- Yet-Another-Resource-Negotiator. Compute resource manager and API for distributed applications. MapReduce (v2) is a YARN application. Although not technically correct, YARN is sometimes referred to as MRv2. Also used to refer to WMF's corresponding (private) web interface at https://yarn.wikimedia.org/cluster/scheduler.
- ResourceManager (Hadoop)
- Handles YARN application (job) submission from clients.
- NodeManager (Hadoop)
- Runs on each compute node (and/or HDFS DataNode) and accepts jobs from the ResourceManager.
- MapReduce
- Programming model for parallelizing batch processing of large data sets. Good for counting things. Hadoop ships with a Java implementation of MapReduce.
- Hive
- A system that projects structure onto flat data (text or binary) in HDFS and allows this data to be queried using an SQL-like syntax. Hive provides two main components: the Hive Metastore, which catalogs all the datasets available in HDFS, and the Hive Query Engine, which translates SQL to executable code.
- Iceberg
- A high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark and Presto to safely work with the same tables, at the same time. Iceberg leverages the Hive Metastore for cataloging purposes.
- Spark
- General purpose distributed runtime engine. Writing jobs in Spark is easier than with lower level MapReduce. Also has a nice Hive / SQL interface. You'll use Spark over Hive when you need more flexibility than you can achieve with just SQL and pre defined tables.
- Spark History Server
- User Interface that is used to monitor the metrics and performance of the completed Spark applications.
- Kafka
- Distributed pub/sub message queue. Useful as a big ol' reliable log buffer.
- Gobblin
- Kafka -> HDFS pipeline. Runs as a MapReduce job in Hadoop and consumes from Kafka into HDFS. See Analytics/Systems/Cluster/Gobblin.
- Zookeeper
- Highly reliable dynamic configuration store. Instead of keeping configuration and simple state in flat files or a database, Zookeeper allows applications to be notified of configuration changes on the fly.
- jmx
- JMX (Java Management Extensions) is a widely-adopted JVM mechanism to expose VM and application metrics, state, and management features. Nearly all the above-listed technologies provide JMX metrics; for example, Kafka brokers expose BytesIn, BytesOut, FailedFetchRequests, FailedProduceRequests, and MessagesIn aggregated across all topics (as well as individually). Kraken is using jmxtrans to send relevant metrics to Grafana and elsewhere.
- Refinery
- Refinery is a code repository that contains scripts, artifacts, and configuration for the cluster.
Log Producers - varnishkafka
Frontend varnish cache servers use varnishkafka to send all webrequest logs to Kafka brokers in EQIAD.
Log Buffering - Kafka
Kafka Brokers store a 7 day buffer of data. This gives us time to consume this data into HDFS and recover if there are any problems.
Batch Analysis - Hadoop
Raw webrequest logs are imported into HDFS using Apache Gobblin. (Raw log data is not world readable.) Analysis of these logs is primarily done by Spark and Presto, with reference to the Hive Metastore.
Data pipelines
- Loading traffic logs
- Refinement and curation
- Datasets computation
- Pageviews / Projectviews
- Mediacounts
- Unique devices (last access method)
- Mobile uniques
The pipeline providing Edits data in the Data Lake has seven steps:
- Loading - Import data from MySQL to hadoop (1 hadoop job per MySQL table per wiki, so a lot of jobs !)
- User and page history reconstruction - Spark jobs rebuilding user and page history that don't exist as-is in MySQL. See also: Page and user history reconstruction algorithm.
- Revision augmentation and denormalization - Enhance historical revisions with interesting fields and join with user and page history for a fully historified denormalized table.
- Check data quality against previous snapshot - To prevent data-errors chiming in, we have an automatic validation step checking a fully denormalized dataset snapshot against a previously generated one (assumed correct).
- Metrics computation - Compute some standard metrics from the denormalized table and store them in Hive.
- Prepare Mediawiki History Reduced for Druid - Compute a reduced version (less details, some pre-aggregations) of the fully denormalized dataset to provide a dataset better suited for druid serving layer (see below). There also is a validation step after this job, but since it almost identical to the one presented above (see 4.), we don't detail it here.
- Serving layer loading
- 2 years of the fully-denormalized table in the Druid cluster available internally for the foundation
- Full reduced version of the dataset in the druid cluster serving data for the AQS API, which powers Wikistats.
- Full version published on dumps
See also: edit history administration
More details
- Getting Access
- Hadoop
- Kafka
- Zookeeper
- Hive
- Dataset documentation
- Refinery
- Refinery-source
- Spark
- Webrequest partitions
- AMD GPU
Infrastructure
Roles
Role | Host | Cumin Alias |
---|---|---|
Hadoop primary NameNode | an-master1003 | hadoop-master |
Hadoop standby NameNode | an-master1004 | hadoop-standby |
Cluster coordinator (Hive, Presto coordinator) | an-coord[1003-1004] | hadoop-coordinator |
Hadoop workers (DataNodes) | an-worker[1078-1175], analytics[1070-1077] | hadoop-worker |
YARN web interfaces | an-tool1008 | hadoop-yarn |
Turnilo | an-tool1007 | |
Kafka Jumbo brokers | kafka-jumbo[1007-1015] | kafka-jumbo |
Druid Analytics nodes | an-druid[1001-1005] | druid-analytics |
Druid Public nodes (for AQS) | druid[1007-1011] | druid-public |
Presto Workers | an-presto[1001-1015] | presto-analytics |
Analytics Meta (MariaDB for Hive, Superset, Druid, DataHub) | an-mariadb[1001-1002] | |
PostreSQL (for Airflow) | an-db[1001-1002] | analytics-psql |
Launcher (Systemd timers and Airflow analytics instance) | an-launcher1002 | analytics-launcher |
Analytics clients | stat[1004-1011] | stat |
Airflow instances | an-airflow[1002,1004-1007], an-launcher1002 | analytics-airflow |
Zookeeper | an-conf[1001-1003] | an-conf |
MariaDB private replicas | dbstore[1007-1009] | db-store |
Wikireplicas private sanitized replica (used by sqoop) | clouddb1021 | wikireplicas-dedicated |
Wikireplicas analytics sanitized replicas (exposed to WMCS) | clouddb[1017-1020] | wikireplicas-analytics |
Puppet
Some puppet modules used by the Analytics Cluster.
FAQ
How do I...
retrieve a file from HDFS?
hdfs dfs -text /wmf/data/archive/browser/general/desktop_and_mobile_web-2015-9-27/*
list files on a directory?
hdfs dfs -ls /wmf/data/archive/browser/general/
calculate the size of a directory?
hdfs dfs -du -s -h /wmf/data/wmf/webrequest
recover files deleted by mistake using the hdfs CLI rm command?
If you have deleted a file using the HDFS CLI rm command then you should have got a message like the following:
18/04/11 16:52:09 INFO fs.TrashPolicyDefault: Moved: 'hdfs://analytics-hadoop/user/elukey/test'
to trash at: hdfs://analytics-hadoop/user/elukey/.Trash/Current/user/elukey/test
In this case, /user/elukey/test
was not deleted but moved to a special .Trash
directory in /user/elukey
(it will be different for each user issuing the hdfs command of course). By default the HDFS Analytics file system will keep deleted files (only via CLI) for a month as precautionary measure to help in case of accidental deletes. For example, in the above case, what if I realize after a day that /user/elukey/test
is needed? How would I recover it? The files under .Trash
are organized in daily checkpoints, namely every day at midnight UTC a new directory under .Trash
is created and all the files deleted between the last checkpoint time and the current one will be placed under it.
In my case:
elukey@stat1004:~$ date
Thu Apr 12 08:15:32 UTC 2018
elukey@stat1004:~$ hdfs dfs -ls /user/elukey/.Trash
Found 1 items
drwx------ - elukey elukey 0 2018-04-11 16:52 /user/elukey/.Trash/180412000000
elukey@stat1004:~$ hdfs dfs -ls /user/elukey/.Trash/180412000000/user/elukey
Found 1 items
-rw-r--r-- 3 elukey elukey 15 2018-04-11 16:51 /user/elukey/.Trash/180412000000/user/elukey/test
As you can see the file /user/elukey/test
, that was deleted on Apr 11th, ended up under the 180412000000 checkpoint. At this point the last thing to do is to move the file to recover in a safe place using the hdfs CLI like this:
elukey@stat1004:~$ hdfs dfs -mv /user/elukey/.Trash/180412000000/user/elukey/test /user/elukey/test
elukey@stat1004:~$ hdfs dfs -ls /user/elukey/
Found 6 items
[..]
-rw-r--r-- 3 elukey elukey 15 2018-04-11 16:51 /user/elukey/test
And as you can see the file is now back to its original place!
See also
- Logging Solutions Overview - 2013 overview of the distributed logging solutions surveyed back then.
- Logging Solutions Recommendation - An in depth description of Kafka and why we like it (2013).
- Cluster Infrastructure TechTalk Slides (2014-07)
- Infrastructure Tech Talk (2014-07-15)
- Slide Deck: [1]