Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix_remove_processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong committed Sep 23, 2023
2 parents e0fbcf6 + 9e90671 commit cebeb23
Show file tree
Hide file tree
Showing 31 changed files with 656 additions and 120 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110))
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -84,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public Optional<AuthenticationToken> translateAuthToken(org.opensearch.identity.
public AuthToken issueOnBehalfOfToken(Subject subject, OnBehalfOfClaims claims) {

String password = generatePassword();
final byte[] rawEncoded = Base64.getEncoder().encode((claims.getAudience() + ":" + password).getBytes(UTF_8)); // Make a new
// ShiroSubject w/
// audience as name
// Make a new ShiroSubject audience as name
final byte[] rawEncoded = Base64.getUrlEncoder().encode((claims.getAudience() + ":" + password).getBytes(UTF_8));

final String usernamePassword = new String(rawEncoded, UTF_8);
final String header = "Basic " + usernamePassword;
BasicAuthToken token = new BasicAuthToken(header);
Expand All @@ -75,6 +75,19 @@ public AuthToken issueOnBehalfOfToken(Subject subject, OnBehalfOfClaims claims)
return token;
}

@Override
public AuthToken issueServiceAccountToken(String audience) {

String password = generatePassword();
final byte[] rawEncoded = Base64.getUrlEncoder().withoutPadding().encode((audience + ":" + password).getBytes(UTF_8)); // Make a new
final String usernamePassword = new String(rawEncoded, UTF_8);
final String header = "Basic " + usernamePassword;

BasicAuthToken token = new BasicAuthToken(header);
shiroTokenPasswordMap.put(token, password);
return token;
}

@Override
public Subject authenticateToken(AuthToken authToken) {
return new NoopSubject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ public void testTokenNoopIssuance() {
Subject subject = new NoopSubject();
AuthToken token = tokenManager.issueOnBehalfOfToken(subject, claims);
assertTrue(token instanceof AuthToken);
AuthToken serviceAccountToken = tokenManager.issueServiceAccountToken("test");
assertTrue(serviceAccountToken instanceof AuthToken);
assertEquals(serviceAccountToken.asAuthHeaderValue(), "noopToken");
}

public void testShouldSucceedIssueServiceAccountToken() {
String audience = "testExtensionName";
BasicAuthToken authToken = (BasicAuthToken) shiroAuthTokenHandler.issueServiceAccountToken(audience);
assertTrue(authToken instanceof BasicAuthToken);
UsernamePasswordToken translatedToken = (UsernamePasswordToken) shiroAuthTokenHandler.translateAuthToken(authToken).get();
assertEquals(authToken.getPassword(), new String(translatedToken.getPassword()));
assertTrue(shiroAuthTokenHandler.getShiroTokenPasswordMap().containsKey(authToken));
assertEquals(shiroAuthTokenHandler.getShiroTokenPasswordMap().get(authToken), new String(translatedToken.getPassword()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -168,6 +166,29 @@ public static Settings remoteStoreClusterSettings(String name, Path path) {
return remoteStoreClusterSettings(name, path, name, path);
}

public static Settings remoteStoreClusterSettings(
String segmentRepoName,
Path segmentRepoPath,
String segmentRepoType,
String translogRepoName,
Path translogRepoPath,
String translogRepoType
) {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(
buildRemoteStoreNodeAttributes(
segmentRepoName,
segmentRepoPath,
segmentRepoType,
translogRepoName,
translogRepoPath,
translogRepoType,
false
)
);
return settingsBuilder.build();
}

public static Settings remoteStoreClusterSettings(
String segmentRepoName,
Path segmentRepoPath,
Expand All @@ -185,6 +206,26 @@ public static Settings buildRemoteStoreNodeAttributes(
String translogRepoName,
Path translogRepoPath,
boolean withRateLimiterAttributes
) {
return buildRemoteStoreNodeAttributes(
segmentRepoName,
segmentRepoPath,
FsRepository.TYPE,
translogRepoName,
translogRepoPath,
FsRepository.TYPE,
withRateLimiterAttributes
);
}

public static Settings buildRemoteStoreNodeAttributes(
String segmentRepoName,
Path segmentRepoPath,
String segmentRepoType,
String translogRepoName,
Path translogRepoPath,
String translogRepoType,
boolean withRateLimiterAttributes
) {
String segmentRepoTypeAttributeKey = String.format(
Locale.getDefault(),
Expand Down Expand Up @@ -219,13 +260,13 @@ public static Settings buildRemoteStoreNodeAttributes(

Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(segmentRepoTypeAttributeKey, FsRepository.TYPE)
.put(segmentRepoTypeAttributeKey, segmentRepoType)
.put(segmentRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
.put(translogRepoTypeAttributeKey, FsRepository.TYPE)
.put(translogRepoTypeAttributeKey, translogRepoType)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoTypeAttributeKey, segmentRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);

if (withRateLimiterAttributes) {
Expand Down Expand Up @@ -310,46 +351,6 @@ public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
}
}

public Settings buildClusterSettingsWith() {
String segmentRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);
String segmentRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
REPOSITORY_NAME
);
String translogRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_2_NAME
);
String translogRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
REPOSITORY_2_NAME
);
return Settings.builder()
.put(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY,
REPOSITORY_NAME
)
.put(segmentRepoTypeAttributeKey, FsRepository.TYPE)
.put(segmentRepoSettingsAttributeKeyPrefix + "location", randomRepoPath())
.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean())
.put(segmentRepoSettingsAttributeKeyPrefix + "max_remote_download_bytes_per_sec", "2kb")
.put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES)
.put(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY,
REPOSITORY_2_NAME
)
.put(translogRepoTypeAttributeKey, FsRepository.TYPE)
.put(translogRepoSettingsAttributeKeyPrefix + "location", randomRepoPath())
.build();
}

public static int getFileCount(Path path) throws Exception {
final AtomicInteger filesExisting = new AtomicInteger(0);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,26 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
MockFsRepositoryPlugin.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
MockFsRepositoryPlugin.TYPE
)
)
.build();
}

@Before
public void setup() {
clusterSettingsSuppliedByTest = true;
overrideBuildRepositoryMetadata = false;
repositoryLocation = randomRepoPath();
compress = randomBoolean();
Expand Down Expand Up @@ -86,6 +104,7 @@ public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String nam
} else {
return super.buildRepositoryMetadata(node, name);
}

}

public void testRateLimitedRemoteUploads() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.crypto.CryptoHandler;
import org.opensearch.common.crypto.DecryptedRangedStreamProvider;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.io.InputStream;
import java.util.List;
import java.util.stream.Collectors;

/**
* EncryptedBlobContainer is an encrypted BlobContainer that is backed by a
Expand All @@ -44,12 +46,17 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException();
}

@Override
public void asyncBlobDownload(String blobName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
throw new UnsupportedOperationException();
try {
final U cryptoContext = cryptoHandler.loadEncryptionMetadata(getEncryptedHeaderContentSupplier(blobName));
ActionListener<ReadContext> decryptingCompletionListener = ActionListener.map(
listener,
readContext -> new DecryptedReadContext<>(readContext, cryptoHandler, cryptoContext)
);

blobContainer.readBlobAsync(blobName, decryptingCompletionListener);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
Expand Down Expand Up @@ -108,4 +115,58 @@ public InputStreamContainer provideStream(int partNumber) throws IOException {
}

}

/**
* DecryptedReadContext decrypts the encrypted {@link ReadContext} by acting as a transformation wrapper around
* the encrypted object
* @param <T> Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance
* @param <U> Parsed Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance
*/
static class DecryptedReadContext<T, U> extends ReadContext {

private final CryptoHandler<T, U> cryptoHandler;
private final U cryptoContext;
private Long blobSize;

public DecryptedReadContext(ReadContext readContext, CryptoHandler<T, U> cryptoHandler, U cryptoContext) {
super(readContext);
this.cryptoHandler = cryptoHandler;
this.cryptoContext = cryptoContext;
}

@Override
public long getBlobSize() {
// initializes the value lazily
if (blobSize == null) {
this.blobSize = this.cryptoHandler.estimateDecryptedLength(cryptoContext, super.getBlobSize());
}
return this.blobSize;
}

@Override
public List<InputStreamContainer> getPartStreams() {
return super.getPartStreams().stream().map(this::decryptInputStreamContainer).collect(Collectors.toList());
}

/**
* Transforms an encrypted {@link InputStreamContainer} to a decrypted instance
* @param inputStreamContainer encrypted input stream container instance
* @return decrypted input stream container instance
*/
private InputStreamContainer decryptInputStreamContainer(InputStreamContainer inputStreamContainer) {
long startOfStream = inputStreamContainer.getOffset();
long endOfStream = startOfStream + inputStreamContainer.getContentLength() - 1;
DecryptedRangedStreamProvider decryptedStreamProvider = cryptoHandler.createDecryptingStreamOfRange(
cryptoContext,
startOfStream,
endOfStream
);

long adjustedPos = decryptedStreamProvider.getAdjustedRange()[0];
long adjustedLength = decryptedStreamProvider.getAdjustedRange()[1] - adjustedPos + 1;
final InputStream decryptedStream = decryptedStreamProvider.getDecryptedStreamProvider()
.apply(inputStreamContainer.getInputStream());
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public InputStream readBlob(String blobName) throws IOException {
return cryptoHandler.createDecryptingStream(inputStream);
}

private EncryptedHeaderContentSupplier getEncryptedHeaderContentSupplier(String blobName) {
EncryptedHeaderContentSupplier getEncryptedHeaderContentSupplier(String blobName) {
return (start, end) -> {
byte[] buffer;
int length = (int) (end - start + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String
this.blobChecksum = blobChecksum;
}

public ReadContext(ReadContext readContext) {
this.blobSize = readContext.blobSize;
this.partStreams = readContext.partStreams;
this.blobChecksum = readContext.blobChecksum;
}

public String getBlobChecksum() {
return blobChecksum;
}
Expand Down
Loading

0 comments on commit cebeb23

Please sign in to comment.