Using Apicurio Registry with Apache Flink

This chapter explains how to use Apicurio Registry as a catalog in Apache Flink to manage schemas for your streaming applications:

Prerequisites
  • A running Apicurio Registry instance

  • Apache Flink 1.17 or later

The Apicurio Registry Flink Catalog provides seamless integration between Apicurio Registry and Apache Flink, enabling you to manage and discover schemas directly from your Flink SQL and Table API applications.

The catalog maps Apicurio Registry concepts to Flink concepts as follows:

Table 1. Mapping between Apicurio Registry and Flink concepts
Apicurio Registry Concept Flink Concept Description

Group

Database

A logical namespace for organizing artifacts. Each group in Apicurio Registry appears as a database in Flink.

Artifact

Table

A schema registered in Apicurio Registry. Each artifact appears as a table in Flink with columns derived from the schema.

Schema Content

Table Schema

The schema content (Avro, JSON Schema) is automatically converted to Flink’s type system.

This section describes the configuration options available for the Apicurio Registry Flink Catalog.

Table 2. Required configuration properties
Property Type Default Description

registry.url

String

None (required)

Base URL of the Apicurio Registry API, for example http://localhost:8080/apis/registry/v3.

Table 3. Optional configuration properties
Property Type Default Description

default-database

String

default

The default database (group) to use when none is specified.

registry.auth.type

String

none

Authentication type: none, basic, or oauth2.

registry.auth.username

String

None

Username for basic authentication.

registry.auth.password

String

None

Password for basic authentication.

registry.auth.token-endpoint

String

None

OAuth2 token endpoint URL.

registry.auth.client-id

String

None

OAuth2 client ID.

registry.auth.client-secret

String

None

OAuth2 client secret.

cache.ttl.ms

Long

300000 (5 minutes)

Cache time-to-live in milliseconds for schema caching.

You can register and use the Apicurio Registry Flink Catalog directly in Flink SQL.

Registering the catalog in Flink SQL
CREATE CATALOG apicurio WITH (
  'type' = 'apicurio',
  'registry.url' = 'http://localhost:8080/apis/registry/v3'
);

USE CATALOG apicurio;
Listing databases (groups)
SHOW DATABASES;
Using a specific database
USE my_group;
SHOW TABLES;
Querying a table
SELECT * FROM my_artifact;
Example with authentication
CREATE CATALOG apicurio_secure WITH (
  'type' = 'apicurio',
  'registry.url' = 'http://localhost:8080/apis/registry/v3',
  'registry.auth.type' = 'oauth2',
  'registry.auth.token-endpoint' = 'http://keycloak:8080/realms/registry/protocol/openid-connect/token',
  'registry.auth.client-id' = 'registry-client',
  'registry.auth.client-secret' = 'my-secret'
);

You can also use the Apicurio Registry Flink Catalog programmatically with the Flink Table API in Java.

Registering the catalog using Java Table API
import io.apicurio.registry.flink.CatalogConfig;
import io.apicurio.registry.flink.ApicurioCatalog;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

// Create Table Environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .inStreamingMode()
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Configure the catalog
CatalogConfig config = CatalogConfig.builder()
    .name("apicurio")
    .url("http://localhost:8080/apis/registry/v3")
    .defaultDatabase("default")
    .build();

// Register the catalog
ApicurioCatalog catalog = new ApicurioCatalog(config);
tableEnv.registerCatalog("apicurio", catalog);
tableEnv.useCatalog("apicurio");
Example with OAuth2 authentication
CatalogConfig config = CatalogConfig.builder()
    .name("apicurio")
    .url("http://localhost:8080/apis/registry/v3")
    .authType("oauth2")
    .tokenEndpoint("http://keycloak:8080/realms/registry/protocol/openid-connect/token")
    .clientId("registry-client")
    .clientSecret("my-secret")
    .build();
Example with basic authentication
CatalogConfig config = CatalogConfig.builder()
    .name("apicurio")
    .url("http://localhost:8080/apis/registry/v3")
    .authType("basic")
    .username("my-user")
    .password("my-password")
    .build();

The Apicurio Registry Flink Catalog supports automatic conversion of the following schema types to Flink’s type system.

Table 4. Supported schema types
Schema Type Artifact Type Description

Apache Avro

AVRO

Full support for Avro schemas. Avro types are mapped to corresponding Flink types.

JSON Schema

JSON

Support for JSON Schema. JSON Schema types are mapped to Flink types.

Table 5. Avro to Flink type mapping
Avro Type Flink Type

null

NULL

boolean

BOOLEAN

int

INT

long

BIGINT

float

FLOAT

double

DOUBLE

bytes

BYTES

string

STRING

array

ARRAY<…​>

map

MAP<STRING, …​>

record

ROW<…​>

Table 6. JSON Schema to Flink type mapping
JSON Schema Type Flink Type

null

NULL

boolean

BOOLEAN

integer

BIGINT

number

DOUBLE

string

STRING

array

ARRAY<…​>

object

ROW<…​>

Additional resources