Skip to content

Commit a3c7498

Browse files
committed
adds in integration tests for parquet files
1 parent 2999d57 commit a3c7498

File tree

10 files changed

+253
-112
lines changed

10 files changed

+253
-112
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ public final class StreamStatisticNames {
9797
*/
9898
public static final String STREAM_READ_OPENED = "stream_read_opened";
9999

100+
/**
101+
* Total count of times an analytics input stream was opened.
102+
*
103+
* Value: {@value}.
104+
*/
105+
public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
106+
100107
/**
101108
* Count of exceptions raised during input stream reads.
102109
* Value: {@value}.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.classification.InterfaceStability;
2929
import org.apache.hadoop.fs.FileSystem;
3030
import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
31+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
3132
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
3233
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
3334
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -840,6 +841,7 @@ private final class InputStreamStatistics
840841
private final AtomicLong closed;
841842
private final AtomicLong forwardSeekOperations;
842843
private final AtomicLong openOperations;
844+
private final AtomicLong analyticsStreamOpenOperations;
843845
private final AtomicLong readExceptions;
844846
private final AtomicLong readsIncomplete;
845847
private final AtomicLong readOperations;
@@ -888,7 +890,8 @@ private InputStreamStatistics(
888890
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
889891
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
890892
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
891-
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
893+
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
894+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
892895
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
893896
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
894897
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -927,6 +930,9 @@ private InputStreamStatistics(
927930
StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
928931
openOperations = st.getCounterReference(
929932
StreamStatisticNames.STREAM_READ_OPENED);
933+
analyticsStreamOpenOperations = st.getCounterReference(
934+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED
935+
);
930936
readExceptions = st.getCounterReference(
931937
StreamStatisticNames.STREAM_READ_EXCEPTIONS);
932938
readsIncomplete = st.getCounterReference(
@@ -1030,6 +1036,16 @@ public long streamOpened() {
10301036
return openOperations.getAndIncrement();
10311037
}
10321038

1039+
@Override
1040+
public long streamOpened(InputStreamType type) {
1041+
switch (type) {
1042+
case Analytics:
1043+
return analyticsStreamOpenOperations.getAndIncrement();
1044+
default:
1045+
return openOperations.getAndIncrement();
1046+
}
1047+
}
1048+
10331049
/**
10341050
* {@inheritDoc}.
10351051
* If the connection was aborted, increment {@link #aborted}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableIn
5050
super(InputStreamType.Analytics, parameters);
5151
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
5252
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()));
53+
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
5354
}
5455

5556
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.fs.s3a.statistics;
2020

2121
import org.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics;
22+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
2223
import org.apache.hadoop.fs.statistics.DurationTracker;
2324

2425
/**
@@ -53,6 +54,13 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
5354
*/
5455
long streamOpened();
5556

57+
/**
58+
* A stream of the given type was opened.
59+
* @param type type of input stream
60+
* @return the previous count or zero if this is the first opening.
61+
*/
62+
long streamOpened(InputStreamType type);
63+
5664
/**
5765
* The inner stream was closed.
5866
* @param abortedConnection flag to indicate the stream was aborted,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Duration;
2323

2424
import org.apache.hadoop.fs.s3a.Statistic;
25+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
2526
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
2627
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
2728
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -164,6 +165,11 @@ public long streamOpened() {
164165
return 0;
165166
}
166167

168+
@Override
169+
public long streamOpened(InputStreamType type) {
170+
return 0;
171+
}
172+
167173
@Override
168174
public void streamClose(final boolean abortedConnection,
169175
final long remainingInCurrentRequest) {
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.s3a;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.nio.charset.Charset;
25+
import java.util.UUID;
26+
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
30+
import org.apache.commons.io.FileUtils;
31+
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.fs.FSDataInputStream;
33+
import org.apache.hadoop.fs.FileStatus;
34+
import org.apache.hadoop.fs.FileSystem;
35+
import org.apache.hadoop.fs.Path;
36+
import org.apache.hadoop.fs.PathHandle;
37+
import org.apache.hadoop.fs.s3a.impl.AwsSdkWorkarounds;
38+
import org.apache.hadoop.fs.statistics.IOStatistics;
39+
import org.apache.hadoop.test.GenericTestUtils;
40+
41+
import static org.apache.commons.io.FileUtils.ONE_KB;
42+
import static org.apache.hadoop.fs.s3a.Constants.*;
43+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
44+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching;
45+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
46+
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
47+
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
48+
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
49+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
50+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
51+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
52+
import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
53+
54+
import org.assertj.core.api.Assertions;
55+
56+
import org.slf4j.LoggerFactory;
57+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
58+
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
59+
import software.amazon.s3.analyticsaccelerator.io.logical.parquet.ParquetMetadataParsingTask;
60+
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
61+
62+
public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
63+
64+
private static final String PHYSICAL_IO_PREFIX = "physicalio";
65+
private static final String LOGICAL_IO_PREFIX = "logicalio";
66+
67+
68+
private Configuration conf;
69+
private Path testFile;
70+
71+
@Before
72+
public void setUp() throws Exception {
73+
super.setup();
74+
conf = createConfiguration();
75+
testFile = getExternalData(conf);
76+
}
77+
78+
@Override
79+
public Configuration createConfiguration() {
80+
Configuration configuration = super.createConfiguration();
81+
if (isUsingDefaultExternalDataFile(configuration)) {
82+
S3ATestUtils.removeBaseAndBucketOverrides(configuration,
83+
ENDPOINT);
84+
}
85+
enableAnalyticsAccelerator(configuration);
86+
return configuration;
87+
}
88+
89+
@Test
90+
public void testConnectorFrameWorkIntegration() throws IOException {
91+
describe("Verify S3 connector framework integration");
92+
93+
S3AFileSystem fs =
94+
(S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
95+
byte[] buffer = new byte[500];
96+
IOStatistics ioStats;
97+
98+
try (FSDataInputStream inputStream = fs.open(testFile)) {
99+
ioStats = inputStream.getIOStatistics();
100+
inputStream.seek(5);
101+
inputStream.read(buffer, 0, 500);
102+
}
103+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
104+
}
105+
106+
@Test
107+
public void testMalformedParquetFooter() throws IOException {
108+
describe("Reading a malformed parquet file should not throw an exception");
109+
110+
// File with malformed footer take from https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
111+
// This test ensures AAL does not throw exceptions if footer parsing fails. It will only emit a WARN log,
112+
// "Unable to parse parquet footer for test/malformedFooter.parquet, parquet prefetch optimisations will be disabled for this key."
113+
Path dest = path("malformed_footer.parquet");
114+
115+
File file = new File("src/test/resources/malformed_footer.parquet");
116+
Path sourcePath = new Path(file.toURI().getPath());
117+
getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
118+
119+
byte[] buffer = new byte[500];
120+
IOStatistics ioStats;
121+
122+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
123+
ioStats = inputStream.getIOStatistics();
124+
inputStream.seek(5);
125+
inputStream.read(buffer, 0, 500);
126+
}
127+
128+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
129+
}
130+
131+
@Test
132+
public void testMultiRowGroupParquet() throws IOException {
133+
describe("A parquet file is read successfully");
134+
135+
Path dest = path("multi_row_group.parquet");
136+
137+
File file = new File("src/test/resources/multi_row_group.parquet");
138+
Path sourcePath = new Path(file.toURI().getPath());
139+
getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
140+
141+
FileStatus fileStatus = getFileSystem().getFileStatus(dest);
142+
143+
byte[] buffer = new byte[3000];
144+
IOStatistics ioStats;
145+
146+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
147+
ioStats = inputStream.getIOStatistics();
148+
inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
149+
}
150+
151+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
152+
}
153+
154+
@Test
155+
public void testConnectorFrameworkConfigurable() {
156+
describe("Verify S3 connector framework reads configuration");
157+
158+
Configuration conf = getConfiguration();
159+
removeBaseAndBucketOverrides(conf);
160+
161+
//Disable Predictive Prefetching
162+
conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
163+
"." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
164+
165+
//Set Blobstore Capacity
166+
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
167+
"." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
168+
169+
ConnectorConfiguration connectorConfiguration =
170+
new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
171+
172+
S3SeekableInputStreamConfiguration configuration =
173+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
174+
175+
Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
176+
.as("AnalyticsStream configuration is not set to expected value")
177+
.isSameAs(PrefetchMode.ALL);
178+
179+
Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity())
180+
.as("AnalyticsStream configuration is not set to expected value")
181+
.isEqualTo(1);
182+
}
183+
184+
@Test
185+
public void testInvalidConfigurationThrows() throws Exception {
186+
describe("Verify S3 connector framework throws with invalid configuration");
187+
188+
Configuration conf = getConfiguration();
189+
removeBaseAndBucketOverrides(conf);
190+
//Disable Sequential Prefetching
191+
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
192+
"." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
193+
194+
ConnectorConfiguration connectorConfiguration =
195+
new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
196+
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
197+
.isThrownBy(() ->
198+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
199+
}
200+
201+
}

0 commit comments

Comments
 (0)