Event Platform/Stream Processing/Flink Catalog

From Wikitech

This page documents the initial prototype of the Event Catalog for Apache Flink.

The Event Catalog in Wikimedia Event Utilities provides an easy way to access Wikimedia's Kafka in an SQL-like way for stream and batch processing. It does schema validation and performs automatic normalization of $schema and meta fields.

Getting Started

(Assuming you already have Apache Flink installed) *Package versions in the examples here may change

1. Download (or build) Wikimedia eventutilities-flink:

wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.10/eventutilities-flink-1.2.10-jar-with-dependencies.jar

2. Download flink-sql-connector-kafka

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

3. Download kafka-client

wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.jar

4. Start Flink's SQL client with these libraries. In this example the above jars are assumed to be in a flink-sql-libs folder.

./bin/sql-client.sh -l flink-sql-libs

4a. If you're inserting, also start the Flink cluster beforehand.

./bin/start-cluster.sh

5. Create the catalog

CREATE CATALOG wmfeventcatalog WITH (
	'type' = 'eventstream',
	'event-stream-config-uri' = 'https://meta.wikimedia.org/w/api.php',
	'event-schema-base-uris' = 'https://schema.wikimedia.org/repositories/primary/jsonschema;https://schema.wikimedia.org/repositories/secondary/jsonschema',
	'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092'
);

6. Use the catalog

USE CATALOG wmfeventcatalog;

7. Check to see if you can query the streams in Kafka

SHOW TABLES;
SELECT * FROM `eventgate-main.test.event`;

Options

Catalog Options

To create the catalog, you need to provide it with some default options.

Option Required? Notes
type Yes Always eventstream
event-stream-config-uri Yes
event-schema-base-uris Yes Semi-colon-separated string
https-routes No Untested. Colon-separated key-value pairs, comma-separated string of entries
properties.bootstrap.servers Yes
properties.group.id No Defaults to randomized UUID. Used for default consumers of event stream created by the catalog.
kafka-watermark-column No Defaults to kafka_timestamp.
kafka-watermark-delay No Defaults to 5. Delay is in seconds.

Table Options

Tables within the catalog require some custom options in addition to the ones needed for the connector and format.

Option Required? Notes
kafka-topic-prefix No Only used when using the kafka connector (which is the default connector). Only used when you want to insert.
event-stream-name Yes
event-schema-version Yes

Examples

Creating a table for inserting events into a topic

CREATE TABLE `test.event.insert` WITH (
	'kafka-topic-prefix'='eqiad'
) LIKE `eventgate-main.test.event`;

INSERT INTO `test.event.insert` (`test`, `test_map`) VALUES('test_from_catalog', MAP['test_key', 'test_val']);

Creating a table with additional columns

CREATE TABLE `test.event.insert` (
	`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
) LIKE `eventgate-main.test.event`;

Creating a custom table based off an event schema but with a different connector

CREATE TABLE `test.insert.example` ( _placeholder_ STRING ) 
WITH (
    'connector' = 'filesystem',
    'path' = '',
    'format' = 'event-json', 
    'event-schema-version' = '',
    'event-stream-name' = ''
);

Limitations

  • When you create a table from scratch, you must use a _placeholder_ STRING column (see examples)
  • When inserting, all columns (besides $schema and meta) must be present for it to succeed. (See T328211)
  • You cannot directly insert into a catalog-provided table.
  • You cannot alter the schema or its version after a table is created.
  • To use a table with a schema version other than latest, you must create the entire table from scratch.

Internals

Creating Tables

This section details different ways a table can be created within the catalog.

Please note the distinction between functions in the catalog and SQL DDL/DML. For example, CREATE TABLE triggers a stack of function calls, one of which is createTable().

createTable() is a catalog function called by CREATE TABLE. Unfortunately, it is also called by CREATE TABLE LIKE and CREATE VIEW. This means that createTable() has to handle different scenarios where the table passed to it is vastly different. To help distinguish between them, users have to provide a flag in the form of a _placeholder_ column.

CREATE TABLE is restricted to tables that have a _placeholder_ column. The catalog replaces the placeholder with the table generated from the event schema.

CREATE TABLE LIKE is restricted to tables that don't have a _placeholder_ column. The catalog does no processing on the table schema, but does process the options. However, since createTable() does not have access to the original table, it cannot detect which options have changed and therefore doesn't act on them. For example, changing watermark-column does not generate a new watermark column to replace the old one. This can change in the future.

  • Technically we can validate it if we instantiate the original schema and diff the schemas, but that only works if the schema option itself wasn't changed

ALTER TABLE processes options, but leaves any edge cases to be handled by the user. This behavior might change in the future because in Flink 1.17, ALTER TABLE was changed to contain a diff of the original and new table, which makes it more powerful than CREATE TABLE LIKE.

Event tables, although dynamically created by the catalog, does not use createTable(). Instead, it uses EventTableUtils.getTableOfEventStream().

Options

Validation

There are three layers of validation that happens. First is the CatalogFactory that validates catalog options. Next is the DynamicTableFactory* to validate table options, and then FormatFactory** to validate format options.

Because of this cascading validation, some invalid options are not caught when declaring tables and only caught at runtime when querying them. This behavior is taken advantage of within tests, so any DDL statements there should not be considered usable code.

* DynamicTableFactory refers to both DynamicTableSourceFactory and DynamicTableSinkFactory

** FormatFactory refers to both DeserializationFormatFactor and SerializationFormatFactory

Meta-Definitions

Because of the catalog's behavior in managing options for the table and the format, more meta-definitions are needed to describe certain options.

Pseudo-Table Options

Some options are provided when declaring a table, but used in the catalog. These options are not passed down to the table, but they influence how the table is created.

Some pseudo-table options are only used for CREATE TABLE and not CREATE TABLE LIKE.

Input:

CREATE TABLE `example` WITH (
	'connector' = 'kafka',
	'kafka-watermark-column' = 'example_col',
    ...
);

Processed Table Options:

{
 "connector" = "kafka",
 ...
}


Pseudo-table options include:

kafka-topic-prefix
CREATE TABLE Only
event-stream-name
event-schema-version
kafka-watermark-column
kafka-watermark-delay

Override Options

Options that are set by the catalog can be overridden by providing them as table options. It doesn't necessarily mean that the options are passed down to the table, however, all standard table options and prefixed format options are override options (except for topic and some other edge cases. See T331542).

Some override options are only used for CREATE TABLE and not CREATE TABLE LIKE.

These options must be checked for defaults twice. First if it's in the table, then if it's in the catalog, and finally retrieve the default value defined in its ConfigOption. This means that these options do not perform their expected behavior within a Configuration. These defaults are currently handed on a case-by-case basis, however it might be worth creating a dedicated EventConfiguration to handle it.


Input:

CREATE CATALOG wmfeventcatalog WITH (
	'type' = 'eventstream',
	'properties.group.id' = 'from_catalog',
    ...
);

USE CATALOG wmfeventcatalog;

CREATE TABLE `no_override` WITH (
    ...
);

CREATE TABLE `override` WITH (
	'properties.group.id'='from_table',
    ...
);


Processed Table Options:

{ // no_override
 "properties.group.id" = "from_catalog",
 ...
}

{ //override
 "properties.group.id" = "from_table",
 ...
}


Override options include:

scan.startup.mode
properties.bootstrap.servers
properties.group.id
event-json.timestamp-format.standard
CREATE TABLE Only
event-stream-name
event-schema-version
kafka-watermark-column
kafka-watermark-delay

Shared Options

The catalog and our custom format factory is strongly coupled, but we allow any connector and therefore any DynamicTableFactory. This means that the catalog has to bypass the validation done by an unknown table factory so that the options can reach the format factory.

The way this is handled by Flink is by prefixing options with the identifier of the format factory, so the catalog does that automatically for options applicable to our event-json format. Therefore, the resulting table will only have the prefixed option when saved.


Input:

CREATE TABLE `example` WITH (
	'event-stream-name'='example',
    ...
);


Processed Table Options:

{
 "event-json.event-stream-name" = "example",
 ...
}


Shared options include:

event-schema-base-uris
event-stream-config-uri
event-stream-name
event-schema-version
https-routes

Overlap

These meta-options are not mutually exclusive. See the chart and example below.


Input:

CREATE CATALOG wmfeventcatalog WITH (
	'type' = 'eventstream',
	'properties.group.id' = 'from_catalog',
    ...
);

USE CATALOG wmfeventcatalog;

CREATE TABLE `example` WITH (
	'connector' = 'kafka',
    'format' = 'event-json',
    'kafka-prefix' = 'eqiad',
    'event-stream-name' = 'example',
    'event-schema-version' = '1.0.0',
	'properties.group.id'='from_table',
    ...
);


Processed Table Options:

{
 "connector" = "kafka",
 "format" = "event-json",
 "topic" = "eqiad.example",
 "properties.group.id" = "from_table",
 "event-json.event-stream-name" = "example",
 "event-json.event-schema-version" = "1.0.0"
 ...
}


This means that a table created by the catalog will not produce the same result if fed back into the catalog. The flag needed to differentiate CREATE TABLE and CREATE TABLE LIKE is due to this discrepancy.