Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (spike): Package opentelemetry forwarding to xray #115

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Prev Previous commit
Successfully export metrics
  • Loading branch information
Chris Fraser committed Jan 9, 2023
commit e8706d373a10292d3a096c961260efc724e9a429
1 change: 1 addition & 0 deletions packages/@eventual/aws-cdk/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export class Service extends Construct implements IGrantable {
});

this.telemetry = new Telemetry(this, "Telemetry", {
serviceName: this.serviceName,
collectorConfigPath: props.telemetryCollectorConfigPath,
});
this.telemetry.configureFunction(this.activities.worker, "worker");
Expand Down
18 changes: 12 additions & 6 deletions packages/@eventual/aws-cdk/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import fs from "fs";
import { Effect, PolicyStatement } from "aws-cdk-lib/aws-iam";

export interface TelemetryProps {
serviceName: string;
collectorConfigPath?: string;
}

Expand All @@ -30,7 +31,7 @@ export class Telemetry extends Construct {
collectorFnUrl: FunctionUrl;
collectorConfigPath?: string;

constructor(scope: Construct, id: string, props?: TelemetryProps) {
constructor(scope: Construct, id: string, props: TelemetryProps) {
super(scope, id);

let collectorConfigPath =
Expand All @@ -56,16 +57,18 @@ export class Telemetry extends Construct {
)
),
}),
new LayerVersion(this, "otel-config", {
new LayerVersion(this, "otel-config-file", {
code: Code.fromAsset(path.dirname(collectorConfigPath), {
bundling: {
image: new DockerImage(""),
local: {
tryBundle(outputDir, _options) {
fs.copyFileSync(
collectorConfigPath!,
const config = fs
.readFileSync(collectorConfigPath, "utf-8")
.replace("{SERVICE_NAME}", props.serviceName);
fs.writeFileSync(
path.join(outputDir, path.basename(collectorConfigPath)),
fs.constants.COPYFILE_FICLONE
config
);
return true;
},
Expand All @@ -87,12 +90,15 @@ export class Telemetry extends Construct {
"xray:GetSamplingRules",
"xray:GetSamplingTargets",
"xray:GetSamplingStatisticSummaries",
"cloudwatch:PutMetricData",
"cloudwatch:PutMetricStream",
"cloudwatch:StartMetricStreams",
],
resources: ["*"],
effect: Effect.ALLOW,
}),
],
tracing: Tracing.ACTIVE
tracing: Tracing.ACTIVE,
});
this.collectorFn;
this.collectorFnUrl = this.collectorFn.addFunctionUrl({
Expand Down
Binary file modified packages/@eventual/aws-runtime/otlp-proxy-lambda/bootstrap.zip
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:
timeout: 25ms

exporters:
logging:
loglevel: debug
awsxray:
index_all_attributes: true
awsemf:

processors:
batch:
timeout: 1s
log_group_name: /metrics/eventual
log_stream_name: "{SERVICE_NAME}"

#enables output for traces to xray
service:
Expand All @@ -25,4 +26,4 @@ service:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [awsemf]
exporters: [logging, awsemf]
6 changes: 4 additions & 2 deletions packages/@eventual/aws-runtime/src/handlers/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { registerTelemetryApi } from "../telemetry.js";
* Creates an entrypoint function for orchestrating a workflow
* from within an AWS Lambda Function attached to a SQS FIFO queue.
*/
registerTelemetryApi();
const telemetry = registerTelemetryApi();

const orchestrate = createOrchestrator({
executionHistoryClient: createExecutionHistoryClient(),
Expand Down Expand Up @@ -68,7 +68,9 @@ export default middy(async (event: SQSEvent) => {
itemIdentifier: r,
})),
};
}).use(loggerMiddlewares);
})
.use(loggerMiddlewares)
.after(telemetry.flush);

function sqsRecordsToEvents(sqsRecords: SQSRecord[]) {
return sqsRecords.flatMap(sqsRecordToEvents);
Expand Down
52 changes: 43 additions & 9 deletions packages/@eventual/aws-runtime/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ import {
} from "@opentelemetry/core";
import {
BasicTracerProvider,
SimpleSpanProcessor,
BatchSpanProcessor,
} from "@opentelemetry/sdk-trace-base";
import { B3InjectEncoding, B3Propagator } from "@opentelemetry/propagator-b3";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { AsyncHooksContextManager } from "@opentelemetry/context-async-hooks";
import { Resource } from "@opentelemetry/resources";
import { trace, metrics } from "@opentelemetry/api";
import {
trace,
metrics,
diag,
DiagConsoleLogger,
TracerProvider,
} from "@opentelemetry/api";
import { serviceName, telemetryComponentName } from "./env.js";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto";
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto";
Expand All @@ -27,9 +33,26 @@ import { AWSXRayPropagator } from "@opentelemetry/propagator-aws-xray";
* This function will fail if run more than once, and we won't try to save it
* Ensure that its run during the init phase of the lambda (ie global scope)
*/
export function registerTelemetryApi() {
registerTracerProvider();
registerMetricsProvider();

export interface Telemetry {
tracerProvider: TracerProvider;
meterProvider: MeterProvider;
flush: () => Promise<void>;
}

//Set up tracing and metrics provider, and supply a hook to flush, ensuring pending requests are sent
export function registerTelemetryApi(): Telemetry {
const tracerProvider = registerTracerProvider();
const meterProvider = registerMeterProvider();
diag.setLogger(new DiagConsoleLogger());
return {
tracerProvider,
meterProvider,
flush: async () => {
await tracerProvider.forceFlush();
await meterProvider.forceFlush();
},
};
}

function registerTracerProvider() {
Expand All @@ -41,7 +64,11 @@ function registerTracerProvider() {
//Important for traces to show up on xray
idGenerator: new AWSXRayIdGenerator(),
});
provider.addSpanProcessor(new SimpleSpanProcessor(new OTLPTraceExporter()));
provider.addSpanProcessor(
new BatchSpanProcessor(new OTLPTraceExporter(), {
scheduledDelayMillis: 25,
})
);
const contextManager = new AsyncHooksContextManager();
provider.register({
contextManager,
Expand All @@ -56,16 +83,23 @@ function registerTracerProvider() {
});
contextManager.enable();
trace.setGlobalTracerProvider(provider);
return provider;
}

function registerMetricsProvider() {
function registerMeterProvider() {
const metricExporter = new OTLPMetricExporter();
const meterProvider = new MeterProvider({});
const meterProvider = new MeterProvider({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAMESPACE]: serviceName(),
[SemanticResourceAttributes.SERVICE_NAME]: telemetryComponentName(),
}),
});
meterProvider.addMetricReader(
new PeriodicExportingMetricReader({
exporter: metricExporter,
exportIntervalMillis: 1000,
exportIntervalMillis: 250,
})
);
metrics.setGlobalMeterProvider(meterProvider);
return meterProvider;
}