Skip to content

Commit

Permalink
GH-44444: [Java][CI] Add Java implementation of Flight do_exchange in…
Browse files Browse the repository at this point in the history
…tegration test (#44445)

### Rationale for this change

See #44444, this helps ensure compatibility of the Java `do_exchange` implementation with C++ and C#.

### What changes are included in this PR?

Adds a Java implementation of the `do_exchange:echo` Flight integration test, and enables it in Archery.

### Are these changes tested?

Yes

### Are there any user-facing changes?

No
* GitHub Issue: #44444

Authored-by: Adam Reeve <adreeve@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
adamreeve authored Oct 17, 2024
1 parent b175463 commit 1a323fc
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ def append_tester(implementation, tester):
"do_exchange:echo",
description=("Test the do_exchange method by "
"echoing data back to the client."),
skip_testers={"Go", "Java", "JS", "Rust"},
skip_testers={"Go", "JS", "Rust"},
),
Scenario(
"location:reuse_connection",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight.integration.tests;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Validator;

/** Test DoExchange by echoing data back to the client. */
final class DoExchangeEchoScenario implements Scenario {
public static final byte[] COMMAND = "echo".getBytes(StandardCharsets.UTF_8);

@Override
public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception {
return new DoExchangeProducer(allocator);
}

@Override
public void buildServer(FlightServer.Builder builder) {}

@Override
public void client(BufferAllocator allocator, Location location, FlightClient client)
throws Exception {
final Schema schema =
new Schema(Collections.singletonList(Field.notNullable("x", new ArrowType.Int(32, true))));
try (final FlightClient.ExchangeReaderWriter stream =
client.doExchange(FlightDescriptor.command(COMMAND));
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
final FlightStream reader = stream.getReader();

// Write data and check that it gets echoed back.
IntVector iv = (IntVector) root.getVector("x");
iv.allocateNew();
stream.getWriter().start(root);
int rowCount = 10;
for (int batchIdx = 0; batchIdx < 4; batchIdx++) {
for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) {
iv.setSafe(rowIdx, batchIdx + rowIdx);
}
root.setRowCount(rowCount);
boolean writeMetadata = batchIdx % 2 == 0;
final byte[] rawMetadata = Integer.toString(batchIdx).getBytes(StandardCharsets.UTF_8);
if (writeMetadata) {
final ArrowBuf metadata = allocator.buffer(rawMetadata.length);
metadata.writeBytes(rawMetadata);
stream.getWriter().putNext(metadata);
} else {
stream.getWriter().putNext();
}

IntegrationAssertions.assertTrue("Unexpected end of reader", reader.next());
if (writeMetadata) {
IntegrationAssertions.assertNotNull(reader.getLatestMetadata());
final byte[] readMetadata = new byte[rawMetadata.length];
reader.getLatestMetadata().readBytes(readMetadata);
IntegrationAssertions.assertEquals(rawMetadata, readMetadata);
} else {
IntegrationAssertions.assertNull(reader.getLatestMetadata());
}
IntegrationAssertions.assertEquals(root.getSchema(), reader.getSchema());
Validator.compareVectorSchemaRoot(reader.getRoot(), root);
}

stream.getWriter().completed();
IntegrationAssertions.assertFalse("Expected to reach end of reader", reader.next());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight.integration.tests;

import java.util.Arrays;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;

/** The server used for testing the Flight do_exchange method. */
final class DoExchangeProducer extends NoOpFlightProducer {
private final BufferAllocator allocator;

DoExchangeProducer(BufferAllocator allocator) {
this.allocator = allocator;
}

@Override
public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) {
FlightDescriptor descriptor = reader.getDescriptor();
if (descriptor.isCommand()) {
if (Arrays.equals(DoExchangeEchoScenario.COMMAND, descriptor.getCommand())) {
doEcho(reader, writer);
}
}
throw CallStatus.UNIMPLEMENTED
.withDescription("Unsupported descriptor: " + descriptor.toString())
.toRuntimeException();
}

private void doEcho(FlightStream reader, ServerStreamListener writer) {
VectorSchemaRoot root = null;
VectorLoader loader = null;
while (reader.next()) {
if (reader.hasRoot()) {
if (root == null) {
root = VectorSchemaRoot.create(reader.getSchema(), allocator);
loader = new VectorLoader(root);
writer.start(root);
}
VectorUnloader unloader = new VectorUnloader(reader.getRoot());
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
loader.load(arb);
}
if (reader.getLatestMetadata() != null) {
reader.getLatestMetadata().getReferenceManager().retain();
writer.putNext(reader.getLatestMetadata());
} else {
writer.putNext();
}
} else {
// Pure metadata
reader.getLatestMetadata().getReferenceManager().retain();
writer.putMetadata(reader.getLatestMetadata());
}
}
if (root != null) {
root.close();
}
writer.completed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ static void assertTrue(String message, boolean value) {
}
}

static void assertNull(Object actual) {
if (actual != null) {
throw new AssertionError("Expected: null\n\nbut got: " + actual);
}
}

static void assertNotNull(Object actual) {
if (actual == null) {
throw new AssertionError("Expected: (not null)\n\nbut got: null\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private Scenarios() {
scenarios.put("flight_sql:ingestion", FlightSqlIngestionScenario::new);
scenarios.put("app_metadata_flight_info_endpoint", AppMetadataFlightInfoEndpointScenario::new);
scenarios.put("session_options", SessionOptionsScenario::new);
scenarios.put("do_exchange:echo", DoExchangeEchoScenario::new);
}

private static Scenarios getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ void sessionOptions() throws Exception {
testScenario("session_options");
}

@Test
void doExchangeEcho() throws Exception {
testScenario("do_exchange:echo");
}

void testScenario(String scenarioName) throws Exception {
TestBufferAllocationListener listener = new TestBufferAllocationListener();
try (final BufferAllocator allocator = new RootAllocator(listener, Long.MAX_VALUE)) {
Expand Down

0 comments on commit 1a323fc

Please sign in to comment.