Skip to content

Commit

Permalink
Update filebeat registry configs
Browse files Browse the repository at this point in the history
Change registry config and directory layout, preparing for future
changes.

- remove settings `filebeat.registry_file` and replace with
  `filebeat.registry.path`. Path will be a directory, and the
  actual contents will be stored under
  `${filebeat.registry.path}/filebeat/data.json`.
- introduce `<registry path>/filebeat/meta.json` with version number of
  the current directories layout.
- move `filebeat.registry_flush` to `filebeat.registry.flush`
- move `filebeat.registry_file_permission` to
  `filebeat.registry.file_permission`
- update tests
  • Loading branch information
urso committed Feb 6, 2019
1 parent 3b20a57 commit c2aae3d
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 107 deletions.
5 changes: 4 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
rawConfig,
"prospectors",
"config.prospectors",
"registry_file",
"registry_file_permissions",
"registry_flush",
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -293,7 +296,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
finishedLogger := newFinishedLogger(wgEvents)

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile, config.RegistryFilePermissions, config.RegistryFlush, finishedLogger)
registrar, err := registrar.New(config.Registry, finishedLogger)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
Expand Down
38 changes: 23 additions & 15 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,33 @@ const (
)

type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
RegistryFilePermissions os.FileMode `config:"registry_file_permissions"`
RegistryFlush time.Duration `config:"registry_flush"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigInput *common.Config `config:"config.inputs"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
OverwritePipelines bool `config:"overwrite_pipelines"`
Inputs []*common.Config `config:"inputs"`
Registry Registry `config:"registry"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigInput *common.Config `config:"config.inputs"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
OverwritePipelines bool `config:"overwrite_pipelines"`
}

type Registry struct {
Path string `config:"path"`
Permissions os.FileMode `config:"file_permissions"`
FlushTimeout time.Duration `config:"flush"`
MigrateFile string `config:"migrate_file"`
}

var (
DefaultConfig = Config{
RegistryFile: "registry",
RegistryFilePermissions: 0600,
ShutdownTimeout: 0,
OverwritePipelines: false,
Registry: Registry{
Path: "registry",
Permissions: 0600,
MigrateFile: "",
},
ShutdownTimeout: 0,
OverwritePipelines: false,
}
)

Expand Down
187 changes: 187 additions & 0 deletions filebeat/registrar/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package registrar

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/pkg/errors"

helper "github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/logp"
)

const (
legacyVersion = "<legacy>"
currentVersion = "0"
)

func ensureCurrent(home, migrateFile string, perm os.FileMode) error {
if migrateFile == "" {
if isFile(home) {
migrateFile = home
}
}

fbRegHome := filepath.Join(home, "filebeat")
version, err := readVersion(fbRegHome, migrateFile)
if err != nil {
return err
}

logp.Debug("registrar", "Registry type '%v' found", version)

switch version {
case legacyVersion:
return migrateLegacy(home, fbRegHome, migrateFile, perm)
case currentVersion:
return nil
case "":
backupFile := migrateFile + ".bak"
if isFile(backupFile) {
return migrateLegacy(home, fbRegHome, backupFile, perm)
}
return initRegistry(fbRegHome, perm)
default:
return fmt.Errorf("registry file version %v not supported", version)
}
}

func migrateLegacy(home, regHome, migrateFile string, perm os.FileMode) error {
logp.Info("Migrate registry file to registry directory")

if home == migrateFile {
backupFile := migrateFile + ".bak"
if isFile(migrateFile) {
logp.Info("Move registry file to backup file: %v", backupFile)
if err := helper.SafeFileRotate(backupFile, migrateFile); err != nil {
return err
}
migrateFile = backupFile
} else if isFile(backupFile) {
logp.Info("Old registry backup file found, continue migration")
migrateFile = backupFile
}
}

if err := initRegistry(regHome, perm); err != nil {
return err
}

dataFile := filepath.Join(regHome, "data.json")
if !isFile(dataFile) && isFile(migrateFile) {
logp.Info("Migrate old registry file to new data file")
err := helper.SafeFileRotate(dataFile, migrateFile)
if err != nil {
return err
}
}

return nil
}

func initRegistry(regHome string, perm os.FileMode) error {
if !isDir(regHome) {
logp.Info("No registry home found. Create: %v", regHome)
if err := os.MkdirAll(regHome, 0750); err != nil {
return errors.Wrapf(err, "failed to create registry dir '%v'", regHome)
}
}

metaFile := filepath.Join(regHome, "meta.json")
if !isFile(metaFile) {
logp.Info("Initialize registry meta file")
err := safeWriteFile(metaFile, []byte(`{"version": "0"}`), perm)
if err != nil {
return errors.Wrap(err, "failed writing registry meta.json")
}
}

return nil
}

func readVersion(regHome, migrateFile string) (string, error) {
if isFile(migrateFile) {
return legacyVersion, nil
}

if !isDir(regHome) {
return "", nil
}

metaFile := filepath.Join(regHome, "meta.json")
if !isFile(metaFile) {
return "", nil
}

tmp, err := ioutil.ReadFile(metaFile)
if err != nil {
return "", errors.Wrap(err, "failed to open meta file")
}

meta := struct{ Version string }{}
if err := json.Unmarshal(tmp, &meta); err != nil {
return "", errors.Wrap(err, "failed reading meta file")
}

return meta.Version, nil
}

func isDir(path string) bool {
fi, err := os.Stat(path)
exists := err == nil && fi.IsDir()
logp.Debug("test", "isDir(%v) -> %v", path, exists)
return exists
}

func isFile(path string) bool {
fi, err := os.Stat(path)
exists := err == nil && fi.Mode().IsRegular()
logp.Debug("test", "isFile(%v) -> %v", path, exists)
return exists
}

func safeWriteFile(path string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}

for len(data) > 0 {
var n int
n, err = f.Write(data)
if err != nil {
break
}

data = data[n:]
}

if err == nil {
err = f.Sync()
}

if err1 := f.Close(); err == nil {
err = err1
}
return err
}
25 changes: 18 additions & 7 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input/file"
helper "github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -63,20 +64,30 @@ var (

// New creates a new Registrar instance, updating the registry file on
// `file.State` updates. New fails if the file can not be opened or created.
func New(registryFile string, fileMode os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
func New(cfg config.Registry, out successLogger) (*Registrar, error) {
home := paths.Resolve(paths.Data, cfg.Path)
migrateFile := cfg.MigrateFile
if migrateFile != "" {
migrateFile = paths.Resolve(paths.Data, migrateFile)
}

err := ensureCurrent(home, migrateFile, cfg.Permissions)
if err != nil {
return nil, err
}

dataFile := filepath.Join(home, "filebeat", "data.json")
r := &Registrar{
registryFile: registryFile,
fileMode: fileMode,
registryFile: dataFile,
fileMode: cfg.Permissions,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []file.State, 1),
flushTimeout: flushTimeout,
flushTimeout: cfg.FlushTimeout,
out: out,
wg: sync.WaitGroup{},
}
err := r.Init()

return r, err
return r, r.Init()
}

// Init sets up the Registrar and make sure the registry file is setup correctly
Expand Down
8 changes: 6 additions & 2 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ filebeat.{{input_config | default("inputs")}}:

filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
{% if not skip_registry_config %}
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}
filebeat.registry_file_permissions: {{ registryFilePermissions|default("0600") }}
filebeat.registry:
path: {{ beat.working_dir + '/' }}{{ registry_home|default("registry")}}
file_permissions: {{ registry_file_permissions|default("0600") }}
{% if registry_migrate_file %}
migrate_file: {{ beat.working_dir + '/' + registry_migrate_file }}
{% endif %}
{%endif%}

{% if reload or reload_path -%}
Expand Down
25 changes: 23 additions & 2 deletions filebeat/tests/system/filebeat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import os
import stat
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), '../../../libbeat/tests/system'))

from beat.beat import TestCase

default_registry_file = 'registry/filebeat/data.json'


class BaseTest(TestCase):

Expand All @@ -18,9 +21,23 @@ def setUpClass(self):

super(BaseTest, self).setUpClass()

def get_registry(self):
def has_registry(self, name=None, data_path=None):
if not name:
name = default_registry_file
if not data_path:
data_path = self.working_dir

dotFilebeat = os.path.join(data_path, name)
return os.path.isfile(dotFilebeat)

def get_registry(self, name=None, data_path=None):
if not name:
name = default_registry_file
if not data_path:
data_path = self.working_dir

# Returns content of the registry file
dotFilebeat = self.working_dir + '/registry'
dotFilebeat = os.path.join(data_path, name)
self.wait_until(cond=lambda: os.path.isfile(dotFilebeat))

with open(dotFilebeat) as file:
Expand All @@ -47,3 +64,7 @@ def get_registry_entry_by_path(self, path):
tmp_entry = entry

return tmp_entry

def file_permissions(self, path):
full_path = os.path.join(self.working_dir, path)
return oct(stat.S_IMODE(os.lstat(full_path).st_mode))
Loading

0 comments on commit c2aae3d

Please sign in to comment.