Skip to content

Commit

Permalink
refactor(ui): support receiving limited Flux responses
Browse files Browse the repository at this point in the history
Adds support for fetching a Flux response with a limit, and returning a
partial response instead of an error if that limit is exceeded.

The logic is implemented here instead of in influxdb-client-js as part
of #14482.
  • Loading branch information
chnn committed Aug 2, 2019
1 parent d0dedff commit ff51a78
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 167 deletions.
43 changes: 0 additions & 43 deletions ui/src/dashboards/resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
SourceAuthenticationMethod,
SourceLinks,
TimeRange,
QueryConfig,
TableOptions,
} from 'src/types'

Expand Down Expand Up @@ -83,48 +82,6 @@ export const service: Service = {
},
}

export const queryConfig: QueryConfig = {
database: 'telegraf',
measurement: 'cpu',
retentionPolicy: 'autogen',
fields: [
{
value: 'mean',
type: 'func',
alias: 'mean_usage_idle',
args: [
{
value: 'usage_idle',
type: 'field',
alias: '',
},
],
},
{
value: 'mean',
type: 'func',
alias: 'mean_usage_user',
args: [
{
value: 'usage_user',
type: 'field',
alias: '',
},
],
},
],
tags: {},
groupBy: {
time: 'auto',
tags: [],
},
areTagsAccepted: false,
fill: 'null',
rawText: null,
range: null,
shifts: null,
}

export const axes: Axes = {
x: {
bounds: ['', ''],
Expand Down
12 changes: 0 additions & 12 deletions ui/src/dashboards/utils/sources.ts

This file was deleted.

5 changes: 4 additions & 1 deletion ui/src/dataLoaders/components/verifyStep/DataListening.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ class DataListening extends PureComponent<OwnProps & WithRouterProps, State> {
let timePassed: number

try {
const response = await runQuery(orgID, script).promise
const response = await runQuery(orgID, script).promise.then(
({csv}) => csv
)

responseLength = response.length
timePassed = Number(new Date()) - this.startTime
} catch (err) {
Expand Down
112 changes: 105 additions & 7 deletions ui/src/shared/apis/query.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,117 @@
// APIs
import {client} from 'src/utils/api'
// Constants
import {FLUX_RESPONSE_BYTES_LIMIT} from 'src/shared/constants'

// Types
import {CancelBox} from 'src/types/promises'
import {File} from '@influxdata/influx'
import {File, Query, CancellationError} from 'src/types'

const MAX_RESPONSE_CHARS = 50000 * 160
export interface RunQueryResult {
csv: string
didTruncate: boolean
bytesRead: number
}

export const runQuery = (
orgID: string,
query: string,
extern?: File
): CancelBox<string> => {
return client.queries.execute(orgID, query, {
): CancelBox<RunQueryResult> => {
const url = `/api/v2/query?${new URLSearchParams({orgID})}`

const headers = {
'Content-Type': 'application/json',
'Accept-Encoding': 'gzip',
}

const body: Query = {
query,
extern,
limitChars: MAX_RESPONSE_CHARS,
dialect: {annotations: ['group', 'datatype', 'default']},
}

const controller = new AbortController()

const request = fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(body),
signal: controller.signal,
})

const promise = request
.then(processResponse)
.catch(e =>
e.name === 'AbortError'
? Promise.reject(new CancellationError())
: Promise.reject(e)
)

return {
promise,
cancel: () => controller.abort(),
}
}

const processResponse = async (response: Response): Promise<RunQueryResult> => {
const reader = response.body.getReader()
const decoder = new TextDecoder()

let csv = ''
let bytesRead = 0
let didTruncate = false

let read = await reader.read()

while (!read.done) {
const text = decoder.decode(read.value)

bytesRead += read.value.byteLength

if (bytesRead > FLUX_RESPONSE_BYTES_LIMIT) {
csv += trimPartialLines(text)
didTruncate = true
break
} else {
csv += text
read = await reader.read()
}
}

reader.cancel()

return {
csv,
bytesRead,
didTruncate,
}
}

/*
Given an arbitrary text chunk of a Flux CSV, trim partial lines off of the end
of the text.
For example, given the following partial Flux response,
r,baz,3
foo,bar,baz,2
foo,bar,b
we want to trim the last incomplete line, so that the result is
r,baz,3
foo,bar,baz,2
*/
const trimPartialLines = (partialResp: string): string => {
let i = partialResp.length - 1

while (partialResp[i] !== '\n') {
if (i <= 0) {
return partialResp
}

i -= 1
}

return partialResp.slice(0, i + 1)
}
8 changes: 5 additions & 3 deletions ui/src/shared/components/TimeSeries.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {withRouter, WithRouterProps} from 'react-router'
import {fromFlux, FromFluxResult} from '@influxdata/giraffe'

// API
import {runQuery} from 'src/shared/apis/query'
import {runQuery, RunQueryResult} from 'src/shared/apis/query'

// Utils
import {checkQueryResult} from 'src/shared/utils/checkQueryResult'
Expand Down Expand Up @@ -80,7 +80,7 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {

public state: State = defaultState()

private pendingResults: Array<CancelBox<string>> = []
private pendingResults: Array<CancelBox<RunQueryResult>> = []

public async componentDidMount() {
this.reload()
Expand Down Expand Up @@ -143,7 +143,9 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {
})

// Wait for new queries to complete
const files = await Promise.all(this.pendingResults.map(r => r.promise))
const files = await Promise.all(
this.pendingResults.map(r => r.promise.then(({csv}) => csv))
)
const duration = Date.now() - startTime
const giraffeResult = fromFlux(files.join('\n\n'))

Expand Down
4 changes: 4 additions & 0 deletions ui/src/shared/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ export const CLOUD_URL = process.env.CLOUD_URL
export const CLOUD_CHECKOUT_PATH = process.env.CLOUD_CHECKOUT_PATH
export const CLOUD_BILLING_PATH = process.env.CLOUD_BILLING_PATH

export const FLUX_RESPONSE_BYTES_LIMIT = CLOUD
? 10 * 1024 * 1024
: 100 * 1024 * 1024

export const VIS_SIG_DIGITS = 4

export const VIS_THEME: Partial<Config> = {
Expand Down
8 changes: 5 additions & 3 deletions ui/src/timeMachine/actions/queries.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {get} from 'lodash'

// API
import {runQuery} from 'src/shared/apis/query'
import {runQuery, RunQueryResult} from 'src/shared/apis/query'

// Actions
import {refreshVariableValues, selectValue} from 'src/variables/actions'
Expand Down Expand Up @@ -84,7 +84,7 @@ export const refreshTimeMachineVariableValues = () => async (
await dispatch(refreshVariableValues(contextID, variablesToRefresh))
}

let pendingResults: Array<CancelBox<string>> = []
let pendingResults: Array<CancelBox<RunQueryResult>> = []

export const executeQueries = () => async (dispatch, getState: GetState) => {
const {view, timeRange} = getActiveTimeMachine(getState())
Expand Down Expand Up @@ -116,7 +116,9 @@ export const executeQueries = () => async (dispatch, getState: GetState) => {
return runQuery(orgID, text, extern)
})

const files = await Promise.all(pendingResults.map(r => r.promise))
const files = await Promise.all(
pendingResults.map(r => r.promise.then(({csv}) => csv))
)

const duration = Date.now() - startTime

Expand Down
6 changes: 3 additions & 3 deletions ui/src/timeMachine/apis/queryBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export function findBuckets({orgID}: FindBucketsOptions): CancelableQuery {
const {promise, cancel} = runQuery(orgID, query)

return {
promise: promise.then(resp => extractCol(resp, 'name')),
promise: promise.then(({csv}) => extractCol(csv, 'name')),
cancel,
}
}
Expand Down Expand Up @@ -74,7 +74,7 @@ export function findKeys({
const {promise, cancel} = runQuery(orgID, query)

return {
promise: promise.then(resp => extractCol(resp, '_value')),
promise: promise.then(({csv}) => extractCol(csv, '_value')),
cancel,
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ export function findValues({
const {promise, cancel} = runQuery(orgID, query)

return {
promise: promise.then(resp => extractCol(resp, '_value')),
promise: promise.then(({csv}) => extractCol(csv, '_value')),
cancel,
}
}
Expand Down
Loading

0 comments on commit ff51a78

Please sign in to comment.