Apicurio Studio

Validating schemas using Kafka client serializers/deserializers in Java

Apicurio Registry provides client serializers/deserializers (SerDes) for Kafka producer and consumer applications written in Java. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications use deserializers to validate that messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.

This chapter explains how to use Kafka client SerDe in your producer and consumer client applications:

Prerequisites

Kafka client applications and Apicurio Registry

Apicurio Registry decouples schema management from client application configuration. You can enable a Java client application to use a schema from Apicurio Registry by specifying its URL in your client code.

You can store the schemas in the registry to serialize and deserialize messages, which are referenced from your client applications to ensure that the messages that they send and receive are compatible with those schemas. Kafka client applications can push or pull their schemas from Apicurio Registry at runtime.

Schemas can evolve, so you can define rules in Apicurio Registry, for example, to ensure that schema changes are valid and do not break previous versions used by applications. Apicurio Registry checks for compatibility by comparing a modified schema with previous schema versions.

Apicurio Registry schema technologies

Apicurio Registry provides schema registry support for schema technologies such as:

  • Avro

  • Protobuf

  • JSON Schema

These schema technologies can be used by client applications through the Kafka client serializer/deserializer (SerDe) services provided by Apicurio Registry. The maturity and usage of the SerDe classes provided by Apicurio Registry might vary. The sections that follow provide more details about each schema type.

Producer schema configuration

A producer client application uses a serializer to put the messages that it sends to a specific broker topic into the correct data format.

To enable a producer to use Apicurio Registry for serialization:

After registering your schema, when you start Kafka and Apicurio Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer. Alternatively, depending on configuration, the producer can automatically register the schema on first use.

If a schema already exists, you can create a new version using the registry REST API based on compatibility rules defined in Apicurio Registry. Versions are used for compatibility checking as a schema evolves. A group ID, artifact ID, and version represents a unique tuple that identifies a schema.

Consumer schema configuration

A consumer client application uses a deserializer to get the messages that it consumes from a specific broker topic into the correct data format.

To enable a consumer to use Apicurio Registry for deserialization:

Retrieve schemas using a global ID

By default, the schema is retrieved from Apicurio Registry by the deserializer using a global ID, which is specified in the message being consumed. The schema global ID can be located in the message headers or in the message payload, depending on the configuration of the producer application.

When locating the global ID in the message payload, the format of the data begins with a magic byte, used as a signal to consumers, followed by the global ID, and the message data as normal. For example:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

Then when you start Kafka and Apicurio Registry, you can access the schema to format messages received from the Kafka broker topic.

Retrieve schemas using a content ID

Alternatively, you can configure to retrieve schemas from Apicurio Registry based on the content ID, which is the unique ID of the artifact content. While the global ID is the unique ID of an artifact version.

The content ID does not uniquely identify a version, but uniquely identifies the version content only. If multiple versions share the exact same content, they have a different global ID but the same content ID. The Confluent schema registry uses content ID by default.

Strategies to look up a schema in Apicurio Registry

The Kafka client serializer uses lookup strategies to determine the artifact ID and global ID under which the message schema is registered in Apicurio Registry. For a given topic and message, you can use different implementations of the ArtifactResolverStrategy Java interface to return a reference to an artifact in the registry.

The classes for each strategy are in the io.apicurio.registry.serde.strategy package. Specific strategy classes for Avro SerDe are in the io.apicurio.registry.serde.avro.strategy package. The default strategy is the TopicIdStrategy, which looks for Apicurio Registry artifacts with the same name as the Kafka topic receiving messages.

Example
public ArtifactReference artifactReference(String topic, boolean isKey, T schema) {
        return ArtifactReference.builder()
                .groupId(null)
                .artifactId(String.format("%s-%s", topic, isKey ? "key" : "value"))
                .build();
  • The topic parameter is the name of the Kafka topic receiving the message.

  • The isKey parameter is true when the message key is serialized, and false when the message value is serialized.

  • The schema parameter is the schema of the message serialized or deserialized.

  • The ArtifactReference returned contains the artifact ID under which the schema is registered.

Which lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.

ArtifaceResolverStrategy interface

The artifact resolver strategy provides a way to map the Kafka topic and message information to an artifact in Apicurio Registry. The common convention for the mapping is to combine the Kafka topic name with the key or value, depending on whether the serializer is used for the Kafka message key or value.

However, you can use alternative conventions for the mapping by using a strategy provided by Apicurio Registry, or by creating a custom Java class that implements io.apicurio.registry.serde.strategy.ArtifactResolverStrategy.

Strategies to return an artifact reference

Apicurio Registry provides the following strategies to return an artifact reference based on an implementation of ArtifaceResolverStrategy:

RecordIdStrategy

Avro-specific strategy that uses the full name of the schema.

TopicRecordIdStrategy

Avro-specific strategy that uses the topic name and the full name of the schema.

TopicIdStrategy

Default strategy that uses the topic name and key or value suffix.

SimpleTopicIdStrategy

Simple strategy that only uses the topic name.

DefaultSchemaResolver interface

The default schema resolver locates and identifies the specific version of the schema registered under the artifact reference provided by the artifact resolver strategy. Every version of every artifact has a single globally unique identifier that can be used to retrieve the content of that artifact. This global ID is included in every Kafka message so that a deserializer can properly fetch the schema from Apicurio Registry.

The default schema resolver can look up an existing artifact version, or it can register one if not found, depending on which strategy is used. You can also provide your own strategy by creating a custom Java class that implements io.apicurio.registry.serde.SchemaResolver. However, it is recommended to use the DefaultSchemaResolver and specify configuration properties instead.

Strategies to return a global ID

When using the DefaultSchemaResolver, you can configure its behavior using application properties. The following table shows some commonly used examples:

Table 1. Apicurio Registry global ID configuration options
Property Type Description Default

apicurio.registry.find-latest

boolean

Specify whether the serializer tries to find the latest artifact in the registry for the corresponding group ID and artifact ID.

false

apicurio.registry.use-id

String

Instructs the serializer to write the specified ID to Kafka and instructs the deserializer to use this ID to find the schema.

-

apicurio.registry.auto-register

boolean

Specify whether the serializer tries to create an artifact in the registry. The JSON Schema serializer does not support this.

false

apicurio.registry.check-period-ms

String

Specify how long to cache the global ID in milliseconds. If not configured, the global ID is fetched every time.

-

You can configure application properties as Java system properties or include them in the Quarkus application.properties file. For more details, see the Quarkus documentation.
Additional resources

Apicurio Registry serializer/deserializer configuration

You can configure specific client serializer/deserializer (SerDe) services and schema lookup strategies directly in a client application using the example constants shown in this section. Alternatively, you can configure the corresponding Apicurio Registry application properties in a file or an instance.

The following sections show examples of SerDe constants and configuration options.

Configuration for SerDe services

public class SerdeConfig {

   public static final String REGISTRY_URL = "apicurio.registry.url"; (1)
   public static final String ID_HANDLER = "apicurio.registry.id-handler"; (2)
   public static final String ENABLE_CONFLUENT_ID_HANDLER = "apicurio.registry.as-confluent"; (3)
1 The required URL of Apicurio Registry.
2 Extends ID handling to support other ID formats and make them compatible with Apicurio Registry SerDe services. For example, changing the default ID format from Long to Integer supports the Confluent ID format.
3 Simplifies the handling of Confluent IDs. If set to true, an Integer is used for the global ID lookup. The setting should not be used with the ID_HANDLER option.
Additional resources

Configuration for SerDe lookup strategies

public class SerdeConfig {

   public static final String ARTIFACT_RESOLVER_STRATEGY = "apicurio.registry.artifact-resolver-strategy";
...
Additional resources

Configuration for Kafka converters

public class SerdeBasedConverter<S, T> extends SchemaResolverConfigurer<S, T> implements Converter, Closeable {

   public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; (1)
   public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; (2)
1 The required serializer to use with the Apicurio Registry Kafka converter.
2 The required deserializer to use with the Apicurio Registry Kafka converter.
Additional resources

Configuration for different schema types

For details on how to configure SerDe for different schema technologies, see the following:

Using different client serializer/deserializer types

When using schemas in your Kafka client applications, you must choose which specific schema type to use, depending on your use case. Apicurio Registry provides SerDe Java classes for Apache Avro, JSON Schema, and Google Protobuf. The following sections explain how to configure Kafka applications to use each type.

You can also use Kafka to implement custom serializer and deserializer classes, and leverage Apicurio Registry functionality using the Apicurio Registry REST Java client.

Kafka application configuration for serializers/deserializers

Using the SerDe classes provided by Apicurio Registry in your Kafka application involves setting the correct configuration properties. The following simple Avro examples show how to configure a serializer in a Kafka producer application and how to configure a deserializer in a Kafka consumer application.

Example serializer configuration in a Kafka producer
// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Apicurio Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}
Example deserializer configuration in a Kafka consumer
// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Apicurio Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}
Additional resources

Configure Avro SerDe with Apicurio Registry

Apicurio Registry provides the following Kafka client serializer and deserializer classes for Apache Avro:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer

  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

Configure the Avro serializer

You can configure the Avro serializer class with the following:

  • Apicurio Registry URL

  • Artifact resolver strategy

  • ID location

  • ID encoding

  • Avro datum provider

  • Avro encoding

ID location

The serializer passes the unique ID of the schema as part of the Kafka message so that consumers can use the correct schema for deserialization. The ID can be in the message payload or in the message headers. The default location is the message payload. To send the ID in the message headers, set the following configuration property:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")

The property name is apicurio.registry.headers.enabled.

ID encoding

You can customize how the schema ID is encoded when passing it in the Kafka message body. Set the apicurio.registry.id-handler configuration property to a class that implements the io.apicurio.registry.serde.IdHandler interface. Apicurio Registry provides the following implementations:

  • io.apicurio.registry.serde.DefaultIdHandler: Stores the ID as an 8-byte long

  • io.apicurio.registry.serde.Legacy4ByteIdHandler: Stores the ID as an 4-byte integer

Apicurio Registry represents the schema ID as a long, but for legacy reasons, or for compatibility with other registries or SerDe classes, you might want to use 4 bytes when sending the ID.

Avro datum provider

Avro provides different datum writers and readers to write and read data. Apicurio Registry supports three different types:

  • Generic

  • Specific

  • Reflect

The Apicurio Registry AvroDatumProvider is the abstraction of which type is used, where DefaultAvroDatumProvider is used by default.

You can set the following configuration options:

  • apicurio.registry.avro-datum-provider: Specifies a fully-qualified Java class name of the AvroDatumProvider implementation, for example io.apicurio.registry.serde.avro.ReflectAvroDatumProvider

  • apicurio.registry.use-specific-avro-reader: Set to true to use a specific type when using DefaultAvroDatumProvider

Avro encoding

When using Avro to serialize data, you can use the Avro binary encoding format to ensure the data is encoded in as efficient a format as possible. Avro also supports encoding the data as JSON, which makes it easier to inspect the payload of each message, for example, for logging or debugging.

You can set the Avro encoding by configuring the apicurio.registry.avro.encoding property with a value of JSON or BINARY. The default is BINARY.

Configure the Avro deserializer

You must configure the Avro deserializer class to match the following configuration settings of the serializer:

  • Apicurio Registry URL

  • ID encoding

  • Avro datum provider

  • Avro encoding

See the serializer section for these configuration options. The property names and values are the same.

The following options are not required when configuring the deserializer:

  • Artifact resolver strategy

  • ID location

The deserializer class can determine the values for these options from the message. The strategy is not required because the serializer is responsible for sending the ID as part of the message.

The ID location is determined by checking for the magic byte at the start of the message payload. If that byte is found, the ID is read from the message payload using the configured handler. If the magic byte is not found, the ID is read from the message headers.

Additional resources

Configure JSON Schema SerDe with Apicurio Registry

Apicurio Registry provides the following Kafka client serializer and deserializer classes for JSON Schema:

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

Unlike Apache Avro, JSON Schema is not a serialization technology, but is instead a validation technology. As a result, configuration options for JSON Schema are quite different. For example, there is no encoding option, because data is always encoded as JSON.

Configure the JSON Schema serializer

You can configure the JSON Schema serializer class as follows:

  • Apicurio Registry URL

  • Artifact resolver strategy

  • Schema validation

The only non-standard configuration property is JSON Schema validation, which is enabled by default. You can disable this by setting apicurio.registry.serde.validation-enabled to "false". For example:

props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
Configure the JSON Schema deserializer

You can configure the JSON Schema deserializer class as follows:

  • Apicurio Registry URL

  • Schema validation

  • Class for deserializing data

You must provide the location of Apicurio Registry so that the schema can be loaded. The other configuration is optional.

Deserializer validation only works if the serializer passes the global ID in the Kafka message, which will only happen when validation is enabled in the serializer.
Additional resources

Configure Protobuf SerDe with Apicurio Registry

Apicurio Registry provides the following Kafka client serializer and deserializer classes for Google Protobuf:

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer

Configure the Protobuf serializer

You can configure the Protobuf serializer class as follows:

  • Apicurio Registry URL

  • Artifact resolver strategy

  • ID location

  • ID encoding

  • Schema validation

For details on these configuration options, see the following sections:

Configure the Protobuf deserializer

You must configure the Protobuf deserializer class to match the following configuration settings in the serializer:

  • Apicurio Registry URL

  • ID encoding

The configuration property names and values are the same as for the serializer.

The following options are not required when configuring the deserializer:

  • Artifact resolver strategy

  • ID location

The deserializer class can determine the values for these options from the message. The strategy is not required because the serializer is responsible for sending the ID as part of the message.

The ID location is determined by checking for the magic byte at the start of the message payload. If that byte is found, the ID is read from the message payload using the configured handler. If the magic byte is not found, the ID is read from the message headers.

The Protobuf deserializer does not deserialize to your exact Protobuf Message implementation, but rather to a DynamicMessage instance. There is no appropriate API to do otherwise.
Additional resources

Registering a schema in Apicurio Registry

After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Apicurio Registry.

You can add the schema using the following approaches:

  • Apicurio Registry web console

  • curl command using the Apicurio Registry REST API

  • Maven plug-in supplied with Apicurio Registry

  • Schema configuration added to your client code

Client applications cannot use Apicurio Registry until you have registered your schemas.

Apicurio Registry web console

When Apicurio Registry is installed, you can connect to the web console from the ui endpoint:

http://MY-REGISTRY-URL/ui

From the console, you can add, view and configure schemas. You can also create the rules that prevent invalid content being added to the registry.

Curl command example

 curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
   -H "X-Registry-ArtifactId: share-price" \ (1)
   --data '{
     "type":"record",
     "name":"price",
     "namespace":"com.example",
     "fields":[{"name":"symbol","type":"string"},
     {"name":"price","type":"string"}]}'
   https://my-cluster-my-registry-my-project.example.com/apis/registry/v2/groups/my-group/artifacts -s (2)
1 Simple Avro schema artifact.
2 OpenShift route name that exposes Apicurio Registry.

Maven plug-in example

<plugin>
  <groupId>io.apicurio</groupId>
  <artifactId>apicurio-registry-maven-plugin</artifactId>
  <version>${apicurio.version}</version>
  <executions>
      <execution>
        <phase>generate-sources</phase>
        <goals>
            <goal>register</goal>  (1)
        </goals>
        <configuration>
            <registryUrl>http://REGISTRY-URL/apis/registry/v2</registryUrl> (2)
            <artifacts>
                <artifact>
                    <groupId>TestGroup</groupId> (3)
                    <artifactId>FullNameRecord</artifactId>
                    <file>${project.basedir}/src/main/resources/schemas/record.avsc</file>
                    <ifExists>FAIL</ifExists>
                </artifact>
                <artifact>
                    <groupId>TestGroup</groupId>
                    <artifactId>ExampleAPI</artifactId> (4)
                    <type>GRAPHQL</type>
                    <file>${project.basedir}/src/main/resources/apis/example.graphql</file>
                    <ifExists>RETURN_OR_UPDATE</ifExists>
                    <canonicalize>true</canonicalize>
                </artifact>
            </artifacts>
        </configuration>
    </execution>
  </executions>
 </plugin>
1 Specify register as the execution goal to upload the schema artifact to the registry.
2 Specify the Apicurio Registry URL with the ../apis/registry/v2 endpoint.
3 Specify the Apicurio Registry artifact group ID.
4 You can upload multiple artifacts using the specified group ID, artifact ID, and location.

Configuration using a producer client example

String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
    "https://my-cluster-service-registry-myproject.example.com/apis/registry/v2"); (1)
try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); (2)
    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            ArtifactType.AVRO,
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
1 You can register properties against more than one URL node.
2 Check to see if the schema already exists based on the artifact ID.

Using a schema from a Kafka consumer client

This procedure describes how to configure a Kafka consumer client written in Java to use a schema from Apicurio Registry.

Prerequisites
  • Apicurio Registry is installed

  • The schema is registered with Apicurio Registry

Procedure
  1. Configure the client with the URL of Apicurio Registry. For example:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    Properties props = new Properties();
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
  2. Configure the client with the Apicurio Registry deserializer. For example:

    // Configure Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // Configure deserializer settings
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); (1)
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); (2)
    1 The deserializer provided by Apicurio Registry.
    2 The deserialization is in Apache Avro JSON format.

Using a schema from a Kafka producer client

This procedure describes how to configure a Kafka producer client written in Java to use a schema from Apicurio Registry.

Prerequisites
  • Apicurio Registry is installed

  • The schema is registered with Apicurio Registry

Procedure
  1. Configure the client with the URL of Apicurio Registry. For example:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    Properties props = new Properties();
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
  2. Configure the client with the serializer, and the strategy to look up the schema in Apicurio Registry. For example:

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); (1)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); (2)
    props.put(SerdeConfig.FIND_LATEST_ARTIFACT, FindLatestIdStrategy.class.getName()); (3)
    1 The serializer for the message key provided by Apicurio Registry.
    2 The serializer for the message value provided by Apicurio Registry.
    3 The lookup strategy to find the global ID for the schema.

Using a schema from a Kafka Streams application

This procedure describes how to configure a Kafka Streams client written in Java to use an Apache Avro schema from Apicurio Registry.

Prerequisites
  • Apicurio Registry is installed

  • The schema is registered with Apicurio Registry

Procedure
  1. Create and configure a Java client with the Apicurio Registry URL:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    
    RegistryService client = RegistryClient.cached(registryUrl);
  2. Configure the serializer and deserializer:

    Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>(); (1)
    
    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>(); (2)
    
    Serde<LogInput> logSerde = Serdes.serdeFrom(
        serializer,
        deserializer
    );
    
    Map<String, Object> config = new HashMap<>();
    config.put(SerdeConfig.REGISTRY_URL, registryUrl);
    config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
    logSerde.configure(config, false); (3)
    1 The Avro serializer provided by Apicurio Registry.
    2 The Avro deserializer provided by Apicurio Registry.
    3 Configures the Apicurio Registry URL and the Avro reader for deserialization in Avro format.
  3. Create the Kafka Streams client:

    KStream<String, LogInput> input = builder.stream(
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );