Skip to content

Commit ab733b4

Browse files
dhalperidavorbonaci
authored andcommitted
AvroCoder: more efficient use of Avro APIs
- Make the Encoder/Decoder factories static -- they are thread-safe and immutable. - Reuse the DirectBinaryEncoder/DirectBinaryDecoder objects across invocations to encode/decode. Though reuse is only "when possible", it's always applicable in our invocations. This change reduces the allocations of these objects from 1/element to 1/coder instance. - Remove the call to flush from encoder(), because the DirectBinaryEncoder does not need to be flushed. - Cache the objects in ThreadLocal variables for thread safety. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=113018546
1 parent 50f98a7 commit ab733b4

File tree

1 file changed

+48
-20
lines changed

1 file changed

+48
-20
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -171,19 +171,42 @@ public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) {
171171

172172
private final List<String> nonDeterministicReasons;
173173

174-
private final DatumWriter<T> writer;
175-
private final DatumReader<T> reader;
176-
private final EncoderFactory encoderFactory = new EncoderFactory();
177-
private final DecoderFactory decoderFactory = new DecoderFactory();
174+
// Factories allocated by .get() are thread-safe and immutable.
175+
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
176+
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
177+
// Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
178+
// these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
179+
// an inner coder.
180+
private final ThreadLocal<BinaryDecoder> decoder;
181+
private final ThreadLocal<BinaryEncoder> encoder;
182+
private final ThreadLocal<DatumWriter<T>> writer;
183+
private final ThreadLocal<DatumReader<T>> reader;
178184

179185
protected AvroCoder(Class<T> type, Schema schema) {
180186
this.type = type;
181187
this.schema = schema;
182188

183-
nonDeterministicReasons = new AvroDeterminismChecker()
184-
.check(TypeDescriptor.of(type), schema);
185-
this.reader = createDatumReader();
186-
this.writer = createDatumWriter();
189+
nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
190+
191+
// Decoder and Encoder start off null for each thread. They are allocated and potentially
192+
// reused inside encode/decode.
193+
this.decoder = new ThreadLocal<>();
194+
this.encoder = new ThreadLocal<>();
195+
196+
// Reader and writer are allocated once per thread and are "final" for thread-local Coder
197+
// instance.
198+
this.reader = new ThreadLocal<DatumReader<T>>() {
199+
@Override
200+
public DatumReader<T> initialValue() {
201+
return createDatumReader();
202+
}
203+
};
204+
this.writer = new ThreadLocal<DatumWriter<T>>() {
205+
@Override
206+
public DatumWriter<T> initialValue() {
207+
return createDatumWriter();
208+
}
209+
};
187210
}
188211

189212
/**
@@ -233,17 +256,22 @@ private Object writeReplace() {
233256
}
234257

235258
@Override
236-
public void encode(T value, OutputStream outStream, Context context)
237-
throws IOException {
238-
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(outStream, null);
239-
writer.write(value, encoder);
240-
encoder.flush();
259+
public void encode(T value, OutputStream outStream, Context context) throws IOException {
260+
// Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
261+
BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
262+
// Save the potentially-new instance for reuse later.
263+
encoder.set(encoderInstance);
264+
writer.get().write(value, encoderInstance);
265+
// Direct binary encoder does not buffer any data and need not be flushed.
241266
}
242267

243268
@Override
244269
public T decode(InputStream inStream, Context context) throws IOException {
245-
BinaryDecoder decoder = decoderFactory.directBinaryDecoder(inStream, null);
246-
return reader.read(null, decoder);
270+
// Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
271+
BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
272+
// Save the potentially-new instance for later.
273+
decoder.set(decoderInstance);
274+
return reader.get().read(null, decoderInstance);
247275
}
248276

249277
@Override
@@ -272,12 +300,12 @@ public void verifyDeterministic() throws NonDeterministicException {
272300
}
273301

274302
/**
275-
* Returns a new DatumReader that can be used to read from
276-
* an Avro file directly. Assumes the schema used to read is
277-
* the same as the schema that was used when writing.
303+
* Returns a new {@link DatumReader} that can be used to read from an Avro file directly. Assumes
304+
* the schema used to read is the same as the schema that was used when writing.
278305
*
279306
* @deprecated For {@code AvroCoder} internal use only.
280307
*/
308+
// TODO: once we can remove this deprecated function, inline in constructor.
281309
@Deprecated
282310
public DatumReader<T> createDatumReader() {
283311
if (type.equals(GenericRecord.class)) {
@@ -288,11 +316,11 @@ public DatumReader<T> createDatumReader() {
288316
}
289317

290318
/**
291-
* Returns a new DatumWriter that can be used to write to
292-
* an Avro file directly.
319+
* Returns a new {@link DatumWriter} that can be used to write to an Avro file directly.
293320
*
294321
* @deprecated For {@code AvroCoder} internal use only.
295322
*/
323+
// TODO: once we can remove this deprecated function, inline in constructor.
296324
@Deprecated
297325
public DatumWriter<T> createDatumWriter() {
298326
if (type.equals(GenericRecord.class)) {

0 commit comments

Comments
 (0)