forked from ZwickyTransientFacility/ztf-avro-alert
-
Notifications
You must be signed in to change notification settings - Fork 0
/
validateAvroNestedSchema.py
158 lines (122 loc) · 5.16 KB
/
validateAvroNestedSchema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
""" Check that json data follows a given Avro schema.
Schema files have to be given in order of internal nests first.
"""
import avro.schema
import fastavro
import io
import json
import argparse
import sys
import os.path
import hashlib
def combine_schemas(schema_files):
"""Combine multiple nested schemas into a single schema.
"""
known_schemas = avro.schema.Names()
for s in schema_files:
schema = load_single_avsc(s, known_schemas)
return schema.to_json()
def load_single_avsc(file_path, names):
"""Load a single avsc file.
"""
with open(file_path) as file_text:
json_data = json.load(file_text)
schema = avro.schema.SchemaFromJSONData(json_data, names)
return schema
def load_stamp(file_path):
"""Load a cutout postage stamp file to include in alert.
"""
_, fileoutname = os.path.split(file_path)
with open(file_path, mode='rb') as f:
cutout_data = f.read()
cutout_dict = {"fileName": fileoutname, "stampData": cutout_data}
return cutout_dict
def write_stamp_file(stamp_dict, output_dir):
"""Given a stamp dict that follows the cutout schema, write data to a file in a given directory.
"""
filename = stamp_dict['fileName']
try:
os.makedirs(output_dir)
except OSError:
pass
out_path = os.path.join(output_dir, filename)
with open(out_path, 'wb') as f:
f.write(stamp_dict['stampData'])
return out_path
def write_avro_data(json_data, json_schema):
"""Encode json with fastavro module into avro format given a schema.
"""
bytes_io = io.BytesIO()
fastavro.schemaless_writer(bytes_io, json_schema, json_data)
return bytes_io
def read_avro_data(bytes_io, json_schema):
"""Read avro data with fastavro module and decode with a given schema.
"""
bytes_io.seek(
0) # force schemaless_reader to read from the start of stream, byte offset = 0
message = fastavro.schemaless_reader(bytes_io, json_schema)
return message
def check_md5(infile, outfile):
"""Compare the MD5 hash values of two files.
"""
with open(infile, 'rb') as f:
in_data = f.read()
in_md5 = hashlib.md5(in_data).hexdigest()
with open(outfile, 'rb') as f:
out_data = f.read()
out_md5 = hashlib.md5(out_data).hexdigest()
return in_md5 == out_md5
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('schema', metavar='file.avsc', type=str, nargs='+',
help='schema file(s)')
parser.add_argument('data', metavar='file.json', type=str,
help='json data file to fill the schema')
parser.add_argument('--cutoutSci', metavar='science.jpg', type=str,
help='file for science image postage stamp')
parser.add_argument('--cutoutTemp', metavar='template.jpg', type=str,
help='file for template image postage stamp')
parser.add_argument('--cutoutDiff', metavar='difference.jpg', type=str,
help='file for difference image postage stamp')
args = parser.parse_args()
json_path = args.data
schema_files = args.schema
cutoutsci_path = args.cutoutSci
cutouttemp_path = args.cutoutTemp
cutoutdiff_path = args.cutoutDiff
alert_schema = combine_schemas(schema_files)
with open(json_path) as file_text:
json_data = json.load(file_text)
# Load science stamp if included
if cutoutsci_path is not None:
cutoutTemplate = load_stamp(cutoutsci_path)
json_data['cutoutScience'] = cutoutTemplate
# Load template stamp if included
if cutouttemp_path is not None:
cutoutTemplate = load_stamp(cutouttemp_path)
json_data['cutoutTemplate'] = cutoutTemplate
# Load difference stamp if included
if cutoutdiff_path is not None:
cutoutDifference = load_stamp(cutoutdiff_path)
json_data['cutoutDifference'] = cutoutDifference
avro_bytes = write_avro_data(json_data, alert_schema)
message = read_avro_data(avro_bytes, alert_schema)
# Print message text to screen
message_text = {k: message[k] for k in message if k not in [
'cutoutScience', 'cutoutDifference', 'cutoutTemplate']}
print(message_text)
# Collect stamps as files written to local directory 'output' and check hashes match expected
if message.get('cutoutScience') is not None:
stamp_temp_out = write_stamp_file(message.get('cutoutScience'), 'output')
print('Science stamp ok:', check_md5(args.cutoutSci, stamp_temp_out))
if message.get('cutoutTemplate') is not None:
stamp_temp_out = write_stamp_file(message.get('cutoutTemplate'), 'output')
print('Template stamp ok:', check_md5(args.cutoutTemp, stamp_temp_out))
if message.get('cutoutDifference') is not None:
stamp_diff_out = write_stamp_file(message.get('cutoutDifference'), 'output')
print('Difference stamp ok:', check_md5(args.cutoutDiff, stamp_diff_out))
print("size in bytes of json text: %d" % sys.getsizeof(message_text))
raw_bytes = avro_bytes.getvalue()
print("size in bytes of avro message: %d" % sys.getsizeof(raw_bytes))
if __name__ == "__main__":
main()