Using Apicurio Registry with Apache Flink
This chapter explains how to use Apicurio Registry as a catalog in Apache Flink to manage schemas for your streaming applications:
-
A running Apicurio Registry instance
-
Apache Flink 1.17 or later
Overview of the Apicurio Registry Flink Catalog
The Apicurio Registry Flink Catalog provides seamless integration between Apicurio Registry and Apache Flink, enabling you to manage and discover schemas directly from your Flink SQL and Table API applications.
The catalog maps Apicurio Registry concepts to Flink concepts as follows:
| Apicurio Registry Concept | Flink Concept | Description |
|---|---|---|
Group |
Database |
A logical namespace for organizing artifacts. Each group in Apicurio Registry appears as a database in Flink. |
Artifact |
Table |
A schema registered in Apicurio Registry. Each artifact appears as a table in Flink with columns derived from the schema. |
Schema Content |
Table Schema |
The schema content (Avro, JSON Schema) is automatically converted to Flink’s type system. |
Configuring the Apicurio Registry Flink Catalog
This section describes the configuration options available for the Apicurio Registry Flink Catalog.
| Property | Type | Default | Description |
|---|---|---|---|
|
String |
None (required) |
Base URL of the Apicurio Registry API, for example |
| Property | Type | Default | Description |
|---|---|---|---|
|
String |
|
The default database (group) to use when none is specified. |
|
String |
|
Authentication type: |
|
String |
None |
Username for basic authentication. |
|
String |
None |
Password for basic authentication. |
|
String |
None |
OAuth2 token endpoint URL. |
|
String |
None |
OAuth2 client ID. |
|
String |
None |
OAuth2 client secret. |
|
Long |
|
Cache time-to-live in milliseconds for schema caching. |
Using the catalog with Flink SQL
You can register and use the Apicurio Registry Flink Catalog directly in Flink SQL.
CREATE CATALOG apicurio WITH (
'type' = 'apicurio',
'registry.url' = 'http://localhost:8080/apis/registry/v3'
);
USE CATALOG apicurio;
SHOW DATABASES;
USE my_group;
SHOW TABLES;
SELECT * FROM my_artifact;
CREATE CATALOG apicurio_secure WITH (
'type' = 'apicurio',
'registry.url' = 'http://localhost:8080/apis/registry/v3',
'registry.auth.type' = 'oauth2',
'registry.auth.token-endpoint' = 'http://keycloak:8080/realms/registry/protocol/openid-connect/token',
'registry.auth.client-id' = 'registry-client',
'registry.auth.client-secret' = 'my-secret'
);
Using the catalog with the Flink Table API
You can also use the Apicurio Registry Flink Catalog programmatically with the Flink Table API in Java.
import io.apicurio.registry.flink.CatalogConfig;
import io.apicurio.registry.flink.ApicurioCatalog;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
// Create Table Environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Configure the catalog
CatalogConfig config = CatalogConfig.builder()
.name("apicurio")
.url("http://localhost:8080/apis/registry/v3")
.defaultDatabase("default")
.build();
// Register the catalog
ApicurioCatalog catalog = new ApicurioCatalog(config);
tableEnv.registerCatalog("apicurio", catalog);
tableEnv.useCatalog("apicurio");
CatalogConfig config = CatalogConfig.builder()
.name("apicurio")
.url("http://localhost:8080/apis/registry/v3")
.authType("oauth2")
.tokenEndpoint("http://keycloak:8080/realms/registry/protocol/openid-connect/token")
.clientId("registry-client")
.clientSecret("my-secret")
.build();
CatalogConfig config = CatalogConfig.builder()
.name("apicurio")
.url("http://localhost:8080/apis/registry/v3")
.authType("basic")
.username("my-user")
.password("my-password")
.build();
Supported schema types
The Apicurio Registry Flink Catalog supports automatic conversion of the following schema types to Flink’s type system.
| Schema Type | Artifact Type | Description |
|---|---|---|
Apache Avro |
|
Full support for Avro schemas. Avro types are mapped to corresponding Flink types. |
JSON Schema |
|
Support for JSON Schema. JSON Schema types are mapped to Flink types. |
| Avro Type | Flink Type |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| JSON Schema Type | Flink Type |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-
For more details, see the Flink Catalog source code
-
For information on Apicurio Registry groups and artifacts, see Introduction to Apicurio Registry
