Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
- [Remote Store] Add Lock Manager in Remote Segment Store to persist data([#6787](https://github.com/opensearch-project/OpenSearch/pull/6787))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class RemoteDirectory extends Directory {

private final BlobContainer blobContainer;
protected final BlobContainer blobContainer;

public RemoteDirectory(BlobContainer blobContainer) {
this.blobContainer = blobContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.index.store.lockmanager.RemoteStoreMDShardLockManager;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
Expand Down Expand Up @@ -57,6 +59,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR =
new MetadataFilenameUtils.MetadataFilenameComparator();

public static final String METADATA_FILENAME_PREFIX = MetadataFilenameUtils.METADATA_PREFIX;

/**
* remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data
*/
Expand All @@ -66,6 +70,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
*/
private final RemoteDirectory remoteMetadataDirectory;

private final RemoteStoreMDShardLockManager mdLockManager;

/**
* To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation
* This is achieved by uploading refresh metadata file with the same UUID suffix.
Expand All @@ -87,10 +93,12 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {

private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class);

public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException {
public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory,
RemoteStoreMDShardLockManager mdLockManager) throws IOException {
super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
init();
}

Expand Down Expand Up @@ -317,18 +325,49 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}
}

public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException {
public void acquireLock(long primaryTerm, long generation, String resourceID) throws IOException {
Optional<String> metadataFile = getMetadataFileForCommit(primaryTerm, generation);
if (metadataFile.isEmpty()) {
String errorString = "Metadata file is not present for given primary term "
+ primaryTerm + " and generation " + generation;
throw new FileNotFoundException(errorString);
}
mdLockManager.acquire(mdLockManager.getLockInfoBuilder().withMetadataFile(metadataFile.get())
.withResourceId(resourceID).build());
}

// Visible for testing
Optional<String> getMetadataFileForCommit(long primaryTerm, long generation) throws IOException {
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(
MetadataFilenameUtils.METADATA_PREFIX);

return metadataFiles.stream().filter(
file -> (
MetadataFilenameUtils.getPrimaryTerm(
file.split(MetadataFilenameUtils.SEPARATOR)) == primaryTerm
&& MetadataFilenameUtils.getGeneration(
file.split(MetadataFilenameUtils.SEPARATOR)) == generation)).findAny();
}

public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix,
String checksum) throws IOException {
String remoteFilename;
if (useCommonSuffix) {
remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix;
} else {
remoteFilename = getNewRemoteSegmentFilename(dest);
}
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src));
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
}
public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException {
copyFrom(from, src, dest, context, useCommonSuffix, getChecksumOfLocalFile(from, src));
}

public static long getCommitGenerationFromMdFile(String mdFile) {
return MetadataFilenameUtils.getGeneration(mdFile.split(MetadataFilenameUtils.SEPARATOR));
}

/**
* Copies an existing src file from directory from to a non-existent file dest in this directory.
Expand Down Expand Up @@ -408,6 +447,20 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
}

public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore(long primaryTerm, long generation)
throws IOException {
Optional<String> metadataFile = getMetadataFileForCommit(primaryTerm, generation);

if (metadataFile.isEmpty()) {
String errorString = "Metadata file is not present for given primary term "
+ primaryTerm + " and generation " + generation;
throw new FileNotFoundException(errorString);
}
Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(
readMetadataFile(metadataFile.get()));
return Collections.unmodifiableMap(segmentsUploadedToRemoteStore);
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreMDLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMDShardLockManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand All @@ -38,17 +40,25 @@ public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> reposito
@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
String repositoryName = indexSettings.getRemoteStoreRepository();
String indexUUID = indexSettings.getIndex().getUUID();
String shardId = String.valueOf(path.getShardId().getId());
return newDirectory(repositoryName, indexUUID, shardId);
}

public Directory newDirectory(String repositoryName, String indexUUID, String shardId) throws IOException {
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath();
commonBlobPath = commonBlobPath.add(indexSettings.getIndex().getUUID())
.add(String.valueOf(path.getShardId().getId()))
commonBlobPath = commonBlobPath.add(indexUUID)
.add(shardId)
.add("segments");

RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath,"metadata");
RemoteStoreMDShardLockManager mdLockManager = RemoteStoreMDLockManagerFactory.newLockManager(
repositoriesService.get(), repositoryName, indexUUID, shardId);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory);
return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.lockmanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.common.Nullable;
import org.opensearch.index.store.RemoteDirectory;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* A Lock Manager Class for Index Level Lock
* @opensearch.internal
*/
public class FileBasedMDIndexLockManager implements RemoteStoreMDIndexLockManager {
private static final Logger logger = LogManager.getLogger(RemoteStoreMDIndexLockManager.class);
private final RemoteDirectory lockDirectory;
public FileBasedMDIndexLockManager(RemoteDirectory lockDirectory) {
this.lockDirectory = lockDirectory;
}

@Override
public void acquire(RemoteStoreMDIndexLockManager.IndexLockInfo lockInfo) throws IOException {
try (IndexOutput indexOutput = lockDirectory.createOutput(lockInfo.getLockName(), IOContext.DEFAULT)) {
lockInfo.writeLockContent(indexOutput);
}
}

@Override
public void release(String fileName) throws IOException {
lockDirectory.deleteFile(fileName);
}

@Override
public IndexLockFileInfo readLockData(String lockName) throws IOException {
try (IndexInput indexInput = lockDirectory.openInput(lockName, IOContext.DEFAULT)) {
return IndexLockFileInfo.getLockFileInfoFromIndexInput(indexInput);
}
}

@Override
public IndexLockInfo.LockInfoBuilder getLockInfoBuilder() {
return new IndexLockFileInfo.LockInfoBuilder();
}

public Boolean isLockExpiredForResource(String resourceId) throws IOException {
IndexLockFileInfo lockFileData = readLockData(IndexLockFileInfo.generateLockFileName(resourceId));
if (lockFileData.getExpiryTime() != null) {
Instant currentInstantInUTC = Instant.now().atZone(ZoneOffset.UTC).toInstant();
return Instant.parse(lockFileData.getExpiryTime()).compareTo(currentInstantInUTC) > 0;
}
return false;
}

static class IndexLockFileInfo implements RemoteStoreMDIndexLockManager.IndexLockInfo {
private String resourceId;
private String expiryTime = null;

private void setResourceId(String resourceId) {
this.resourceId = resourceId;
}

private void setExpiryTimeFromTTL(String ttl) {
if (!Objects.equals(ttl, RemoteStoreLockManagerUtils.NO_TTL)) {
setExpiryTime(Instant.now().atZone(ZoneOffset.UTC).plusDays(Long.parseLong(ttl)).toInstant().toString());
}
}

private void setExpiryTime(String expiryTime) {
this.expiryTime = expiryTime;
}

public String getExpiryTime() {
return this.expiryTime;
}

public String getResourceId() {
return resourceId;
}

static IndexLockFileInfo getLockFileInfoFromIndexInput(IndexInput indexInput) throws IOException {
Map<String, String> lockData = indexInput.readMapOfStrings();
IndexLockFileInfo indexLockFileInfo = new IndexLockFileInfo();
indexLockFileInfo.setResourceId(lockData.get(RemoteStoreLockManagerUtils.RESOURCE_ID));
indexLockFileInfo.setExpiryTime(lockData.get(RemoteStoreLockManagerUtils.LOCK_EXPIRY_TIME));
return indexLockFileInfo;
}

static String generateLockFileName(String resourceID) {
return resourceID + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION;
}

static String getResourceIDFromLockFile(String lockFile) {
return lockFile.replace(RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, "");
}
static Map<String, String> getLockData(String resourceID, String expiry_time) {
Map<String, String> lockFileData = new HashMap<>();
lockFileData.put(RemoteStoreLockManagerUtils.RESOURCE_ID, resourceID);
if (expiry_time != null) {
lockFileData.put(RemoteStoreLockManagerUtils.LOCK_EXPIRY_TIME, expiry_time);
}
return lockFileData;
}
@Override
public String getLockName() {
return generateLockFileName(this.resourceId);
}

@Override
public void writeLockContent(IndexOutput indexOutput) throws IOException {
indexOutput.writeMapOfStrings(getLockData(resourceId, expiryTime));
}
static class LockInfoBuilder implements RemoteStoreMDIndexLockManager.IndexLockInfo.LockInfoBuilder {
private final IndexLockFileInfo lockFileInfo;

LockInfoBuilder() {
this.lockFileInfo = new IndexLockFileInfo();
}
@Override
public LockInfoBuilder withTTL(String ttl) {
lockFileInfo.setExpiryTimeFromTTL(ttl);
return this;
}

@Override
public LockInfoBuilder withResourceId(String resourceId) {
lockFileInfo.setResourceId(resourceId);
return this;
}

@Override
public IndexLockFileInfo build() {
return lockFileInfo;
}
}
}
}
Loading