Skip to content

Commit

Permalink
feat(influx): add telegraf resource support to influx CLI
Browse files Browse the repository at this point in the history
closes: #17654
  • Loading branch information
jsteenb2 committed Jul 20, 2020
1 parent b7e2330 commit 1b8bcb6
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
1. [18888](https://github.com/influxdata/influxdb/pull/18888): Add event source to influx stack operations
1. [18910](https://github.com/influxdata/influxdb/pull/18910): Add uninstall functionality for stacks
1. [18912](https://github.com/influxdata/influxdb/pull/18912): Drop deprecated influx pkg command tree
1. [18997](https://github.com/influxdata/influxdb/pull/18997): Add telegraf management commands to influx CLI

### Bug Fixes

Expand Down
12 changes: 11 additions & 1 deletion cmd/influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type genericCLIOpts struct {
w io.Writer
errW io.Writer

json bool
hideHeaders bool

runEWrapFn cobraRunEMiddleware
}

Expand Down Expand Up @@ -123,7 +126,13 @@ func (o genericCLIOpts) writeJSON(v interface{}) error {
}

func (o genericCLIOpts) newTabWriter() *internal.TabWriter {
return internal.NewTabWriter(o.w)
w := internal.NewTabWriter(o.w)
w.HideHeaders(o.hideHeaders)
return w
}

func (o *genericCLIOpts) registerPrintOptions(cmd *cobra.Command) {
registerPrintOptions(cmd, &o.hideHeaders, &o.json)
}

func in(r io.Reader) genericCLIOptFn {
Expand Down Expand Up @@ -289,6 +298,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command {
cmdSetup,
cmdStack,
cmdTask,
cmdTelegraf,
cmdTemplate,
cmdApply,
cmdTranspile,
Expand Down
340 changes: 340 additions & 0 deletions cmd/influx/telegraf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
package main

import (
"context"
"errors"
"io/ioutil"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influx/internal"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/tenant"
"github.com/spf13/cobra"
)

func cmdTelegraf(f *globalFlags, opts genericCLIOpts) *cobra.Command {
return newCmdTelegrafBuilder(newTelegrafSVCs, f, opts).cmdTelegrafs()
}

type telegrafSVCsFn func() (influxdb.TelegrafConfigStore, influxdb.OrganizationService, error)

type cmdTelegrafBuilder struct {
genericCLIOpts
*globalFlags

svcFn telegrafSVCsFn

desc string
file string
id string
ids []string
name string
org organization
}

func newCmdTelegrafBuilder(svcFn telegrafSVCsFn, f *globalFlags, opts genericCLIOpts) *cmdTelegrafBuilder {
return &cmdTelegrafBuilder{
genericCLIOpts: opts,
globalFlags: f,
svcFn: svcFn,
}
}

func (b *cmdTelegrafBuilder) cmdTelegrafs() *cobra.Command {
cmd := b.newCmd("telegrafs", b.listRunE)
cmd.Short = "List Telegraf configuration(s). Subcommands manage Telegraf configurations."
cmd.Long = `
List Telegraf configuration(s). Subcommands manage Telegraf configurations.
Examples:
# list all known Telegraf configurations
influx telegrafs
# list Telegraf configuration corresponding to specific ID
influx telegrafs --id $ID
# list Telegraf configuration corresponding to specific ID shorts
influx telegrafs -i $ID
`

b.org.register(cmd, false)
cmd.Flags().StringVarP(&b.id, "id", "i", "", "Telegraf configuration ID to retrieve.")

cmd.AddCommand(
b.cmdCreate(),
b.cmdRemove(),
b.cmdUpdate(),
)
return cmd
}

func (b *cmdTelegrafBuilder) listRunE(cmd *cobra.Command, args []string) error {
svc, orgSVC, err := b.svcFn()
if err != nil {
return err
}

orgID, _ := b.org.getID(orgSVC)
if orgID == 0 && b.id == "" {
return &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: "at least one of org, org-id, or telegraf-config-id must be provided",
}
}

var filter influxdb.UserResourceMappingFilter
if b.id != "" {
id, err := influxdb.IDFromString(b.id)
if err != nil {
return err
}
filter.ResourceID = *id
filter.ResourceType = influxdb.TelegrafsResourceType
}

cfgs, _, err := svc.FindTelegrafConfigs(context.Background(), influxdb.TelegrafConfigFilter{
OrgID: &orgID,
UserResourceMappingFilter: filter,
})
if err != nil {
return err
}
return b.writeTelegrafConfig(cfgs...)
}

func (b *cmdTelegrafBuilder) cmdCreate() *cobra.Command {
cmd := b.newCmd("create", b.createRunEFn)
cmd.Short = "Create a Telegraf configuration"
cmd.Long = `
The telegrafs create command creates a new Telegraf configuration.
Examples:
# create new Telegraf configuration
influx telegrafs create --name $CFG_NAME --description $CFG_DESC --file $PATH_TO_TELE_CFG
# create new Telegraf configuration using shorts
influx telegrafs create -n $CFG_NAME -d $CFG_DESC -f $PATH_TO_TELE_CFG
# create a new Telegraf config with a config provided via STDIN
cat $CONFIG_FILE | influx telegrafs create -n $CFG_NAME -d $CFG_DESC
`

b.org.register(cmd, false)
b.registerTelegrafCfgFlags(cmd)

return cmd
}

func (b *cmdTelegrafBuilder) createRunEFn(cmd *cobra.Command, args []string) error {
svc, orgSVC, err := b.svcFn()
if err != nil {
return err
}

orgID, err := b.org.getID(orgSVC)
if err != nil {
return err
}

cfg, err := b.readConfig(b.file)
if err != nil {
return err
}

newTelegraf := influxdb.TelegrafConfig{
OrgID: orgID,
Name: b.name,
Description: b.desc,
Config: cfg,
}
err = svc.CreateTelegrafConfig(context.Background(), &newTelegraf, 0)
if err != nil {
return err
}

return b.writeTelegrafConfig(&newTelegraf)
}

func (b *cmdTelegrafBuilder) cmdRemove() *cobra.Command {
cmd := b.newCmd("rm", b.removeRunEFn)
cmd.Aliases = []string{"remove"}
cmd.Short = "Remove Telegraf configuration(s)"
cmd.Long = `
The telegrafs rm command removes Telegraf configuration(s).
Examples:
# remove a single Telegraf configuration
influx telegrafs rm --id $ID
# remove multiple Telegraf configurations
influx telegrafs rm --id $ID1 --id $ID2
# remove using short flags
influx telegrafs rm -i $ID1
`

cmd.Flags().StringArrayVarP(&b.ids, "id", "i", nil, "Telegraf configuration ID(s) to remove.")
cmd.MarkFlagRequired("id")

return cmd
}

func (b *cmdTelegrafBuilder) removeRunEFn(cmd *cobra.Command, args []string) error {
svc, _, err := b.svcFn()
if err != nil {
return err
}

for _, rawID := range b.ids {
id, err := influxdb.IDFromString(rawID)
if err != nil {
return err
}

err = svc.DeleteTelegrafConfig(context.Background(), *id)
if err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
return err
}
}

return nil
}

func (b *cmdTelegrafBuilder) cmdUpdate() *cobra.Command {
cmd := b.newCmd("update", b.updateRunEFn)
cmd.Short = "Update a Telegraf configuration"
cmd.Long = `
The telegrafs update command updates a Telegraf configuration to match the
specified parameters. If a name or description is not provided, then are set
to an empty string.
Examples:
# update new Telegraf configuration
influx telegrafs update --id $ID --name $CFG_NAME --description $CFG_DESC --file $PATH_TO_TELE_CFG
# update new Telegraf configuration using shorts
influx telegrafs update -i $ID -n $CFG_NAME -d $CFG_DESC -f $PATH_TO_TELE_CFG
# update a Telegraf config with a config provided via STDIN
cat $CONFIG_FILE | influx telegrafs update -i $ID -n $CFG_NAME -d $CFG_DESC
`

b.org.register(cmd, false)
b.registerTelegrafCfgFlags(cmd)
cmd.Flags().StringVarP(&b.id, "id", "i", "", "Telegraf configuration id to update")
cmd.MarkFlagRequired("id")

return cmd
}

func (b *cmdTelegrafBuilder) updateRunEFn(cmd *cobra.Command, args []string) error {
svc, orgSVC, err := b.svcFn()
if err != nil {
return err
}

orgID, err := b.org.getID(orgSVC)
if err != nil {
return err
}

cfg, err := b.readConfig(b.file)
if err != nil {
return err
}

id, err := influxdb.IDFromString(b.id)
if err != nil {
return err
}

teleCfg := influxdb.TelegrafConfig{
ID: *id,
OrgID: orgID,
Name: b.name,
Description: b.desc,
Config: cfg,
}
updatedCfg, err := svc.UpdateTelegrafConfig(context.Background(), *id, &teleCfg, 0)
if err != nil {
return err
}

return b.writeTelegrafConfig(updatedCfg)
}

func (b *cmdTelegrafBuilder) writeTelegrafConfig(cfgs ...*influxdb.TelegrafConfig) error {
if b.json {
return b.writeJSON(cfgs)
}

tabW := b.newTabWriter()
defer tabW.Flush()

writeTelegrafRows(tabW, cfgs...)
return nil
}

func writeTelegrafRows(tabW *internal.TabWriter, cfgs ...*influxdb.TelegrafConfig) {
tabW.WriteHeaders("ID", "OrgID", "Name", "Description")
for _, cfg := range cfgs {
tabW.Write(map[string]interface{}{
"ID": cfg.ID,
"OrgID": cfg.OrgID,
"Name": cfg.Name,
"Description": cfg.Description,
})
}
}

func (b *cmdTelegrafBuilder) registerTelegrafCfgFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(&b.file, "file", "f", "", "Path to Telegraf configuration")
cmd.Flags().StringVarP(&b.name, "name", "n", "", "Name of Telegraf configuration")
cmd.Flags().StringVarP(&b.desc, "description", "d", "", "Description for Telegraf configuration")
}

func (b *cmdTelegrafBuilder) readConfig(file string) (string, error) {
if file != "" {
bb, err := ioutil.ReadFile(file)
if err != nil {
return "", err
}

return string(bb), nil
}

stdIn, err := inStdIn(b.in)
if err != nil {
return "", &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Err: errors.New("a telegraf config must be provided"),
}
}
defer stdIn.Close()

bb, err := ioutil.ReadAll(stdIn)
if err != nil {
return "", err
}
return string(bb), nil
}

func (b *cmdTelegrafBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command {
cmd := b.genericCLIOpts.newCmd(use, runE, true)
b.genericCLIOpts.registerPrintOptions(cmd)
enforceFlagValidation(cmd)
return cmd
}

func newTelegrafSVCs() (influxdb.TelegrafConfigStore, influxdb.OrganizationService, error) {
httpClient, err := newHTTPClient()
if err != nil {
return nil, nil, err
}

orgSVC := &tenant.OrgClientService{
Client: httpClient,
}

return http.NewTelegrafService(httpClient), orgSVC, nil
}
Loading

0 comments on commit 1b8bcb6

Please sign in to comment.