-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Switch MetaDataStateFormat to Lucene directory abstraction #33989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
76956de
afa9a3b
c400868
71ca04d
a1ed7b3
5b5ef4d
0f33a39
0d8b842
554537e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.OutputStreamIndexOutput; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.apache.lucene.store.SimpleFSDirectory; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.common.logging.Loggers; | ||
|
@@ -48,9 +48,9 @@ | |
import java.nio.file.Files; | ||
import java.nio.file.NoSuchFileException; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardCopyOption; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
@@ -69,7 +69,6 @@ public abstract class MetaDataStateFormat<T> { | |
private static final String STATE_FILE_CODEC = "state"; | ||
private static final int MIN_COMPATIBLE_STATE_FILE_VERSION = 1; | ||
private static final int STATE_FILE_VERSION = 1; | ||
private static final int BUFFER_SIZE = 4096; | ||
private final String prefix; | ||
private final Pattern stateFilePattern; | ||
|
||
|
@@ -81,16 +80,75 @@ public abstract class MetaDataStateFormat<T> { | |
protected MetaDataStateFormat(String prefix) { | ||
this.prefix = prefix; | ||
this.stateFilePattern = Pattern.compile(Pattern.quote(prefix) + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?"); | ||
} | ||
|
||
private static void deleteFileIfExists(Path stateLocation, Directory directory, String fileName) throws IOException { | ||
try { | ||
directory.deleteFile(fileName); | ||
} catch (FileNotFoundException | NoSuchFileException ignored) { | ||
|
||
} | ||
logger.trace("cleaned up {}", stateLocation.resolve(fileName)); | ||
} | ||
|
||
private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName) | ||
throws IOException { | ||
try { | ||
deleteFileIfExists(stateLocation, stateDir, tmpFileName); | ||
try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { | ||
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); | ||
out.writeInt(FORMAT.index()); | ||
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { | ||
@Override | ||
public void close() throws IOException { | ||
// this is important since some of the XContentBuilders write bytes on close. | ||
// in order to write the footer we need to prevent closing the actual index input. | ||
} | ||
})) { | ||
|
||
builder.startObject(); | ||
{ | ||
toXContent(builder, state); | ||
} | ||
builder.endObject(); | ||
} | ||
CodecUtil.writeFooter(out); | ||
} | ||
|
||
stateDir.sync(Collections.singleton(tmpFileName)); | ||
stateDir.rename(tmpFileName, fileName); | ||
stateDir.syncMetaData(); | ||
logger.trace("written state to {}", stateLocation.resolve(fileName)); | ||
} finally { | ||
deleteFileIfExists(stateLocation, stateDir, tmpFileName); | ||
} | ||
} | ||
|
||
private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName) | ||
throws IOException { | ||
try (Directory extraStateDir = newDirectory(extraStateLocation)) { | ||
try { | ||
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); | ||
extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT); | ||
extraStateDir.sync(Collections.singleton(tmpFileName)); | ||
extraStateDir.rename(tmpFileName, fileName); | ||
extraStateDir.syncMetaData(); | ||
logger.trace("copied state to {}", extraStateLocation.resolve(fileName)); | ||
} finally { | ||
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Writes the given state to the given directories. The state is written to a | ||
* state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it | ||
* doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to | ||
* it's target filename of the pattern {@code {prefix}{version}.st}. | ||
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it. | ||
* But if this method throws an exception, loadLatestState could return this state or some previous state. | ||
* | ||
* @param state the state object to write | ||
* @param state the state object to write | ||
* @param locations the locations where the state should be written to. | ||
* @throws IOException if an IOException occurs | ||
*/ | ||
|
@@ -101,60 +159,22 @@ public final void write(final T state, final Path... locations) throws IOExcepti | |
if (locations.length <= 0) { | ||
throw new IllegalArgumentException("One or more locations required"); | ||
} | ||
final long maxStateId = findMaxStateId(prefix, locations)+1; | ||
final long maxStateId = findMaxStateId(prefix, locations) + 1; | ||
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; | ||
|
||
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; | ||
Path stateLocation = locations[0].resolve(STATE_DIR_NAME); | ||
Files.createDirectories(stateLocation); | ||
final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp"); | ||
final Path finalStatePath = stateLocation.resolve(fileName); | ||
try { | ||
final String resourceDesc = "MetaDataStateFormat.write(path=\"" + tmpStatePath + "\")"; | ||
try (OutputStreamIndexOutput out = | ||
new OutputStreamIndexOutput(resourceDesc, fileName, Files.newOutputStream(tmpStatePath), BUFFER_SIZE)) { | ||
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); | ||
out.writeInt(FORMAT.index()); | ||
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { | ||
@Override | ||
public void close() throws IOException { | ||
// this is important since some of the XContentBuilders write bytes on close. | ||
// in order to write the footer we need to prevent closing the actual index input. | ||
} })) { | ||
final String tmpFileName = fileName + ".tmp"; | ||
final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); | ||
try (Directory stateDir = newDirectory(firstStateLocation)) { | ||
writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName); | ||
|
||
builder.startObject(); | ||
{ | ||
toXContent(builder, state); | ||
} | ||
builder.endObject(); | ||
} | ||
CodecUtil.writeFooter(out); | ||
} | ||
IOUtils.fsync(tmpStatePath, false); // fsync the state file | ||
Files.move(tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE); | ||
IOUtils.fsync(stateLocation, true); | ||
logger.trace("written state to {}", finalStatePath); | ||
for (int i = 1; i < locations.length; i++) { | ||
stateLocation = locations[i].resolve(STATE_DIR_NAME); | ||
Files.createDirectories(stateLocation); | ||
Path tmpPath = stateLocation.resolve(fileName + ".tmp"); | ||
Path finalPath = stateLocation.resolve(fileName); | ||
try { | ||
Files.copy(finalStatePath, tmpPath); | ||
IOUtils.fsync(tmpPath, false); // fsync the state file | ||
// we are on the same FileSystem / Partition here we can do an atomic move | ||
Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE); | ||
IOUtils.fsync(stateLocation, true); | ||
logger.trace("copied state to {}", finalPath); | ||
} finally { | ||
Files.deleteIfExists(tmpPath); | ||
logger.trace("cleaned up {}", tmpPath); | ||
} | ||
final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); | ||
copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName); | ||
} | ||
} finally { | ||
Files.deleteIfExists(tmpStatePath); | ||
logger.trace("cleaned up {}", tmpStatePath); | ||
andrershov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
cleanupOldFiles(prefix, fileName, locations); | ||
|
||
cleanupOldFiles(fileName, locations); | ||
} | ||
|
||
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException { | ||
|
@@ -207,26 +227,29 @@ protected Directory newDirectory(Path dir) throws IOException { | |
return new SimpleFSDirectory(dir); | ||
} | ||
|
||
private void cleanupOldFiles(final String prefix, final String currentStateFile, Path[] locations) throws IOException { | ||
final DirectoryStream.Filter<Path> filter = entry -> { | ||
final String entryFileName = entry.getFileName().toString(); | ||
return Files.isRegularFile(entry) | ||
&& entryFileName.startsWith(prefix) // only state files | ||
&& currentStateFile.equals(entryFileName) == false; // keep the current state file around | ||
}; | ||
// now clean up the old files | ||
for (Path dataLocation : locations) { | ||
logger.trace("cleanupOldFiles: cleaning up {}", dataLocation); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why remove this log line? I think that was added to check how long the deletes take There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same reason as writeStateToFirstLocation, fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks to be still missing. |
||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dataLocation.resolve(STATE_DIR_NAME), filter)) { | ||
for (Path stateFile : stream) { | ||
Files.deleteIfExists(stateFile); | ||
logger.trace("cleanupOldFiles: cleaned up {}", stateFile); | ||
private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException { | ||
for (Path location : locations) { | ||
logger.trace("cleanupOldFiles: cleaning up {}", location); | ||
Path stateLocation = location.resolve(STATE_DIR_NAME); | ||
try (Directory stateDir = newDirectory(stateLocation)) { | ||
for (String file : stateDir.listAll()) { | ||
if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { | ||
deleteFileIfExists(stateLocation, stateDir, file); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
long findMaxStateId(final String prefix, Path... locations) throws IOException { | ||
/** | ||
* Finds state file with maximum id. | ||
* | ||
* @param prefix - filename prefix | ||
* @param locations - paths to directories with state folder | ||
* @return maximum id of state file or -1 if no such files are found | ||
* @throws IOException if IOException occurs | ||
*/ | ||
private long findMaxStateId(final String prefix, Path... locations) throws IOException { | ||
long maxId = -1; | ||
for (Path dataLocation : locations) { | ||
final Path resolve = dataLocation.resolve(STATE_DIR_NAME); | ||
|
@@ -245,6 +268,24 @@ long findMaxStateId(final String prefix, Path... locations) throws IOException { | |
return maxId; | ||
} | ||
|
||
private List<Path> findStateFilesByGeneration(final long generation, Path... locations) { | ||
List<Path> files = new ArrayList<>(); | ||
if (generation == -1) { | ||
return files; | ||
} | ||
|
||
final String fileName = prefix + generation + STATE_FILE_EXTENSION; | ||
for (Path dataLocation : locations) { | ||
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName); | ||
if (Files.exists(stateFilePath)) { | ||
logger.trace("found state file: {}", stateFilePath); | ||
files.add(stateFilePath); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add trace logging to this (like it was before):
|
||
} | ||
} | ||
|
||
return files; | ||
} | ||
|
||
/** | ||
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by | ||
* the states version from one or more data directories and if none of the latest states can be loaded an exception | ||
|
@@ -255,78 +296,39 @@ long findMaxStateId(final String prefix, Path... locations) throws IOException { | |
* @return the latest state or <code>null</code> if no state was found. | ||
*/ | ||
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException { | ||
List<PathAndStateId> files = new ArrayList<>(); | ||
long maxStateId = -1; | ||
if (dataLocations != null) { // select all eligible files first | ||
for (Path dataLocation : dataLocations) { | ||
final Path stateDir = dataLocation.resolve(STATE_DIR_NAME); | ||
// now, iterate over the current versions, and find latest one | ||
// we don't check if the stateDir is present since it could be deleted | ||
// after the check. Also if there is a _state file and it's not a dir something is really wrong | ||
// we don't pass a glob since we need the group part for parsing | ||
try (DirectoryStream<Path> paths = Files.newDirectoryStream(stateDir)) { | ||
for (Path stateFile : paths) { | ||
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString()); | ||
if (matcher.matches()) { | ||
final long stateId = Long.parseLong(matcher.group(1)); | ||
maxStateId = Math.max(maxStateId, stateId); | ||
PathAndStateId pav = new PathAndStateId(stateFile, stateId); | ||
logger.trace("found state file: {}", pav); | ||
files.add(pav); | ||
} | ||
} | ||
} catch (NoSuchFileException | FileNotFoundException ex) { | ||
// no _state directory -- move on | ||
} | ||
} | ||
long maxStateId = findMaxStateId(prefix, dataLocations); | ||
List<Path> stateFiles = findStateFilesByGeneration(maxStateId, dataLocations); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If maxStateId > 0 and stateFiles.isEmpty() I think we should throw an exception here as it clearly indicates something went horribly wrong (e.g. concurrent writes or deletes). That same condition was previously covered by distinguishing between |
||
|
||
if (maxStateId > -1 && stateFiles.isEmpty()) { | ||
throw new IllegalStateException("unable to find state files with state id " + maxStateId + | ||
" returned by findMaxStateId function, in data folders [" + | ||
Arrays.stream(dataLocations).map(Path::toAbsolutePath). | ||
map(Object::toString).collect(Collectors.joining(", ")) + | ||
"], concurrent writes?"); | ||
} | ||
// NOTE: we might have multiple version of the latest state if there are multiple data dirs.. for this case | ||
// we iterate only over the ones with the max version. | ||
long finalMaxStateId = maxStateId; | ||
Collection<PathAndStateId> pathAndStateIds = files | ||
.stream() | ||
.filter(pathAndStateId -> pathAndStateId.id == finalMaxStateId) | ||
.collect(Collectors.toCollection(ArrayList::new)); | ||
|
||
final List<Throwable> exceptions = new ArrayList<>(); | ||
for (PathAndStateId pathAndStateId : pathAndStateIds) { | ||
for (Path stateFile : stateFiles) { | ||
try { | ||
T state = read(namedXContentRegistry, pathAndStateId.file); | ||
logger.trace("state id [{}] read from [{}]", pathAndStateId.id, pathAndStateId.file.getFileName()); | ||
T state = read(namedXContentRegistry, stateFile); | ||
logger.trace("state id [{}] read from [{}]", maxStateId, stateFile.getFileName()); | ||
return state; | ||
} catch (Exception e) { | ||
exceptions.add(new IOException("failed to read " + pathAndStateId.toString(), e)); | ||
exceptions.add(new IOException("failed to read " + stateFile.toAbsolutePath(), e)); | ||
logger.debug(() -> new ParameterizedMessage( | ||
"{}: failed to read [{}], ignoring...", pathAndStateId.file.toAbsolutePath(), prefix), e); | ||
"{}: failed to read [{}], ignoring...", stateFile.toAbsolutePath(), prefix), e); | ||
} | ||
} | ||
// if we reach this something went wrong | ||
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); | ||
if (files.size() > 0) { | ||
if (stateFiles.size() > 0) { | ||
// We have some state files but none of them gave us a usable state | ||
throw new IllegalStateException("Could not find a state file to recover from among " + files); | ||
throw new IllegalStateException("Could not find a state file to recover from among " + | ||
stateFiles.stream().map(Path::toAbsolutePath).map(Object::toString).collect(Collectors.joining(", "))); | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
* Internal struct-like class that holds the parsed state id and the file | ||
*/ | ||
private static class PathAndStateId { | ||
final Path file; | ||
final long id; | ||
|
||
private PathAndStateId(Path file, long id) { | ||
this.file = file; | ||
this.id = id; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "[id:" + id + ", file:" + file.toAbsolutePath() + "]"; | ||
} | ||
} | ||
|
||
/** | ||
* Deletes all meta state directories recursively for the given data locations | ||
* @param dataLocations the data location to delete | ||
|
Uh oh!
There was an error while loading. Please reload this page.