Skip to content

Commit 72f9f9f

Browse files
committed
unit test for data only node write and refactor similar to #10016
1 parent 9111aa5 commit 72f9f9f

File tree

4 files changed

+339
-53
lines changed

4 files changed

+339
-53
lines changed

src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

Lines changed: 33 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.gateway;
2121

22-
import com.carrotsearch.hppc.cursors.IntObjectCursor;
2322
import com.google.common.collect.Lists;
2423
import com.google.common.collect.Maps;
2524
import org.elasticsearch.ElasticsearchException;
@@ -33,7 +32,6 @@
3332
import org.elasticsearch.cluster.node.DiscoveryNode;
3433
import org.elasticsearch.cluster.routing.*;
3534
import org.elasticsearch.common.Nullable;
36-
import org.elasticsearch.common.Preconditions;
3735
import org.elasticsearch.common.component.AbstractComponent;
3836
import org.elasticsearch.common.inject.Inject;
3937
import org.elasticsearch.common.settings.ImmutableSettings;
@@ -46,6 +44,7 @@
4644
import java.nio.file.DirectoryStream;
4745
import java.nio.file.Files;
4846
import java.nio.file.Path;
47+
import java.util.*;
4948

5049
/**
5150
*
@@ -58,17 +57,19 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
5857
private final NodeEnvironment nodeEnv;
5958
private final MetaStateService metaStateService;
6059
private final DanglingIndicesState danglingIndicesState;
60+
private final IndexMetaState indexMetaState;
6161

6262
@Nullable
6363
private volatile MetaData currentMetaData;
6464

6565
@Inject
6666
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
67-
DanglingIndicesState danglingIndicesState, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
67+
DanglingIndicesState danglingIndicesState, IndexMetaState indexMetaState, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
6868
super(settings);
6969
this.nodeEnv = nodeEnv;
7070
this.metaStateService = metaStateService;
7171
this.danglingIndicesState = danglingIndicesState;
72+
this.indexMetaState = indexMetaState;
7273
nodesListGatewayMetaState.init(this);
7374

7475

@@ -93,6 +94,26 @@ public MetaData loadMetaState() throws Exception {
9394
return metaStateService.loadFullState();
9495
}
9596

97+
public static class IndexMetaWriteInfo {
98+
IndexMetaData newMetaData;
99+
String reason;
100+
public IndexMetaData previousMetaData;
101+
102+
public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) {
103+
this.newMetaData = newMetaData;
104+
this.reason = reason;
105+
this.previousMetaData = previousMetaData;
106+
}
107+
108+
public IndexMetaData getNewMetaData() {
109+
return newMetaData;
110+
}
111+
112+
public String getReason() {
113+
return reason;
114+
}
115+
}
116+
96117
@Override
97118
public void clusterChanged(ClusterChangedEvent event) {
98119
final ClusterState state = event.state();
@@ -117,39 +138,17 @@ public void clusterChanged(ClusterChangedEvent event) {
117138
}
118139
}
119140

120-
// check and write changes in indices
121-
for (IndexMetaData indexMetaData : newMetaData) {
122-
String writeReason = null;
123-
IndexMetaData currentIndexMetaData;
124-
if (currentMetaData == null) {
125-
// a new event..., check from the state stored
126-
try {
127-
currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
128-
} catch (IOException ex) {
129-
throw new ElasticsearchException("failed to load index state", ex);
130-
}
131-
} else {
132-
currentIndexMetaData = currentMetaData.index(indexMetaData.index());
133-
}
134-
boolean shardsAllocatedOnThisNodeInLastClusterState = shardsAllocatedOnLocalNode(event.previousState(), indexMetaData);
135-
boolean shardsAllocatedOnThisNodeInNewClusterState = shardsAllocatedOnLocalNode(event.state(), indexMetaData);
136-
if (isDataOnlyNode(state) && (shardsAllocatedOnThisNodeInLastClusterState == false) && (shardsAllocatedOnThisNodeInNewClusterState == true)) {
137-
// shard was newly allocated because it was not allocated in last cluster state but is now
138-
// we removed the index state before so now we have to write it again
139-
writeReason = "shard allocated on data only node";
140-
} else if (shouldWriteOnNode(state, shardsAllocatedOnThisNodeInNewClusterState) && currentIndexMetaData == null) {
141-
writeReason = "freshly created";
142-
} else if (shouldWriteOnNode(state, shardsAllocatedOnThisNodeInNewClusterState) && currentIndexMetaData.version() != indexMetaData.version()) {
143-
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
144-
}
145-
146-
// we update the writeReason only if we really need to write it
147-
if (writeReason == null) {
148-
continue;
149-
}
141+
Iterable<IndexMetaWriteInfo> writeInfo = new ArrayList<>();
142+
if (isDataOnlyNode(event.state())) {
143+
writeInfo = indexMetaState.getIndicesToWriteDataOnlyNode(event, currentMetaData);
144+
} else if (isMasterEligibleNode(event.state())) {
145+
writeInfo = indexMetaState.getIndicesToWriteMasterNode(event, currentMetaData);
146+
}
150147

148+
// check and write changes in indices
149+
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
151150
try {
152-
metaStateService.writeIndex(writeReason, indexMetaData, currentIndexMetaData);
151+
metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData);
153152
} catch (Throwable e) {
154153
success = false;
155154
}
@@ -163,24 +162,6 @@ public void clusterChanged(ClusterChangedEvent event) {
163162
}
164163
}
165164

166-
protected boolean shardsAllocatedOnLocalNode(ClusterState state, IndexMetaData indexMetaData) {
167-
boolean shardsAllocatedOnThisNode = false;
168-
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexMetaData.index());
169-
if (indexRoutingTable == null) {
170-
// nothing allocated ?
171-
return false;
172-
}
173-
// iterate over shards and see if one is on our node
174-
for (IntObjectCursor it : indexRoutingTable.shards()) {
175-
IndexShardRoutingTable shardRoutingTable = (IndexShardRoutingTable) it.value;
176-
for (ShardRouting shardRouting : shardRoutingTable.shards()) {
177-
if (shardRouting.currentNodeId() != null && shardRouting.currentNodeId().equals(state.nodes().localNode().getId())) {
178-
shardsAllocatedOnThisNode = true;
179-
}
180-
}
181-
}
182-
return shardsAllocatedOnThisNode;
183-
}
184165

185166
protected boolean isDataOnlyNode(ClusterState state) {
186167
return ((isMasterEligibleNode(state) == false) && (state.nodes().localNode().dataNode() == true));
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.gateway;
21+
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.collect.ImmutableMap;
24+
import com.google.common.collect.Maps;
25+
import org.elasticsearch.cluster.ClusterChangedEvent;
26+
import org.elasticsearch.cluster.metadata.IndexMetaData;
27+
import org.elasticsearch.cluster.metadata.MetaData;
28+
import org.elasticsearch.cluster.routing.MutableShardRouting;
29+
import org.elasticsearch.cluster.routing.RoutingNode;
30+
import org.elasticsearch.common.component.AbstractComponent;
31+
import org.elasticsearch.common.inject.Inject;
32+
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
34+
import org.elasticsearch.env.NodeEnvironment;
35+
36+
import java.io.IOException;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
import java.util.Set;
40+
41+
/**
42+
* The dangling indices state is responsible for finding new dangling indices (indices that have
43+
* their state written on disk, but don't exists in the metadata of the cluster), and importing
44+
* them into the cluster.
45+
*/
46+
public class IndexMetaState extends AbstractComponent {
47+
48+
private final NodeEnvironment nodeEnv;
49+
private final MetaStateService metaStateService;
50+
51+
52+
@Inject
53+
public IndexMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService) {
54+
super(settings);
55+
this.nodeEnv = nodeEnv;
56+
this.metaStateService = metaStateService;
57+
}
58+
59+
public Iterable<GatewayMetaState.IndexMetaWriteInfo> getIndicesToWriteDataOnlyNode(ClusterChangedEvent event, MetaData currentMetaData) {
60+
Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
61+
RoutingNode thisNode = event.state().getRoutingNodes().node(event.state().nodes().localNodeId());
62+
if ( thisNode == null) {
63+
// this needs some other handling
64+
return indicesToWrite.values();
65+
}
66+
// iterate over all shards allocated on this node in the new cluster state but only write if ...
67+
for (MutableShardRouting shardRouting : thisNode) {
68+
IndexMetaData indexMetaData = event.state().metaData().index(shardRouting.index());
69+
IndexMetaData currentIndexMetaData = maybeLoadIndexState(currentMetaData, indexMetaData);
70+
String writeReason = null;
71+
// ... state persistence was disabled or index was newly created
72+
if (currentIndexMetaData == null) {
73+
writeReason = "freshly created";
74+
// ... new shard is allocated on node (we could optimize here and make sure only written once and not for each shard per index -> do later)
75+
} else if (shardRouting.initializing()) {
76+
writeReason = "newly allocated on node";
77+
// ... version changed
78+
} else if (indexMetaData.version() != currentIndexMetaData.version()) {
79+
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
80+
}
81+
if (writeReason != null) {
82+
indicesToWrite.put(shardRouting.index(),
83+
new GatewayMetaState.IndexMetaWriteInfo(indexMetaData, currentIndexMetaData,
84+
writeReason));
85+
}
86+
}
87+
return indicesToWrite.values();
88+
}
89+
public Iterable<GatewayMetaState.IndexMetaWriteInfo> getIndicesToWriteMasterNode(ClusterChangedEvent event, MetaData currentMetaData) {
90+
Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
91+
MetaData newMetaData = event.state().metaData();
92+
// iterate over all indices but only write if ...
93+
for (IndexMetaData indexMetaData : newMetaData) {
94+
String writeReason = null;
95+
IndexMetaData currentIndexMetaData = maybeLoadIndexState(currentMetaData, indexMetaData);
96+
// ... new index or state persistence was disabled?
97+
if ( currentIndexMetaData == null) {
98+
writeReason = "freshly created";
99+
// ... version changed
100+
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
101+
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
102+
}
103+
if (writeReason!=null) {
104+
indicesToWrite.put(indexMetaData.index(),
105+
new GatewayMetaState.IndexMetaWriteInfo(indexMetaData, currentIndexMetaData,
106+
writeReason));
107+
}
108+
109+
}
110+
return indicesToWrite.values();
111+
}
112+
113+
protected IndexMetaData maybeLoadIndexState(MetaData currentMetaData, IndexMetaData indexMetaData) {
114+
IndexMetaData currentIndexMetaData = null;
115+
if (currentMetaData != null) {
116+
currentIndexMetaData = currentMetaData.index(indexMetaData.index());
117+
} else {
118+
try {
119+
currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
120+
} catch (IOException e) {
121+
logger.debug("failed to load index state ", e);
122+
}
123+
} return currentIndexMetaData;
124+
}
125+
126+
}

0 commit comments

Comments
 (0)