diff --git a/VERSION b/VERSION index 341cf11..0d91a54 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.0 \ No newline at end of file +0.3.0 diff --git a/cli/app.go b/cli/app.go index 36a3231..5b1e716 100644 --- a/cli/app.go +++ b/cli/app.go @@ -143,6 +143,7 @@ func Reindex() cli.Command { cli.StringFlag{Name: "reindex-host-allocation", Usage: "Optional target host for the reindex to happen on. eg. 'es-reindex-*'"}, cli.StringFlag{Name: "dest-host-allocation", Usage: "Optional target host once the reindex is complete. eg. 'es-data-*'"}, cli.StringFlag{Name: "extra-suffix", Usage: "Optional extra suffix name to add to index name (after date). Ignored if dest-index is set"}, + cli.StringFlag{Name: "remote-src", Usage: "Remote ElasticSearch cluster to reindex from. eg. 'http://es-client:9200'"}, }, Action: func(c *cli.Context) error { if c.NArg() == 0 || c.NArg() > 1 { @@ -174,7 +175,7 @@ func Reindex() cli.Command { _, fileName := filepath.Split(filePath) alias := strings.TrimSuffix(fileName, ".json") - if err = elasticsearch.ReindexOne(client, alias, newIndex, c.Bool("version-external"), c.Bool("no-update-alias"), c.Bool("bulk-indexing")); err != nil { + if err = elasticsearch.ReindexOne(client, alias, newIndex, c.String("remote-src"), c.Bool("version-external"), c.Bool("no-update-alias"), c.Bool("bulk-indexing")); err != nil { return err } @@ -200,7 +201,7 @@ func Reindex() cli.Command { return err } - if err = elasticsearch.ReindexAll(client, aliasToNewIndex); err != nil { + if err = elasticsearch.ReindexAll(client, aliasToNewIndex, c.String("remote-src")); err != nil { return err } diff --git a/elasticsearch/elasdx.go b/elasticsearch/elasdx.go index 8b9e86a..3940645 100644 --- a/elasticsearch/elasdx.go +++ b/elasticsearch/elasdx.go @@ -125,7 +125,7 @@ func UpdateTemplatesAndCreateNewIndices(client *elastic.Client, templatesDir str return aliasToNewIndex, err } -func ReindexOne(client *elastic.Client, alias, newIndex string, versionExternal, noUpdateAlias, bulkIndexing bool) error { +func ReindexOne(client *elastic.Client, alias, newIndex, remoteSrc string, versionExternal, noUpdateAlias, bulkIndexing bool) error { // We assume that we are reindexing from an existing index on the alias reindexingRequired := true @@ -142,9 +142,14 @@ func ReindexOne(client *elastic.Client, alias, newIndex string, versionExternal, } // If we have data in an existing index that needs to be reindex with the new template to the new index - if reindexingRequired { - // We should only ever get one result if our naming is sensible - indicesFromAlias := aliasResult.IndicesByAlias(alias) + // Also assume the remote index exists if specificed a remote source cluster + if reindexingRequired || remoteSrc != "" { + // Default to alias name (used for remote clusters only) + indicesFromAlias := []string{alias} + if remoteSrc == "" { + // Search for indexname based off alias. We should only ever get one result if our naming is sensible + indicesFromAlias = aliasResult.IndicesByAlias(alias) + } for _, index := range indicesFromAlias { // Reindex from the existing index as the source to the new index as the destination targetVersionType := "internal" @@ -152,6 +157,11 @@ func ReindexOne(client *elastic.Client, alias, newIndex string, versionExternal, targetVersionType = "external" } src := elastic.NewReindexSource().Index(index) + if remoteSrc != "" { + // A remote cluster + remoteReindexSrc := elastic.NewReindexRemoteInfo().Host(remoteSrc) + src = elastic.NewReindexSource().Index(index).RemoteInfo(remoteReindexSrc) + } dst := elastic.NewReindexDestination().Index(newIndex).VersionType(targetVersionType) refresh, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Conflicts("proceed").Do(context.Background()) if err != nil { @@ -267,9 +277,9 @@ func UpdateHostAllocation(client *elastic.Client, newIndex, allocation string) e return nil } -func ReindexAll(client *elastic.Client, aliasToNewIndex map[string]string) error { +func ReindexAll(client *elastic.Client, aliasToNewIndex map[string]string, remoteSrc string) error { for alias, newIndex := range aliasToNewIndex { - if err := ReindexOne(client, alias, newIndex, false, false, false); err != nil { + if err := ReindexOne(client, alias, newIndex, remoteSrc, false, false, false); err != nil { return errors.Wrapf(err, "failed reindexing to %s and adding to alias %s", newIndex, alias) } }