Data Platform/Evaluations/Data Format Experiments
webrequest
I am experimenting with different storage formats for webrequest data. The data in the wmf_raw.webrequest data is imported as JSON and stored in Snappy compressed SequenceFiles. I want to experiment with storing this data as Parquet and with different Hive clustered buckets, to allow for more efficient querying and sampling.
I started writing a MapReduce job to convert the JSON data to Parquet format, but then realized that using Hive to do this is easier, and also better, because then we can instruct hive to cluster the files it outputs to allow for better sampling.
Load Data
Create two extra tables, one using Parquet clustered by IP, and another using Parquet clustered by IP and sorted by dt:
set parquet.compression=SNAPPY;
CREATE TABLE `webrequest_parquet_clustered_snappy`(
`hostname` string COMMENT 'from deserializer',
`sequence` bigint COMMENT 'from deserializer',
`dt` string COMMENT 'from deserializer',
`time_firstbyte` double COMMENT 'from deserializer',
`ip` string COMMENT 'from deserializer',
`cache_status` string COMMENT 'from deserializer',
`http_status` string COMMENT 'from deserializer',
`response_size` bigint COMMENT 'from deserializer',
`http_method` string COMMENT 'from deserializer',
`uri_host` string COMMENT 'from deserializer',
`uri_path` string COMMENT 'from deserializer',
`uri_query` string COMMENT 'from deserializer',
`content_type` string COMMENT 'from deserializer',
`referer` string COMMENT 'from deserializer',
`x_forwarded_for` string COMMENT 'from deserializer',
`user_agent` string COMMENT 'from deserializer',
`accept_language` string COMMENT 'from deserializer',
`x_analytics` string COMMENT 'from deserializer',
`range` string COMMENT 'from deserializer')
PARTITIONED BY (
`webrequest_source` string COMMENT 'Source cluster',
`year` int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day` int COMMENT 'Unpadded day of request',
`hour` int COMMENT 'Unpadded hour of request')
CLUSTERED BY(ip) INTO 64 BUCKETS
ROW FORMAT SERDE
'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
;
set parquet.compression=SNAPPY;
CREATE TABLE `webrequest_parquet_clustered_snappy_sorted`(
`hostname` string COMMENT 'from deserializer',
`sequence` bigint COMMENT 'from deserializer',
`dt` string COMMENT 'from deserializer',
`time_firstbyte` double COMMENT 'from deserializer',
`ip` string COMMENT 'from deserializer',
`cache_status` string COMMENT 'from deserializer',
`http_status` string COMMENT 'from deserializer',
`response_size` bigint COMMENT 'from deserializer',
`http_method` string COMMENT 'from deserializer',
`uri_host` string COMMENT 'from deserializer',
`uri_path` string COMMENT 'from deserializer',
`uri_query` string COMMENT 'from deserializer',
`content_type` string COMMENT 'from deserializer',
`referer` string COMMENT 'from deserializer',
`x_forwarded_for` string COMMENT 'from deserializer',
`user_agent` string COMMENT 'from deserializer',
`accept_language` string COMMENT 'from deserializer',
`x_analytics` string COMMENT 'from deserializer',
`range` string COMMENT 'from deserializer')
PARTITIONED BY (
`webrequest_source` string COMMENT 'Source cluster',
`year` int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day` int COMMENT 'Unpadded day of request',
`hour` int COMMENT 'Unpadded hour of request')
CLUSTERED BY(ip) SORTED BY (dt) INTO 64 BUCKETS
ROW FORMAT SERDE
'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
;
Insert into these tables from the wmf_raw.webrequest table:
SET parquet.compression=SNAPPY;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.enforce.bucketing = true;
INSERT INTO TABLE webrequest_parquet_clustered_snappy
PARTITION(webrequest_source, year, month, day, hour)
SELECT *
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2014 AND month=12 AND day=30 and hour=7;
SET parquet.compression=SNAPPY;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.enforce.bucketing = true;
INSERT INTO TABLE webrequest_parquet_clustered_snappy_sorted
PARTITION(webrequest_source, year, month, day, hour)
SELECT *
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2014 AND month=12 AND day=30 and hour=7;
Note: comparing insert times between the sorted and non sorted table for an hour of the same data, sorted clustered data took slightly more cumulative CPU time. I can't think of a good use case to have the data pre-sorted by some key (at least not at this time).
Experiments
We will use the mobile unique app id query for this expriement:
wmf_raw.webrequest (JSON)
CREATE VIEW app_uuid_view_daily_raw AS
SELECT
CASE WHEN user_agent LIKE('%iPhone%') THEN 'iOS'
ELSE 'Android' END AS platform,
parse_url(concat('http://bla.org/woo/', uri_query), 'QUERY', 'appInstallID') AS uuid
FROM wmf_raw.webrequest
WHERE user_agent LIKE('WikipediaApp%')
AND uri_query LIKE('%action=mobileview%')
AND uri_query LIKE('%sections=0%')
AND uri_query LIKE('%appInstallID%')
AND webrequest_source='text'
AND year=2014
AND month=12
AND day=30
AND hour=7
;
SET hive.enforce.bucketing = true;
SELECT
platform,
CAST(COUNT(DISTINCT(uuid)) AS string)
FROM app_uuid_view_daily_raw
GROUP BY platform;
MapReduce Total cumulative CPU time: 23 minutes 31 seconds 560 msec
Ended Job = job_1415917009743_65718
MapReduce Jobs Launched:
Job 0: Map: 38 Reduce: 14 Cumulative CPU: 1411.56 sec HDFS Read: 13248208127 HDFS Write: 19 SUCCESS
Total MapReduce CPU Time Spent: 23 minutes 31 seconds 560 msec
OK
platform _c1
Android 2145
iOS 9
Time taken: 213.339 seconds, Fetched: 2 row(s)
webrequest_parquet_clustered_snappy (Clustered Parquet no sorting)
CREATE VIEW app_uuid_view_daily_parquet_clustered_snappy AS
SELECT
CASE WHEN user_agent LIKE('%iPhone%') THEN 'iOS'
ELSE 'Android' END AS platform,
parse_url(concat('http://bla.org/woo/', uri_query), 'QUERY', 'appInstallID') AS uuid
FROM otto.webrequest_parquet_clustered_snappy
WHERE user_agent LIKE('WikipediaApp%')
AND uri_query LIKE('%action=mobileview%')
AND uri_query LIKE('%sections=0%')
AND uri_query LIKE('%appInstallID%')
AND webrequest_source='text'
AND year=2014
AND month=12
AND day=30
AND hour=7
;
SET hive.enforce.bucketing = true;
SELECT
platform,
CAST(COUNT(DISTINCT(uuid)) AS string)
FROM app_uuid_view_daily_parquet_clustered_snappy
GROUP BY platform;
MapReduce Total cumulative CPU time: 9 minutes 2 seconds 920 msec
Ended Job = job_1415917009743_65723
MapReduce Jobs Launched:
Job 0: Map: 25 Reduce: 7 Cumulative CPU: 542.92 sec HDFS Read: 2517879043 HDFS Write: 19 SUCCESS
Total MapReduce CPU Time Spent: 9 minutes 2 seconds 920 msec
OK
platform _c1
Android 2454
iOS 5
Time taken: 125.361 seconds, Fetched: 2 row(s)
webrequest_parquet_clustered_snappy_sorted (Clustered Parquet with sorting)
CREATE VIEW app_uuid_view_daily_parquet_clustered_snappy_sorted AS
SELECT
CASE WHEN user_agent LIKE('%iPhone%') THEN 'iOS'
ELSE 'Android' END AS platform,
parse_url(concat('http://bla.org/woo/', uri_query), 'QUERY', 'appInstallID') AS uuid
FROM otto.webrequest_parquet_clustered_snappy_sorted
WHERE user_agent LIKE('WikipediaApp%')
AND uri_query LIKE('%action=mobileview%')
AND uri_query LIKE('%sections=0%')
AND uri_query LIKE('%appInstallID%')
AND webrequest_source='text'
AND year=2014
AND month=12
AND day=30
AND hour=7
;
SET hive.enforce.bucketing = true;
SELECT
platform,
CAST(COUNT(DISTINCT(uuid)) AS string)
FROM app_uuid_view_daily_parquet_clustered_snappy_sorted
GROUP BY platform;
MapReduce Total cumulative CPU time: 8 minutes 44 seconds 430 msec
Ended Job = job_1415917009743_65725
MapReduce Jobs Launched:
Job 0: Map: 25 Reduce: 7 Cumulative CPU: 524.43 sec HDFS Read: 2510719930 HDFS Write: 19 SUCCESS
Total MapReduce CPU Time Spent: 8 minutes 44 seconds 430 msec
OK
platform _c1
Android 2454
iOS 5
Time taken: 99.249 seconds, Fetched: 2 row(s)
Results
Format | Size of one hour partition | app_uuid_view hourly time taken |
app_uuid_view hourly cumulative cpu time taken |
app_uuid_view hourly # mappers |
app_uuid_view hourly # reducers |
app_uuid_view hourly HDFS bytes read |
---|---|---|---|---|---|---|
JSON Snappy SequenceFile | 12.7 G | 213.339 | 1411.56 | 38 | 14 | 12.34 G |
Parquet Clustered Snappy | 6.5G | 125.361 | 542.92 | 25 | 7 | 2.35 G |
Parquet Clustered Snappy Sorted | 6.5G | 99.249 | 524.43 | 25 | 7 | 2.34 G |
Also, running this query on a full day of data in Parquet format took less than 40 minutes. Nuria said that running it on the JSON data could take almost 10 hours.
revision xmldumps
These are imported from http://dumps.wikimedia.org/ and converted into Avro format.
Given the schema at https://gerrit.wikimedia.org/r/#/c/171056/5/refinery-core/src/main/avro/mediawiki/RevisionDocument.avsc
Data was imported using Wikihadoop and converted to avro and saved compresed both as bz2 and snappy. It was also converted to JSON using Halfak's dump2json script. It was converted from avro-snappy data to parquet-snappy via avro2parquet. Hive tables were then mapped on top of this data via:
create hive tables
CREATE EXTERNAL TABLE `revision_simplewiki_json_bz2`(
`id` int,
`timestamp` string,
`page` struct<id:int,namespace:int,title:string,redirect:struct<title:string>,restrictions:array<string>>,
`contributor` struct<id:int,user_text:string>,
`minor` boolean,
`comment` string,
`text` string,
`bytes` int,
`sha1` string,
`parent_id` int,
`model` string,
`format` string)
ROW FORMAT SERDE
'org.apache.hcatalog.data.JsonSerDe'
LOCATION
'hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/json-bz2'
;
CREATE EXTERNAL TABLE `revision_simplewiki_json_uncompressed`(
`id` int,
`timestamp` string,
`page` struct<id:int,namespace:int,title:string,redirect:struct<title:string>,restrictions:array<string>>,
`contributor` struct<id:int,user_text:string>,
`minor` boolean,
`comment` string,
`text` string,
`bytes` int,
`sha1` string,
`parent_id` int,
`model` string,
`format` string)
ROW FORMAT SERDE
'org.apache.hcatalog.data.JsonSerDe'
LOCATION
'hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/json-uncompressed'
;
CREATE EXTERNAL TABLE revision_simplewiki_avro_bz2
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
'/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-bz2'
TBLPROPERTIES (
'avro.schema.url'='hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/RevisionDocument.avsc')
;
CREATE EXTERNAL TABLE revision_simplewiki_avro_snappy
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
'/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-snappy'
TBLPROPERTIES (
'avro.schema.url'='hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/RevisionDocument.avsc')
;
-- Steps to Create avro-parquet file.
-- Clone https://github.com/laserson/avro2parquet.
-- Modify pom.xml to use our version of hadoop and to generate shaded jar.
-- mvn clean package
-- scp jar to stat1002
--
-- Then run:
-- hadoop jar /home/otto/avro2parquet-0.1.0.jar \
-- com.cloudera.science.avro2parquet.Avro2Parquet \
-- hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/RevisionDocument.avsc \
-- hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-snappy \
-- hdfs://analytics-hadoop/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-parquet-snappy
-- not sure if this add jar is needed
add jar /home/otto/parquet-hive-bundle-1.6.0rc4.jar;
CREATE EXTERNAL TABLE revision_simplewiki_avro_parquet_snappy(
`id` int,
`timestamp` string,
`page` struct<id:int,namespace:int,title:string,redirect_title:string,restrictions:array<string>>,
`contributor` struct<id:int,user_text:string>,
`minor` boolean,
`comment` string,
`text` string,
`bytes` int,
`sha1` string,
`parent_id` int,
`model` string,
`format` string
)
ROW FORMAT SERDE
'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION
'/user/halfak/hadoopin/diffengine/simplewiki-20141122-pages-meta-history/avro-parquet-snappy'
;
Results
Format | Size | select count(*) time taken |
select count(*) cumulative cpu time taken |
select count(*) # mappers |
streaming experiment ??? time taken |
streaming experiment ??? cumulative cpu time taken |
streaming experiment ??? # mappers/reducers |
---|---|---|---|---|---|---|---|
XML - bz2 | 3.9 G | Crashed after 2 hours | 15106.390 (at time of crash) | 4/177 | |||
JSON - uncompressed | 26 G | 47.869 | 771.39 | 104 | 196.940 | 2698.560 | 106/51 |
JSON - bz2 compressed | 1.2 G | 327.367 | 1154.55 | 5 | 728.948 | 4355.400 | 6/51 |
JSON - Snappy compressed | 14 G | 543.782 | 2559.780 | 5/50 | |||
Avro - uncompressed | 25 G ??? (i think this is wrong, still to do) | 125.040 | 2775.530 | 101/51 | |||
Avro - bz2 compressed | 5.7 G | 104.533 | 1944.37 | 25 | 76.118 | 5611.680 | 26/51 |
Avro - Snappy compressed | 8.4 G | 38.683 | 471.67 | 33 | 124.563 | 3433.390 | 36/50 |
Parquet+Avro- Snappy compressed | 7.0G | 83.681 | 71.81 | 21 |
select count(*) from revision_simplewiki_json_bz2; ... MapReduce Total cumulative CPU time: 19 minutes 14 seconds 550 msec Ended Job = job_1415917009743_38010 MapReduce Jobs Launched: Job 0: Map: 5 Reduce: 1 Cumulative CPU: 1154.55 sec HDFS Read: 1192999999 HDFS Write: 8 SUCCESS Total MapReduce CPU Time Spent: 19 minutes 14 seconds 550 msec OK _c0 4557431 Time taken: 327.367 seconds, Fetched: 1 row(s) select count(*) from revision_simplewiki_json_uncompressed; ... MapReduce Total cumulative CPU time: 12 minutes 51 seconds 390 msec Ended Job = job_1415917009743_38131 MapReduce Jobs Launched: Job 0: Map: 104 Reduce: 1 Cumulative CPU: 771.39 sec HDFS Read: 27854858701 HDFS Write: 8 SUCCESS Total MapReduce CPU Time Spent: 12 minutes 51 seconds 390 msec OK _c0 4557431 Time taken: 47.869 seconds, Fetched: 1 row(s) select count(*) from revision_simplewiki_avro_bz2; ... MapReduce Total cumulative CPU time: 32 minutes 24 seconds 370 msec Ended Job = job_1415917009743_38013 MapReduce Jobs Launched: Job 0: Map: 25 Reduce: 1 Cumulative CPU: 1944.37 sec HDFS Read: 6118212001 HDFS Write: 8 SUCCESS Total MapReduce CPU Time Spent: 32 minutes 24 seconds 370 msec OK _c0 4557489 Time taken: 104.533 seconds, Fetched: 1 row(s) select count(*) from revision_simplewiki_avro_snappy; ... MapReduce Total cumulative CPU time: 7 minutes 51 seconds 670 msec Ended Job = job_1415917009743_38012 MapReduce Jobs Launched: Job 0: Map: 33 Reduce: 1 Cumulative CPU: 471.67 sec HDFS Read: 8938238424 HDFS Write: 8 SUCCESS Total MapReduce CPU Time Spent: 7 minutes 51 seconds 670 msec OK _c0 4557489 Time taken: 38.683 seconds, Fetched: 1 row(s) select count(*) from revision_simplewiki_avro_parquet_snappy; ... MapReduce Total cumulative CPU time: 1 minutes 11 seconds 810 msec Ended Job = job_1415917009743_38069 MapReduce Jobs Launched: Job 0: Map: 21 Reduce: 1 Cumulative CPU: 71.81 sec HDFS Read: 120821 HDFS Write: 8 SUCCESS Total MapReduce CPU Time Spent: 1 minutes 11 seconds 810 msec OK _c0 4557489 Time taken: 83.681 seconds, Fetched: 1 row(s)
select and group by redirect title, avro vs parquet:
select page.redirect_title, count(*) as cnt from revision_simplewiki_avro_snappy where page.redirect_title is not null group by page.redirect_title order by cnt desc limit 10; MapReduce Total cumulative CPU time: 27 seconds 90 msec Ended Job = job_1415917009743_38096 MapReduce Jobs Launched: Job 0: Map: 33 Reduce: 9 Cumulative CPU: 503.74 sec HDFS Read: 8938238424 HDFS Write: 1111211 SUCCESS Job 1: Map: 7 Reduce: 1 Cumulative CPU: 27.09 sec HDFS Read: 1114206 HDFS Write: 241 SUCCESS Total MapReduce CPU Time Spent: 8 minutes 50 seconds 830 msec Time taken: 56.571 seconds, Fetched: 10 row(s) select page.redirect_title, count(*) as cnt from revision_simplewiki_avro_parquet_snappy where page.redirect_title is not null group by page.redirect_title order by cnt desc limit 10; MapReduce Total cumulative CPU time: 20 seconds 190 msec Ended Job = job_1415917009743_38092 MapReduce Jobs Launched: Job 0: Map: 21 Reduce: 8 Cumulative CPU: 114.66 sec HDFS Read: 10404994 HDFS Write: 1111095 SUCCESS Job 1: Map: 4 Reduce: 1 Cumulative CPU: 20.19 sec HDFS Read: 1113435 HDFS Write: 241 SUCCESS Total MapReduce CPU Time Spent: 2 minutes 14 seconds 850 msec Time taken: 58.561 seconds, Fetched: 10 row(s)
Avro uses much more CPU time than Parquet, interesting!