Analytics/Cluster/Workflow management tools study
This document describes our decision process (Data Engineering team) when choosing a replacement for our workflow management system.
We Data Engineering identified the need of updating our main scheduler/task manager system: Oozie. Oozie is quite simple and robust, and has worked OK so far. However it is and old software that is not improving any more and has a diminishing community; it has significant limitations like the lack of dynamic execution flows (i.e. looping all files in a directory) that we need for refining dynamic databases; also, the developer needs to write lots of boilerplate XML code for even a simple job, which represents an entry wall for external teams; we Analytics are not satisfied with its UI (Hue) neither.
In parallel to that, we observed that we are maintaining other scheduling (or data presence sensors) systems, like: Refine, reportupdater and systemd timers. Refine implements the dynamic data presence sensors that Oozie does not support. Reportupdater tries to reduce all the boilerplate code from Oozie and its learning curve, so that users that are not super technical can still create jobs easily. And in some cases, for simplicity, we're using systemd-timers for jobs (deletion scripts) that could be better handled by a higher level scheduling system that can i.e. be monitored in a UI.
We want to find a new scheduler / workflow management system, that is able to replace all our current Oozie jobs, and if possible, unify all Refine jobs, all reportupdater jobs, and all systemd-timers that would make sense to be handled by a higher level system. We'd like a more modern software with a growing community and good docs, that can be used by different types of users (with varying technical levels). The new system should ideally match Oozie in robustness and ease of maintenance. With such a scheduler, we would reduce the maintenance of our jobs, improve the control over them and make it easier for collaborators to create their own jobs. Also, a more modern system would give our job pipeline a longer life and more state-of-the-art capabilities (i.e. kubernetes support).
In our search of a better workflow manager software, most candidates were discarded because of hard requirements, without doing a deeper evaluation. Here are the reasons:
- We're looking for a fully open source software. Open source is one of the pillars of the WMF, and we'll only choose a system that is open source and therefore we can contribute back to.
- We want a full-fledged workflow orchestrator that is not only focused on just i.e. automation (RunDeck, StackStorm), or data science/ML (Pachyderm, MLFlow), or data integration/ETL (Beam, Camel). The chosen solution should have the power and flexibility to handle all our use cases.
- It's crucial that the chosen software is mature enough, has an extensive community (is popular enough) and thus has good support and documentation. We'll discard projects that are not actively maintained (Pinball).
- Also, it must be able to support our current set of data pipelines (time schedules, data dependency management, alerts, etc.) and infrastructure (Hadoop + Yarn).
These are the candidates that we evaluated.
Prefect is a "dataflow automation" system build by the company with the same name. The software seems to meet all our needs (even though we didn't do a deep study). Now, in its early days, Prefect was a closed source project, and in 2020 they open-sourced it. However, parts of the system (server and UI) are licensed under the prefect community license, which is open source except for a couple limitations that include using the software for PAAS and SAAS. After discussing the issue with our CTO, the position of the WMF is that we should for now not use software with such license. Thus, we discarded this option without further study.
Argo Workflows is a workflow orchestrator with an interesting approach, that we think would also meet all our needs. Except, it is strictly Kubernetes-based. If we wanted to choose Argo to replace Oozie, we'd have to move our Hadoop cluster to Kubernetes in the first place. We Data Engineering don't discard moving to Kubernetes in the future, but don't want to make moving to Kubernetes a dependency of the replacement of Oozie. We think the new workflow manager should be able to handle both Hadoop + Yarn and Kubernets. This way we decouple both projects. Thus, we also discarded Argo without further study. Note: This reasoning also applies to Chronos, another workflow manager based on Apache Mesos, which we also discarded.
Azkaban is a batch workflow job scheduler built by LinkedIn. It mostly fulfills all our requirements, except it only supports time-based scheduling of jobs (bug). In our data pipelines there are many jobs that depend on the presence of data, so we need jobs that are triggered by i.e. the existence of files in a given HDFS path, by the fact that an HDFS path saw updates in the last hour, or when a Hive partition is added to a table. Such data probes are really important for us, and are one of the reasons we're moving away from Oozie. So, we also discarded this option without further study.
Luigi is a workflow engine created by Spotify. It's a widely used mature project and covers most of the use cases that we need. But it lacks scheduling of the jobs, instead it depends on Cron (or any other scheduler) to do that (docs). One of our main objectives of moving to another workflow manager is to unify different schedulers that we're maintaining, including systemd timers (our Cron). Given this significant disadvantage, we did a quick check of other properties of Luigi, to see if they would compensate it. But we found that Luigi also does not handle distributed execution of tasks (docs). We Data Engineering wouldn't probably use this feature, because we'll run jobs using Yarn (at least in the near future); but other teams might want to have this option. Plus, we found that Luigi's UI is really minimal, to the point that users can not interact with running jobs. Given all this, we decided to discard this option without further study.
Apache Airflow is a workflow management platform created by AirBnB, that became an Apache Software Foundation project in January 2019. It is written in Python, and workflows are created via Python scripts (configuration as code). Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration. DAGs can be run either on a defined schedule (e.g. hourly or daily) or based on external event triggers (e.g. a file appearing in Hive). [summarized from https://en.wikipedia.org/wiki/Apache_Airflow] Airflow seems to meet all our needs with no critical drawbacks.
Airflow 1 P.O.C.
We did a POC to test the capability of Airflow 1 of implementing dynamic workflows, and to get a grasp of how is it to develop and manage jobs with Airflow. The descriptions below and comparison chart reflect what we learned during the POC. You can find the code and read more technical details at https://gerrit.wikimedia.org/r/#/c/analytics/refinery/+/597623/.
In Airflow all jobs are written in Python, a programming language, as opposed of a markup language like Oozie's XML. This gives Airflow lots of power and flexibility. An example of that are dynamic workflows, i.e. one can iterate over the files in a directory at DAG definition time. Also, Python is an easy language to learn, and the majority of data analysts and researchers are already familiar with it. And finally, Python is easier to abstract and compartmentalize, thus making DAG code way shorter than i.e. Oozie's XML.
Support and extensibility
Airflow is probably the workflow management solution out there with most extensive out of the box support for related tools and technology, i.e.: Hadoop, Hive, Presto, Spark, Cassandra, kubernetes, Druid, Sqoop, Docker, Slack, Jenkins, etc. There are many operators and sensors within the standard Airflow install (i.e. HiveToDruid), plus there are many other connectors developed by the community. Also, you can build your own custom plugins. For the POC, we built a Refine plugin, and it was pretty straight forward.
Airflow has 3 main components: the meta-store (JDBC database), where it stores information and state about DAGs and tasks; the scheduler, a process that periodically reads DAGs and schedules tasks; and the web server, which serves the Airflow UI and also controls DAG and task events that come from it. Additionally, you can configure Airflow to use Celery as a task execution engine (4th component). Some Airflow community members say these several-moving-pieces system can be more costly to set up and monitor, especially if you're using Celery.
DAGs: interpretation in 2 steps
Something to take into account is that DAG coding has its gotchas; like understanding the interpretation of the code in 2 steps: DAG definition time and task execution time. A DAG is interpreted every N seconds by Airflow, so that dynamic DAGs are updated accordingly; or changed to DAG code are also updated. But no tasks are run at that moment. When a task within a DAG fulfills its execution conditions and is triggered, then some parts of the DAG spec code are used to execute that task. Note that both DAG and task spec belong to the same code file. So, there's still a learning curve for people that have never written an Airflow DAG.
Documentation and community
Apache Airflow has a decent documentation. It is not super detailed though (2020-06-23), so sometimes you can find yourself looking for several sources to confirm what you want to do, or even browsing Airflow code to be sure what a parameter does. Airflow seems to have the biggest community within open-source workflow management software options. There are lots of questions answered online and case studies in blogs, etc. During the POC we found that some of our questions were only partially answered. Maybe it's still a young Apache project, and it will improve in the near future.
Airflow's UI has a lot of diagrams, icons and buttons, and it can be intimidating at first. But it gives you decent control over your config, DAGs and tasks. You can do things like: switching DAGs on/off, killing/re-running/clearing/marking-success-of tasks or task sequences, look at (driver) logs, getting the state of each task in a live execution diagram, checking the rendered final parameters of the task, etc. The main improvement vs Hue, is probably usability, visual aids and the detail of information that is displayed about DAGs and tasks (the actual things you can do are mostly equivalent). Finally, big con of the Airflow UI: it does not auto-refresh, and it's a bit clunky when manually refreshing (i.e. takes a bit until a running task displays as running in the UI).
Airflow comes with Kerberos support out of the box. But so far it seems it only support one user. So if we wanted to use Airflow with both the analytics user and the hdfs user (or the product-analytics user), that could potentially be a problem. Or if you want to test a new job with your own user to make sure you don't overwrite production files, then that would also be an issue (unless we setup a separate Airflow testing instance). This limitation is significant and should be fixed in case we choose to use Airflow.
PyArrow concurrency issues
During the POC we've had problems with dynamic DAGs using PyArrow to access HDFS. Some tasks would get stuck at running state right after calling an HDFS ls operation using PyArrow library. It seems there's a mutex deadlock when combining parallel tasks using a PyArrow HDFS client. After lots of troubleshooting we decided to move on and write this issue here as a note to be considered. There are still ways to workaround this issue, like trying to use the Celery executor, or using the command line to query HDFS (poor). But definitely something else to solve if we decide to use Airflow.
Oozie vs Airflow
Conclusion after 1st P.O.C.
It seems to me that Airflow will be able to do all we need it to do (Refine, regular schedules, data presence jobs, better maintainability, better UI, unify all our workflows, etc.). And Airflow will probably be maintained for a long time, and have a nice community. But it also seems that until we get there, we'll have to solve a set of non-trivial issues (concurrency, Kerberos, proper setup, and potentially more...), in addition to the actual migration of all jobs and Airflow's learning curve. It is a bit discouraging, and I think we should consider other options before we decide. If other options aren't better, then let's use Airflow. Mforns (talk) 13:40, 23 June 2020 (UTC)
Airflow 2 P.O.C.
Since we did our 1st Airflow P.O.C. Airflow launched its 2.0 version, that includes significant improvements in various fronts, including performance of the scheduler and revamp of the UI, plus the PyArrow concurrency issues described above are resolved in the new version. In addition to that, it seems that the 2.0 release has been happily accepted by the community, who is more active than ever. Thus, we decided to give Airflow another try and implement another P.O.C. with the 2.0 version. This time, we focused on migrating and productionizing 1 non-trivial job of our current Oozie grid to a new Airflow instance setup and managed by puppet. The descriptions below reflect what we learned during the 2nd POC. You can find the code and read more technical details at https://gerrit.wikimedia.org/r/c/analytics/refinery/+/702668, https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/707517 and https://wikitech.wikimedia.org/wiki/Analytics/Systems/Airflow.
We aimed to confirm that several key features of Airflow would work for our use case without issues. We successfully checked the following:
- Operator that runs a SparkSQL query in Yarn.
- Operator that runs a SparkSubmit job in Yarn.
- Data probe (sensor) that checks for the existence of a Hive partition.
- Access to HDFS via pyarrow library (without Airflow 1 bug).
- Sending a custom email with an embedded HDFS file.
- SLA email alerts.
The customization capabilities of Airflow are impressive. You can create any missing piece that you need, like operators, hooks, macros, and even custom DAGs. This can be used, for instance, to allow for very easy creation of DAGs for datasets that have the same use case, like reportupdater would. In the POC code you can see the example of how easy is to create an anomaly detection DAG. On the other hand, the fact that Airflow is so customizable, makes it a bit more complex to work with out of the box. It might represent a steep learning curve for non-technical users. We can nevertheless always provide custom operator and DAGs for less technical users when needed.
Where does the code live?
It seems that we should take advantage of the customization capabilities of Airflow, and might benefit a lot from sharing code for custom operators/DAGs/macros/etc. But, how can we share this code among different teams repositories and Airflow instances? Also, if we end up putting the shared code in the refinery repository, how should we structure it? Airflow suggests some code structure for DAGs and plugins (macros), but not for other imports. We should add this subject to the design document and discuss.
Data probes vs. DAG dependencies
In the POC the data dependencies have been implemented as data probes (HivePartitionSensor). This means that the job waits for a Hive partition to be present, and then runs. This can present difficulties when i.e. the dataset has non-timely partitions like 'datacenter'. How does the HivePartitionSensor check for data presence? Should it wait for data being present in all datacenters? In just one? How does it know how many datacenters are there? Another option is triggering jobs via DAG dependencies, i.e. start the job for DAG B when DAG A has successfully finished. Both approaches will have pros and cons. We should discuss this in the design document. In addition to that, in the future we might want to integrate Airflow with Apache Atalas, or another data governance solution, thus maybe helping with data dependencies.
In this POC we used Airflow's local executor, which executes the tasks in the Airflow machine. Note that most of the computation happens actually in the Hadoop cluster, because the heaviest operators are just calling spark-sql and spark-submit on yarn. Even so, some of the code, like preparing command line commands, sending emails or checking HDFS for existing files, happens in the Airflow instance. This code is light, but once we have hundreds of DAGs running on Airflow this can add up and we might see performance issues with the local executor. Two potential solutions for the future are: 1) Scaling up the schedulers; in Airflow 2 they say the scheduler is not any more a bottleneck, it can be easily scaled. 2) Moving to the parallel executor; we'd need to spin up a small Celery cluster and execute tasks there.
The flexibility of Airflow also shows up when defining default parameters for DAGs like HDFS canonical paths, or spark configs (the usual suspects in Oozie boilerplate). It is very easy to factor out these properties to make the DAG and operator code DRY. It remains to be seen how to organize this code, so that we minimize boilerplate and create easy-to-use DAG models, while still allowing flexibility and customization by teams. Another related question is: do we want to configure connections and other settings in the Airflow UI, or via a puppet-managed file? Probably the latter, but let's discuss in the design doc.
One thing that we did not tackle in the POC are delayed updates. Nevertheless the POC (and other sources) brought up the need to discuss this. More and more, we encounter the situation where a dataset is ingested into our system in a non-linear way. Once an hourly partition is created in HDFS, it can be updated with missing data after 1 or more hours. This presents a question: should be wait until data is complete to start our refining, transforming and loading jobs? Or should we process the data as soon as it comes the first time, and then update it by re-running parts of the data processing pipeline? And if so, how do we re-run such pipelines in a coherent, organized and robust way. Let's discuss in the design doc!
- The scheduler breaks sometimes when deleting or restarting DAGs. All scheduling is frozen until the scheduler is restarted. Not sure what happens with running tasks. We need to troubleshoot this and find a solution.
- With the 1-instance-per-team approach, we need to add new system users and admin groups for each airflow instance.
- Airflow's SparkSqlOperator does not support deploy-mode=cluster, meaning the Spark driver code is executed in the Airflow machine. This can create performance issues with very big jobs. One potential solution is to wrap big spark-sql jobs into spark-submit jobs with deploy-mode=cluster.
- Some Airflow operators require a thrift server, i.e. spark-sql. If we want to use them we should set those services up, or implement our own custom functionality (like done in the POC).
- SLAs seem to have the same problem as in Oozie: cascading emails. Although all alerts of the same job are grouped in the same email! We need to check that and find a better way.
- Waiting for the DAG to be interpreted by airflow is a bit annoying at development time.
Conslusions after 2nd P.O.C.
Regarding functionality, flexibility and power of the tool, my impression is great. I believe we can fulfill all our use cases with Airflow (maybe with the exception of streaming?). There are still some issues that we need to overcome, or work we need to do: prevent/handle restarts, improve SLAs, quite some custom code (especially for non-tech users). In spite of that, I didn't see any deal breakers, and the existing issues do not seem to add up to an excessive amount. I believe we can and should move on with Airflow 2 as the replacement for our current scheduling tools. Mforns (talk) 21:25, 29 July 2021 (UTC)