Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class TypeRecordView extends ContainerView<zed.TypeRecord> {
}

count() {
return this.value.fields.length
return this.value.fields?.length || 0
}

openToken() {
Expand All @@ -21,6 +21,7 @@ export class TypeRecordView extends ContainerView<zed.TypeRecord> {

*iterate(n?: number) {
const fields = this.value.fields
if (!fields) return
const length = n ? Math.min(n, fields.length) : fields.length

for (let i = 0; i < fields.length; ++i) {
Expand Down
14 changes: 4 additions & 10 deletions apps/zui/src/core/query/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,13 @@ function run(id: string): Thunk<Promise<ResultStream | null>> {
const paginatedQuery = Results.getPaginatedQuery(id)(getState())

try {
const res = await api.query(paginatedQuery, {
id,
tabId,
})
res.collect(({rows, shapesMap}) => {
const values = isFirstPage ? [...rows] : [...prevVals, ...rows]
const shapes = isFirstPage
? {...shapesMap}
: {...prevShapes, ...shapesMap}
const res = await api.query(paginatedQuery, {id, tabId})
await res.collect(({rows, shapesMap}) => {
const values = isFirstPage ? rows : [...prevVals, ...rows]
const shapes = isFirstPage ? shapesMap : {...prevShapes, ...shapesMap}
dispatch(Results.setValues({id, tabId, values}))
dispatch(Results.setShapes({id, tabId, shapes}))
})
await res.promise
dispatch(Results.success({id, tabId, count: res.rows.length}))
return res
} catch (e) {
Expand Down
8 changes: 2 additions & 6 deletions apps/zui/src/js/api/zui-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ export default class ZuiApi {
}

createAbortable(tab?: string, tag?: string) {
try {
this.abortables.abort({tab, tag})
} catch (e) {
console.log("Abort Handled", e)
}
this.abortables.abort({tab, tag})
const ctl = new AbortController()
const id = this.abortables.add({
abort: () => ctl.abort(),
Expand All @@ -74,7 +70,7 @@ export default class ZuiApi {
const [signal, cleanup] = this.createAbortable(opts.tabId, opts.id)
try {
const resp = await zealot.query(body, {signal})
resp.promise.finally(cleanup)
resp.on("success", cleanup)
return resp
} catch (e) {
cleanup()
Expand Down
40 changes: 29 additions & 11 deletions apps/zui/src/views/histogram-pane/run-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,18 @@ import ZuiApi from "src/js/api/zui-api"
import {isAbortError} from "src/util/is-abort-error"

export const HISTOGRAM_RESULTS = "histogram"
const POOL_RANGE = "pool-range"
const NULL_TIME_COUNT = "null-time-count"
const MISSING_TIME_COUNT = "missing-time-count"

export async function runHistogramQuery(api: ZuiApi) {
// all these queries should maybe be attached to the same abort signal
// this would change the abortables api a bit
api.abortables.abort({tag: POOL_RANGE})
api.abortables.abort({tag: NULL_TIME_COUNT})
api.abortables.abort({tag: MISSING_TIME_COUNT})
api.abortables.abort({tag: HISTOGRAM_RESULTS})

const id = HISTOGRAM_RESULTS
const tabId = api.current.tabId
const key = api.current.location.key
Expand All @@ -31,7 +41,7 @@ export async function runHistogramQuery(api: ZuiApi) {
}

function error(error: Error) {
if (isAbortError(error)) return success()
if (isAbortError(error)) return
api.dispatch(Results.error({id, tabId, error: error.message}))
}

Expand All @@ -52,7 +62,7 @@ export async function runHistogramQuery(api: ZuiApi) {

async function getPoolRange() {
const query = `from ${poolId} | min(${timeField}), max(${timeField})`
const resp = await api.query(query, {id, tabId})
const resp = await api.query(query, {id: POOL_RANGE, tabId})
const [{min, max}] = await resp.js()
if (!(min instanceof Date && max instanceof Date)) return null
return [min, max] as [Date, Date]
Expand All @@ -61,19 +71,27 @@ export async function runHistogramQuery(api: ZuiApi) {
async function getNullTimeCount() {
// Newline after baseQuery in case it ends with a comment.
const query = `${baseQuery}\n | ${timeField} == null | count()`
const id = "null-time-count"
const resp = await api.query(query, {id, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setNullXCount(count ?? 0))
try {
const resp = await api.query(query, {id: NULL_TIME_COUNT, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setNullXCount(count ?? 0))
} catch (e) {
if (isAbortError(e)) return
throw e
}
}

async function getMissingTimeCount() {
// Newline after baseQuery in case it ends with a comment.
const query = `${baseQuery}\n | !has(${timeField}) | count()`
const id = "missing-time-count"
const resp = await api.query(query, {id, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setMissingXCount(count ?? 0))
try {
const resp = await api.query(query, {id: MISSING_TIME_COUNT, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setMissingXCount(count ?? 0))
} catch (e) {
if (isAbortError(e)) return
throw e
}
}

async function run() {
Expand All @@ -85,7 +103,7 @@ export async function runHistogramQuery(api: ZuiApi) {
const interval = `${number}${timeUnits[unit]}`
// Newline after baseQuery in case it ends with a comment.
const query = `${baseQuery}\n | ${timeField} != null | count() by time := bucket(${timeField}, ${interval}), group := ${colorField} | sort time`
const resp = await api.query(query, {id, tabId})
const resp = await api.query(query, {id: HISTOGRAM_RESULTS, tabId})
api.dispatch(Histogram.setInterval({unit, number, fn}))
api.dispatch(Histogram.setRange(range))
resp.collect(collect)
Expand Down
4 changes: 3 additions & 1 deletion packages/zed-js/src/query/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ export class Channel extends EventEmitter {
let timeId: ReturnType<typeof setTimeout>;

const flush = () => {
collector({ rows: this.rows, shapesMap: this.shapesMap });
// Return shallow copies so that comsumers can do what they want
// with the rows and shapes
collector({ rows: [...this.rows], shapesMap: { ...this.shapesMap } });
first = false;
count = 0;
clearTimeout(timeId);
Expand Down
11 changes: 8 additions & 3 deletions packages/zed-js/src/query/result-stream.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { EventEmitter } from 'events';
import { eachLine } from '../ndjson/lines';
import { JSOptions } from '../values/types';
import * as zjson from '../zjson';
import { Channel } from './channel';
import { Collector } from '../types';
import { IsoResponse } from '../client/types';

export class ResultStream {
export class ResultStream extends EventEmitter {
public status: 'idle' | 'pending' | 'error' | 'aborted' | 'success' = 'idle';

private currentChannelId: number | undefined;
private channelsMap = new Map<number, Channel>();
private _promise?: Promise<void>;

constructor(public resp: IsoResponse, private ctl: AbortController) {}
constructor(public resp: IsoResponse, private ctl: AbortController) {
super();
}

get body() {
return this.resp.body;
Expand Down Expand Up @@ -80,9 +83,11 @@ export class ResultStream {
this.consumeLine(json);
}
this.status = 'success';
this.emit('success');
resolve();
} catch (e) {
} catch (e: unknown) {
if (
(e instanceof Object && 'name' in e && e.name === 'AbortError') ||
(e instanceof DOMException && e.message.match(/user aborted/)) ||
(e instanceof Error && e.message.match(/context canceled/))
) {
Expand Down