View Javadoc
1   package io.apicurio.registry.maven;
2   
3   import io.apicurio.registry.content.ContentHandle;
4   import io.apicurio.registry.content.TypedContent;
5   import io.apicurio.registry.rest.client.RegistryClient;
6   import io.apicurio.registry.rest.client.models.ArtifactReference;
7   import io.apicurio.registry.types.ContentTypes;
8   import org.apache.avro.AvroTypeException;
9   import org.apache.avro.Schema;
10  import org.apache.avro.SchemaParseException;
11  import org.slf4j.Logger;
12  import org.slf4j.LoggerFactory;
13  
14  import java.io.File;
15  import java.io.FileNotFoundException;
16  import java.util.ArrayList;
17  import java.util.Arrays;
18  import java.util.HashMap;
19  import java.util.HashSet;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.Objects;
23  import java.util.Set;
24  import java.util.concurrent.ExecutionException;
25  import java.util.stream.Collectors;
26  
27  public class AvroDirectoryParser extends AbstractDirectoryParser<Schema> {
28  
29      private static final String AVRO_SCHEMA_EXTENSION = ".avsc";
30      private static final Logger log = LoggerFactory.getLogger(AvroDirectoryParser.class);
31  
32      public AvroDirectoryParser(RegistryClient client) {
33          super(client);
34      }
35  
36      @Override
37      public ParsedDirectoryWrapper<Schema> parse(File rootSchemaFile) {
38          return parseDirectory(rootSchemaFile.getParentFile(), rootSchemaFile);
39      }
40  
41      @Override
42      public List<ArtifactReference> handleSchemaReferences(RegisterArtifact rootArtifact, Schema rootSchema,
43              Map<String, TypedContent> fileContents)
44              throws FileNotFoundException, ExecutionException, InterruptedException {
45  
46          Set<ArtifactReference> references = new HashSet<>();
47  
48          // Iterate through all the fields of the schema
49          for (Schema.Field field : rootSchema.getFields()) {
50              List<ArtifactReference> nestedArtifactReferences = new ArrayList<>();
51              if (field.schema().getType() == Schema.Type.RECORD) { // If the field is a sub-schema, recursively
52                                                                    // check for nested sub-schemas and register
53                                                                    // all of them
54  
55                  RegisterArtifact nestedSchema = buildFromRoot(rootArtifact, field.schema().getFullName());
56  
57                  if (field.schema().hasFields()) {
58                      nestedArtifactReferences = handleSchemaReferences(nestedSchema, field.schema(),
59                              fileContents);
60                  }
61  
62                  references.add(registerNestedSchema(field.schema().getFullName(), nestedArtifactReferences,
63                          nestedSchema, fileContents.get(field.schema().getFullName()).getContent().content()));
64              } else if (field.schema().getType() == Schema.Type.ENUM) { // If the nested schema is an enum,
65                                                                         // just register
66  
67                  RegisterArtifact nestedSchema = buildFromRoot(rootArtifact, field.schema().getFullName());
68                  references.add(registerNestedSchema(field.schema().getFullName(), nestedArtifactReferences,
69                          nestedSchema, fileContents.get(field.schema().getFullName()).getContent().content()));
70              } else if (isArrayWithSubschemaElement(field)) { // If the nested schema is an array and the
71                                                               // element is a sub-schema, handle it
72  
73                  Schema elementSchema = field.schema().getElementType();
74  
75                  RegisterArtifact nestedSchema = buildFromRoot(rootArtifact, elementSchema.getFullName());
76  
77                  if (elementSchema.hasFields()) {
78                      nestedArtifactReferences = handleSchemaReferences(nestedSchema, elementSchema,
79                              fileContents);
80                  }
81  
82                  references.add(registerNestedSchema(elementSchema.getFullName(), nestedArtifactReferences,
83                          nestedSchema, fileContents.get(elementSchema.getFullName()).getContent().content()));
84              }
85          }
86          return new ArrayList<>(references);
87      }
88  
89      private ParsedDirectoryWrapper<Schema> parseDirectory(File directory, File rootSchema) {
90          Set<File> typesToAdd = Arrays
91                  .stream(Objects.requireNonNull(
92                          directory.listFiles((dir, name) -> name.endsWith(AVRO_SCHEMA_EXTENSION))))
93                  .filter(file -> !file.getName().equals(rootSchema.getName())).collect(Collectors.toSet());
94  
95          Map<String, Schema> processed = new HashMap<>();
96          Map<String, TypedContent> schemaContents = new HashMap<>();
97  
98          Schema.Parser rootSchemaParser = new Schema.Parser();
99          Schema.Parser partialParser = new Schema.Parser();
100 
101         while (processed.size() != typesToAdd.size()) {
102             boolean fileParsed = false;
103             for (File typeToAdd : typesToAdd) {
104                 if (typeToAdd.getName().equals(rootSchema.getName())) {
105                     continue;
106                 }
107                 try {
108                     final ContentHandle schemaContent = readSchemaContent(typeToAdd);
109                     final String contentType = ContentTypes.APPLICATION_JSON;
110                     final TypedContent typedSchemaContent = TypedContent.create(schemaContent, contentType);
111                     final Schema schema = partialParser.parse(schemaContent.content());
112                     processed.put(schema.getFullName(), schema);
113                     schemaContents.put(schema.getFullName(), typedSchemaContent);
114                     fileParsed = true;
115                 } catch (SchemaParseException | AvroTypeException ex) {
116                     log.warn(
117                             "Error processing Avro schema with name {}. This usually means that the references are not ready yet to parse it",
118                             typeToAdd.getName());
119                 }
120             }
121             partialParser = new Schema.Parser();
122             partialParser.addTypes(processed);
123 
124             // If no schema has been processed during this iteration, that means there is an error in the
125             // configuration, throw exception.
126             if (!fileParsed) {
127                 throw new IllegalStateException(
128                         "Error found in the directory structure. Check that all required files are present.");
129             }
130 
131         }
132 
133         rootSchemaParser.addTypes(processed);
134 
135         return new AvroSchemaWrapper(rootSchemaParser.parse(readSchemaContent(rootSchema).content()),
136                 schemaContents);
137     }
138 
139     private boolean isArrayWithSubschemaElement(Schema.Field field) {
140         return field.schema().getType() == Schema.Type.ARRAY
141                 && field.schema().getElementType().getType() == Schema.Type.RECORD;
142     }
143 
144     public static class AvroSchemaWrapper implements ParsedDirectoryWrapper<Schema> {
145         final Schema schema;
146         final Map<String, TypedContent> fileContents; // Original file contents from the file system.
147 
148         public AvroSchemaWrapper(Schema schema, Map<String, TypedContent> fileContents) {
149             this.schema = schema;
150             this.fileContents = fileContents;
151         }
152 
153         @Override
154         public Schema getSchema() {
155             return schema;
156         }
157 
158         @Override
159         public Map<String, TypedContent> getSchemaContents() {
160             return fileContents;
161         }
162     }
163 }