User:Ottomata/Event Platform with Flink

From Wikitech

Below are some responses in a a December 2022 email thread between Andrew Otto and Gabriele Modena and TNG consultants we hired to help us with Flink. I'm putting these on wiki for easier reference when folks ask about what the Event Platform Value Stream is trying to achieve with Flink. We should probably move this into our official docs.

From Andrew Otto:

Luke, Thomas, Gabriele and myself are working on our "Event Platform" value stream. Years ago, we built platform components for managing JSON event streams in Kafka with JSONschema.  We've got a pretty good handle on producing in general, and consuming into Hive is automated. But any stream processing and ingestion into other datastores are currently done as one-offs by whatever team needs it.  In practice, this means WMF rarely builds features using event-driven concepts.  Most of our feature engineers work in LAMP-land, and are reluctant or don't have time to switch contexts into JVM / distributed computing land / data engineering land.  The Event Platform Value Stream effort aims to make building event driven systems easier.

Our Event Platform is missing two main components:

  • Stream processing support (framework, tooling, deployment, etc.)
  • Automated stream ingestion (AKA connectors).

We are also missing some important data streams from our main application, MediaWiki. Our team is doing work to implement better state carrying streams from MediaWiki, and we are using that project to drive the larger platform work.

Okay, let's see! I'll respond to your Qs inline here.  Get ready for a link barrage. :)

> - It would be interesting to map out potential use cases and user groups a bit more.

Here's a list of use cases. A bit of a hodge-podge, but there are some common needs around simple enrichment and backfilling.

> - What are the requirements for what you call "complex stream processors" (WDQS updater/diff calculation)?

https://wikitech.wikimedia.org/wiki/Wikidata_Query_Service/Streaming_Updater

- What kind of state do these stream processing jobs need to have? Stateful stream processing is one of the things where Flink shines compared to other systems and barebones Kafka producer/consumer apps.

Indeed!  We don't have a good handle on the kinds of use cases that need more state than Kafka offsets :).  There are many, but we think they will be a little more difficult to 'platform'-ize.  For these use cases, folks will have to dive into Flink.  Right now that is mostly the Search team (who run WDQS). Here's an example of a Flink based search index update pipeline they are working on now.

Both.  Harnesses (I like this word, I'm going to start using it) for the simple and common use cases, full Flink otherwise.

> Sample case (Enrichment/T307959)

> no built-in configurable solution for HTTP enrichment in the Flink Table API, but there are libraries like the getindata one. The DataStream API is much more flexible in this regard.

Indeed.  We'd have to use either UDFs or LookupTables for HTTP enrichment in the Table API.  Since enrichment is really a one-to-one event mapping (input event -> process -> output event), the DataStream API may be easier to abstract than the Table API or SQL (more on this later).

> Are the enrichment cases you have in mind all/mostly covered by the AsyncTableFunction/treating the API as a lookup source table?

Probably.  We've had some cool ideas of being able to automate creation of LookupTables using OpenAPI(Swagger) specs...if only our APIs were good at having these.  

> User-defined table functions do not support async functions (FLINK-18379).

I sometimes wonder if we even need or want async for remote API lookups.  If we care about ordering (we don't always), IIUC, async lookups will mean that the stream needs to be re-ordered by Flink before writing to Kafka anyway.  If it has to re-order, perhaps the benefits from an async function are lessened by the overhead needed to reorder?

Our experience so far with the Table API / SQL is that it is in practice pretty awkward to use for enrichment.  Here was my experiment with doing enrichment with SQL.  

1. Unless we implement a Catalog (we are spiking to see if we can), users have to translate from JSONSchemas to SQL DDLs every time they want to write a SQL query.

2. Row construction in SQL is very awkward.  Working with map types is even worse!

3. People are going to have to write UDFs anyway.  If they do, they might as well just write Python or Java/Scala.

But, we are not sure about PyFlink yet either. We think it is easier to make harnesses in Python, mostly because the typing isn't so strict.  Yes, this will have different failure modes, and possibly worse performance at big scale, but we hope that by offering harnesses in Python we'll scare away fewer people.

> Multi-DC - [...] for Flink this can be considered on a case-by-case basis. What are the SLOs for the use cases you are planning? Indeed.  Generally, if the output of the streaming pipeline is used for a production feature that needs up to date data, then it will need to be multi DC.  If the data is only for analytics purpose (often this doesn't need streaming at all), or the production feature just cares about uptime, but not data freshness, then single DC is fine.  We have a 'multi DC' k8s cluster we'll target in the fresh-production-feature case, and a single datacenter 'data science' k8s cluster, as well as YARN, we can use for the others.

> - What are the failover scenarios that would benefit from Flink clusters spanning both DCs?

> - Traffic between Flink operators living in different DCs depends a lot on the job's structure and may be very hard to estimate in advance.

Indeed, we are not really considering spanning Flink between different DCs.  It's an idea I'm willing to explore, but I doubt it will be actually feasible.  

What we really want is easy of failover for cluster operators.  The SRE team that runs the multi-DC production k8s cluster would like to be able to automatically failover services at will.  For stateless request based services, this is easy, but for Flinky things, a bit harder.  There are a lot of ways we could do multi DC here, and it probably depends on the needs of the pipeline.  Even so, we'd like to standardize this a bit, so if we can choose our preferred multi DC setup and just do that, it would make maintenance life easier.

If a Kafka stretch cluster turns out to be a good thing, then my current preferred multi-DC layout is active/passive, sharing state.  Flink in either DC will be using the same Kafka cluster and offsets, so to failover we just have to stop in one DC and restart in the other.  TBD though!

Agree.  I tried to add this info in the list of use cases under "Needs backfill?" :)

From Gabriele Modena:

> User-defined table functions do not support async functions (FLINK-18379). I sometimes wonder if we even need or want async for remote API lookups.  If we care about ordering (we don't always), IIUC, async lookups will mean that the stream needs to be re-ordered by Flink before writing to Kafka anyway.  If it has to re-order, perhaps the benefits from an async function are lessened by the overhead needed to reorder?

There are teams currently working with Benthos (https://www.benthos.dev/). They are building synchronous enrichment applications with no apparent performance concern.

Maybe also worth considering that most of our kafka topics have relatively low throughput. IIRC our main topic, page change, should reach 200 messages/second in the worst case scenario. Webrequests will be higher, but that's outside of our current use cases IIRC.

> - PyFlink: I would evaluate whether offering support for PyFlink as a platform team is worth the effort, or whether the simpler use cases can be covered by Flink SQL only, and the more complex ones by the Java/Scala API. Would remove one additional case to support, and the Java API gets more love and also gets new features quicker.

When talking about use cases we should keep end users (developers) in mind. As Andrew suggested, most of our feature devs come from a LAMP background. There's a non-trivial amount of Python expertise in teams doing "data work". However, these teams tend to have no familiarity with distributed systems. That's what makes Python (and the eventutilities framework) attractive, assuming we can abstract Flink implementation details away. I do find SQL a path worth pursuing, but not mutually exclusive with python/jvm. One area we have not looked into yet, but that we wanted to spike, is using UDFs for code interop.

Our experience so far with the Table API / SQL is that it is in practice pretty awkward to use for enrichment.  Here was my experiment with doing enrichment with SQL.  

1. Unless we implement a Catalog (we are spiking to see if we can), users have to translate from JSONSchemas to SQL DDLs every time they want to write a SQL query.

2. Row construction in SQL is very awkward.  Working with map types is even worse!

3. People are going to have to write UDFs anyway.  If they do, they might as well just write Python or Java/Scala.

But, we are not sure about PyFlink yet either. We think it is easier to make harnesses in Python, mostly because the typing isn't so strict.  Yes, this will have different failure modes, and possibly worse performance at big scale, but we hope that by offering harnesses in Python we'll scare away fewer people.

To add some context; in the past quarter we did quite a bit of spike work to evaluate tradeoffs between SQL vs. Python vs. JVM. There's write ups of these spikes at:

Earlier this year we built a PoC enrichment pipeline (to drive collection requirements) in Scala, and have been experimenting with a version of the same in Python: