1 package io.apicurio.registry.maven;
2
3 import io.apicurio.registry.content.ContentHandle;
4 import io.apicurio.registry.content.TypedContent;
5 import io.apicurio.registry.rest.client.RegistryClient;
6 import io.apicurio.registry.rest.client.models.ArtifactReference;
7 import io.apicurio.registry.types.ContentTypes;
8 import org.apache.avro.AvroTypeException;
9 import org.apache.avro.Schema;
10 import org.apache.avro.SchemaParseException;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13
14 import java.io.File;
15 import java.io.FileNotFoundException;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Objects;
23 import java.util.Set;
24 import java.util.concurrent.ExecutionException;
25 import java.util.stream.Collectors;
26
27 public class AvroDirectoryParser extends AbstractDirectoryParser<Schema> {
28
29 private static final String AVRO_SCHEMA_EXTENSION = ".avsc";
30 private static final Logger log = LoggerFactory.getLogger(AvroDirectoryParser.class);
31
32 public AvroDirectoryParser(RegistryClient client) {
33 super(client);
34 }
35
36 @Override
37 public ParsedDirectoryWrapper<Schema> parse(File rootSchemaFile) {
38 return parseDirectory(rootSchemaFile.getParentFile(), rootSchemaFile);
39 }
40
41 @Override
42 public List<ArtifactReference> handleSchemaReferences(RegisterArtifact rootArtifact, Schema rootSchema,
43 Map<String, TypedContent> fileContents)
44 throws FileNotFoundException, ExecutionException, InterruptedException {
45
46 Set<ArtifactReference> references = new HashSet<>();
47
48
49 for (Schema.Field field : rootSchema.getFields()) {
50 List<ArtifactReference> nestedArtifactReferences = new ArrayList<>();
51 if (field.schema().getType() == Schema.Type.RECORD) {
52
53
54
55 RegisterArtifact nestedSchema = buildFromRoot(rootArtifact, field.schema().getFullName());
56
57 if (field.schema().hasFields()) {
58 nestedArtifactReferences = handleSchemaReferences(nestedSchema, field.schema(),
59 fileContents);
60 }
61
62 references.add(registerNestedSchema(field.schema().getFullName(), nestedArtifactReferences,
63 nestedSchema, fileContents.get(field.schema().getFullName()).getContent().content()));
64 } else if (field.schema().getType() == Schema.Type.ENUM) {
65
66
67 RegisterArtifact nestedSchema = buildFromRoot(rootArtifact, field.schema().getFullName());
68 references.add(registerNestedSchema(field.schema().getFullName(), nestedArtifactReferences,
69 nestedSchema, fileContents.get(field.schema().getFullName()).getContent().content()));
70 } else if (isArrayWithSubschemaElement(field)) {
71
72
73 Schema elementSchema = field.schema().getElementType();
74
75 RegisterArtifact nestedSchema = buildFromRoot(rootArtifact, elementSchema.getFullName());
76
77 if (elementSchema.hasFields()) {
78 nestedArtifactReferences = handleSchemaReferences(nestedSchema, elementSchema,
79 fileContents);
80 }
81
82 references.add(registerNestedSchema(elementSchema.getFullName(), nestedArtifactReferences,
83 nestedSchema, fileContents.get(elementSchema.getFullName()).getContent().content()));
84 }
85 }
86 return new ArrayList<>(references);
87 }
88
89 private ParsedDirectoryWrapper<Schema> parseDirectory(File directory, File rootSchema) {
90 Set<File> typesToAdd = Arrays
91 .stream(Objects.requireNonNull(
92 directory.listFiles((dir, name) -> name.endsWith(AVRO_SCHEMA_EXTENSION))))
93 .filter(file -> !file.getName().equals(rootSchema.getName())).collect(Collectors.toSet());
94
95 Map<String, Schema> processed = new HashMap<>();
96 Map<String, TypedContent> schemaContents = new HashMap<>();
97
98 Schema.Parser rootSchemaParser = new Schema.Parser();
99 Schema.Parser partialParser = new Schema.Parser();
100
101 while (processed.size() != typesToAdd.size()) {
102 boolean fileParsed = false;
103 for (File typeToAdd : typesToAdd) {
104 if (typeToAdd.getName().equals(rootSchema.getName())) {
105 continue;
106 }
107 try {
108 final ContentHandle schemaContent = readSchemaContent(typeToAdd);
109 final String contentType = ContentTypes.APPLICATION_JSON;
110 final TypedContent typedSchemaContent = TypedContent.create(schemaContent, contentType);
111 final Schema schema = partialParser.parse(schemaContent.content());
112 processed.put(schema.getFullName(), schema);
113 schemaContents.put(schema.getFullName(), typedSchemaContent);
114 fileParsed = true;
115 } catch (SchemaParseException | AvroTypeException ex) {
116 log.warn(
117 "Error processing Avro schema with name {}. This usually means that the references are not ready yet to parse it",
118 typeToAdd.getName());
119 }
120 }
121 partialParser = new Schema.Parser();
122 partialParser.addTypes(processed);
123
124
125
126 if (!fileParsed) {
127 throw new IllegalStateException(
128 "Error found in the directory structure. Check that all required files are present.");
129 }
130
131 }
132
133 rootSchemaParser.addTypes(processed);
134
135 return new AvroSchemaWrapper(rootSchemaParser.parse(readSchemaContent(rootSchema).content()),
136 schemaContents);
137 }
138
139 private boolean isArrayWithSubschemaElement(Schema.Field field) {
140 return field.schema().getType() == Schema.Type.ARRAY
141 && field.schema().getElementType().getType() == Schema.Type.RECORD;
142 }
143
144 public static class AvroSchemaWrapper implements ParsedDirectoryWrapper<Schema> {
145 final Schema schema;
146 final Map<String, TypedContent> fileContents;
147
148 public AvroSchemaWrapper(Schema schema, Map<String, TypedContent> fileContents) {
149 this.schema = schema;
150 this.fileContents = fileContents;
151 }
152
153 @Override
154 public Schema getSchema() {
155 return schema;
156 }
157
158 @Override
159 public Map<String, TypedContent> getSchemaContents() {
160 return fileContents;
161 }
162 }
163 }