Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom continuous query options per query rather than per node #5194

Merged
merged 1 commit into from
Dec 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add continuous query option for customizing resampling
This makes the following syntax possible:

    CREATE CONTINUOUS QUERY mycq ON mydb
        RESAMPLE EVERY 1m FOR 1h
        BEGIN
          SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(5m)
        END

The RESAMPLE option customizes how often an interval will be sampled and
the duration. The interval is customized with EVERY. Any intervals
within the resampling duration on a multiple of the resample interval
will be updated with the new results from the query.

The duration is customized with FOR. This determines how long an
interval will participate in resampling.

Both options are optional. If RESAMPLE is in the syntax, at least one of
the two needs to be given. The default for both is the interval of the
continuous query.

The service also improves tracking of the last run time and the logic of
when a query for an interval should be run. When determining the oldest
interval to run for a query, the continuous query service determines
what would have been the optimal time to perform the next query based on
the last run time. It then uses this time to determine the oldest
interval that should be run using the resample duration and will
resample all intervals between this time and the current time as opposed
to potentially forgetting about the last run in an interval if the
continuous query service gets delayed for some reason.

This removes the previous config options for customizing continuous
queries since they are no longer relevant and adds a new option of
customizing the run interval. The run interval determines how often the
continuous query service polls for when it should execute a query. This
option defaults to 1s, but can be set to 1m if the least common factor
of all continuous queries' intervals is a higher value (like 1m).
  • Loading branch information
jsternberg committed Dec 28, 2015
commit 5d4ecf853c0f3fc7dae680890968682c186c01eb
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ With this release InfluxDB is moving to Go 1.5.
### Features
- [#5183](https://github.com/influxdb/influxdb/pull/5183): CLI confirms database exists when USE executed. Thanks @pires
- [#5201](https://github.com/influxdb/influxdb/pull/5201): Allow max UDP buffer size to be configurable. Thanks @sebito91
- [#5194](https://github.com/influxdb/influxdb/pull/5194): Custom continuous query options per query rather than per node.

### Bugfixes
- [#5042](https://github.com/influxdb/influxdb/issues/5042): Count with fill(none) will drop 0 valued intervals.
Expand Down
5 changes: 1 addition & 4 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,4 @@ reporting-disabled = false
[continuous_queries]
log-enabled = true
enabled = true
recompute-previous-n = 2
recompute-no-older-than = "10m"
compute-runs-per-interval = 10
compute-no-more-than = "2m"
# run-interval = "1s" # interval for how often continuous queries will be checked if they need to run
37 changes: 27 additions & 10 deletions influxql/INFLUXQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ _cpu_stats
ALL ALTER ANY AS ASC BEGIN
BY CREATE CONTINUOUS DATABASE DATABASES DEFAULT
DELETE DESC DESTINATIONS DIAGNOSTICS DISTINCT DROP
DURATION END EXISTS EXPLAIN FIELD FOR
FORCE FROM GRANT GRANTS GROUP GROUPS
IF IN INF INNER INSERT INTO
KEY KEYS LIMIT SHOW MEASUREMENT MEASUREMENTS
NOT OFFSET ON ORDER PASSWORD POLICY
POLICIES PRIVILEGES QUERIES QUERY READ REPLICATION
RETENTION REVOKE SELECT SERIES SERVER SERVERS
SET SHARD SHARDS SLIMIT SOFFSET STATS
SUBSCRIPTION SUBSCRIPTIONS TAG TO USER USERS
VALUES WHERE WITH WRITE
DURATION END EVERY EXISTS EXPLAIN FIELD
FOR FORCE FROM GRANT GRANTS GROUP
GROUPS IF IN INF INNER INSERT
INTO KEY KEYS LIMIT SHOW MEASUREMENT
MEASUREMENTS NOT OFFSET ON ORDER PASSWORD
POLICY POLICIES PRIVILEGES QUERIES QUERY READ
REPLICATION RESAMPLE RETENTION REVOKE SELECT SERIES
SERVER SERVERS SET SHARD SHARDS SLIMIT
SOFFSET STATS SUBSCRIPTION SUBSCRIPTIONS TAG TO
USER USERS VALUES WHERE WITH WRITE
```

## Literals
Expand Down Expand Up @@ -229,9 +229,14 @@ ALTER RETENTION POLICY policy1 ON somedb DURATION 1h REPLICATION 4

```
create_continuous_query_stmt = "CREATE CONTINUOUS QUERY" query_name on_clause
[ "RESAMPLE" resample_opts ]
"BEGIN" select_stmt "END" .

query_name = identifier .

resample_opts = (every_stmt for_stmt | every_stmt | for_stmt) .
every_stmt = "EVERY" duration_lit
for_stmt = "FOR" duration_lit
```

#### Examples:
Expand All @@ -256,6 +261,18 @@ BEGIN
FROM "6_months".events
GROUP BY time(1h)
END;

-- this customizes the resample interval so the interval is queried every 10s and intervals are resampled until 2m after their start time
-- when resample is used, at least one of "EVERY" or "FOR" must be used
CREATE CONTINUOUS QUERY "cpu_mean"
ON db_name
RESAMPLE EVERY 10s FOR 2m
BEGIN
SELECT mean(value)
INTO "cpu_mean"
FROM "cpu"
GROUP BY time(1m)
END;
```

### CREATE DATABASE
Expand Down
21 changes: 20 additions & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,11 +1967,30 @@ type CreateContinuousQueryStatement struct {

// Source of data (SELECT statement).
Source *SelectStatement

// Interval to resample previous queries
ResampleEvery time.Duration

// Maximum duration to resample previous queries
ResampleFor time.Duration
}

// String returns a string representation of the statement.
func (s *CreateContinuousQueryStatement) String() string {
return fmt.Sprintf("CREATE CONTINUOUS QUERY %s ON %s BEGIN %s END", QuoteIdent(s.Name), QuoteIdent(s.Database), s.Source.String())
var buf bytes.Buffer
fmt.Fprintf(&buf, "CREATE CONTINUOUS QUERY %s ON %s ", QuoteIdent(s.Name), QuoteIdent(s.Database))

if s.ResampleEvery > 0 || s.ResampleFor > 0 {
buf.WriteString("RESAMPLE ")
if s.ResampleEvery > 0 {
fmt.Fprintf(&buf, "EVERY %s ", FormatDuration(s.ResampleEvery))
}
if s.ResampleFor > 0 {
fmt.Fprintf(&buf, "FOR %s ", FormatDuration(s.ResampleFor))
}
}
fmt.Fprintf(&buf, "BEGIN %s END", s.Source.String())
return buf.String()
}

// DefaultDatabase returns the default database from the statement.
Expand Down
59 changes: 59 additions & 0 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,13 @@ func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQuerySt
}
stmt.Database = ident

if p.parseTokenMaybe(RESAMPLE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why parseTokenMaybe here? Instead of

tok, _, _ := p.scanIgnoreWhitespace()
if tok == RESAMPLE {
    stmt.ResampleEvery, stmt.ResampleFor, err = p.parseResample()
} else {
    p.unscan()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it was a very common pattern in the parser to do your suggestion. While that method works, I was hoping to introduce a new convenience function that does the same thing and reduce some of the bloat and variable shadowing the current method introduces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the idea. I think parseOptionalToken is a better name.

stmt.ResampleEvery, stmt.ResampleFor, err = p.parseResample()
if err != nil {
return nil, err
}
}

// Expect a "BEGIN SELECT" tokens.
if err := p.parseTokens([]Token{BEGIN, SELECT}); err != nil {
return nil, err
Expand Down Expand Up @@ -2391,6 +2398,47 @@ func (p *Parser) parseCall(name string) (*Call, error) {
return &Call{Name: name, Args: args}, nil
}

// parseResample parses a RESAMPLE [EVERY <duration>] [FOR <duration>].
// This function assumes RESAMPLE has already been consumed.
// EVERY and FOR are optional, but at least one of the two has to be used.
func (p *Parser) parseResample() (time.Duration, time.Duration, error) {
var interval time.Duration
if p.parseTokenMaybe(EVERY) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != DURATION_VAL {
return 0, 0, newParseError(tokstr(tok, lit), []string{"duration"}, pos)
}

d, err := ParseDuration(lit)
if err != nil {
return 0, 0, &ParseError{Message: err.Error(), Pos: pos}
}
interval = d
}

var maxDuration time.Duration
if p.parseTokenMaybe(FOR) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != DURATION_VAL {
return 0, 0, newParseError(tokstr(tok, lit), []string{"duration"}, pos)
}

d, err := ParseDuration(lit)
if err != nil {
return 0, 0, &ParseError{Message: err.Error(), Pos: pos}
}
maxDuration = d
}

// Neither EVERY or FOR were read, so read the next token again
// so we can return a suitable error message.
if interval == 0 && maxDuration == 0 {
tok, pos, lit := p.scanIgnoreWhitespace()
return 0, 0, newParseError(tokstr(tok, lit), []string{"EVERY", "FOR"}, pos)
}
return interval, maxDuration, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefixing variables with resample in this function seems redundant since it's in the parseResample() function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the resample prefix.


// scan returns the next token from the underlying scanner.
func (p *Parser) scan() (tok Token, pos Pos, lit string) { return p.s.Scan() }

Expand Down Expand Up @@ -2493,6 +2541,17 @@ func (p *Parser) parseTokens(toks []Token) error {
return nil
}

// parseTokenMaybe consumes the next token if it matches the expected one and
// does nothing if the next token is not the next one.
func (p *Parser) parseTokenMaybe(expected Token) bool {
tok, _, _ := p.scanIgnoreWhitespace()
if tok != expected {
p.unscan()
return false
}
return true
}

// QuoteString returns a quoted string.
func QuoteString(s string) string {
return `'` + strings.NewReplacer("\n", `\n`, `\`, `\\`, `'`, `\'`).Replace(s) + `'`
Expand Down
52 changes: 51 additions & 1 deletion influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ func TestParser_ParseStatement(t *testing.T) {

// CREATE CONTINUOUS QUERY ... INTO <measurement>
{
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count(field1) INTO measure1 FROM myseries GROUP BY time(5m) END`,
s: `CREATE CONTINUOUS QUERY myquery ON testdb RESAMPLE EVERY 1m FOR 1h BEGIN SELECT count(field1) INTO measure1 FROM myseries GROUP BY time(5m) END`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also add tests for just having an EVERY clause and just a FOR clause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two more tests that test each possibility of this.

stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Expand All @@ -1070,6 +1070,56 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
},
ResampleEvery: time.Minute,
ResampleFor: time.Hour,
},
},

{
s: `CREATE CONTINUOUS QUERY myquery ON testdb RESAMPLE FOR 1h BEGIN SELECT count(field1) INTO measure1 FROM myseries GROUP BY time(5m) END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}},
Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1", IsTarget: true}},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 5 * time.Minute},
},
},
},
},
},
ResampleFor: time.Hour,
},
},

{
s: `CREATE CONTINUOUS QUERY myquery ON testdb RESAMPLE EVERY 1m BEGIN SELECT count(field1) INTO measure1 FROM myseries GROUP BY time(5m) END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}},
Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1", IsTarget: true}},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 5 * time.Minute},
},
},
},
},
},
ResampleEvery: time.Minute,
},
},

Expand Down
3 changes: 3 additions & 0 deletions influxql/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `DROP`, tok: influxql.DROP},
{s: `DURATION`, tok: influxql.DURATION},
{s: `END`, tok: influxql.END},
{s: `EVERY`, tok: influxql.EVERY},
{s: `EXISTS`, tok: influxql.EXISTS},
{s: `EXPLAIN`, tok: influxql.EXPLAIN},
{s: `FIELD`, tok: influxql.FIELD},
Expand Down Expand Up @@ -152,6 +153,8 @@ func TestScanner_Scan(t *testing.T) {
{s: `QUERIES`, tok: influxql.QUERIES},
{s: `QUERY`, tok: influxql.QUERY},
{s: `READ`, tok: influxql.READ},
{s: `REPLICATION`, tok: influxql.REPLICATION},
{s: `RESAMPLE`, tok: influxql.RESAMPLE},
{s: `RETENTION`, tok: influxql.RETENTION},
{s: `REVOKE`, tok: influxql.REVOKE},
{s: `SELECT`, tok: influxql.SELECT},
Expand Down
4 changes: 4 additions & 0 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
DROP
DURATION
END
EVERY
EXISTS
EXPLAIN
FIELD
Expand Down Expand Up @@ -110,6 +111,7 @@ const (
QUERY
READ
REPLICATION
RESAMPLE
RETENTION
REVOKE
SELECT
Expand Down Expand Up @@ -195,6 +197,7 @@ var tokens = [...]string{
DROP: "DROP",
DURATION: "DURATION",
END: "END",
EVERY: "EVERY",
EXISTS: "EXISTS",
EXPLAIN: "EXPLAIN",
FIELD: "FIELD",
Expand Down Expand Up @@ -229,6 +232,7 @@ var tokens = [...]string{
QUERY: "QUERY",
READ: "READ",
REPLICATION: "REPLICATION",
RESAMPLE: "RESAMPLE",
RETENTION: "RETENTION",
REVOKE: "REVOKE",
SELECT: "SELECT",
Expand Down
45 changes: 9 additions & 36 deletions services/continuous_querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (

// Default values for aspects of interval computation.
const (
DefaultRecomputePreviousN = 2
DefaultRecomputeNoOlderThan = 10 * time.Minute
DefaultComputeRunsPerInterval = 10
DefaultComputeNoMoreThan = 2 * time.Minute
DefaultRunInterval = time.Second
)

// Config represents a configuration for the continuous query service.
Expand All @@ -22,42 +19,18 @@ type Config struct {
// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
Enabled bool `toml:"enabled"`

// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
// it this way because invalidating previously computed intervals would be insanely hard
// and expensive.
RecomputePreviousN int `toml:"recompute-previous-n"`

// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
// and have this set to 10m, then we'd only compute the previous two intervals for any
// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
RecomputeNoOlderThan toml.Duration `toml:"recompute-no-older-than"`

// ComputeRunsPerInterval will determine how many times the current and previous N intervals
// will be computed. The group by time will be divided by this and it will get computed this many times:
// group by time seconds / runs per interval
// This will give partial results for current group by intervals and will determine how long it will
// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
// will be a minute past the previous 10m bucket of time before lagged data is picked up
ComputeRunsPerInterval int `toml:"compute-runs-per-interval"`

// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
// than 10m will get computed 10 times for each interval.
ComputeNoMoreThan toml.Duration `toml:"compute-no-more-than"`
// Run interval for checking continuous queries. This should be set to the least common factor
// of the interval for running continuous queries. If you only aggregate continuous queries
// every minute, this should be set to 1 minute. The default is set to '1s' so the interval
// is compatible with most aggregations.
RunInterval toml.Duration `toml:"run-interval"`
}

// NewConfig returns a new instance of Config with defaults.
func NewConfig() Config {
return Config{
LogEnabled: true,
Enabled: true,
RecomputePreviousN: DefaultRecomputePreviousN,
RecomputeNoOlderThan: toml.Duration(DefaultRecomputeNoOlderThan),
ComputeRunsPerInterval: DefaultComputeRunsPerInterval,
ComputeNoMoreThan: toml.Duration(DefaultComputeNoMoreThan),
LogEnabled: true,
Enabled: true,
RunInterval: toml.Duration(DefaultRunInterval),
}
}
15 changes: 3 additions & 12 deletions services/continuous_querier/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,15 @@ func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c continuous_querier.Config
if _, err := toml.Decode(`
recompute-previous-n = 1
recompute-no-older-than = "10s"
compute-runs-per-interval = 2
compute-no-more-than = "20s"
run-interval = "1m"
enabled = true
`, &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.RecomputePreviousN != 1 {
t.Fatalf("unexpected recompute previous n: %d", c.RecomputePreviousN)
} else if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second {
t.Fatalf("unexpected recompute no older than: %v", c.RecomputeNoOlderThan)
} else if c.ComputeRunsPerInterval != 2 {
t.Fatalf("unexpected compute runs per interval: %d", c.ComputeRunsPerInterval)
} else if time.Duration(c.ComputeNoMoreThan) != 20*time.Second {
t.Fatalf("unexpected compute no more than: %v", c.ComputeNoMoreThan)
if time.Duration(c.RunInterval) != time.Minute {
t.Fatalf("unexpected run interval: %v", c.RunInterval)
} else if c.Enabled != true {
t.Fatalf("unexpected enabled: %v", c.Enabled)
}
Expand Down
Loading