Jump to content

Data Platform/Systems/Hive/Querying using UDFs

From Wikitech

You can write user-defined functions (or UDFs) which encapsulate complex query logic for easy reuse within Hive.

We maintain a standard set of UDFs in analytics/refinery/source/refinery-hive/src/main/java/org/wikimedia/analytics/refinery/hive.

The latest compiled versions of these standard UDFs can be found on the stat machines at:

  • /srv/deployment/analytics/refinery/artifacts/refinery-hive-shaded.jar and
  • hdfs:///wmf/refinery/current/artifacts/refinery-hive-shaded.jar

Using an existing UDF

Here's an example of how to use our user agent UDF to parse user agents from request logs (note that this won't work on EventLogging tables, where the user agent field has already been parsed from a string into a struct).

First, you need to add the function to your session:

hive>  ADD JAR /srv/deployment/analytics/refinery/artifacts/refinery-hive-shaded.jar;
hive>  CREATE TEMPORARY FUNCTION ua as 'org.wikimedia.analytics.refinery.hive.GetUAPropertiesUDF';	
Time taken: 0.046 seconds

Note: if you are using Hue, you will need to load from HDFS via ADD JAR hdfs:///wmf/refinery/current/artifacts/refinery-hive-shaded.jar; instead.

Once the function has been added, execute SELECT using function:

 hive> select ua(user_agent) from webrequest where year=2014 and month=9 and day=1 and hour=12

Output will look like:

 {"browser_major":"3","os_family":"Android","os_major":"4","device_family":"Lenovo A3500-H","browser_family":"UC Browser","os_minor":"4"}        9
 {"browser_major":"30","os_family":"Android","os_major":"4","device_family":"LG-D631/D63110b","browser_family":"Chrome Mobile","os_minor":"4"}   260
 {"browser_major":"30","os_family":"Android","os_major":"4","device_family":"SM-N9008","browser_family":"Chrome Mobile","os_minor":"4"}  18
 {"browser_major":"31","os_family":"Android","os_major":"4","device_family":"SM-N9005","browser_family":"Chrome Mobile","os_minor":"4"}  708

Group by device_family:

select a.device_family, count(*) as cnt from (
    select ua(user_agent)['device_family'] as device_family
    from webrequest
    where webrequest_source='mobile' and year=2014 and month=10 and day=30 and hour=0
) a
group by a.device_family order by cnt desc limit 10;
iPhone	7773691
Other	2940052
iPad	2911523
Spider	770622
iPod	299841
Samsung GT-I9300	173951
Samsung GT-I9505	170575
Samsung SCH-I545	161654
Samsung SM-G900V	150833
HTC One	132639

Using Reflect UDF

The reflect UDF is a generic UDF that allows the user to use a Java class method or function without a specific UDF wrapper.[1] For example, we can use it to access the decode static method of the PercentDecoder class in refinery core:

ADD JAR /srv/deployment/analytics/refinery/artifacts/refinery-hive-shaded.jar;

SELECT reflect("org.wikimedia.analytics.refinery.core.PercentDecoder", "decode", "hello%20world%21") AS decoded_string;

Writing a UDF

Study existing UDFs and their unit tests. For example:

Use Get* naming for UDFs that calculate (e.g. GetUAPropertiesUDF) and Is* naming for UDFs that determine true/false (e.g. IsPageviewUDF).

It's very important you include unit tests with any additions to refinery core and/or refinery hive that you make.

IntelliJ IDEA is the recommended IDE for writing UDFs. Licenses are available to Foundation staff, see Office wiki.

Testing a UDF you just wrote

On Stat*

If the definition of the UDF is not yet merged you would need to build a jar that contains the UDF. You can checkout code on stat1007 and build the jar there

Build the jar using mvn (maven):

mvn package 

Once you have compiled your UDF, you will need to register it with Hive before using it.

hive> ADD JAR /some/path/refinery-hive-0.0.1.jar;	
Added /some/path/refinery-hive-0.0.1.jar to class path	
Added resource: /some/path/refinery-hive-0.0.1.jar	
hive>  CREATE TEMPORARY FUNCTION blah as 'org.wikimedia.analytics.refinery.hive.BlahUDF';	
Time taken: 0.046 seconds

Execute select using function

hive> select blah(some_colum) from webrequest where year=2014 and month=9 and day=1 and hour=12 limit

On Hue

If you prefer to use the online GUI "Hue" (see access info), you must copy the jar to HDFS:

$ hdfs dfs -put /path/to/udf.jar /user/<your LDAP username>/

Then you can register it in your Hive query in Hue's query editor via:

ADD JAR hdfs:///user/<your LDAP username>/udf.jar;

Refer to usage instructions in the previous section. Note that you can also use that same JAR when you use Hive CLI, not just Hue.

Testing changes to existing UDF

You will need to build the jar just like you would in the case of creating a new udf but when testing the udf you need to override the path that loads some jars by default. Otherwise hive will be existing code rather than new code.

Leaving hive.aux.jars.path empty will do the trick.

 hive  --hiveconf hive.aux.jars.path= -f test-udf.hql


You can get more debugging information in your CLI by launching Hive with a special configuration option:

hive  --hiveconf hive.root.logger=INFO,console

Sampling Data

In these examples we'll get a user agent report for one month of data.

With predefined buckets

At creation, the webrequest table defines buckets, meaning its data is clustered in files based on the bucketting fields, namely hostname and sequence. This bucketing allows to efficiently sample data at read time (less data to read), when using the same bucketing parameters:

  COUNT(1) AS c
FROM webrequest TABLESAMPLE(BUCKET 1 OUT OF 1024 ON hostname, sequence)
WHERE year=2018 AND month=3
  AND webrequest_source='text'
GROUP BY user_agent

It is to be noticed that the original bucketing is done over 64 partitions, therefore the request bucketing MUST be made on a multiple of 64.

In our example, the number of mappers (tasks reading data) is 1454. For the same request over a month of webrequest without sampling, the number of mappers goes up to 92722.

Without predefined buckets

Hadoop holds about a month of data, which means that there is a LOT of data. You do not need to a access it all in order to get a sufficiently precise user agent report. We get about 10.000 request per second for mobile so sampling 1 in 1000 gives you about 18 million records that should be sufficient to get a monthly report.

A hive query like the following does the sampling and the grouping using the UDF:

 ADD JAR /home/nuria/refinery-hive-0.0.1.jar;
 CREATE TEMPORARY FUNCTION ua as 'org.wikimedia.analytics.refinery.hive.UAParserUDF';
 use wmf_raw;

 SELECT a.useragent, COUNT(1)
 (select ua(user_agent) as useragent
  from webrequest TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand())
  where year=2014 and webrequest_source="mobile") a
 GROUP BY a.useragent

To execute (timing the output):

time hive -f select.sql > output.txt

Output will look like:

 {"browser_major":"3","os_family":"Android","os_major":"4","device_family":"Lenovo A3500-H","browser_family":"UC Browser","os_minor":"4"}        9
 {"browser_major":"30","os_family":"Android","os_major":"4","device_family":"LG-D631/D63110b","browser_family":"Chrome Mobile","os_minor":"4"}   260
 {"browser_major":"30","os_family":"Android","os_major":"4","device_family":"SM-N9008","browser_family":"Chrome Mobile","os_minor":"4"}  18
 {"browser_major":"31","os_family":"Android","os_major":"4","device_family":"SM-N9005","browser_family":"Chrome Mobile","os_minor":"4"}  708

It is to be noticed that this bucketing still involves reading the whole lot of webrequest.

Using Hive UDFs with Spark

About multi-threaded environment

Spark splits the work into tasks that are distributed to executors for execution. Each executor executes N tasks in N parallel threads, matching its N cores.

Each task will create an instance of the UDF and initialize it before using it on each row.

About Serialization

Spark sends instructions (including UDFs) to executors by using Kryo serialization. Kryo does not need the class to implement Serializable.

Some objects, like Caffeine cache, are crashing at deserialization with Kryo so you may mark some of the instance variables as transient.

See also


  1. https://cwiki.apache.org/confluence/display/Hive/ReflectUDF