User:Ottomata/Current2022 Conference Notes

From Wikitech

Below are the notes I took during the talks I went to at the Current 2022 (AKA Kafka Summit) Conference in Austin Texas. I placed them in order of most to least relevant / interesting.

I think a big takeaway from the talks I went to is that streaming is hard, but SQL can make it a lot easier. Several talks were all advocating for leaning into SQL as the base language for doing both batch and stream processing, wherever possible, and recommended to build tooling to make it easier to do this.

Streaming is Still Not the Default: Challenges, Objections, and the Future of Streaming - Eric Sammer, Decodable CEO

“This use case doesn’t require streaming”

  • assumes that streaming is harder than batch

If streaming were as easy as batch, it would be the default.

But it’s not.

What makes it complicated?

  • It’s disaggregated
    • idea: connectors + streaming + processing
    • reality: each is a deep separate system, tons of concepts and details.  immense flexibility = immense complexity.  “heavy-weight”, daunting.
    • Disaggregation was the achilles heal of the Hadoop ecosystem.  Hyperfocus on components, HDFS, Yarn, avro, parquet, etc
  • It’s low level
    • partitions, sizing, keys, ordering, serialization, type converters, etc.
    • behavior and guarantees: messaging, processing, state management, restoration, degradation under load and failures
    • APIs: different layers, human-optimized vs machine, optionality in deployment
      • In Flink DataStream API, you are taking on 30 years of query optimization, predicate push down, joins, etc.
    • complexity of user code: resource requirements, isolation and security, packaging, deployment, lifecycle, cross lang Interop overhead.
  • immature tooling
    • thin veneer over components.  
    • e.g. fault and reprocess.  No tools to help.  People talk about this as a feature of streams, but where are the streams?
      • some panel (door dash, roblocs, etc.) said >40% team time spent reprocessing and backfilling data.
    • tools are different enough to render tooling inadequate.  e.g. Data quality systems run queries on tables, does not exist in streaming world.
  • Non-obvious patterns
    • evolution and schema and logic
    • reprocessing of data, especially with CDC or stateful data.  Something downstream is wrong.
    • designing for retractions
    • data quality measurement, have to build sliding windows to do this, but this is different enough from batch.
    • error handling

The future, what is going to/should happen:

Aggregate components:

  • consolidate disparate services: e.g. zookeeper -> kraft in kafka
  • unify simplify standardize concepts, management, ops
  • encapsulate the ugly parts
  • we should be prescriptive, unless there’s obvious value otherwise

Raise the level of abstraction:

  • table API, SQL. 98% of code could be in SQL that handles optimization.  
  • remove unnecessary config
  • safe, more efficient user code
  • make it a service, most business shouldn’t run stream processing frameworks themselves

Clear consistent behavior and guarantees:

  • further reduce complex behavior
  • aggregation reduces mismatches, remove components, merge things together.
  • graceful degradation - very few cases in e.g. postgres or other rdbms, where they just die with OOM, they usually just slow down.
  • safe, intuitive state handling.  e.g. want to rewind in time, both code and offset to deterministically re-process data.

Maturation of tooling

  • APIs and tools for common tasks, e.g. reprocessing, restorigin w and w/o state, state migrations, etc.
  • be open to existing landscape of tools, but build where there is need.
  • consider both ops and dev

Evolve existing patterns:

  • codify in tooling, like dead letter queues, reprocessing queues, etc.
  • layer tools like APIs.
  • meet users where they are

How we know we’ve arrived

  • engineers and analysts self-serve
  • build / debug jobs without logs, dumpster diving to find out we got a field name wrong is crazy!
  • streaming development time =~ batch

Q from audience: recommendation for an org going through these things.

  • Be opinionated and make choices.  Everything uses streams, etc.
  • Use SQL where you can!


Streaming SQL for Data Engineers: The Next Big Thing? - Yaroslav Tkachenko, Goldsky Principal Software Engineer

Built a data platform totally based on streaming SQL. Fully embracing it.

Using Apache Flink.

Why SQL?

  • wide adoption
  • declarative transformation model
  • planner!
  • common type system

Use case to join two data streams in a non temporal way.  Do join, keep joining results until end of time, emit latest value per key.

in DataStream API:

  • lots of code:
  • operator to connect two streams
  • define and accumulate state
  • implement a way to emit latest value per key

SQL API much simpler, just write a SQL join. Can save a lot of code if you don’t need a lot of code around the execution

Why not Table API?  

Top-N Query.

  • in SQL can do in straightforward fashion with window functions
  • In table API and DataStream this is much longer.

Row pattern recognition in SQL:

CEP in SQL.  Will be even more complex in imperative code.

Planner optimizations and query rewrite.

Get all the usual optimizations, predicate push down, join rewrite, constant inlining, etc.

Common Type System.

When you start using SQL you get access to decades of advancements in database design.

SQL can be the perfect bridge between streaming an database world.

When NOT to use:

  • complex serialization/deserialization logic
    • push this logic to your connector, then you can bring that to the sql system.
  • low level optimizations, especially with state and timers.
    • Not possible to do these with SQL.
  • not always debugging friendly
    • need to build better tools.  maybe a tool that can map a segment of your SQL to your operator in your Flink dag.

Dealing with complexity:

  • UDFs for heavy lifting, 3rd party libs, external calls, enrichments.
  • Templating
    • control structures, dbt styles macros and references.

Requirements:

  • version control
  • code organization
  • testability
  • CI/CD
  • Observability

Approaches using SQL:

  • Structured Statements
    • Try to contain SQL statements in code (tEnv.sqlQuery) in a parameterized function.  TableEnv can be implicit in scale and change to something different in tests.
    • Treat these like code.
    • Only makes sense when Table API is not available.  
    • SQL also has style guys.
    • Otherwise this is really just a typical app with some SQL built in.
    • testability and CI/CD, affected a bit.  Need to think about Mocks for SQL tables, and how to manage state when you deploy.
  • dbt style project
    • data models and tests.
    • works well for heavy analytical use cases, just doing a lot of data modeling and transformation in sql
    • can still write tests
    • probably needs more tooling than you thin (state management, observability)
    • Check dbt apapter from Materialize (?)
    • testability, CI/CD, and observability affected.  Tricky to write SQL tests for SQL.
  • Notebooks
    • great UX, ideal for exploratory analysis and BI
    • should not be using it for production
    • bad version control, code org, testability, CI, and observability.  All bad! :)
  • “Managed Runtime”
    • Managed =~ server less
    • auto-scaling
    • automated deployments, rollback, etc.
    • testing for different layers is decoupled (runtime vs jobs)
    • These are SaaS style platforms, allowing people to just provide SQL to do e.g. transforms.
    • Any managed runtime requires excellent developer experience to succeed.
    • Ideal developer experience:
      • It kinda should look like Notebooks UX.
      • Version control integration.
      • versioning: a version of your SQL job with all the metadata and connector info. Rollback if needed.
      • Previews: show a result to a user.
    • if you do all these things right, everything is good except testability.
    • More complex to build this than the other ways.

General guidelines:

  • long running streaming apps require special attention to state management.
  • try to avoid mutability: every change is a new version.
  • integration testing > unit testing.
  • Embrace the SRE mentality.

When Streaming Needs Batch - Flink's Journey Towards a Unified Engine - Konstantin Knauf, Immerok Founder

Would be nice if you deploy streaming job once and let it go.

But, outages, bugs and bootstrapping all require a big backfill.

Talk about what flink has done to handle this by unifying batch and streaming.

Nature of data: bounded and unbounded

Nature of processing: stream, batch

stream processing can handle both bounded and unbounded.

batch processing can only with bounded data, but in more efficient ways.  It knows the data is bounded.

Catching up: large backlog of data you want to catch up to real time again.  

want to process backlog quickly and robustly with existing resources

When under backpressure, what do you want?

  • scale up or catch ups steadily

Scale up:

  • adaptive scheduler and reactive mode released in Flink 1.13
    • Flink will always scale to whatever parallelism the RM allows.  
    • add new task managers and flink will auto scale up
    • no additional save point needed for rescaling*
      • this might take some time during restore, Flink 1.16 brings some more improvements

Catching up steadily (without scaling up) / Robustness under backpressure

  • Watermark alignment (time) problem:
    • joining low throughput + high throughput topic, joining on time
    • 1000x more records to process in high throughput.  
    • Flink reads from these topics at the same rate.
      • inhales the whole small topic within seconds.
      • small topic chases through time, but high just crawls. (small advances watermark quickly , large slowly)
      • need to buffer all of the small topic in Flink.
    • Flink 1.15 this is fixed.  Reads through topics by time instead of as fast as possible.
  • Unaligned checkpoints & buffer debloating features:
    • Under backpressure, lots of inflight data.
    • Bottleneck in pipeline, causes big backpressure of inflight data
    • checkpoint barriers are queued behind that, meaning long time before checkpoint is completed.
    • Reach max # of timed out checkpoints, and the job will fail.
    • Fixed now:
      • buffer debating reduces amount of inflight data.
      • unaligned checkpoints allows barriers to overtake in flight data


Backfilling:

  • Want to process a fixed amount of recent historical data to correct a bug or outage.
  • want to reuse same code for backfilling, want it to be resource efficient.
  • Realtime job needs to keep running, this is a scheduled batch job with same code.
  • In kafka source, set the offsets you need, run the job, all data is committed.
  • To make resource efficient, added Batch Execution

Why is batch processing faster?

  • data is complete
  • no latency SLAs
  • can block stages on full set of data
  • fault tolerance is easier
  • counting can happen in memory by first keying every record and then counting.
  • No processing time in batch.
  • no out of order data
  • no watermarks needed.

Uber has a case where doing backfilling in stream mode is better.  You can backfill as slowly as you want with the resources you have.

In batch you have a minimal resource requirement to just make the job run.

Bootstrapping:

  • Different data source for bootstrapping than for realtime data
  • Hybrid source automates switching of sources from historical data to real time data within a single streaming job
    • define a sequence of sources, one bounded source, then switch to realtime source
    • when to switch is defined by timestamp(?)
  • Other way to do it is bootstrapping with batch execution.
    • First batch job that reads from S3 and writes to nowhere, building the internal state.
    • Take a final save point from the batch job with the internal state.
    • Then start a stream job that takes the save point as initial state and reads from kafka.
    • THIS IS NOT POSSIBLE in current Flink versions, but it will be in a release soon
    • Still some limitations in prototype.  Need to publish FLIp and discuss with Community.  Hopefully in Flink 1.17

Q: what is buffer debloating?!

> reducing the upstream buffers to put backpressure on the source (e.g. kafka) to reduce the amount of inflight data, without reducing throughput.

> e.g. if one topic is really small one one is large, can reduce the buffers on the large one without affecting job throughput

Q: Flink 1.15 reading through topic by time, how?  Don’t consume if timestamp > X?

Q: in hybrid source, can you use kafka offsets for when to switch to kafka, rather than timestamp?

"Why Wait?" Real-time Ingestion - Chen Qin, Pinterest Engineering Manager, Heng Zhang, Pinterest Software Engineer

Historically, Pinterest data warehouse ingestion and indexing services were implemented on batch ETL and Kafka streaming respectively. As the product side leans more toward real-time and near-real-time data to innovate and compete, teams work together to revamp the ingestion and processing stack in Pinterest. In this talk, we plan to share our near-real-time ingestion system built on top of Apache Kafka, Apache Flink, and Apache Iceberg. We pick ANSI SQL as the common currency to minimize the "lambda architecture" learning curve of teams adopting fresh data near-real-time data.

Flink dev velocity is the pain point.

steep learning curve.

huge effort to build a streaming job from scratch that has similar logic as batch counterpart.

hard to validate the streaming job results match the batch job results due to completely different implementations in different frameworks. (Flink vs spark)

Toward Federated Big Data(base) system - 2022

Have a big catalog for both OLTP AND OLAP systems, but also have fin and spark for querying data in kafka or s3.

Federation approach

  • extensibility - virtual table and view interface, multiple compute engines.
  • connectivity, all storage abs
  • portability - user workload as UDF and SQL is easier to migrate cross systems.  

Table and SQL centric approach:

  • everything is a table.  Kafka topic, kvstore, etc. all registered as Fink Table.
  • Hive metastore used as centralized metadata service for Flink Tables.
  • Hive UDF - complex processing in Hive UDFs, can be used in both Flink and Spark.
  • Iceberg - preferred storage format for row level deletion, schema evolution, versioning and efficient querying.

Patterns:

Pattern 1: streams to stream filtering and transform.  Read from kafka, transform, and write to kafka.  

have a ‘topic splitter’ to organize one big topic into smaller ones.

-> Otto: we can probably do this for page change, start with one topic and split to smaller wiki based one for stream processing.

Consideration: schema evolution.  Need to make sure sure schema and table DDL are all consistent.

Pattern 2: raw log ingestion.

  • read kafka table, deduce, append to iceberg

considerations

  • data format, late arrival, event-time ingestion.

Pattern 3: real time data warehouse

  • kafka table join lookup table
  • deduce, aggregate, upset to iceberg

Considerations:

  • caching of lookup table state and its TTL
  • largest state is 10TB flink state spread across 700 task managers.  Need to tune this state for quick reloads
  • also iceberg tuning.  Rely on iceberg team to handle small files and snapshot cleanups.

Pattern 4: online ingestion / indexing

  • read from kafka topics, transform/enrich, upsert to OLTP

Considerations:

  • OLTP with version history helps with reproducible backfill.  Do temporarily joins when querying extra state, using iceberg historical snapshot to do the backfill with the right data from the right time.
  • Upset bumps version timestamp(?)

Platform support for real time ingestion and processing:

  • support thrift format in all flink source and sinks
  • develop tools to allow platform user to build, test, and production FlinkSQL apps
  • Align with internal efforts on schema viz, lineage, tracking, governance.

Q: schema and DDL consistent, with Hive metastore.

Pinterest has a QueryBook open source to manage syncing to Hive metastore.  These are ‘online’ tables which is an open standard.

- https://github.com/pinterest/querybook

The Metamorphosis of Database Changes - How We Turned CDC Streams into Iceberg Datasets - Tim Steinbach, Shopify Senior Data Developer

Iceberg tip - if you are writing a lot of files quickly, each time you add the file into Iceberg causes a Hive lock. So if doing this a lot, it is better to batch the registration of the files to Iceberg. How often do you batch the Iceberg new file registration, and why not just write larger files less often in that case?

We have two timestamps in iceberg/hive table properties:

  • latest ingested timestamp so far
  • timestamp at which the data is ‘complete’, there will be no late data inserted after

Many Sources, Many Sinks, One Stream: Joining Domains in Your Data Mesh With Canonical Topics - Joel Eaton, Red Hat Data Engineering Manager

Joining domains in your data mesh with canonical streams.

schema management.

data mesh needs a domain-agnostic data access platform

What about operational data (not analytics)?

information about a thing that exists on its own.  accuracy matters.

each domain records about the things, how to join e.g. info about a person that is known in multiple domains?

Domain crossing a domain boundary is done on that domains terms - for every crossing something has to translate.

Option #1: point to point integrations.  Requires individual contracts for every connection.

Whenever you change something in a  domain, every other domain has to adapt.

Even with Event Sourcing, each domain gets updates from other domains and those won’t be ordered.  What is the source of truth?

Option #2: Central Data Store

Bad.

Option #3: canonical streams.

Event sourced.  Canon is an exchange or learning house.  Declarative information about a common interest.

Domains respond to canon event within own domain.  Canon is persistent stream of data.  Domains consume

canon messages at their own pace.

If not in the canon, its just ‘fan  fiction’.  It is independent of the data domain.  Something real outside of the world.

E.g. people.  Verifiable independent of what the data domains know about them.  The canon is a universal changeling

published as events.

It is appropriate to use a Canon when:

  • the thing exists in the world.
  • The thing happened. an event out in the world.
  • There is shared interested - multiple domains share interest in the same data about the thing in the world

Do not use a canon when:

  • it is domain specific, the thing only exists in a single data domain.  Tech support cases maybe? No other domain can make a declarative statement about tech support cases.
  • It’s domain sourced.  Don’t use a canon when source of truth is a single domain.    Should be a data product produced by that domain only.
  • No shared interest.

Key ingredient #1: the correct unique identifier.

Must be universally recognized to identify the thing.  not domain specific.  

This will be the key for canon messages, so think about partitioning.

Any services that knows this thing canon exists should be able to emit a message with an update to the canon topic(???)

(? what about single producer principal?)  So every service has to maintain current state of the entity to do this?

> yes they do.   But what about ordering?  If domain A has an out of date user state, but they produce an update, the update they produce might overwrite more up to date data in kafka.

Key ingredient #2: Canonical schema

domain agnostic.  It is independent of any data domain in your org.  

Avoid including data points that are meaningful within a particular data domain.

Individual data domains still exist, and can share these data points on the mesh.  Its just that the canonical topic has data ONLY common canonical data points about an entity.

Key ingredient #2: Domain Boundary Discipline

No domain specific integrations!

Domains can only communicate with the Canon in the domain language of the Canon.

The canon is a domain unto itself.

Tricky details: Schema Management

Schemas as Code.  (Python classes in RedHat case).  

Every Kafka application depends on these.  Data level dependency inversion.  Depend on the same schema library.

Schema library is built by canon domain team.  

Use any language (doesn’t this mean you have to reimplement the schema library in every language?)

Transformation layer apps belong to a domain, they just have to translate to the canon schema if they want to produce to it.

Whats the point of the transformation and integration layer?  Why not just combine them?  

Canon app has an aggregator app that keeps the table state.  update set email, but without department.  Published message has the old state too.  Gets tricky with how you delete a field.  Canon library is the single producer.

Introducing KRaft: Kafka without ZooKeeper - Colin McCabe, Confluent

A version of RAFT that works for Kafka

some changes (pull instead of push protocol)

Instead of ZooKeeper, have internal topic named __cluster_metadata

  • replicated with KRAFT
  • single partition
  • the leader is the active controller
  • topic has binary records containing metadata.  KRPC format.
  • auto generated from protocol schemas.
  • Can slo be translated into JSOn for humans readability.

Metadata as an ordered log


designated controller broker nodes.  Elect a leader using Kraft.  Leader will have all the previously committed records.

Brokers continuously fetch metadata they need. New brokers and brokers that are behind fetch snapshots.


inter.broker.protocol is now dynamic, can be changed with kafka-features.sh command.

zookeeper-shell.sh -> metadata-shell.sh which will show info about metadata in KRaft mode.

kafka-dump-log.sh will dump the metadata log changes


metadata-shell reads __cluster_metadata log into memory and makes a ‘virtual filesystem’ kind of like how zookeeper-shell works.

continuing to work and develop this tool.


New Metrics:

metadata commit rate, current RAFT state (follower leader, etc.) snapshot lag.  how far behind in applying metadata updates.


Feature gaps in 3.3


dynamic configurations on the controller.

SCRAM support

delegation tokens

JBOD support


Use isolated controller nodes in Confluent Cloud, this is more idiomatic in k8s world, k8s handles packing pods in same hardware.


The Next Generation of the Consumer Rebalance Protocol - David Jacot, Confluent Senior Staff Software Engineer

New rebalance protocol means there are no global locks while reassigning.

Assigment calculated on the broker, and is declarative.

What about client side assigners? (Kafka streams does this.)

Clients can configured to be used declare assignments.  Client is asked to compute the assignment based on the new consumers joined, then assignment is communicated via group coordinator to other consumers.

From Monoliths to Microservices: A Journey with Confluent - Gayathri Veale, Indeed Senior Cloud Infrastructure Engineer

Developer first.  Self service.

IaaS, but also Environment as a Service

CDC pipelines, stateful stream processing, enrichment.  

Rolling deployments

persistent state stores, faster recovery

Service mesh.

separate application logic from business logic.  separate service application from service communication logic.

Service mesh? its just a proxy.  - Kafka and the Service Mesh - Gwen Shapira

  • service discovery
  • load balancing,
  • circuit breaking - stop sending requests to unhealthy instance
  • …

Modern data flows are seams in the mesh!

there’s data hidden in that mesh.

sharing data better.

Patterns of bounded context.  ETL taking data from legacy to cloud.  

Kafka was becoming our source of truth for these datastore, which were becoming views.

A monolith is a legitimate solution.  You can have a centralized databases, and its ok to have many databases.

Unix philosophy.

Make each program do one thing well.

Expect the output of your program to be the input of another.

Smart endpoints, dumb pipes.  

Can separate business logic in local teams, but shared event stores that both teams understand to communicate.

Kafka Streams.  Still bleeding edge, lots of bugs.

Data as a Product, Data Mesh, engineering teams have been hands on in data mesh.

Running a marathon. Not looking for a big bang rewrite, moving incrementally.

when you introduce something that drastically changes things, you can’t just introduce it at scale.

both envs were available, developers can do both before they deploy to production.

Governance about how you share data is hard.  Spotify has tries, we have ‘guilds’.  

Bottom up organization.  People interested in those theories/best practices, convene

and advocate for best practices around data sharing and streams.


Unbundling the Modern Streaming Stack - Dunith Dhanushka, Redpanda Senior Developer Advocate

Not a lot of stuff in this talk, mostly just an overview of streaming stuff.

Interesting thing I did learn though: https://www.asyncapi.com/ - like OpenAPI but for async/event driven?


Event Driven Infrastructure as Software - Lee Briggs, Pulumi Solutions Engineer

Agenda:

  • Define Infrastructure as Code
  • Share a theory for why we’ve all accepted misery
  • Show you a world where everything is a little different.

Declarative infrastructure.

Imperative:

  • control flow, you build the graph, you make the decisions

Declarative:

  • system makes the decisions

DAG, all these systems (terraform, etc.) create a graph of resources that the system figures out how to build.

Started to use config to build the DAG.  Configs don’t need conditionals, etc.

DSLs turned up to generate configuration.  

The user experience:

Authoring experience

how it feels to describe the end state of your infra

  • language support (configuration, pls, etc.)
  • abstractions
  • cloud provider support
  • language server/ IDE

Execution experience:

how it feels to get the infra you’ve provided

  • opinionated, CL drive, server side execution
  • unopinionated

If you want to create infra from events, you end up wrapping your configs in programming languages.

Authoring experiences you can have using an IaaC tool

DSL based

  • single use, domain specific
  • limited scope
  • simple, lower learning curve
  • custom abstractions built specifically for the languag

Config based

  • Very limited scope
  • low learning curve
  • slower feedback loop
  • limited IDE support

feedback loop matters in authoring experience

Programming language based authoring:

  • Expressive
  • Language based abstractions
  • Full IDE support
  • More complex, steeper learning curve, but reusable
  • Type systems, faster feedback loop

Execution experiences:

CLI driven experience

  • control, you provide credentials
  • grouped with CI/CD workflow tool, you choose how it is executed
  • Opinionated, tool generally guides you
  • Fast, you talk to the API, you run in parallel if you like

Server side execution experience:

  • Simplicity, tool is executed in your cloud provider, it handles credentials for you
  • slow, you don’t get to add more resources to make it faster.  You’re competing with everyone else using the platform.

Some scenarios:

  • 1. Create infra without writing bunch of code
    • i want someone who knows about infra to write it for me, i just want to consume it.
    • e.g. i want to use Kafka, but I don’t want to learn how to spin it up.
    • this is not possible with the current crop of tools
  • 2. I want declarative infra management with API integrations
    • I want someone to programmatically kick off my infra provisioning.  (Like maybe a message from Kafka!)

Integrations:

Integrating infra

the scenarios we described to not integrate with event driven arch

What if I told you  you can directly create infra from events?

Infra as Software vs Infra as Code?

Abstractions, tests (mock Api call from cloud provider), control flow, expression, flexibility.