Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/builder/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ func BuildPackage(ctx context.Context, options BuildOptions) (string, error) {
logger.Debugf("Linked file included (path: %s)", l.TargetFilePath(destinationDir))
}

err = resolveTransformDefinitions(destinationDir)
if err != nil {
return "", fmt.Errorf("resolving transform manifests failed: %w", err)
}

if options.CreateZip {
return buildZippedPackage(ctx, options, destinationDir)
}
Expand Down
40 changes: 40 additions & 0 deletions internal/builder/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package builder

import (
"fmt"
"os"
"path/filepath"

"github.com/elastic/elastic-package/internal/packages"
)

// resolveTransformDefinitions processes all transform definition files in the given destination directory.
// It reads each file, applies templating to set the final ingest pipeline name, and writes the processed
// content back to the same file.
func resolveTransformDefinitions(destinationDir string) error {
files, err := filepath.Glob(filepath.Join(destinationDir, "elasticsearch", "transform", "*", "transform.yml"))
if err != nil {
return fmt.Errorf("failed matching files with transform definitions: %w", err)
}

for _, file := range files {
stat, err := os.Stat(file)
if err != nil {
return fmt.Errorf("stat failed for transform definition file %q: %w", file, err)
}
contents, _, err := packages.ReadTransformDefinitionFile(file, destinationDir)
if err != nil {
return fmt.Errorf("failed reading transform definition file %q: %w", file, err)
}

err = os.WriteFile(file, contents, stat.Mode())
if err != nil {
return err
}
}
return nil
}
110 changes: 102 additions & 8 deletions internal/packages/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package packages

import (
"archive/zip"
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -14,6 +15,7 @@ import (
"path/filepath"
"slices"
"strings"
"text/template"

yamlv3 "gopkg.in/yaml.v3"

Expand Down Expand Up @@ -243,6 +245,9 @@ type TransformDefinition struct {
Source struct {
Index []string `config:"index" yaml:"index"`
} `config:"source" yaml:"source"`
Dest struct {
Pipeline string `config:"pipeline" yaml:"pipeline"`
} `config:"dest" yaml:"dest"`
Meta struct {
FleetTransformVersion string `config:"fleet_transform_version" yaml:"fleet_transform_version"`
} `config:"_meta" yaml:"_meta"`
Expand Down Expand Up @@ -414,6 +419,101 @@ func ReadPackageManifest(path string) (*PackageManifest, error) {
return &m, nil
}

// ReadTransformDefinitionFile reads and parses the transform definition (elasticsearch/transform/<name>/transform.yml)
// file for the given transform. It also applies templating to the file, allowing to set the final ingest pipeline name
// by adding the package version defined in the package manifest.
// It fails if the referenced destination pipeline doesn't exist.
func ReadTransformDefinitionFile(transformPath, packageRootPath string) ([]byte, TransformDefinition, error) {
manifest, err := ReadPackageManifestFromPackageRoot(packageRootPath)
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("could not read package manifest: %w", err)
}

if manifest.Version == "" {
return nil, TransformDefinition{}, fmt.Errorf("package version is not defined in the package manifest")
}

t, err := template.New(filepath.Base(transformPath)).Funcs(template.FuncMap{
"ingestPipelineName": func(pipelineName string) (string, error) {
if pipelineName == "" {
return "", fmt.Errorf("ingest pipeline name is empty")
}
return fmt.Sprintf("%s-%s", manifest.Version, pipelineName), nil
},
}).ParseFiles(transformPath)
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("parsing transform template failed (path: %s): %w", transformPath, err)
}

var rendered bytes.Buffer
err = t.Execute(&rendered, nil)
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("executing template failed: %w", err)
}
cfg, err := yaml.NewConfig(rendered.Bytes(), ucfg.PathSep("."))
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("reading file failed (path: %s): %w", transformPath, err)
}

var definition TransformDefinition
err = cfg.Unpack(&definition)
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("failed to parse transform file \"%s\": %w", transformPath, err)
}

if definition.Dest.Pipeline == "" {
return rendered.Bytes(), definition, nil
}

// Is it using the Ingest pipeline defined in the package (elasticsearch/ingest_pipeline/<version>-<pipeline>.yml)?
// <version>-<pipeline>.yml
// example: 0.1.0-pipeline_extract_metadata

pipelineFileName := fmt.Sprintf("%s.yml", strings.TrimPrefix(definition.Dest.Pipeline, manifest.Version+"-"))
_, err = os.Stat(filepath.Join(packageRootPath, "elasticsearch", "ingest_pipeline", pipelineFileName))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, TransformDefinition{}, fmt.Errorf("checking for destination ingest pipeline file %s: %w", pipelineFileName, err)
}
if err == nil {
return rendered.Bytes(), definition, nil
}

// Is it using the Ingest pipeline from any data stream (data_stream/*/elasticsearch/pipeline/*.yml)?
// <data_stream>-<version>-<data_stream_pipeline>.yml
// example: metrics-aws_billing.cur-0.1.0-pipeline_extract_metadata
dataStreamPaths, err := filepath.Glob(filepath.Join(packageRootPath, "data_stream", "*"))
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("error finding data streams: %w", err)
}

for _, dataStreamPath := range dataStreamPaths {
matched, err := filepath.Glob(filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline", "*.yml"))
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("error finding ingest pipelines in data stream %s: %w", dataStreamPath, err)
}
dataStreamName := filepath.Base(dataStreamPath)
for _, pipelinePath := range matched {
dataStreamPipelineName := strings.TrimSuffix(filepath.Base(pipelinePath), filepath.Ext(pipelinePath))
expectedSuffix := fmt.Sprintf("-%s.%s-%s-%s.yml", manifest.Name, dataStreamName, manifest.Version, dataStreamPipelineName)
if strings.HasSuffix(pipelineFileName, expectedSuffix) {
return rendered.Bytes(), definition, nil
}
}
}
pipelinePaths, err := filepath.Glob(filepath.Join(packageRootPath, "data_stream", "*", "elasticsearch", "ingest_pipeline", "*.yml"))
if err != nil {
return nil, TransformDefinition{}, fmt.Errorf("error finding ingest pipelines in data streams: %w", err)
}
for _, pipelinePath := range pipelinePaths {
dataStreamPipelineName := strings.TrimSuffix(filepath.Base(pipelinePath), filepath.Ext(pipelinePath))
if strings.HasSuffix(pipelineFileName, fmt.Sprintf("-%s-%s.yml", manifest.Version, dataStreamPipelineName)) {
return rendered.Bytes(), definition, nil
}
}

return nil, TransformDefinition{}, fmt.Errorf("destination ingest pipeline file %s not found: incorrect version used in pipeline or unknown pipeline", pipelineFileName)
}

// ReadTransformsFromPackageRoot looks for transforms in the given package root.
func ReadTransformsFromPackageRoot(packageRoot string) ([]Transform, error) {
files, err := filepath.Glob(filepath.Join(packageRoot, "elasticsearch", "transform", "*", "transform.yml"))
Expand All @@ -423,15 +523,9 @@ func ReadTransformsFromPackageRoot(packageRoot string) ([]Transform, error) {

var transforms []Transform
for _, file := range files {
cfg, err := yaml.NewConfigWithFile(file, ucfg.PathSep("."))
if err != nil {
return nil, fmt.Errorf("reading file failed (path: %s): %w", file, err)
}

var definition TransformDefinition
err = cfg.Unpack(&definition)
_, definition, err := ReadTransformDefinitionFile(file, packageRoot)
if err != nil {
return nil, fmt.Errorf("failed to parse transform file \"%s\": %w", file, err)
return nil, fmt.Errorf("failed reading transform definition file %q: %w", file, err)
}

transforms = append(transforms, Transform{
Expand Down
189 changes: 189 additions & 0 deletions internal/packages/packages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package packages

import (
"encoding/json"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -94,3 +96,190 @@ func TestDataStreamManifest_IndexTemplateName(t *testing.T) {
})
}
}

func TestReadTransformDefinitionFile(t *testing.T) {
t.Parallel()

cases := map[string]struct {
packageManifest string
transformManifest string
createIngestPipelineFile bool
createIngestPipelineFileDatastream bool
ingestPipelineName string
expectedError bool
expectedErrorMessage string
expectedTransform string
}{
"valid transform manifest with package version": {
packageManifest: `
name: test-package
version: 1.2.3
`,
createIngestPipelineFile: true,
createIngestPipelineFileDatastream: false,
ingestPipelineName: "my-pipeline",
transformManifest: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "{{ ingestPipelineName "my-pipeline" }}"
latest:
unique_key:
- event.dataset
`,
expectedTransform: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "1.2.3-my-pipeline"
latest:
unique_key:
- event.dataset
`,
expectedError: false,
},
"invalid transform manifest without package version": {
packageManifest: `
name: test-package
`,
createIngestPipelineFile: false,
createIngestPipelineFileDatastream: false,
ingestPipelineName: "my-pipeline",
transformManifest: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "{{ ingestPipelineName "my-pipeline" }}"
latest:
unique_key:
- event.dataset
`,
expectedError: true,
expectedErrorMessage: "package version is not defined in the package manifest",
},
"ingest_pipeline not exists": {
packageManifest: `
name: test-package
version: 1.2.3
`,
createIngestPipelineFile: false,
createIngestPipelineFileDatastream: false,
ingestPipelineName: "my-pipeline",
transformManifest: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "{{ ingestPipelineName "my-pipeline" }}"
latest:
unique_key:
- event.dataset
`,
expectedError: true,
expectedErrorMessage: "destination ingest pipeline file my-pipeline.yml not found: incorrect version used in pipeline or unknown pipeline",
},
"ingest_pipeline name empty": {
packageManifest: `
name: test-package
version: 1.2.3
`,
createIngestPipelineFile: false,
createIngestPipelineFileDatastream: false,
ingestPipelineName: "my-pipeline",
transformManifest: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "{{ ingestPipelineName "" }}"
latest:
unique_key:
- event.dataset
`,
expectedError: true,
expectedErrorMessage: "error calling ingestPipelineName: ingest pipeline name is empty",
},
"ingest_pipeline exists on data stream": {
packageManifest: `
name: test-package
version: 1.2.3
`,
createIngestPipelineFile: false,
createIngestPipelineFileDatastream: true,
ingestPipelineName: "my-pipeline",
transformManifest: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "logs-test_package.test-{{ ingestPipelineName "my-pipeline" }}"
latest:
unique_key:
- event.dataset
`,
expectedError: false,
expectedTransform: `
source:
index: "logs-package.dataset"
dest:
index: "logs-package_latest-index-1"
pipeline: "logs-test_package.test-1.2.3-my-pipeline"
latest:
unique_key:
- event.dataset
`,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
// Setup temporary directory for the package
packageDir := t.TempDir()
packageManifestPath := filepath.Join(packageDir, "manifest.yml")
err := os.WriteFile(packageManifestPath, []byte(tc.packageManifest), 0644)
require.NoError(t, err)

// Optionally create an ingest pipeline file
if tc.createIngestPipelineFile {
ingestPipelineDir := filepath.Join(packageDir, "elasticsearch", "ingest_pipeline")
err = os.MkdirAll(ingestPipelineDir, 0755)
require.NoError(t, err)
ingestPipelinePath := filepath.Join(ingestPipelineDir, tc.ingestPipelineName+".yml")
err = os.WriteFile(ingestPipelinePath, []byte(`---\nprocessors: {}\n`), 0644)
require.NoError(t, err)
}

if tc.createIngestPipelineFileDatastream {
ingestPipelineDir := filepath.Join(packageDir, "data_stream", "test", "elasticsearch", "ingest_pipeline")
err = os.MkdirAll(ingestPipelineDir, 0755)
require.NoError(t, err)
ingestPipelinePath := filepath.Join(ingestPipelineDir, tc.ingestPipelineName+".yml")
err = os.WriteFile(ingestPipelinePath, []byte(`---\nprocessors: {}\n`), 0644)
require.NoError(t, err)
}

// Setup temporary directory for the transform
transformDir := filepath.Join(packageDir, "elasticsearch", "transform", "latest")
err = os.MkdirAll(transformDir, 0755)
require.NoError(t, err)
transformManifestPath := filepath.Join(transformDir, "transform.yml")
err = os.WriteFile(transformManifestPath, []byte(tc.transformManifest), 0644)
require.NoError(t, err)

// Call the function under test
contents, _, err := ReadTransformDefinitionFile(transformManifestPath, packageDir)
if tc.expectedError {
require.Error(t, err)
assert.ErrorContains(t, err, tc.expectedErrorMessage)
} else {
require.NoError(t, err)
require.NotEmpty(t, contents)

assert.Equal(t, tc.expectedTransform, string(contents))
}
})
}
}
Loading