Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Collector Metric Exporter for the Web #1308

Merged
merged 51 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7482b43
Merged from upstream
davidwitten Jul 10, 2020
b02949a
Add test
davidwitten Jul 11, 2020
5bd4fef
Basic node
davidwitten Jul 12, 2020
a002c1d
Tests and functions
davidwitten Jul 13, 2020
1d28df4
minor
davidwitten Jul 13, 2020
cf100b3
new line
davidwitten Jul 13, 2020
6a3e948
DefaultURL
davidwitten Jul 13, 2020
4eb3831
Add JSON tests
davidwitten Jul 13, 2020
547e63b
Test rename
davidwitten Jul 13, 2020
b08de2c
Refactor a lot
davidwitten Jul 14, 2020
dffc3dd
Restored those files
davidwitten Jul 14, 2020
9c9abae
Variable anme
davidwitten Jul 14, 2020
589a0e1
Merge remote-tracking branch 'upstream/master' into browser
davidwitten Jul 15, 2020
6c98095
Duplicate test
davidwitten Jul 15, 2020
0ecb031
Fixed strings
davidwitten Jul 15, 2020
629fa50
Changed metrics to functions and added summary
davidwitten Jul 15, 2020
dc1a492
Fixed tests
davidwitten Jul 16, 2020
5e040b1
Merge remote-tracking branch 'upstream/master' into summary
davidwitten Jul 17, 2020
f08faf3
Merge remote-tracking branch 'upstream/master' into node
davidwitten Jul 17, 2020
12d978c
Merge remote-tracking branch 'upstream/master' into browser
davidwitten Jul 17, 2020
a0bb009
Conflicts
davidwitten Jul 17, 2020
40ef4d4
Conflicts
davidwitten Jul 17, 2020
37f154b
Added browser
davidwitten Jul 17, 2020
707427f
Merge remote-tracking branch 'upstream/master' into everything
davidwitten Jul 20, 2020
70e3061
Merge remote-tracking branch 'upstream/master' into browser
davidwitten Jul 20, 2020
87e3900
Hierarchy
davidwitten Jul 20, 2020
f89378b
Lint
davidwitten Jul 20, 2020
7b90b2c
Rename var
davidwitten Jul 20, 2020
07d0b94
Remove reset
davidwitten Jul 21, 2020
fb9b74c
Reset
davidwitten Jul 21, 2020
b6c7b77
Merge remote-tracking branch 'upstream/master' into browser
davidwitten Jul 24, 2020
495f1c6
Merged
davidwitten Jul 27, 2020
b2e40d0
update submodule
davidwitten Jul 27, 2020
13eda4a
Newline
davidwitten Jul 27, 2020
99abf64
Lint
davidwitten Jul 27, 2020
3ae0247
Documentatiom
davidwitten Jul 28, 2020
7ba0b66
Example
davidwitten Jul 29, 2020
9c292d7
fix: verified example
davidwitten Jul 29, 2020
e61e809
merged
davidwitten Jul 29, 2020
765ddfa
Newline
davidwitten Jul 29, 2020
10236ee
feat: simplified example
davidwitten Jul 29, 2020
44eab55
chore: merge
davidwitten Jul 29, 2020
21c9e8b
chore: rename start
davidwitten Jul 31, 2020
a8857d3
refactor: split up files
davidwitten Jul 31, 2020
15de18b
fix: lint
davidwitten Jul 31, 2020
175e050
fix: add comments
davidwitten Jul 31, 2020
e526e11
fix: private
davidwitten Aug 3, 2020
6d3a080
Merge branch 'master' into browser
dyladan Aug 4, 2020
d29be93
Merge remote-tracking branch 'upstream/master' into browser
davidwitten Aug 4, 2020
ccedacc
fix: readme
davidwitten Aug 4, 2020
775d2a4
Merged
davidwitten Aug 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Basic node
  • Loading branch information
davidwitten committed Jul 12, 2020
commit 5bd4fef0466ad7faacc432302a0a70e239971e68
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { MetricRecord } from '@opentelemetry/metrics';
import * as collectorTypes from '../../types';
import { CollectorExporterConfigNode } from './types';
import { GRPCMetricQueueItem, ServiceClient } from './types';
import { removeProtocol } from './util';
import * as path from 'path';
import * as protoLoader from '@grpc/proto-loader';
import * as grpc from 'grpc';
import { toCollectorExportMetricServiceRequest } from '../../transformMetrics';
import { CollectorMetricExporterBase } from '../../CollectorMetricExporterBase';
import { parseHeaders } from '../../util';

const DEFAULT_COLLECTOR_URL = 'localhost:55678';

/**
* Collector Metric Exporter for Node
*/
export class CollectorMetricExporter extends CollectorMetricExporterBase<
CollectorExporterConfigNode
> {
DEFAULT_HEADERS: Record<string, string> = {
[collectorTypes.OT_REQUEST_HEADER]: '1',
};
isShutDown: boolean = false;
grpcMetricsQueue: GRPCMetricQueueItem[] = [];
metricServiceClient?: ServiceClient = undefined;
credentials: grpc.ChannelCredentials;
metadata?: grpc.Metadata;
headers: Record<string, string>;

constructor(config: CollectorExporterConfigNode = {}) {
super(config);
this.grpcMetricsQueue = [];
this.credentials = config.credentials || grpc.credentials.createInsecure();
this.metadata = config.metadata;
this.headers =
parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS;
}

onShutdown(): void {
this.isShutDown = true;
if (this.metricServiceClient) {
this.metricServiceClient.close();
}
}

onInit(): void {
this.isShutDown = false;
const serverAddress = removeProtocol(this.url);
const metricServiceProtoPath =
'opentelemetry/proto/collector/metrics/v1/metrics_service.proto';
const includeDirs = [path.resolve(__dirname, 'protos')];
protoLoader
.load(metricServiceProtoPath, {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs,
})
.then(packageDefinition => {
const packageObject: any = grpc.loadPackageDefinition(
packageDefinition
);
this.metricServiceClient = new packageObject.opentelemetry.proto.collector.metrics.v1.MetricsService(
serverAddress,
this.credentials
);
if (this.grpcMetricsQueue.length > 0) {
const queue = this.grpcMetricsQueue.splice(0);
queue.forEach((item: GRPCMetricQueueItem) => {
this.sendMetrics(item.metrics, item.onSuccess, item.onError);
});
}
});
}

sendMetrics(
metrics: MetricRecord[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
if (this.isShutDown) {
this.logger.debug('Shutdown already started. Cannot send metrics');
return;
}
if (this.metricServiceClient) {
const exportMetricServiceRequest = toCollectorExportMetricServiceRequest(
metrics,
this._startTime,
this
);
this.metricServiceClient.export(
exportMetricServiceRequest,
this.metadata,
(err: collectorTypes.ExportServiceError) => {
if (err) {
this.logger.error(
'exportMetricServiceRequest',
exportMetricServiceRequest
);
onError(err);
} else {
onSuccess();
}
}
);
} else {
this.grpcMetricsQueue.push({
metrics,
onSuccess,
onError,
});
}
}

getDefaultUrl(url: string | undefined): string {
return url || DEFAULT_COLLECTOR_URL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
*/

export * from './CollectorTraceExporter';
export * from './CollectorMetricExporter';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as grpc from 'grpc';
import { ReadableSpan } from '@opentelemetry/tracing';
import { CollectorProtocolNode } from '../../enums';
import { MetricRecord } from '@opentelemetry/metrics';
import {
CollectorExporterError,
CollectorExporterConfigBase,
Expand All @@ -32,6 +33,15 @@ export interface GRPCSpanQueueItem {
onError: (error: CollectorExporterError) => void;
}

/**
* Queue item to be used to save temporary metrics
*/
export interface GRPCMetricQueueItem {
metrics: MetricRecord[];
onSuccess: () => void;
onError: (error: CollectorExporterError) => void;
}

/**
* Service Client for sending spans or metrics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import * as collectorTypes from '../../types';
import { toCollectorExportTraceServiceRequest } from '../../transform';
import { CollectorTraceExporter } from './CollectorTraceExporter';
import { CollectorExporterConfigNode } from './types';
import { CollectorMetricExporter } from './CollectorMetricExporter';
import { MetricRecord } from '@opentelemetry/metrics';
import { toCollectorExportMetricServiceRequest } from '../../transformMetrics';
import { Logger } from '@opentelemetry/api';

export const DEFAULT_COLLECTOR_URL_JSON = 'http://localhost:55680/v1/trace';

Expand All @@ -33,6 +37,30 @@ export function onInitWithJson(
// nothing to be done for json yet
}

export function sendMetricsUsingJson(
collector: CollectorMetricExporter,
metrics: MetricRecord[],
startTime: number,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const exportMetricServiceRequest = toCollectorExportMetricServiceRequest(
metrics,
startTime,
collector
);

const body = JSON.stringify(exportMetricServiceRequest);
_sendWithJson(
body,
collector.url,
collector.headers,
collector.logger,
onSuccess,
onError
);
}

export function sendSpansUsingJson(
collector: CollectorTraceExporter,
spans: ReadableSpan[],
Expand All @@ -45,7 +73,25 @@ export function sendSpansUsingJson(
);

const body = JSON.stringify(exportTraceServiceRequest);
const parsedUrl = new url.URL(collector.url);
_sendWithJson(
body,
collector.url,
collector.headers,
collector.logger,
onSuccess,
onError
);
}

function _sendWithJson(
body: string,
collectorUrl: string,
collectorHeaders: Partial<Record<string, unknown>>,
logger: Logger,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const parsedUrl = new url.URL(collectorUrl);

const options = {
hostname: parsedUrl.hostname,
Expand All @@ -55,17 +101,17 @@ export function sendSpansUsingJson(
headers: {
'Content-Length': Buffer.byteLength(body),
'Content-Type': 'application/json',
...collector.headers,
...collectorHeaders,
},
};

const request = parsedUrl.protocol === 'http:' ? http.request : https.request;
const req = request(options, (res: http.IncomingMessage) => {
if (res.statusCode && res.statusCode < 299) {
collector.logger.debug(`statusCode: ${res.statusCode}`);
logger.debug(`statusCode: ${res.statusCode}`);
onSuccess();
} else {
collector.logger.error(`statusCode: ${res.statusCode}`);
logger.error(`statusCode: ${res.statusCode}`);
onError({
code: res.statusCode,
message: res.statusMessage,
Expand All @@ -74,7 +120,7 @@ export function sendSpansUsingJson(
});

req.on('error', (error: Error) => {
collector.logger.error('error', error.message);
logger.error('error', error.message);
onError({
message: error.message,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,40 @@ import { HistogramAggregator } from '@opentelemetry/metrics';
import { hrTimeToNanoseconds } from '@opentelemetry/core';
describe('transformMetrics', () => {
describe('toCollectorMetric', () => {
it('should convert metric', () => {
beforeEach(() => {
// Counter
mockCounter.aggregator.update(1);

// Observer
mockObserver.aggregator.update(10);

// Histogram
mockHistogram.aggregator.update(7);
mockHistogram.aggregator.update(14);
(mockHistogram.aggregator as HistogramAggregator).reset();

// ValueRecorder
mockValueRecorder.aggregator.update(5);
});

afterEach(() => {
mockCounter.aggregator.update(-1); // Reset counter
(mockHistogram.aggregator as HistogramAggregator).reset();
});
it('should convert metric', () => {
ensureCounterIsCorrect(
transform.toCollectorMetric(mockCounter, 1592602232694000000),
hrTimeToNanoseconds(mockCounter.aggregator.toPoint().timestamp)
);
mockObserver.aggregator.update(10);
ensureObserverIsCorrect(
transform.toCollectorMetric(mockObserver, 1592602232694000000),
hrTimeToNanoseconds(mockObserver.aggregator.toPoint().timestamp)
);
mockHistogram.aggregator.update(7);
mockHistogram.aggregator.update(14);
(mockHistogram.aggregator as HistogramAggregator).reset();
ensureHistogramIsCorrect(
transform.toCollectorMetric(mockHistogram, 1592602232694000000),
hrTimeToNanoseconds(mockHistogram.aggregator.toPoint().timestamp)
);

mockValueRecorder.aggregator.update(5);
ensureValueRecorderIsCorrect(
transform.toCollectorMetric(mockValueRecorder, 1592602232694000000),
hrTimeToNanoseconds(mockValueRecorder.aggregator.toPoint().timestamp)
Expand Down
45 changes: 45 additions & 0 deletions packages/opentelemetry-exporter-collector/test/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,51 @@ export function ensureHistogramIsCorrect(
});
}

export function ensureExportedCounterIsCorrect(
metric: collectorTypes.opentelemetryProto.metrics.v1.Metric
) {
assert.deepStrictEqual(metric.metricDescriptor, {
name: 'test-counter',
description: 'sample counter description',
unit: '1',
type: 'MONOTONIC_INT64',
temporality: 'CUMULATIVE',
});
assert.deepStrictEqual(metric.doubleDataPoints, []);
assert.deepStrictEqual(metric.summaryDataPoints, []);
assert.deepStrictEqual(metric.histogramDataPoints, []);
assert.ok(metric.int64DataPoints);
assert.deepStrictEqual(metric.int64DataPoints[0].labels, []);
assert.deepStrictEqual(metric.int64DataPoints[0].value, '1');
assert.deepStrictEqual(
metric.int64DataPoints[0].startTimeUnixNano,
'1592602232694000128'
);
}

export function ensureExportedObserverIsCorrect(
metric: collectorTypes.opentelemetryProto.metrics.v1.Metric
) {
assert.deepStrictEqual(metric.metricDescriptor, {
name: 'test-observer',
description: 'sample observer description',
unit: '2',
type: 'DOUBLE',
temporality: 'DELTA',
});

assert.deepStrictEqual(metric.int64DataPoints, []);
assert.deepStrictEqual(metric.summaryDataPoints, []);
assert.deepStrictEqual(metric.histogramDataPoints, []);
assert.ok(metric.doubleDataPoints);
assert.deepStrictEqual(metric.doubleDataPoints[0].labels, []);
assert.deepStrictEqual(metric.doubleDataPoints[0].value, 10);
assert.deepStrictEqual(
metric.doubleDataPoints[0].startTimeUnixNano,
'1592602232694000128'
);
}

export function ensureResourceIsCorrect(
resource: collectorTypes.opentelemetryProto.resource.v1.Resource
) {
Expand Down
Loading