Skip to content

Commit fe59fc0

Browse files
committed
HADOOP-19478. S3A: pull out new configuration load/probes under S3AStore
New service under S3AStoreImpl: StoreConfigurationService Change-Id: I99c8305574492c05170274dcc363bfba4857981b
1 parent 0c20bb1 commit fe59fc0

File tree

5 files changed

+257
-3
lines changed

5 files changed

+257
-3
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,6 +1531,14 @@ private Constants() {
15311531
*/
15321532
public static final String FS_S3A_CREATE_OVERWRITE_SUPPORTED = "fs.s3a.create.overwrite.supported";
15331533

1534+
/**
1535+
* If conditional create is available, should it be used in
1536+
* createFile() operations to check for file existence?
1537+
* If set, this disables probes for directories.
1538+
* Value {@value}.
1539+
*/
1540+
public static final String FS_S3A_CONDITIONAL_CREATE_FOR_FILES = "fs.s3a.conditional.create.for.files";
1541+
15341542
/**
15351543
* Create a multipart file, always: {@value}.
15361544
* <p>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
2424
import org.apache.hadoop.fs.s3a.S3AStore;
2525
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
26+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
2627
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
2728
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
2829
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
@@ -51,6 +52,8 @@ public class S3AStoreBuilder {
5152

5253
private AuditSpanSource<AuditSpanS3A> auditSpanSource;
5354

55+
private StoreConfigurationService storeConfigurationService;
56+
5457
/**
5558
* The original file system statistics: fairly minimal but broadly
5659
* collected so it is important to pick up.
@@ -117,6 +120,16 @@ public S3AStoreBuilder withFsStatistics(final FileSystem.Statistics value) {
117120
return this;
118121
}
119122

123+
/**
124+
* Set the store configuration service.
125+
* @param value new value
126+
* @return the builder
127+
*/
128+
public S3AStoreBuilder withStoreConfigurationService(final StoreConfigurationService value) {
129+
storeConfigurationService = value;
130+
return this;
131+
}
132+
120133
public S3AStore build() {
121134
return new S3AStoreImpl(storeContextFactory,
122135
clientManager,
@@ -127,6 +140,6 @@ public S3AStore build() {
127140
readRateLimiter,
128141
writeRateLimiter,
129142
auditSpanSource,
130-
fsStatistics);
143+
fsStatistics, storeConfigurationService);
131144
}
132145
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
8080
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
8181
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
82+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
8283
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
8384
import org.apache.hadoop.fs.statistics.DurationTracker;
8485
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -193,10 +194,12 @@ public class S3AStoreImpl
193194
*/
194195
private ObjectInputStreamFactory objectInputStreamFactory;
195196

197+
private final StoreConfigurationService storeConfigurationService;
198+
196199
/**
197200
* Constructor to create S3A store.
198201
* Package private, as {@link S3AStoreBuilder} creates them.
199-
* */
202+
*/
200203
S3AStoreImpl(StoreContextFactory storeContextFactory,
201204
ClientManager clientManager,
202205
DurationTrackerFactory durationTrackerFactory,
@@ -206,7 +209,8 @@ public class S3AStoreImpl
206209
RateLimiting readRateLimiter,
207210
RateLimiting writeRateLimiter,
208211
AuditSpanSource<AuditSpanS3A> auditSpanSource,
209-
@Nullable FileSystem.Statistics fsStatistics) {
212+
@Nullable FileSystem.Statistics fsStatistics,
213+
StoreConfigurationService storeConfigurationService) {
210214
super("S3AStore");
211215
this.auditSpanSource = requireNonNull(auditSpanSource);
212216
this.clientManager = requireNonNull(clientManager);
@@ -224,6 +228,8 @@ public class S3AStoreImpl
224228
this.bucket = requireNonNull(storeContext.getBucket());
225229
this.requestFactory = requireNonNull(storeContext.getRequestFactory());
226230
addService(clientManager);
231+
this.storeConfigurationService = requireNonNull(storeConfigurationService);
232+
addService(storeConfigurationService);
227233
}
228234

229235
/**
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl.store;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
23+
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_FOR_FILES;
24+
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED;
25+
26+
/**
27+
* Store configuration flags.
28+
*/
29+
public enum StoreConfigurationFlags {
30+
31+
ConditionCreateAvailable(FS_S3A_CREATE_OVERWRITE_SUPPORTED,
32+
true),
33+
UseConditionalCreateForFiles(FS_S3A_CONDITIONAL_CREATE_FOR_FILES,
34+
false);
35+
36+
/**
37+
* Key name; read from the configuration, and
38+
* for the capability probe unless the arity 3
39+
* constructor is used.
40+
*/
41+
private final String key;
42+
43+
/**
44+
* Capability to probe for in {@link #hasCapability(String)}.
45+
*/
46+
private final String capability;
47+
48+
/**
49+
* Default value when reading from the configuration.
50+
*/
51+
private final boolean defaultValue;
52+
53+
StoreConfigurationFlags(String key, boolean defaultValue) {
54+
this(key, "", defaultValue);
55+
}
56+
57+
StoreConfigurationFlags(String key,
58+
String capability,
59+
boolean defaultValue) {
60+
this.key = key;
61+
this.capability = capability;
62+
this.defaultValue = defaultValue;
63+
}
64+
65+
public String getKey() {
66+
return key;
67+
}
68+
69+
public String getCapability() {
70+
return capability;
71+
}
72+
73+
/**
74+
* Read from the the configuration, falling
75+
* back to the default value.
76+
* @param conf configuration.
77+
* @return the evaluated value.
78+
*/
79+
public boolean evaluate(Configuration conf) {
80+
return conf.getBoolean(key, defaultValue);
81+
}
82+
83+
/**
84+
* Does this enum's key match the supplied key.
85+
* @param k key to probe for
86+
* @return true if there is a match.
87+
*/
88+
public boolean keyMatches(String k) {
89+
return key.equals(k);
90+
}
91+
92+
/**
93+
* Does this enum's capability match the supplied key?
94+
* @param k key to probe for
95+
* @return true if there is a match.
96+
*/
97+
public boolean hasCapability(String k) {
98+
return !capability.isEmpty() &&
99+
capability.equals(k);
100+
}
101+
102+
103+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl.store;
20+
21+
import java.util.Arrays;
22+
import java.util.EnumSet;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.StreamCapabilities;
26+
import org.apache.hadoop.service.AbstractService;
27+
28+
/**
29+
* A service which handles store configurations.
30+
* New configuration options should be added here.
31+
* <p>
32+
* The goal is to pull configuration flags and variables
33+
* out of S3AFileSystem but not reimplement the
34+
* same structure in S3AStore.
35+
* Instead, configuration flags, numbers etc can
36+
* be managed here.
37+
* Maybe in future reflection could be used to
38+
* build up the config, as done in ABFS.
39+
* <p>
40+
* Usage.
41+
* <ol>
42+
* <li>Instantiate.</li>
43+
* <li>Call {@link #init(Configuration)} to trigger config reading</lib>
44+
* <li>Read loaded options.</li>
45+
* </ol>
46+
* The start and close operations are (currently) no-ops.
47+
*/
48+
public class StoreConfigurationService extends AbstractService
49+
implements StreamCapabilities {
50+
51+
private EnumSet<StoreConfigurationFlags> configurationFlags;
52+
53+
public StoreConfigurationService(final String name) {
54+
super(name);
55+
}
56+
57+
public StoreConfigurationService() {
58+
super("StoreConfigurationService");
59+
}
60+
61+
/**
62+
* Initialize the service by reading in configuration
63+
* settings.
64+
* @param conf configuration
65+
* @throws Exception parser failures.
66+
*/
67+
@Override
68+
protected void serviceInit(final Configuration conf) throws Exception {
69+
super.serviceInit(conf);
70+
configurationFlags = EnumSet.noneOf(StoreConfigurationFlags.class);
71+
Arrays.stream(StoreConfigurationFlags.values())
72+
.filter(v -> v.evaluate(conf))
73+
.forEach(configurationFlags::add);
74+
}
75+
76+
/**
77+
* Is a configuration flag set?
78+
* @param flag flag to probe for.
79+
* @return true iff the flag is set
80+
*/
81+
public boolean isFlagSet(StoreConfigurationFlags flag) {
82+
return configurationFlags.contains(flag);
83+
}
84+
85+
/**
86+
* Get a clone of the flags.
87+
* @return a copy of the flags.
88+
*/
89+
public EnumSet<StoreConfigurationFlags> getConfigurationFlags() {
90+
return configurationFlags.clone();
91+
}
92+
93+
/**
94+
* Does one of the flags have this capability?
95+
* @param capability what to probe for
96+
* @return true if the capability is implellmented.
97+
*/
98+
@Override
99+
public boolean hasCapability(String capability) {
100+
return configurationFlags.stream()
101+
.anyMatch(f -> f.keyMatches(capability));
102+
}
103+
104+
/**
105+
* Set a flag.
106+
* This is NOT thread safe.
107+
* @param flag flag to set
108+
* @return true if the flag enumset changed state.
109+
*/
110+
public boolean setFlag(StoreConfigurationFlags flag) {
111+
return configurationFlags.add(flag);
112+
}
113+
114+
/**
115+
* Clear a flag.
116+
* This is NOT thread safe.
117+
* @param flag flag to clear
118+
* @return true if the flag enumset changed state.
119+
*/
120+
public boolean clearFlag(StoreConfigurationFlags flag) {
121+
return configurationFlags.remove(flag);
122+
}
123+
124+
}

0 commit comments

Comments
 (0)