Jump to content

User:Elukey/Analytics/PageViewDumps

From Wikitech

Background

Analytics/Cluster/Oozie#Running a real oozie example

https://phabricator.wikimedia.org/T126464

Oozie configuration

Example:

elukey@stat1002:~/page_view_dumps$ pwd
/home/elukey/page_view_dumps

$ ozie job -config coordinator.properties 
  -Duser=elukey 
  -Dstart_time=2015-05-01T00:00Z -Dend_time=2015-05-01T03:00Z 
  -Dworkflow_file=hdfs://analytics-hadoop/tmp/elukey/pageviewdumps/workflow.xml 
  -Darchive_directory=hdfs://analytics-hadoop/tmp/elukey/pageviewdumps/archive 
  -Dcoordinator_file=hdfs://analytics-hadoop/tmp/elukey/pageviewdumps/coordinator.xml 
  -submit

Custom HDFS files

  • /tmp/elukey/pageviewdumps/coordinator.properties
  • /tmp/elukey/pageviewdumps/workflow.xml
  • /tmp/elukey/pageviewdumps/coordinator.xml
  • /tmp/elukey/pageviewdumps/archive

coodinator.xml

Same as its counterpart in the main pageview hourly hdfs directory but without any <input-event> block to prevent the script to check hdfs for webrequest logs.

coodinator.properties

name_node                         = hdfs://analytics-hadoop
job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
queue_name                        = default

user                              = hdfs

# Base path in HDFS to oozie files.
# Other files will be used relative to this path.
oozie_directory                   = ${name_node}/wmf/refinery/current/oozie

# HDFS path to coordinator to run for each webrequest_source.
coordinator_file                  = ${name_node}/tmp/elukey/pageviewdumps/coordinator.xml

# HDFS path to workflow to run.
workflow_file                     = ${oozie_directory}/pageview/hourly/workflow.xml

# HDFS path to refine webrequest dataset definitions
webrequest_datasets_file          = ${oozie_directory}/webrequest/datasets.xml
webrequest_data_directory         = ${name_node}/wmf/data/wmf/webrequest

# HDFS path to pageview dataset definitions
pageview_datasets_file            = ${oozie_directory}/pageview/datasets.xml
pageview_data_directory           = ${name_node}/wmf/data/wmf/pageview

# Initial import time of the webrequest dataset.
start_time                        = 2015-05-01T00:00Z

# Time to stop running this coordinator.  Year 3000 == never!
stop_time                         = 3000-01-01T00:00Z

# HDFS path to workflow to mark a directory as done
mark_directory_done_workflow_file = ${oozie_directory}/util/mark_directory_done/workflow.xml
# HDFS path to workflow to archive output.
archive_job_output_workflow_file  = ${oozie_directory}/util/archive_job_output/workflow.xml

# Workflow to send an error email
send_error_email_workflow_file    = ${oozie_directory}/util/send_error_email/workflow.xml

# HDFS path to hive-site.xml file.  This is needed to run hive actions.
hive_site_xml                     = ${oozie_directory}/util/hive/hive-site.xml

# Fully qualified Hive table name.
webrequest_table                  = wmf.webrequest
pageview_table                    = wmf.pageview_hourly

# Tables for whitelist check
whitelist_table                   = wmf.pageview_whitelist
unexpected_values_table           = wmf.pageview_unexpected_values

# Record version to keep track of changes
record_version                    = 0.0.4


# Temporary directory for archiving
temporary_directory               = ${name_node}/tmp

# Archive base directory
archive_directory                 = ${name_node}/wmf/data/archive

# Archive directory for pageview_hourly_legacy_format
pageview_archive_directory        = ${archive_directory}/pageview/legacy/hourly

# Coordintator to start.
oozie.coord.application.path      = ${coordinator_file}
oozie.use.system.libpath          = true
oozie.action.external.stats.write = true

workflow.xml

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.4"
    name="pageview-hourly-dump-${webrequest_table}->${pageview_table}-${year},${month},${day},${hour}-wf">

       <parameters>

        <!-- Default values for inner oozie settings -->
        <property>
            <name>oozie_launcher_queue_name</name>
            <value>${queue_name}</value>
        </property>
        <property>
            <name>oozie_launcher_memory</name>
            <value>256</value>
        </property>

        <!-- Required properties -->
        <property><name>queue_name</name></property>
        <property><name>name_node</name></property>
        <property><name>job_tracker</name></property>


        <!-- Aggregation related configuration properties-->
        <property>
            <name>hive_pageview_script_aggregate</name>
            <!-- This is relative to the containing directory of this file. -->
            <value>pageview_hourly.hql</value>
            <description>Hive script to run.</description>
        </property>
        <property>
             <name>hive_whitelist_check_script</name>
             <!-- This is relative to the containing directory of this file. -->
             <value>pageview_whitelist_check.hql</value>
            <description>Hive pageview whitelist check script to run.</description>
        </property>

        <property>
            <name>hive_site_xml</name>
            <description>hive-site.xml file path in HDFS</description>
        </property>
        <property>
            <name>webrequest_table</name>
            <description>Hive table to refine</description>
        </property>
        <property>
            <name>pageview_table</name>
            <description>The destinaton table to store refined data in.</description>
        </property>
        <property>
            <name>whitelist_table</name>
            <description>Hive pageview whitelist table for check</description>
        </property>
        <property>
            <name>unexpected_values_table</name>
            <description>Hive pageview unexpected values from whitelist check</description>
        </property>

        <property>
            <name>year</name>
            <description>The partition's year</description>
        </property>
        <property>
            <name>month</name>
            <description>The partition's month</description>
        </property>
        <property>
            <name>day</name>
            <description>The partition's day</description>
        </property>
        <property>
            <name>hour</name>
            <description>The partition's hour</description>
        </property>
        <property>
            <name>mark_directory_done_workflow_file</name>
            <description>Workflow for marking a directory done</description>
        </property>
        <property>
            <name>pageview_dataset_directory</name>
            <description>Pageview directory to generate the done flag in</description>
        </property>
        <property>
            <name>unexpected_values_dataset_directory</name>
            <description>Pageview unexpected values directory to generate the done flag in</description>
        </property>
        <property>
            <name>send_error_email_workflow_file</name>
            <description>Workflow for sending an email</description>
        </property>

        <property>
            <name>hive_pageview_script_transform</name>
            <!-- This is relative to the containing directory of this file. -->
            <value>/wmf/refinery/current/oozie/pageview/hourly/transform_pageview_to_legacy_format.hql</value>
            <description>Hive script to run for archiving with the legacy format used on dumps through 2015.</description>
        </property>
        <!-- To mimic webstatcollector, file name must be the end of the aggregated hour-->
        <property>
            <name>year_plus_1_hour</name>
            <description>The partition's year plus one hour</description>
        </property>
        <property>
            <name>month_plus_1_hour</name>
            <description>The partition's month plus one hour</description>
        </property>
        <property>
            <name>day_plus_1_hour</name>
            <description>The partition's day plus one hour</description>
        </property>
        <property>
            <name>hour_plus_1_hour</name>
            <description>The partition's hour plus one hour</description>
        </property>

        <property>
            <name>temporary_directory</name>
            <description>A directory in HDFS for temporary files</description>
        </property>
        <property>
            <name>pageview_archive_directory</name>
            <description>Directory to archive the workflow output to</description>
        </property>
        <property>
            <name>archive_job_output_workflow_file</name>
            <description>Workflow to move a data file to the archive</description>
        </property>

    </parameters>

    <start to="transform"/>

    <action name="transform">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${job_tracker}</job-tracker>
            <name-node>${name_node}</name-node>
            <job-xml>${hive_site_xml}</job-xml>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queue_name}</value>
                </property>
                <!--make sure oozie:launcher runs in a low priority queue -->
                <property>
                    <name>oozie.launcher.mapred.job.queue.name</name>
                    <value>${oozie_launcher_queue_name}</value>
                </property>
                <property>
                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
                    <value>${oozie_launcher_memory}</value>
                </property>
                <property>
                    <name>hive.exec.scratchdir</name>
                    <value>/tmp/hive-${user}</value>
                </property>
            </configuration>

            <script>${hive_pageview_script_transform}</script>
            <!-- Here, the source for archive is the
                 destination of the previous job -->
            <param>source_table=${pageview_table}</param>
            <param>year=${year}</param>
            <param>month=${month}</param>
            <param>day=${day}</param>
            <param>hour=${hour}</param>
            <param>destination_directory=${temporary_directory}/${wf:id()}</param>
        </hive>

        <ok to="mark_transformed_pageview_dataset_done"/>
        <error to="send_error_email"/>
    </action>

    <action name="mark_transformed_pageview_dataset_done">
        <sub-workflow>
            <app-path>${mark_directory_done_workflow_file}</app-path>
            <configuration>
                <property>
                    <name>directory</name>
                    <value>${temporary_directory}/${wf:id()}</value>
                </property>
            </configuration>
        </sub-workflow>
        <ok to="move_data_to_archive"/>
        <error to="send_error_email"/>
    </action>

    <action name="move_data_to_archive">
        <sub-workflow>
            <app-path>${archive_job_output_workflow_file}</app-path>
            <propagate-configuration/>
            <configuration>
                <property>
                    <name>source_directory</name>
                    <value>${temporary_directory}/${wf:id()}</value>
                </property>
                <property>
                    <name>expected_filename_ending</name>
                    <value>EMPTY</value>
                </property>
                <property>
                    <name>archive_file</name>
                    <!--
                    webstatscollector used the end of the collection period as
                    timestamp in the filename. To not break scripts of people,
                    we also name files that way.
                    -->
                    <value>${pageview_archive_directory}/${year_plus_1_hour}/${year_plus_1_hour}-${month_plus_1_hour}/pageviews-${year_plus_1_hour}${month_plus_1_hour}${day_plus_1_hour}-${hour_plus_1_hour}0000.gz</value>
                </property>
            </configuration>
        </sub-workflow>
        <ok to="end"/>
        <error to="send_error_email"/>
    </action>

    <action name="send_error_email">
        <sub-workflow>
            <app-path>${send_error_email_workflow_file}</app-path>
            <propagate-configuration/>
            <configuration>
                <property>
                    <name>parent_name</name>
                    <value>${wf:name()}</value>
                </property>
                <property>
                    <name>to</name>
                    <value>ltoscano@wikimedia.org</value>
                </property>
            </configuration>
        </sub-workflow>
        <ok to="kill"/>
        <error to="kill"/>
    </action>

    <kill name="kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>