Event Platform/Stream Processing/Flink Catalog
This page documents the initial prototype of the Event Catalog for Apache Flink.
The Event Catalog in Wikimedia Event Utilities provides an easy way to access Wikimedia's Kafka in an SQL-like way for stream and batch processing. It does schema validation and performs automatic normalization of $schema
and meta
fields.
Getting Started
(Assuming you already have Apache Flink installed) *Package versions in the examples here may change
1. Download (or build) Wikimedia eventutilities-flink
:
wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.10/eventutilities-flink-1.2.10-jar-with-dependencies.jar
2. Download flink-sql-connector-kafka
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar
3. Download kafka-client
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.jar
4. Start Flink's SQL client with these libraries. In this example the above jars are assumed to be in a flink-sql-libs
folder.
./bin/sql-client.sh -l flink-sql-libs
4a. If you're inserting, also start the Flink cluster beforehand.
./bin/start-cluster.sh
5. Create the catalog
CREATE CATALOG wmfeventcatalog WITH (
'type' = 'eventstream',
'event-stream-config-uri' = 'https://meta.wikimedia.org/w/api.php',
'event-schema-base-uris' = 'https://schema.wikimedia.org/repositories/primary/jsonschema;https://schema.wikimedia.org/repositories/secondary/jsonschema',
'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092'
);
6. Use the catalog
USE CATALOG wmfeventcatalog;
7. Check to see if you can query the streams in Kafka
SHOW TABLES;
SELECT * FROM `eventgate-main.test.event`;
Options
Catalog Options
To create the catalog, you need to provide it with some default options.
Option | Required? | Notes |
---|---|---|
type |
Yes | Always eventstream
|
event-stream-config-uri |
Yes | |
event-schema-base-uris |
Yes | Semi-colon-separated string |
https-routes |
No | Untested. Colon-separated key-value pairs, comma-separated string of entries |
properties.bootstrap.servers |
Yes | |
properties.group.id |
No | Defaults to randomized UUID. Used for default consumers of event stream created by the catalog. |
kafka-watermark-column
|
No | Defaults to kafka_timestamp .
|
kafka-watermark-delay
|
No | Defaults to 5 . Delay is in seconds.
|
Table Options
Tables within the catalog require some custom options in addition to the ones needed for the connector and format.
Option | Required? | Notes |
---|---|---|
kafka-topic-prefix |
No | Only used when using the kafka connector (which is the default connector). Only used when you want to insert. |
event-stream-name |
Yes | |
event-schema-version |
Yes |
Examples
Creating a table for inserting events into a topic
CREATE TABLE `test.event.insert` WITH (
'kafka-topic-prefix'='eqiad'
) LIKE `eventgate-main.test.event`;
INSERT INTO `test.event.insert` (`test`, `test_map`) VALUES('test_from_catalog', MAP['test_key', 'test_val']);
Creating a table with additional columns
CREATE TABLE `test.event.insert` (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
) LIKE `eventgate-main.test.event`;
Creating a custom table based off an event schema but with a different connector
CREATE TABLE `test.insert.example` ( _placeholder_ STRING )
WITH (
'connector' = 'filesystem',
'path' = '',
'format' = 'event-json',
'event-schema-version' = '',
'event-stream-name' = ''
);
Limitations
- When you create a table from scratch, you must use a
_placeholder_ STRING
column (see examples) - When inserting, all columns (besides $schema and meta) must be present for it to succeed. (See T328211)
- You cannot directly insert into a catalog-provided table.
- You cannot alter the schema or its version after a table is created.
- To use a table with a schema version other than latest, you must create the entire table from scratch.
Internals
Creating Tables
This section details different ways a table can be created within the catalog.
Please note the distinction between functions in the catalog and SQL DDL/DML. For example, CREATE TABLE
triggers a stack of function calls, one of which is createTable()
.
createTable()
is a catalog function called by CREATE TABLE
. Unfortunately, it is also called by CREATE TABLE LIKE
and CREATE VIEW
. This means that createTable()
has to handle different scenarios where the table passed to it is vastly different. To help distinguish between them, users have to provide a flag in the form of a _placeholder_
column.
CREATE TABLE
is restricted to tables that have a _placeholder_
column. The catalog replaces the placeholder with the table generated from the event schema.
CREATE TABLE LIKE
is restricted to tables that don't have a _placeholder_
column. The catalog does no processing on the table schema, but does process the options. However, since createTable()
does not have access to the original table, it cannot detect which options have changed and therefore doesn't act on them. For example, changing watermark-column
does not generate a new watermark column to replace the old one. This can change in the future.
- Technically we can validate it if we instantiate the original schema and diff the schemas, but that only works if the schema option itself wasn't changed
ALTER TABLE
processes options, but leaves any edge cases to be handled by the user. This behavior might change in the future because in Flink 1.17, ALTER TABLE
was changed to contain a diff of the original and new table, which makes it more powerful than CREATE TABLE LIKE
.
Event tables, although dynamically created by the catalog, does not use createTable()
. Instead, it uses EventTableUtils.getTableOfEventStream()
.
Options
Validation
There are three layers of validation that happens. First is the CatalogFactory
that validates catalog options. Next is the DynamicTableFactory
* to validate table options, and then FormatFactory
** to validate format options.
Because of this cascading validation, some invalid options are not caught when declaring tables and only caught at runtime when querying them. This behavior is taken advantage of within tests, so any DDL statements there should not be considered usable code.
* DynamicTableFactory
refers to both DynamicTableSourceFactory
and DynamicTableSinkFactory
** FormatFactory
refers to both DeserializationFormatFactor
and SerializationFormatFactory
Meta-Definitions
Because of the catalog's behavior in managing options for the table and the format, more meta-definitions are needed to describe certain options.
Pseudo-Table Options
Some options are provided when declaring a table, but used in the catalog. These options are not passed down to the table, but they influence how the table is created.
Some pseudo-table options are only used for CREATE TABLE
and not CREATE TABLE LIKE
.
Input:
CREATE TABLE `example` WITH (
'connector' = 'kafka',
'kafka-watermark-column' = 'example_col',
...
);
Processed Table Options:
{
"connector" = "kafka",
...
}
Pseudo-table options include:
kafka-topic-prefix
|
CREATE TABLE Only |
---|
event-stream-name
|
event-schema-version
|
kafka-watermark-column
|
kafka-watermark-delay
|
Override Options
Options that are set by the catalog can be overridden by providing them as table options. It doesn't necessarily mean that the options are passed down to the table, however, all standard table options and prefixed format options are override options (except for topic
and some other edge cases. See T331542).
Some override options are only used for CREATE TABLE
and not CREATE TABLE LIKE
.
These options must be checked for defaults twice. First if it's in the table, then if it's in the catalog, and finally retrieve the default value defined in its ConfigOption
. This means that these options do not perform their expected behavior within a Configuration
. These defaults are currently handed on a case-by-case basis, however it might be worth creating a dedicated EventConfiguration
to handle it.
Input:
CREATE CATALOG wmfeventcatalog WITH (
'type' = 'eventstream',
'properties.group.id' = 'from_catalog',
...
);
USE CATALOG wmfeventcatalog;
CREATE TABLE `no_override` WITH (
...
);
CREATE TABLE `override` WITH (
'properties.group.id'='from_table',
...
);
Processed Table Options:
{ // no_override
"properties.group.id" = "from_catalog",
...
}
{ //override
"properties.group.id" = "from_table",
...
}
Override options include:
scan.startup.mode
|
properties.bootstrap.servers
|
properties.group.id
|
event-json.timestamp-format.standard
|
CREATE TABLE Only |
---|
event-stream-name
|
event-schema-version
|
kafka-watermark-column
|
kafka-watermark-delay
|
Shared Options
The catalog and our custom format factory is strongly coupled, but we allow any connector and therefore any DynamicTableFactory
. This means that the catalog has to bypass the validation done by an unknown table factory so that the options can reach the format factory.
The way this is handled by Flink is by prefixing options with the identifier of the format factory, so the catalog does that automatically for options applicable to our event-json
format. Therefore, the resulting table will only have the prefixed option when saved.
Input:
CREATE TABLE `example` WITH (
'event-stream-name'='example',
...
);
Processed Table Options:
{
"event-json.event-stream-name" = "example",
...
}
Shared options include:
event-schema-base-uris
|
event-stream-config-uri
|
event-stream-name
|
event-schema-version
|
https-routes
|
Overlap
These meta-options are not mutually exclusive. See the chart and example below.
Input:
CREATE CATALOG wmfeventcatalog WITH (
'type' = 'eventstream',
'properties.group.id' = 'from_catalog',
...
);
USE CATALOG wmfeventcatalog;
CREATE TABLE `example` WITH (
'connector' = 'kafka',
'format' = 'event-json',
'kafka-prefix' = 'eqiad',
'event-stream-name' = 'example',
'event-schema-version' = '1.0.0',
'properties.group.id'='from_table',
...
);
Processed Table Options:
{
"connector" = "kafka",
"format" = "event-json",
"topic" = "eqiad.example",
"properties.group.id" = "from_table",
"event-json.event-stream-name" = "example",
"event-json.event-schema-version" = "1.0.0"
...
}
This means that a table created by the catalog will not produce the same result if fed back into the catalog. The flag needed to differentiate CREATE TABLE
and CREATE TABLE LIKE
is due to this discrepancy.