Skip to content

Commit

Permalink
Merge pull request #3 from warwick-mitchell1/feature/remote_reindex
Browse files Browse the repository at this point in the history
Allow remote reindex from a remote cluster
  • Loading branch information
warwick-mitchell1 authored Jun 13, 2022
2 parents f484d8c + b33e1b3 commit ac267cf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.0
0.3.0
5 changes: 3 additions & 2 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func Reindex() cli.Command {
cli.StringFlag{Name: "reindex-host-allocation", Usage: "Optional target host for the reindex to happen on. eg. 'es-reindex-*'"},
cli.StringFlag{Name: "dest-host-allocation", Usage: "Optional target host once the reindex is complete. eg. 'es-data-*'"},
cli.StringFlag{Name: "extra-suffix", Usage: "Optional extra suffix name to add to index name (after date). Ignored if dest-index is set"},
cli.StringFlag{Name: "remote-src", Usage: "Remote ElasticSearch cluster to reindex from. eg. 'http://es-client:9200'"},
},
Action: func(c *cli.Context) error {
if c.NArg() == 0 || c.NArg() > 1 {
Expand Down Expand Up @@ -174,7 +175,7 @@ func Reindex() cli.Command {

_, fileName := filepath.Split(filePath)
alias := strings.TrimSuffix(fileName, ".json")
if err = elasticsearch.ReindexOne(client, alias, newIndex, c.Bool("version-external"), c.Bool("no-update-alias"), c.Bool("bulk-indexing")); err != nil {
if err = elasticsearch.ReindexOne(client, alias, newIndex, c.String("remote-src"), c.Bool("version-external"), c.Bool("no-update-alias"), c.Bool("bulk-indexing")); err != nil {
return err
}

Expand All @@ -200,7 +201,7 @@ func Reindex() cli.Command {
return err
}

if err = elasticsearch.ReindexAll(client, aliasToNewIndex); err != nil {
if err = elasticsearch.ReindexAll(client, aliasToNewIndex, c.String("remote-src")); err != nil {
return err
}

Expand Down
22 changes: 16 additions & 6 deletions elasticsearch/elasdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func UpdateTemplatesAndCreateNewIndices(client *elastic.Client, templatesDir str
return aliasToNewIndex, err
}

func ReindexOne(client *elastic.Client, alias, newIndex string, versionExternal, noUpdateAlias, bulkIndexing bool) error {
func ReindexOne(client *elastic.Client, alias, newIndex, remoteSrc string, versionExternal, noUpdateAlias, bulkIndexing bool) error {
// We assume that we are reindexing from an existing index on the alias
reindexingRequired := true

Expand All @@ -142,16 +142,26 @@ func ReindexOne(client *elastic.Client, alias, newIndex string, versionExternal,
}

// If we have data in an existing index that needs to be reindex with the new template to the new index
if reindexingRequired {
// We should only ever get one result if our naming is sensible
indicesFromAlias := aliasResult.IndicesByAlias(alias)
// Also assume the remote index exists if specificed a remote source cluster
if reindexingRequired || remoteSrc != "" {
// Default to alias name (used for remote clusters only)
indicesFromAlias := []string{alias}
if remoteSrc == "" {
// Search for indexname based off alias. We should only ever get one result if our naming is sensible
indicesFromAlias = aliasResult.IndicesByAlias(alias)
}
for _, index := range indicesFromAlias {
// Reindex from the existing index as the source to the new index as the destination
targetVersionType := "internal"
if versionExternal {
targetVersionType = "external"
}
src := elastic.NewReindexSource().Index(index)
if remoteSrc != "" {
// A remote cluster
remoteReindexSrc := elastic.NewReindexRemoteInfo().Host(remoteSrc)
src = elastic.NewReindexSource().Index(index).RemoteInfo(remoteReindexSrc)
}
dst := elastic.NewReindexDestination().Index(newIndex).VersionType(targetVersionType)
refresh, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Conflicts("proceed").Do(context.Background())
if err != nil {
Expand Down Expand Up @@ -267,9 +277,9 @@ func UpdateHostAllocation(client *elastic.Client, newIndex, allocation string) e
return nil
}

func ReindexAll(client *elastic.Client, aliasToNewIndex map[string]string) error {
func ReindexAll(client *elastic.Client, aliasToNewIndex map[string]string, remoteSrc string) error {
for alias, newIndex := range aliasToNewIndex {
if err := ReindexOne(client, alias, newIndex, false, false, false); err != nil {
if err := ReindexOne(client, alias, newIndex, remoteSrc, false, false, false); err != nil {
return errors.Wrapf(err, "failed reindexing to %s and adding to alias %s", newIndex, alias)
}
}
Expand Down

0 comments on commit ac267cf

Please sign in to comment.