Skip to content

Commit

Permalink
-Support for Sink Codecs
Browse files Browse the repository at this point in the history
Signed-off-by: umairofficial <umairhusain1010@gmail.com>
  • Loading branch information
umairofficial committed Jul 6, 2023
1 parent 9b9e433 commit 939562b
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
4 changes: 4 additions & 0 deletions data-prepper-plugins/avro-codecs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pipeline:
2) `schema_file_location`: Path to the schema json file through which the user can provide schema.
3) `exclude_keys`: Those keys of the events that the user wants to exclude while converting them to avro records.
4) `schema_registry_url`: Another way of providing the schema through schema registry.
5) `region`: AWS Region of the S3 bucket in which `schema.json` file is kept.
6) `bucket_name`: Name of the S3 bucket in which `schema.json` file is kept.
7) `file_key`: File key of `schema.json` file kept in S3 bucket.

### Note:

Expand All @@ -60,6 +63,7 @@ pipeline:
"type": { "type": "array",
"items": "string"
}`
3) If the user wants to input schema through a `schema.json` file kept in S3, the user must provide corresponding credentials i.e. region, bucket name and file key of the same.



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private Object schemaMapper(final Schema.Field field, final Object rawValue) {
}

private boolean checkS3SchemaValidity() throws IOException {
if (config.getBucketName() != null && config.getFile_key() != null && config.getRegion() != null) {
if (config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null) {
return true;
} else {
LOG.error("Invalid S3 credentials, can't reach the schema file.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class AvroOutputCodecConfig {
private String region;
@JsonProperty("bucket_name")
private String bucketName;
@JsonProperty("fileKey")
private String file_key;
@JsonProperty("file_key")
private String fileKey;

public List<String> getExcludeKeys() {
return excludeKeys;
Expand Down Expand Up @@ -59,8 +59,8 @@ public String getBucketName() {
return bucketName;
}

public String getFile_key() {
return file_key;
public String getFileKey() {
return fileKey;
}
public void setExcludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static String getS3SchemaObject(AvroOutputCodecConfig config) throws IOE
S3Client s3Client = buildS3Client(config);
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(config.getBucketName())
.key(config.getFile_key())
.key(config.getFileKey())
.build();
ResponseInputStream<GetObjectResponse> s3Object = s3Client.getObject(getObjectRequest);
final Map<String, Object> stringObjectMap = objectMapper.readValue(s3Object, new TypeReference<>() {});
Expand Down

0 comments on commit 939562b

Please sign in to comment.