Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import-beats: support multiple streams #380

Merged
merged 18 commits into from
Apr 27, 2020
  •  
  •  
  •  
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Change package path from /package/{packagename}-{version} to /package/{packagename}/{version} [#300](https://github.com/elastic/integrations-registry/pull/300)
* By default /search?package= now only returns the most recent package. [#301](https://github.com/elastic/integrations-registry/pull/301)
* Stream configuration filenames have `.hbs` suffix appended [#308](https://github.com/elastic/package-registry/pull/380)

### Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion dev/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func buildPackage(packagesBasePath string, p util.Package) error {
return err
}

err = ioutil.WriteFile(filepath.Join(dirPath, "stream.yml"), []byte(streamFields), 0644)
err = ioutil.WriteFile(filepath.Join(dirPath, "stream.yml.hbs"), []byte(streamFields), 0644)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewkroh You will like this.
@nchaulet @jen-huang Will this break anything at the moment in the config builder? I hope not.

if err != nil {
return err
}
Expand Down
152 changes: 4 additions & 148 deletions dev/import-beats/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
package main

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"

"github.com/elastic/package-registry/util"
)

Expand All @@ -26,151 +21,12 @@ type streamContent struct {
body []byte
}

func createAgentContent(modulePath, moduleName, datasetName, beatType string, streams []util.Stream) (agentContent, error) {
switch beatType {
case "logs":
return createAgentContentForLogs(modulePath, datasetName)
case "metrics":
return createAgentContentForMetrics(modulePath, moduleName, datasetName, streams)
}
return agentContent{}, fmt.Errorf("invalid beat type: %s", beatType)
}

func createAgentContentForLogs(modulePath, datasetName string) (agentContent, error) {
configFilePaths, err := filepath.Glob(filepath.Join(modulePath, datasetName, "config", "*.yml"))
if err != nil {
return agentContent{}, errors.Wrapf(err, "location config files failed (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

if len(configFilePaths) == 0 {
return agentContent{}, fmt.Errorf("expected at least one config file (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

var buffer bytes.Buffer

for _, configFilePath := range configFilePaths {
configFile, err := transformAgentConfigFile(configFilePath)
if err != nil {
return agentContent{}, errors.Wrapf(err, "loading config file failed (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

inputConfigName := extractInputConfigName(configFilePath)
if len(configFilePaths) > 1 {
buffer.WriteString(fmt.Sprintf("{{#if input == %s}}\n", inputConfigName))
}
buffer.Write(configFile)
if len(configFilePaths) > 1 {
buffer.WriteString("{{/if}}\n")
}
}
return agentContent{
streams: []streamContent{
{
targetFileName: "stream.yml",
body: buffer.Bytes(),
},
},
}, nil
}

func extractInputConfigName(configFilePath string) string {
func extractInputConfigFilename(configFilePath string) string {
i := strings.LastIndex(configFilePath, "/")
inputConfigName := configFilePath[i+1:]
j := strings.Index(inputConfigName, ".")
return inputConfigName[:j]
}

func transformAgentConfigFile(configFilePath string) ([]byte, error) {
var buffer bytes.Buffer

configFile, err := os.Open(configFilePath)
if err != nil {
return nil, errors.Wrapf(err, "opening agent config file failed (path: %s)", configFilePath)
}

scanner := bufio.NewScanner(configFile)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}

if strings.HasPrefix(line, "type: ") {
line = strings.ReplaceAll(line, "type: ", "input: ")
}

// simple cases: if, range, -}}
line = strings.ReplaceAll(line, "{{ ", "{{")
line = strings.ReplaceAll(line, " }}", "}}")
line = strings.ReplaceAll(line, "{{if .", "{{if this.")
line = strings.ReplaceAll(line, "{{if", "{{#if")
line = strings.ReplaceAll(line, "{{end}}", "{{/if}}")
line = strings.ReplaceAll(line, "{{.", "{{this.")
line = strings.ReplaceAll(line, "{{range .", "{{#each this.")
line = strings.ReplaceAll(line, ".}}", "}}")
line = strings.ReplaceAll(line, " -}}", "}}") // no support for cleaning trailing white characters?
line = strings.ReplaceAll(line, "{{- ", "{{") // no support for cleaning trailing white characters?

// if/else if eq
if strings.Contains(line, " eq ") {
line = strings.ReplaceAll(line, " eq .", " ")
line = strings.ReplaceAll(line, " eq ", " ")

skipSpaces := 1
if strings.HasPrefix(line, "{{else") {
skipSpaces = 2
}

splitCondition := strings.SplitN(line, " ", skipSpaces+2)
line = strings.Join(splitCondition[:len(splitCondition)-1], " ") + " == " +
splitCondition[len(splitCondition)-1]
}

if strings.Contains(line, "{{range ") || strings.Contains(line, " range ") {
loopedVar, err := extractRangeVar(line)
if err != nil {
return nil, errors.Wrapf(err, "extracting range var failed")
}

line = fmt.Sprintf("{{#each %s}}\n", loopedVar)
line += " - {{this}}\n"
line += "{{/each}}"

for scanner.Scan() { // skip all lines inside range
rangeLine := scanner.Text()
if strings.Contains(rangeLine, "{{ end }}") {
break
}
}
}

buffer.WriteString(line)
buffer.WriteString("\n")
}
return buffer.Bytes(), nil
}

func extractRangeVar(line string) (string, error) {
line = line[strings.Index(line, "range")+1:]
line = strings.ReplaceAll(line, "}}", "")
i := strings.Index(line, ":=")
var sliced string
if i >= 0 {
line = strings.TrimSpace(line[i+3:])
split := strings.Split(line, " ")
sliced = split[0]
} else {
split := strings.Split(line, " ")
sliced = split[1]
}

if strings.HasPrefix(sliced, ".") {
sliced = sliced[1:]
}
return sliced, nil
return configFilePath[i+1:]
}

func createAgentContentForMetrics(modulePath, moduleName, datasetName string, streams []util.Stream) (agentContent, error) {
func createAgentContentForMetrics(moduleName, datasetName string, streams []util.Stream) (agentContent, error) {
inputName := moduleName + "/metrics"
vars := extractVarsFromStream(streams, inputName)

Expand Down Expand Up @@ -198,7 +54,7 @@ func createAgentContentForMetrics(modulePath, moduleName, datasetName string, st
return agentContent{
streams: []streamContent{
{
targetFileName: "stream.yml",
targetFileName: "stream.yml.hbs",
body: buffer.Bytes(),
},
},
Expand Down
13 changes: 3 additions & 10 deletions dev/import-beats/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func createDatasets(beatType, modulePath, moduleName, moduleTitle, moduleRelease
modulePath, datasetName)
}

foundEcsFieldNames := uniqueFieldNames(append(filteredEcsModuleFieldNames, filteredEcsDatasetFieldNames...))
foundEcsFieldNames := uniqueStringValues(append(filteredEcsModuleFieldNames, filteredEcsDatasetFieldNames...))
ecsFields := filterEcsFields(ecsFields, foundEcsFieldNames)

fieldsFiles := map[string]fieldDefinitionArray{}
Expand Down Expand Up @@ -116,19 +116,12 @@ func createDatasets(beatType, modulePath, moduleName, moduleTitle, moduleRelease
return nil, errors.Wrapf(err, "loading elasticsearch content failed (datasetPath: %s)", datasetPath)
}

// streams
streams, err := createStreams(modulePath, moduleName, moduleTitle, datasetName, beatType)
// streams and agents
streams, agent, err := createStreams(modulePath, moduleName, moduleTitle, datasetName, beatType)
if err != nil {
return nil, errors.Wrapf(err, "creating streams failed (datasetPath: %s)", datasetPath)
}

// agent
agent, err := createAgentContent(modulePath, moduleName, datasetName, beatType, streams)
if err != nil {
return nil, errors.Wrapf(err, "creating agent content failed (modulePath: %s, datasetName: %s)",
modulePath, datasetName)
}

// manifest
manifest := util.DataSet{
Title: fmt.Sprintf("%s %s %s", moduleTitle, datasetName, beatType),
Expand Down
12 changes: 0 additions & 12 deletions dev/import-beats/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,6 @@ func isPackageFields(fileName string) bool {
return fileName == "package-fields.yml"
}

func uniqueFieldNames(fieldNames []string) []string {
t := make(map[string]bool)
var unique []string
for _, f := range fieldNames {
if _, ok := t[f]; !ok {
t[f] = true
unique = append(unique, f)
}
}
return unique
}

func filterEcsFields(ecsFields fieldDefinitionArray, filteredNames []string) fieldDefinitionArray {
var filteredFields fieldDefinitionArray
for _, f := range ecsFields {
Expand Down
Loading