Skip to content

Commit 0e283f9

Browse files
authored
[Zen2] PersistedState interface implementation (#35819)
Today GatewayMetaState is capable of atomically storing MetaData to disk. We've also moved fields that are needed to be persisted in Zen2 from ClusterState to ClusterState.MetaData.CoordinationMetaData. This commit implements PersistedState interface. version and currentTerm are persisted as a part of Manifest. GatewayMetaState now implements both ClusterStateApplier and PersistedState interfaces. We started with two descendants Zen1GatewayMetaState and Zen2GatewayMetaState, but it turned out to be not easy to glue it. GatewayMetaState now constructs previousClusterState (including MetaData) and previousManifest inside the constructor so that all PersistedState methods are usable as soon as GatewayMetaState instance is constructed. Also, loadMetaData is renamed to getMetaData, because it just returns previousClusterState.metaData(). Sadly, we don't have access to localNode (obtained from TransportService in the constructor, so getLastAcceptedState should be called, after setLocalNode method is invoked. Currently, when deciding whether to write IndexMetaData to disk, we're comparing current IndexMetaData version and received IndexMetaData version. This is not safe in Zen2 if the term has changed. So updateClusterState now accepts incremental write method parameter. When it's set to false, we always write IndexMetaData to disk. Things that are not covered by GatewayMetaStateTests are covered by GatewayMetaStatePersistedStateTests. This commit also adds an option to use GatewayMetaState instead of InMemoryPersistedState in TestZenDiscovery. However, by default InMemoryPersistedState is used and only one test in PersistedStateIT used GatewayMetaState. In order to use it for other tests, proper state recovery should be implemented.
1 parent a68a464 commit 0e283f9

22 files changed

+580
-130
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -458,11 +458,13 @@ public interface PersistedState {
458458
*/
459459
default void markLastAcceptedConfigAsCommitted() {
460460
final ClusterState lastAcceptedState = getLastAcceptedState();
461-
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
462-
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
463-
.build();
464-
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
465-
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
461+
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
462+
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
463+
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
464+
.build();
465+
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
466+
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
467+
}
466468
}
467469
}
468470

server/src/main/java/org/elasticsearch/cluster/metadata/Manifest.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,29 @@
4444
* Index metadata generation could be obtained by calling {@link #getIndexGenerations()}.
4545
*/
4646
public class Manifest implements ToXContentFragment {
47-
private static final long MISSING_GLOBAL_GENERATION = -1;
47+
//TODO revisit missing and unknown constants once Zen2 BWC is ready
48+
private static final long MISSING_GLOBAL_GENERATION = -1L;
49+
private static final long MISSING_CURRENT_TERM = 0L;
50+
private static final long UNKNOWN_CURRENT_TERM = MISSING_CURRENT_TERM;
51+
private static final long MISSING_CLUSTER_STATE_VERSION = 0L;
52+
private static final long UNKNOWN_CLUSTER_STATE_VERSION = MISSING_CLUSTER_STATE_VERSION;
4853

4954
private final long globalGeneration;
5055
private final Map<Index, Long> indexGenerations;
56+
private final long currentTerm;
57+
private final long clusterStateVersion;
5158

52-
public Manifest(long globalGeneration, Map<Index, Long> indexGenerations) {
59+
public Manifest(long currentTerm, long clusterStateVersion, long globalGeneration, Map<Index, Long> indexGenerations) {
60+
this.currentTerm = currentTerm;
61+
this.clusterStateVersion = clusterStateVersion;
5362
this.globalGeneration = globalGeneration;
5463
this.indexGenerations = indexGenerations;
5564
}
5665

66+
public static Manifest unknownCurrentTermAndVersion(long globalGeneration, Map<Index, Long> indexGenerations) {
67+
return new Manifest(UNKNOWN_CURRENT_TERM, UNKNOWN_CLUSTER_STATE_VERSION, globalGeneration, indexGenerations);
68+
}
69+
5770
/**
5871
* Returns global metadata generation.
5972
*/
@@ -68,18 +81,38 @@ public Map<Index, Long> getIndexGenerations() {
6881
return indexGenerations;
6982
}
7083

84+
public long getCurrentTerm() {
85+
return currentTerm;
86+
}
87+
88+
public long getClusterStateVersion() {
89+
return clusterStateVersion;
90+
}
91+
7192
@Override
7293
public boolean equals(Object o) {
7394
if (this == o) return true;
7495
if (o == null || getClass() != o.getClass()) return false;
7596
Manifest manifest = (Manifest) o;
76-
return globalGeneration == manifest.globalGeneration &&
77-
Objects.equals(indexGenerations, manifest.indexGenerations);
97+
return currentTerm == manifest.currentTerm &&
98+
clusterStateVersion == manifest.clusterStateVersion &&
99+
globalGeneration == manifest.globalGeneration &&
100+
Objects.equals(indexGenerations, manifest.indexGenerations);
78101
}
79102

80103
@Override
81104
public int hashCode() {
82-
return Objects.hash(globalGeneration, indexGenerations);
105+
return Objects.hash(currentTerm, clusterStateVersion, globalGeneration, indexGenerations);
106+
}
107+
108+
@Override
109+
public String toString() {
110+
return "Manifest{" +
111+
"currentTerm=" + currentTerm +
112+
", clusterStateVersion=" + clusterStateVersion +
113+
", globalGeneration=" + globalGeneration +
114+
", indexGenerations=" + indexGenerations +
115+
'}';
83116
}
84117

85118
private static final String MANIFEST_FILE_PREFIX = "manifest-";
@@ -103,37 +136,57 @@ public Manifest fromXContent(XContentParser parser) throws IOException {
103136
* Code below this comment is for XContent parsing/generation
104137
*/
105138

139+
private static final ParseField CURRENT_TERM_PARSE_FIELD = new ParseField("current_term");
140+
private static final ParseField CLUSTER_STATE_VERSION_PARSE_FIELD = new ParseField("cluster_state_version");
106141
private static final ParseField GENERATION_PARSE_FIELD = new ParseField("generation");
107142
private static final ParseField INDEX_GENERATIONS_PARSE_FIELD = new ParseField("index_generations");
108143

109144
@Override
110145
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
146+
builder.field(CURRENT_TERM_PARSE_FIELD.getPreferredName(), currentTerm);
147+
builder.field(CLUSTER_STATE_VERSION_PARSE_FIELD.getPreferredName(), clusterStateVersion);
111148
builder.field(GENERATION_PARSE_FIELD.getPreferredName(), globalGeneration);
112149
builder.array(INDEX_GENERATIONS_PARSE_FIELD.getPreferredName(), indexEntryList().toArray());
113150
return builder;
114151
}
115152

153+
private static long requireNonNullElseDefault(Long value, long defaultValue) {
154+
return value != null ? value : defaultValue;
155+
}
156+
116157
private List<IndexEntry> indexEntryList() {
117158
return indexGenerations.entrySet().stream().
118159
map(entry -> new IndexEntry(entry.getKey(), entry.getValue())).
119160
collect(Collectors.toList());
120161
}
121162

122-
private static long generation(Object[] generationAndListOfIndexEntries) {
123-
return (Long) generationAndListOfIndexEntries[0];
163+
private static long currentTerm(Object[] manifestFields) {
164+
return requireNonNullElseDefault((Long) manifestFields[0], MISSING_CURRENT_TERM);
165+
}
166+
167+
private static long clusterStateVersion(Object[] manifestFields) {
168+
return requireNonNullElseDefault((Long) manifestFields[1], MISSING_CLUSTER_STATE_VERSION);
169+
}
170+
171+
private static long generation(Object[] manifestFields) {
172+
return requireNonNullElseDefault((Long) manifestFields[2], MISSING_GLOBAL_GENERATION);
124173
}
125174

126-
private static Map<Index, Long> indices(Object[] generationAndListOfIndexEntries) {
127-
List<IndexEntry> listOfIndices = (List<IndexEntry>) generationAndListOfIndexEntries[1];
175+
@SuppressWarnings("unchecked")
176+
private static Map<Index, Long> indices(Object[] manifestFields) {
177+
List<IndexEntry> listOfIndices = (List<IndexEntry>) manifestFields[3];
128178
return listOfIndices.stream().collect(Collectors.toMap(IndexEntry::getIndex, IndexEntry::getGeneration));
129179
}
130180

131181
private static final ConstructingObjectParser<Manifest, Void> PARSER = new ConstructingObjectParser<>(
132182
"manifest",
133-
generationAndListOfIndexEntries ->
134-
new Manifest(generation(generationAndListOfIndexEntries), indices(generationAndListOfIndexEntries)));
183+
manifestFields ->
184+
new Manifest(currentTerm(manifestFields), clusterStateVersion(manifestFields), generation(manifestFields),
185+
indices(manifestFields)));
135186

136187
static {
188+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), CURRENT_TERM_PARSE_FIELD);
189+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_STATE_VERSION_PARSE_FIELD);
137190
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_PARSE_FIELD);
138191
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), IndexEntry.INDEX_ENTRY_PARSER, INDEX_GENERATIONS_PARSE_FIELD);
139192
}
@@ -143,11 +196,12 @@ public static Manifest fromXContent(XContentParser parser) throws IOException {
143196
}
144197

145198
public boolean isEmpty() {
146-
return globalGeneration == MISSING_GLOBAL_GENERATION && indexGenerations.isEmpty();
199+
return currentTerm == MISSING_CURRENT_TERM && clusterStateVersion == MISSING_CLUSTER_STATE_VERSION
200+
&& globalGeneration == MISSING_GLOBAL_GENERATION && indexGenerations.isEmpty();
147201
}
148202

149203
public static Manifest empty() {
150-
return new Manifest(MISSING_GLOBAL_GENERATION, Collections.emptyMap());
204+
return new Manifest(MISSING_CURRENT_TERM, MISSING_CLUSTER_STATE_VERSION, MISSING_GLOBAL_GENERATION, Collections.emptyMap());
151205
}
152206

153207
public boolean isGlobalGenerationMissing() {

server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
3939
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
4040
import org.elasticsearch.discovery.zen.ZenDiscovery;
41+
import org.elasticsearch.gateway.GatewayMetaState;
4142
import org.elasticsearch.plugins.DiscoveryPlugin;
4243
import org.elasticsearch.threadpool.ThreadPool;
4344
import org.elasticsearch.transport.TransportService;
@@ -73,7 +74,7 @@ public class DiscoveryModule {
7374
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
7475
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
7576
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
76-
AllocationService allocationService, Path configFile) {
77+
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
7778
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
7879
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
7980
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
@@ -118,15 +119,16 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
118119
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
119120
discoveryTypes.put("zen",
120121
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
121-
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators)));
122+
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState));
122123
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
123124
for (DiscoveryPlugin plugin : plugins) {
124125
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
125-
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService).entrySet().forEach(entry -> {
126-
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
127-
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
128-
}
129-
});
126+
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState).entrySet()
127+
.forEach(entry -> {
128+
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
129+
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
130+
}
131+
});
130132
}
131133
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
132134
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);

server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3939
import org.elasticsearch.cluster.service.ClusterApplier;
4040
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
41+
import org.elasticsearch.cluster.service.ClusterApplierService;
4142
import org.elasticsearch.cluster.service.MasterService;
4243
import org.elasticsearch.common.Priority;
4344
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@@ -58,6 +59,7 @@
5859
import org.elasticsearch.discovery.DiscoveryStats;
5960
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
6061
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
62+
import org.elasticsearch.gateway.GatewayMetaState;
6163
import org.elasticsearch.tasks.Task;
6264
import org.elasticsearch.threadpool.ThreadPool;
6365
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@@ -159,7 +161,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
159161
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
160162
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
161163
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
162-
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
164+
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState) {
163165
super(settings);
164166
this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
165167
this.masterService = masterService;
@@ -227,6 +229,10 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
227229

228230
transportService.registerRequestHandler(
229231
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
232+
233+
if (clusterApplier instanceof ClusterApplierService) {
234+
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
235+
}
230236
}
231237

232238
static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(

0 commit comments

Comments
 (0)