Analytics/Cluster/Data Format Experiments

From Wikitech

webrequest

I am experimenting with different storage formats for webrequest data. The data in the wmf_raw.webrequest data is imported as JSON and stored in Snappy compressed SequenceFiles. I want to experiment with storing this data as Parquet and with different Hive clustered buckets, to allow for more efficient querying and sampling.

I started writing a MapReduce job to convert the JSON data to Parquet format, but then realized that using Hive to do this is easier, and also better, because then we can instruct hive to cluster the files it outputs to allow for better sampling.

Load Data

Create two extra tables, one using Parquet clustered by IP, and another using Parquet clustered by IP and sorted by dt:

set parquet.compression=SNAPPY;
CREATE TABLE `webrequest_parquet_clustered_snappy`(
  `hostname` string COMMENT 'from deserializer',
  `sequence` bigint COMMENT 'from deserializer',
  `dt` string COMMENT 'from deserializer',
  `time_firstbyte` double COMMENT 'from deserializer',
  `ip` string COMMENT 'from deserializer',
  `cache_status` string COMMENT 'from deserializer',
  `http_status` string COMMENT 'from deserializer',
  `response_size` bigint COMMENT 'from deserializer',
  `http_method` string COMMENT 'from deserializer',
  `uri_host` string COMMENT 'from deserializer',
  `uri_path` string COMMENT 'from deserializer',
  `uri_query` string COMMENT 'from deserializer',
  `content_type` string COMMENT 'from deserializer',
  `referer` string COMMENT 'from deserializer',
  `x_forwarded_for` string COMMENT 'from deserializer',
  `user_agent` string COMMENT 'from deserializer',
  `accept_language` string COMMENT 'from deserializer',
  `x_analytics` string COMMENT 'from deserializer',
  `range` string COMMENT 'from deserializer')
PARTITIONED BY (
  `webrequest_source` string COMMENT 'Source cluster',
  `year` int COMMENT 'Unpadded year of request',
  `month` int COMMENT 'Unpadded month of request',
  `day` int COMMENT 'Unpadded day of request',
  `hour` int COMMENT 'Unpadded hour of request')
CLUSTERED BY(ip) INTO 64 BUCKETS 
ROW FORMAT SERDE
  'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
;

set parquet.compression=SNAPPY;
CREATE TABLE `webrequest_parquet_clustered_snappy_sorted`(
  `hostname` string COMMENT 'from deserializer',
  `sequence` bigint COMMENT 'from deserializer',
  `dt` string COMMENT 'from deserializer',
  `time_firstbyte` double COMMENT 'from deserializer',
  `ip` string COMMENT 'from deserializer',
  `cache_status` string COMMENT 'from deserializer',
  `http_status` string COMMENT 'from deserializer',
  `response_size` bigint COMMENT 'from deserializer',
  `http_method` string COMMENT 'from deserializer',
  `uri_host` string COMMENT 'from deserializer',
  `uri_path` string COMMENT 'from deserializer',
  `uri_query` string COMMENT 'from deserializer',
  `content_type` string COMMENT 'from deserializer',
  `referer` string COMMENT 'from deserializer',
  `x_forwarded_for` string COMMENT 'from deserializer',
  `user_agent` string COMMENT 'from deserializer',
  `accept_language` string COMMENT 'from deserializer',
  `x_analytics` string COMMENT 'from deserializer',
  `range` string COMMENT 'from deserializer')
PARTITIONED BY (
  `webrequest_source` string COMMENT 'Source cluster',
  `year` int COMMENT 'Unpadded year of request',
  `month` int COMMENT 'Unpadded month of request',
  `day` int COMMENT 'Unpadded day of request',
  `hour` int COMMENT 'Unpadded hour of request')
CLUSTERED BY(ip) SORTED BY (dt) INTO 64 BUCKETS 
ROW FORMAT SERDE
  'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
;

Insert into these tables from the wmf_raw.webrequest table:

SET parquet.compression=SNAPPY;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.enforce.bucketing = true;
INSERT INTO TABLE webrequest_parquet_clustered_snappy
PARTITION(webrequest_source, year, month, day, hour)
SELECT *
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2014 AND month=12 AND day=30 and hour=7;

SET parquet.compression=SNAPPY;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.enforce.bucketing = true;
INSERT INTO TABLE webrequest_parquet_clustered_snappy_sorted
PARTITION(webrequest_source, year, month, day, hour)
SELECT *
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2014 AND month=12 AND day=30 and hour=7;

Note: comparing insert times between the sorted and non sorted table for an hour of the same data, sorted clustered data took slightly more cumulative CPU time. I can't think of a good use case to have the data pre-sorted by some key (at least not at this time).

Experiments

We will use the mobile unique app id query for this expriement:

wmf_raw.webrequest (JSON)

CREATE VIEW app_uuid_view_daily_raw AS 
SELECT
CASE WHEN user_agent LIKE('%iPhone%') THEN 'iOS'
  ELSE 'Android' END AS platform,
parse_url(concat('http://bla.org/woo/', uri_query), 'QUERY', 'appInstallID') AS uuid
    FROM wmf_raw.webrequest
    WHERE user_agent LIKE('WikipediaApp%') 
        AND uri_query LIKE('%action=mobileview%')
        AND uri_query LIKE('%sections=0%')
        AND uri_query LIKE('%appInstallID%')
        AND webrequest_source='text'
        AND year=2014
        AND month=12
        AND day=30
        AND hour=7
;

SET hive.enforce.bucketing = true;
SELECT
    platform,
    CAST(COUNT(DISTINCT(uuid))  AS string)
FROM app_uuid_view_daily_raw
GROUP BY platform;

MapReduce Total cumulative CPU time: 23 minutes 31 seconds 560 msec
Ended Job = job_1415917009743_65718
MapReduce Jobs Launched:
Job 0: Map: 38  Reduce: 14   Cumulative CPU: 1411.56 sec   HDFS Read: 13248208127 HDFS Write: 19 SUCCESS
Total MapReduce CPU Time Spent: 23 minutes 31 seconds 560 msec
OK
platform        _c1
Android 2145
iOS     9
Time taken: 213.339 seconds, Fetched: 2 row(s)


webrequest_parquet_clustered_snappy (Clustered Parquet no sorting)

CREATE VIEW app_uuid_view_daily_parquet_clustered_snappy AS 
SELECT
CASE WHEN user_agent LIKE('%iPhone%') THEN 'iOS'
  ELSE 'Android' END AS platform,
parse_url(concat('http://bla.org/woo/', uri_query), 'QUERY', 'appInstallID') AS uuid
    FROM otto.webrequest_parquet_clustered_snappy
    WHERE user_agent LIKE('WikipediaApp%') 
        AND uri_query LIKE('%action=mobileview%')
        AND uri_query LIKE('%sections=0%')
        AND uri_query LIKE('%appInstallID%')
        AND webrequest_source='text'
        AND year=2014
        AND month=12
        AND day=30
        AND hour=7
;


SET hive.enforce.bucketing = true;
SELECT
    platform,
    CAST(COUNT(DISTINCT(uuid))  AS string)
FROM app_uuid_view_daily_parquet_clustered_snappy
GROUP BY platform;

MapReduce Total cumulative CPU time: 9 minutes 2 seconds 920 msec
Ended Job = job_1415917009743_65723
MapReduce Jobs Launched:
Job 0: Map: 25  Reduce: 7   Cumulative CPU: 542.92 sec   HDFS Read: 2517879043 HDFS Write: 19 SUCCESS
Total MapReduce CPU Time Spent: 9 minutes 2 seconds 920 msec
OK
platform        _c1
Android 2454
iOS     5
Time taken: 125.361 seconds, Fetched: 2 row(s)

webrequest_parquet_clustered_snappy_sorted (Clustered Parquet with sorting)

CREATE VIEW app_uuid_view_daily_parquet_clustered_snappy_sorted AS 
SELECT
CASE WHEN user_agent LIKE('%iPhone%') THEN 'iOS'
  ELSE 'Android' END AS platform,
parse_url(concat('http://bla.org/woo/', uri_query), 'QUERY', 'appInstallID') AS uuid
    FROM otto.webrequest_parquet_clustered_snappy_sorted
    WHERE user_agent LIKE('WikipediaApp%') 
        AND uri_query LIKE('%action=mobileview%')
        AND uri_query LIKE('%sections=0%')
        AND uri_query LIKE('%appInstallID%')
        AND webrequest_source='text'
        AND year=2014
        AND month=12
        AND day=30
        AND hour=7
;

SET hive.enforce.bucketing = true;
SELECT
    platform,
    CAST(COUNT(DISTINCT(uuid))  AS string)
FROM app_uuid_view_daily_parquet_clustered_snappy_sorted
GROUP BY platform;

MapReduce Total cumulative CPU time: 8 minutes 44 seconds 430 msec
Ended Job = job_1415917009743_65725
MapReduce Jobs Launched:
Job 0: Map: 25  Reduce: 7   Cumulative CPU: 524.43 sec   HDFS Read: 2510719930 HDFS Write: 19 SUCCESS
Total MapReduce CPU Time Spent: 8 minutes 44 seconds 430 msec
OK
platform        _c1
Android 2454
iOS     5
Time taken: 99.249 seconds, Fetched: 2 row(s)

Results

Format Size of one hour partition app_uuid_view hourly
time taken
app_uuid_view hourly
cumulative cpu time taken
app_uuid_view hourly
# mappers
app_uuid_view hourly
# reducers
app_uuid_view hourly
HDFS bytes read
JSON Snappy SequenceFile 12.7 G 213.339 1411.56 38 14 12.34 G
Parquet Clustered Snappy 6.5G 125.361 542.92 25 7 2.35 G
Parquet Clustered Snappy Sorted 6.5G 99.249 524.43 25 7 2.34 G

Also, running this query on a full day of data in Parquet format took less than 40 minutes. Nuria said that running it on the JSON data could take almost 10 hours.

revision xmldumps

These are imported from http://dumps.wikimedia.org/ and converted into Avro format.

Given the schema at https://gerrit.wikimedia.org/r/#/c/171056/5/refinery-core/src/main/avro/mediawiki/RevisionDocument.avsc

Data was imported using Wikihadoop and converted to avro and saved compresed both as bz2 and snappy. It was also converted to JSON using Halfak's dump2json script. It was converted from avro-snappy data to parquet-snappy via avro2parquet. Hive tables were then mapped on top of this data via:

create hive tables

CREATE EXTERNAL TABLE `revision_simplewiki_json_bz2`(
  `id` int,
  `timestamp` string,
  `page` struct<id:int,namespace:int,title:string,redirect:struct<title:string>,restrictions:array<string>>,
  `contributor` struct<id:int,user_text:string>,
  `minor` boolean,
  `comment` string,
  `text` string,
  `bytes` int,
  `sha1` string,
  `parent_id` int,
  `model` string,
  `format` string)
ROW FORMAT SERDE
  'org.apache.hcatalog.data.JsonSerDe'
LOCATION
  'hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/json-bz2'
;

CREATE EXTERNAL TABLE `revision_simplewiki_json_uncompressed`(
  `id` int,
  `timestamp` string,
  `page` struct<id:int,namespace:int,title:string,redirect:struct<title:string>,restrictions:array<string>>,
  `contributor` struct<id:int,user_text:string>,
  `minor` boolean,
  `comment` string,
  `text` string,
  `bytes` int,
  `sha1` string,
  `parent_id` int,
  `model` string,
  `format` string)
ROW FORMAT SERDE
  'org.apache.hcatalog.data.JsonSerDe'
LOCATION
  'hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/json-uncompressed'
;


CREATE EXTERNAL TABLE revision_simplewiki_avro_bz2
  ROW FORMAT SERDE
    'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED AS INPUTFORMAT
    'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  LOCATION
    '/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-bz2'
  TBLPROPERTIES (
    'avro.schema.url'='hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/RevisionDocument.avsc')
;

CREATE EXTERNAL TABLE revision_simplewiki_avro_snappy
  ROW FORMAT SERDE
    'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED AS INPUTFORMAT
    'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  LOCATION
  '/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-snappy'
  TBLPROPERTIES (
    'avro.schema.url'='hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/RevisionDocument.avsc')
;







-- Steps to Create avro-parquet file.
-- Clone https://github.com/laserson/avro2parquet.
-- Modify pom.xml to use our version of hadoop and to generate shaded jar.
-- mvn clean package
-- scp jar to stat1002
--
-- Then run:
-- hadoop jar /home/otto/avro2parquet-0.1.0.jar \
-- com.cloudera.science.avro2parquet.Avro2Parquet \
-- hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/RevisionDocument.avsc \
-- hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-snappy \
-- hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-parquet-snappy


-- not sure if this add jar is needed
add jar /home/otto/parquet-hive-bundle-1.6.0rc4.jar;
CREATE EXTERNAL TABLE revision_simplewiki_avro_parquet_snappy(
  `id` int,
  `timestamp` string,
  `page` struct<id:int,namespace:int,title:string,redirect_title:string,restrictions:array<string>>,
  `contributor` struct<id:int,user_text:string>,
  `minor` boolean,
  `comment` string,
  `text` string,
  `bytes` int,
  `sha1` string,
  `parent_id` int,
  `model` string,
  `format` string
)
  ROW FORMAT SERDE
    'parquet.hive.serde.ParquetHiveSerDe'
  STORED AS
    INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
    OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
  LOCATION
  '/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-parquet-snappy'
;

Results

Format Size select count(*)
time taken
select count(*)
cumulative cpu time taken
select count(*)
# mappers
streaming experiment ???
time taken
streaming experiment ???
cumulative cpu time taken
streaming experiment ???
# mappers/reducers
XML - bz2 3.9 G Crashed after 2 hours 15106.390 (at time of crash) 4/177
JSON - uncompressed 26 G 47.869 771.39 104 196.940 2698.560 106/51
JSON - bz2 compressed 1.2 G 327.367 1154.55 5 728.948 4355.400 6/51
JSON - Snappy compressed 14 G 543.782 2559.780 5/50
Avro - uncompressed 25 G ??? (i think this is wrong, still to do) 125.040 2775.530 101/51
Avro - bz2 compressed 5.7 G 104.533 1944.37 25 76.118 5611.680 26/51
Avro - Snappy compressed 8.4 G 38.683 471.67 33 124.563 3433.390 36/50
Parquet+Avro- Snappy compressed 7.0G 83.681 71.81 21


select count(*) from revision_simplewiki_json_bz2;
...
MapReduce Total cumulative CPU time: 19 minutes 14 seconds 550 msec
Ended Job = job_1415917009743_38010
MapReduce Jobs Launched:
Job 0: Map: 5  Reduce: 1   Cumulative CPU: 1154.55 sec   HDFS Read: 1192999999 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 19 minutes 14 seconds 550 msec
OK
_c0
4557431
Time taken: 327.367 seconds, Fetched: 1 row(s)

select count(*) from revision_simplewiki_json_uncompressed;
...
MapReduce Total cumulative CPU time: 12 minutes 51 seconds 390 msec
Ended Job = job_1415917009743_38131
MapReduce Jobs Launched:
Job 0: Map: 104  Reduce: 1   Cumulative CPU: 771.39 sec   HDFS Read: 27854858701 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 12 minutes 51 seconds 390 msec
OK
_c0
4557431
Time taken: 47.869 seconds, Fetched: 1 row(s)
select count(*) from revision_simplewiki_avro_bz2;
...
MapReduce Total cumulative CPU time: 32 minutes 24 seconds 370 msec
Ended Job = job_1415917009743_38013
MapReduce Jobs Launched:
Job 0: Map: 25  Reduce: 1   Cumulative CPU: 1944.37 sec   HDFS Read: 6118212001 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 32 minutes 24 seconds 370 msec
OK
_c0
4557489
Time taken: 104.533 seconds, Fetched: 1 row(s)

select count(*) from revision_simplewiki_avro_snappy;
...
MapReduce Total cumulative CPU time: 7 minutes 51 seconds 670 msec
Ended Job = job_1415917009743_38012
MapReduce Jobs Launched:
Job 0: Map: 33  Reduce: 1   Cumulative CPU: 471.67 sec   HDFS Read: 8938238424 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 7 minutes 51 seconds 670 msec
OK
_c0
4557489
Time taken: 38.683 seconds, Fetched: 1 row(s)


select count(*) from revision_simplewiki_avro_parquet_snappy;
...
MapReduce Total cumulative CPU time: 1 minutes 11 seconds 810 msec
Ended Job = job_1415917009743_38069
MapReduce Jobs Launched:
Job 0: Map: 21  Reduce: 1   Cumulative CPU: 71.81 sec   HDFS Read: 120821 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 1 minutes 11 seconds 810 msec
OK
_c0
4557489
Time taken: 83.681 seconds, Fetched: 1 row(s)

select and group by redirect title, avro vs parquet:


select page.redirect_title, count(*) as cnt from revision_simplewiki_avro_snappy where page.redirect_title is not null group by page.redirect_title  order by cnt desc limit 10;
MapReduce Total cumulative CPU time: 27 seconds 90 msec
Ended Job = job_1415917009743_38096
MapReduce Jobs Launched:
Job 0: Map: 33  Reduce: 9   Cumulative CPU: 503.74 sec   HDFS Read: 8938238424 HDFS Write: 1111211 SUCCESS
Job 1: Map: 7  Reduce: 1   Cumulative CPU: 27.09 sec   HDFS Read: 1114206 HDFS Write: 241 SUCCESS
Total MapReduce CPU Time Spent: 8 minutes 50 seconds 830 msec
Time taken: 56.571 seconds, Fetched: 10 row(s)

select page.redirect_title, count(*) as cnt from revision_simplewiki_avro_parquet_snappy where page.redirect_title is not null group by page.redirect_title order by cnt desc limit 10;
MapReduce Total cumulative CPU time: 20 seconds 190 msec
Ended Job = job_1415917009743_38092
MapReduce Jobs Launched:
Job 0: Map: 21  Reduce: 8   Cumulative CPU: 114.66 sec   HDFS Read: 10404994 HDFS Write: 1111095 SUCCESS
Job 1: Map: 4  Reduce: 1   Cumulative CPU: 20.19 sec   HDFS Read: 1113435 HDFS Write: 241 SUCCESS
Total MapReduce CPU Time Spent: 2 minutes 14 seconds 850 msec
Time taken: 58.561 seconds, Fetched: 10 row(s)

Avro uses much more CPU time than Parquet, interesting!