Configuring Kafka serializers/deserializers in Java clients
This chapter provides detailed information on how to configure Kafka serializers/deserializers (SerDes) in your producer and consumer Java client applications:
Apicurio Registry serializer/deserializer configuration in client applications
You can configure specific client serializer/deserializer (SerDes) services and schema lookup strategies directly in a client application using the example constants shown in this section. Alternatively, you can configure the corresponding Apicurio Registry application properties in a file or an instance.
The following sections show examples of commonly used SerDes constants and configuration options.
Configuration for SerDes services
public class SerdeConfig {
public static final String REGISTRY_URL = "apicurio.registry.url"; (1)
public static final String ID_HANDLER = "apicurio.registry.id-handler"; (2)
public static final String ENABLE_CONFLUENT_ID_HANDLER = "apicurio.registry.as-confluent"; (3)
-
The required URL of Apicurio Registry.
-
Extends ID handling to support other ID formats and make them compatible with Apicurio Registry SerDes services. For example, changing the default ID format from
Long
toInteger
supports the Confluent ID format. -
Simplifies the handling of Confluent IDs. If set to
true
, anInteger
is used for the global ID lookup. The setting should not be used with theID_HANDLER
option.
-
For more details on configuration options, see Apicurio Registry serializer/deserializer configuration properties
Configuration for SerDes lookup strategies
public class SerdeConfig {
public static final String ARTIFACT_RESOLVER_STRATEGY = "apicurio.registry.artifact-resolver-strategy"; (1)
public static final String SCHEMA_RESOLVER = "apicurio.registry.schema-resolver"; (2)
...
1 | Java class that implements the artifact resolver strategy and maps between the Kafka SerDes and artifact ID. Defaults to the topic ID strategy. This is only used by the serializer class. |
2 | Java class that implements the schema resolver. Defaults to DefaultSchemaResolver . This is used by the serializer and deserializer classes. |
-
For more details on look up strategies, see Validating schemas using Kafka client serializers/deserializers in Java clients
-
For more details on configuration options, see Apicurio Registry serializer/deserializer configuration properties
Configuration for Kafka converters
public class SerdeBasedConverter<S, T> extends SchemaResolverConfigurer<S, T> implements Converter, Closeable {
public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; (1)
public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; (2)
-
The required serializer to use with the Apicurio Registry Kafka converter.
-
The required deserializer to use with the Apicurio Registry Kafka converter.
-
For more details, see the SerdeBasedConverter Java class
Configuration for different schema types
For details on how to configure SerDes for different schema technologies, see the following:
Apicurio Registry serializer/deserializer configuration properties
This section provides reference information on Java configuration properties for Apicurio Registry Kafka serializers/deserializers (SerDes).
SchemaResolver interface
Apicurio Registry SerDes are based on the SchemaResolver
interface, which abstracts access to the registry and applies the same lookup logic for the SerDes classes of all supported formats.
Constant | Property | Description | Type | Default |
---|---|---|---|---|
|
|
Used by serializers and deserializers. Fully-qualified Java classname that implements |
String |
|
The DefaultSchemaResolver is recommended and provides useful features for most use cases.
For some advanced use cases, you might use a custom implementation of SchemaResolver .
|
DefaultSchemaResolver class
You can use the DefaultSchemaResolver
to configure features such as:
-
Access to the registry API
-
How to look up artifacts in the registry
-
How to write and read artifact information to and from Kafka
-
Fall-back options for deserializers
Configuration for registry API access options
The DefaultSchemaResolver
provides the following properties to configure access to the core registry API:
Constant | Property | Description | Type | Default |
---|---|---|---|---|
|
|
Used by serializers and deserializers. URL to access the registry API. |
|
None |
|
|
Used by serializers and deserializers. URL of the authentication service. Required when accessing a secure registry using the OAuth client credentials flow. |
|
None |
|
|
Used by serializers and deserializers. URL of the token endpoint. Required when accessing a secure registry and |
|
None |
|
|
Used by serializers and deserializers. Realm to access the authentication service. Required when accessing a secure registry using the OAuth client credentials flow. |
|
None |
|
|
Used by serializers and deserializers. Client ID to access the authentication service. Required when accessing a secure registry using the OAuth client credentials flow. |
|
None |
|
|
Used by serializers and deserializers. Client secret to access the authentication service. Required when accessing a secure registry using the OAuth client credentials flow. |
|
None |
|
|
Used by serializers and deserializers. Username to access the registry. Required when accessing a secure registry using HTTP basic authentication. |
|
None |
|
|
Used by serializers and deserializers. Password to access the registry. Required when accessing a secure registry using HTTP basic authentication. |
|
None |
Configuration for registry lookup options
The DefaultSchemaResolver
uses the following properties to configure how to look up artifacts in Apicurio Registry.
Constant | Property | Description | Type | Default |
---|---|---|---|---|
|
|
Used by serializers only. Fully-qualified Java classname that implements |
|
|
|
|
Used by serializers only. Sets the |
|
None |
|
|
Used by serializers only. Sets the |
|
None |
|
|
Used by serializers only. Sets the artifact version used for querying or creating an artifact. Overrides the version returned by the |
|
None |
|
|
Used by serializers only. Specifies whether the serializer tries to find the latest artifact in the registry for the corresponding group ID and artifact ID. |
|
|
|
|
Used by serializers only. Specifies whether the serializer tries to create an artifact in the registry. The JSON Schema serializer does not support this feature. |
|
|
|
|
Used by serializers only. Configures the behavior of the client when there is a conflict creating an artifact because the artifact already exists. Available values are |
|
|
|
|
Used by serializers and deserializers. Specifies how long to cache artifacts before auto-eviction (milliseconds). If set to zero, artifacts are fetched every time. |
|
|
|
|
Used by serializers and deserializers. If a schema can not be be retrieved from the Registry, it may retry a number of times. This configuration option controls the delay between the retry attempts (milliseconds). |
|
|
|
|
Used by serializers and deserializers. If a schema can not be be retrieved from the Registry, it may retry a number of times. This configuration option controls the number of retry attempts. |
|
|
|
|
Used by serializers and deserializers. Configures to use the specified |
|
|
Configuration to read/write registry artifacts in Kafka
The DefaultSchemaResolver
uses the following properties to configure how artifact information is written to and read from Kafka.
Constant | Property | Description | Type | Default |
---|---|---|---|---|
|
|
Used by serializers and deserializers. Configures to read/write the artifact identifier to Kafka message headers instead of in the message payload. |
|
|
|
|
Used by serializers and deserializers. Fully-qualified Java classname that implements |
|
|
|
|
Used by serializers and deserializers. Fully-qualified Java classname of a class that implements |
|
|
|
|
Used by serializers and deserializers. Shortcut for enabling the legacy Confluent-compatible implementation of |
|
|
Configuration for deserializer fall-back options
The DefaultSchemaResolver
uses the following property to configure a fall-back provider for all deserializers.
Constant | Property | Description | Type | Default |
---|---|---|---|---|
|
|
Only used by deserializers. Sets a custom implementation of |
|
|
The DefaultFallbackArtifactProvider
uses the following properties to configure deserializer fall-back options:
Constant | Property | Description | Type | Default |
---|---|---|---|---|
|
|
Used by deserializers only. Sets the |
|
None |
|
|
Used by deserializers only. Sets the |
|
None |
|
|
Used by deserializers only. Sets the version used as fallback for resolving the artifact used for deserialization. |
|
None |
-
For more details, see the SerdeConfig Java class.
-
You can configure application properties as Java system properties or include them in the Quarkus
application.properties
file. For more details, see the Quarkus documentation.
How to configure different client serializer/deserializer types
When using schemas in your Kafka client applications, you must choose which specific schema type to use, depending on your use case. Apicurio Registry provides SerDe Java classes for Apache Avro, JSON Schema, and Google Protobuf. The following sections explain how to configure Kafka applications to use each type.
You can also use Kafka to implement custom serializer and deserializer classes, and leverage Apicurio Registry functionality using the Apicurio Registry REST Java client.
Kafka application configuration for serializers/deserializers
Using the SerDe classes provided by Apicurio Registry in your Kafka application involves setting the correct configuration properties. The following simple Avro examples show how to configure a serializer in a Kafka producer application and how to configure a deserializer in a Kafka consumer application.
// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
// Use Apicurio Registry-provided Kafka serializer for Avro
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
// Configure the Apicurio Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// Register the schema artifact if not found in the registry.
props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
// Create the Kafka producer
Producer<Object, Object> producer = new KafkaProducer<>(props);
return producer;
}
// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use Apicurio Registry-provided Kafka deserializer for Avro
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
// Configure the Apicurio Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// No other configuration needed because the schema globalId the deserializer uses is sent
// in the payload. The deserializer extracts the globalId and uses it to look up the schema
// from the registry.
// Create the Kafka consumer
KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
return consumer;
}
-
For an example application, see the Simple Avro example
Configure Avro SerDes with Apicurio Registry
This topic explains how to use the Kafka client serializer and deserializer (SerDes) classes for Apache Avro.
Apicurio Registry provides the following Kafka client SerDes classes for Avro:
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer
-
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
You can configure the Avro serializer class with the following:
-
Apicurio Registry URL
-
Artifact resolver strategy
-
ID location
-
ID encoding
-
Avro datum provider
-
Avro encoding
The serializer passes the unique ID of the schema as part of the Kafka message so that consumers can use the correct schema for deserialization. The ID can be in the message payload or in the message headers. The default location is the message payload. To send the ID in the message headers, set the following configuration property:
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
The property name is apicurio.registry.headers.enabled
.
You can customize how the schema ID is encoded when passing it in the Kafka message body. Set the apicurio.registry.id-handler
configuration property to a class that implements the io.apicurio.registry.serde.IdHandler
interface. Apicurio Registry provides the following implementations:
-
io.apicurio.registry.serde.DefaultIdHandler
: Stores the ID as an 8-byte long -
io.apicurio.registry.serde.Legacy4ByteIdHandler
: Stores the ID as an 4-byte integer
Apicurio Registry represents the schema ID as a long, but for legacy reasons, or for compatibility with other registries or SerDe classes, you might want to use 4 bytes when sending the ID.
Avro provides different datum writers and readers to write and read data. Apicurio Registry supports three different types:
-
Generic
-
Specific
-
Reflect
The Apicurio Registry AvroDatumProvider
is the abstraction of which type is used, where DefaultAvroDatumProvider
is used by default.
You can set the following configuration options:
-
apicurio.registry.avro-datum-provider
: Specifies a fully-qualified Java class name of theAvroDatumProvider
implementation, for exampleio.apicurio.registry.serde.avro.ReflectAvroDatumProvider
-
apicurio.registry.use-specific-avro-reader
: Set totrue
to use a specific type when usingDefaultAvroDatumProvider
When using Avro to serialize data, you can use the Avro binary encoding format to ensure the data is encoded in as efficient a format as possible. Avro also supports encoding the data as JSON, which makes it easier to inspect the payload of each message, for example, for logging or debugging.
You can set the Avro encoding by configuring the apicurio.registry.avro.encoding
property with a value of JSON
or BINARY
. The default is BINARY
.
You must configure the Avro deserializer class to match the following configuration settings of the serializer:
-
Apicurio Registry URL
-
ID encoding
-
Avro datum provider
-
Avro encoding
See the serializer section for these configuration options. The property names and values are the same.
The following options are not required when configuring the deserializer:
|
The deserializer class can determine the values for these options from the message. The strategy is not required because the serializer is responsible for sending the ID as part of the message.
The ID location is determined by checking for the magic byte at the start of the message payload. If that byte is found, the ID is read from the message payload using the configured handler. If the magic byte is not found, the ID is read from the message headers.
When working with Avro messages and a schema with nested records, a new artifact is registered per nested record. For example, the following TradeKey
schema includes a nested Exchange
schema:
{
"namespace": "com.kubetrade.schema.trade",
"type": "record",
"name": "TradeKey",
"fields": [
{
"name": "exchange",
"type": "com.kubetrade.schema.common.Exchange"
},
{
"name": "key",
"type": "string"
}
]
}
{
"namespace": "com.kubetrade.schema.common",
"type": "enum",
"name": "Exchange",
"symbols" : ["GEMINI"]
}
When using these schemas with Avro SerDes, two artifacts are created in Apicurio Registry, one for the TradeKey
schema and one for the Exchange
schema. Whenever a message using the TradeKey
schema is serialized or deserialized, both schemas are retrieved, allowing you to split your definitions into different files.
-
For more details on Avro configuration, see the AvroKafkaSerdeConfig Java class
-
For Java example applications, see:
Configure JSON Schema SerDes with Apicurio Registry
This topic explains how to use the Kafka client serializer and deserializer (SerDes) classes for JSON Schema.
Apicurio Registry provides the following Kafka client SerDes classes for JSON Schema:
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
Unlike Apache Avro, JSON Schema is not a serialization technology, but is instead a validation technology. As a result, configuration options for JSON Schema are quite different. For example, there is no encoding option, because data is always encoded as JSON.
You can configure the JSON Schema serializer class as follows:
-
Apicurio Registry URL
-
Artifact resolver strategy
-
Schema validation
The only non-standard configuration property is JSON Schema validation, which is enabled by default. You can disable this by setting
apicurio.registry.serde.validation-enabled
to "false"
. For example:
props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
You can configure the JSON Schema deserializer class as follows:
-
Apicurio Registry URL
-
Schema validation
-
Class for deserializing data
You must provide the location of Apicurio Registry so that the schema can be loaded. The other configuration is optional.
Deserializer validation only works if the serializer passes the global ID in the Kafka message, which will only happen when validation is enabled in the serializer. |
The JSON Schema SerDes cannot discover the schema from the message payload, so the schema artifact must be registered beforehand, and this also applies artifact references.
Depending on the content of the schema, if the $ref
value is a URL, the SerDes try to resolve the referenced schema using that URL, and then validation works as usual, validating the data against the main schema, and validating the nested value against the nested schema. Support for referencing artifacts in Apicurio Registry has also been implemented.
For example, the following citizen.json
schema references the city.json
schema:
{
"$id": "https://example.com/citizen.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Citizen",
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The citizen's first name."
},
"lastName": {
"type": "string",
"description": "The citizen's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
},
"city": {
"$ref": "city.json"
}
}
}
{
"$id": "https://example.com/city.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "City",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The city's name."
},
"zipCode": {
"type": "integer",
"description": "The zip code.",
"minimum": 0
}
}
}
In this example, a given citizen has a city. In Apicurio Registry, a citizen artifact with a reference to the city artifact is created using the name city.json
. In the SerDes, when the citizen schema is fetched, the city schema is also fetched because it is referenced from the citizen schema. When serializing/deserializing data, the reference name is used to resolve the nested schema, allowing validation against the citizen schema and the nested city schema.
-
For more details, see the JsonSchemaKafkaDeserializerConfig Java class
-
For Java example applications, see:
Configure Protobuf SerDes with Apicurio Registry
This topic explains how to use the Kafka client serializer and deserializer (SerDes) classes for Google Protobuf.
Apicurio Registry provides the following Kafka client SerDes classes for Protobuf:
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer
You can configure the Protobuf serializer class as follows:
-
Apicurio Registry URL
-
Artifact resolver strategy
-
ID location
-
ID encoding
-
Schema validation
For details on these configuration options, see the following sections:
You must configure the Protobuf deserializer class to match the following configuration settings in the serializer:
-
Apicurio Registry URL
-
ID encoding
The configuration property names and values are the same as for the serializer.
The following options are not required when configuring the deserializer:
|
The deserializer class can determine the values for these options from the message. The strategy is not required because the serializer is responsible for sending the ID as part of the message.
The ID location is determined by checking for the magic byte at the start of the message payload. If that byte is found, the ID is read from the message payload using the configured handler. If the magic byte is not found, the ID is read from the message headers.
The Protobuf deserializer does not deserialize to your exact Protobuf Message implementation,
but rather to a DynamicMessage instance. There is no appropriate API to do otherwise.
|
When a complex Protobuf message with an import
statement is used, the imported Protobuf messages are stored in Apicurio Registry as separate artifacts. Then when Apicurio Registry gets the main schema to check a Protobuf message, the referenced schemes are also retrieved so the full message schema can be checked and serialized.
For example, the following table_info.proto
schema file includes the imported mode.proto
schema file:
syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;
import "sample/mode.proto";
message TableInfo {
int32 winIndex = 1;
Mode mode = 2;
int32 min = 3;
int32 max = 4;
string id = 5;
string dataAdapter = 6;
string schema = 7;
string selector = 8;
string subscription_id = 9;
}
syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;
enum Mode {
MODE_UNKNOWN = 0;
RAW = 1;
MERGE = 2;
DISTINCT = 3;
COMMAND = 4;
}
In this example, two Protobuf artifacts are stored in Apicurio Registry, one for TableInfo
and one for Mode
. However, because Mode
is part of TableInfo
, whenever TableInfo
is fetched to check a message in the SerDes, Mode
is also returned as an artifact referenced by TableInfo
.
-
For Java example applications, see: