-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
- Loading branch information
1 parent
441f520
commit ee70dca
Showing
3 changed files
with
141 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
77 changes: 77 additions & 0 deletions
77
...n/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.routingtable; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.cluster.routing.IndexRoutingTable; | ||
import org.opensearch.cluster.routing.IndexShardRoutingTable; | ||
import org.opensearch.common.io.stream.BufferedChecksumStreamInput; | ||
import org.opensearch.core.common.io.stream.BytesStreamInput; | ||
import org.opensearch.core.common.io.stream.InputStreamStreamInput; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class IndexRoutingTableInputStreamReader { | ||
|
||
private final StreamInput streamInput; | ||
|
||
private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class); | ||
|
||
public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException { | ||
this.streamInput = new InputStreamStreamInput(inputStream); | ||
} | ||
|
||
public Map<String, IndexShardRoutingTable> read() throws IOException { | ||
try { | ||
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) { | ||
// Read the Table Header first | ||
IndexRoutingTableHeader.read(in); | ||
int shards = in.readVInt(); | ||
logger.info("Number of Index Routing Table {}", shards); | ||
Map<String, IndexShardRoutingTable> indicesRouting = new HashMap<String, IndexShardRoutingTable>(Collections.EMPTY_MAP); | ||
for(int i=0; i<shards; i++) | ||
{ | ||
IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); | ||
logger.info("Index Shard Routing Table reading {}", indexShardRoutingTable); | ||
indicesRouting.put(indexShardRoutingTable.getShardId().getIndexName(), indexShardRoutingTable); | ||
|
||
} | ||
verifyCheckSum(in); | ||
// Return indices Routing table | ||
return indicesRouting; | ||
} | ||
} catch (EOFException e) { | ||
throw new IOException("Indices Routing table is corrupted", e); | ||
} | ||
|
||
} | ||
|
||
private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException { | ||
long expectedChecksum = in.getChecksum(); | ||
long readChecksum = in.readLong(); | ||
if (readChecksum != expectedChecksum) { | ||
throw new IOException( | ||
"checksum verification failed - expected: 0x" | ||
+ Long.toHexString(expectedChecksum) | ||
+ ", got: 0x" | ||
+ Long.toHexString(readChecksum) | ||
); | ||
} | ||
} | ||
} |
61 changes: 61 additions & 0 deletions
61
...st/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.routingtable; | ||
|
||
import org.apache.lucene.util.BytesRef; | ||
import org.opensearch.Version; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.cluster.metadata.Metadata; | ||
import org.opensearch.cluster.routing.IndexShardRoutingTable; | ||
import org.opensearch.cluster.routing.RoutingTable; | ||
import org.opensearch.common.io.stream.BytesStreamOutput; | ||
import org.opensearch.core.common.bytes.BytesArray; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.index.seqno.ReplicationTrackerTestCase; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.hasToString; | ||
|
||
public class IndexRoutingTableInputStreamTests extends ReplicationTrackerTestCase { | ||
|
||
public void testRoutingTableInputStream() throws IOException { | ||
Metadata metadata = Metadata.builder() | ||
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) | ||
.build(); | ||
|
||
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); | ||
|
||
initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { | ||
try { | ||
logger.info("IndexShardRoutingTables: {}", indexShardRoutingTables); | ||
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables, | ||
initialRoutingTable.version(), Version.CURRENT); | ||
|
||
IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream); | ||
Map<String, IndexShardRoutingTable> indexShardRoutingTableMap = reader.read(); | ||
|
||
logger.info("indexShardRoutingTableMap: {}", indexShardRoutingTableMap); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
} | ||
|
||
} |