Skip to content

Commit

Permalink
feat: request body compression (#1358)
Browse files Browse the repository at this point in the history
* adds compression middleware

* format code

* fix build

* feat: Handle compression differently in node and browser

* feat: adds compression middleware

* Fix build

* Fix response

* Fix build

* chore: code linting

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* chore: linting

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* ci: adds unit test debug configuration

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* chore: linting

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* chore: doc linting

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* fix: aligns header mapping to a record

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* chore: linting

Signed-off-by: Vincent Biret <vibiret@microsoft.com>

* fix: release-please

* fix: await promise

---------

Signed-off-by: Vincent Biret <vibiret@microsoft.com>
Co-authored-by: Vincent Biret <vibiret@microsoft.com>
  • Loading branch information
rkodev and baywet authored Oct 22, 2024
1 parent bc3779e commit 40440fb
Show file tree
Hide file tree
Showing 19 changed files with 572 additions and 36 deletions.
38 changes: 38 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug Current Test File",
"autoAttachChildProcesses": true,
"skipFiles": ["<node_internals>/**", "**/node_modules/**"],
"program": "${workspaceRoot}/node_modules/vitest/vitest.mjs",
"args": ["run", "${relativeFile}"],
"smartStep": true,
"console": "integratedTerminal"
},
{
"type": "node",
"request": "launch",
"name": "Run Vitest Browser",
"program": "${workspaceRoot}/node_modules/vitest/vitest.mjs",
"console": "integratedTerminal",
"args": ["--inspect-brk", "--browser", "--no-file-parallelism"]
},
{
"type": "chrome",
"request": "attach",
"name": "Attach to Vitest Browser",
"port": 9229
}
],
"compounds": [
{
"name": "Debug Vitest Browser",
"configurations": ["Attach to Vitest Browser", "Run Vitest Browser"],
"stopAll": true
}
]
}
11 changes: 6 additions & 5 deletions packages/abstractions/src/utils/enumUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
* See License in the project root for license information.
* -------------------------------------------------------------------------------------------
*/
function reverseRecord(input: Record<PropertyKey, PropertyKey>): Record<PropertyKey, PropertyKey> {
const reverseRecord = (input: Record<PropertyKey, PropertyKey>): Record<PropertyKey, PropertyKey> => {
const entries = Object.entries(input).map(([key, value]) => [value, key]);
return Object.fromEntries(entries) as Record<PropertyKey, PropertyKey>;
}
};

/**
* Factory to create an UntypedString from a string.
* @param stringValue The string value to lookup the enum value from.
* @param originalType The type definition of the enum.
* @return The enu value.
* @typeparam T The type of the enum.
* @return The enum value.
*/
export function getEnumValueFromStringValue<T>(stringValue: string, originalType: Record<PropertyKey, PropertyKey>): T | undefined {
export const getEnumValueFromStringValue = <T>(stringValue: string, originalType: Record<PropertyKey, PropertyKey>): T | undefined => {
const reversed: Record<PropertyKey, PropertyKey> = reverseRecord(originalType);
return originalType[reversed[stringValue]] as T;
}
};
9 changes: 6 additions & 3 deletions packages/http/fetch/src/fetchRequestAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ export class FetchRequestAdapter implements RequestAdapter {
if (response) {
const responseContentLength = response.headers.get("Content-Length");
if (responseContentLength) {
spanForAttributes.setAttribute("http.response.body.size", parseInt(responseContentLength));
spanForAttributes.setAttribute("http.response.body.size", parseInt(responseContentLength, 10));
}
const responseContentType = response.headers.get("Content-Type");
if (responseContentType) {
Expand Down Expand Up @@ -527,13 +527,16 @@ export class FetchRequestAdapter implements RequestAdapter {
}
const requestContentLength = requestInfo.headers.tryGetValue("Content-Length");
if (requestContentLength) {
spanForAttributes.setAttribute("http.response.body.size", parseInt(requestContentLength[0]));
spanForAttributes.setAttribute("http.response.body.size", parseInt(requestContentLength[0], 10));
}
const requestContentType = requestInfo.headers.tryGetValue("Content-Type");
if (requestContentType) {
spanForAttributes.setAttribute("http.request.header.content-type", requestContentType);
}
const headers: [string, string][] | undefined = requestInfo.headers ? Array.from(requestInfo.headers.keys()).map((key) => [key.toString().toLocaleLowerCase(), this.foldHeaderValue(requestInfo.headers.tryGetValue(key))]) : undefined;
const headers: Record<string, string> | undefined = {};
requestInfo.headers?.forEach((_, key) => {
headers[key.toString().toLocaleLowerCase()] = this.foldHeaderValue(requestInfo.headers.tryGetValue(key));
});
const request = {
method,
headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Middleware } from "../middleware";
import { ParametersNameDecodingHandler } from "../parametersNameDecodingHandler";
import { RetryHandler } from "../retryHandler";
import { UserAgentHandler } from "../userAgentHandler";
import { CompressionHandler } from "../compressionHandler";

/**
* @class
Expand All @@ -25,6 +26,6 @@ export class MiddlewareFactory {
*/
public static getDefaultMiddlewares(customFetch: (request: string, init: RequestInit) => Promise<Response> = (...args) => fetch(...args) as any): Middleware[] {
// Browsers handles redirection automatically and do not require the redirectionHandler
return [new RetryHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
return [new RetryHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new CompressionHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
}
}
239 changes: 239 additions & 0 deletions packages/http/fetch/src/middlewares/compressionHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/**
* -------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All Rights Reserved. Licensed under the MIT License.
* See License in the project root for license information.
* -------------------------------------------------------------------------------------------
*/

import { type RequestOption, inNodeEnv } from "@microsoft/kiota-abstractions";
import { Span, trace } from "@opentelemetry/api";

import { getObservabilityOptionsFromRequest } from "../observabilityOptions";
import type { Middleware } from "./middleware";
import { CompressionHandlerOptions, CompressionHandlerOptionsKey } from "./options/compressionHandlerOptions";
import type { FetchHeadersInit, FetchRequestInit } from "../utils/fetchDefinitions";
import { deleteRequestHeader, getRequestHeader, setRequestHeader } from "../utils/headersUtil";

/**
* Compress the url content.
*/
export class CompressionHandler implements Middleware {
next: Middleware | undefined;

/**
* @private
* @static
* A member holding the name of content range header
*/
private static readonly CONTENT_RANGE_HEADER = "Content-Range";

/**
* @private
* @static
* A member holding the name of content encoding header
*/
private static readonly CONTENT_ENCODING_HEADER = "Content-Encoding";

/**
* @public
* @constructor
* Creates a new instance of the CompressionHandler class
* @param {CompressionHandlerOptions} handlerOptions The options for the compression handler.
* @returns An instance of the CompressionHandler class
*/
public constructor(private readonly handlerOptions: CompressionHandlerOptions = new CompressionHandlerOptions()) {
if (!handlerOptions) {
throw new Error("handlerOptions cannot be undefined");
}
}

/**
* @inheritdoc
*/
public execute(url: string, requestInit: RequestInit, requestOptions?: Record<string, RequestOption> | undefined): Promise<Response> {
let currentOptions = this.handlerOptions;
if (requestOptions?.[CompressionHandlerOptionsKey]) {
currentOptions = requestOptions[CompressionHandlerOptionsKey] as CompressionHandlerOptions;
}
const obsOptions = getObservabilityOptionsFromRequest(requestOptions);
if (obsOptions) {
return trace.getTracer(obsOptions.getTracerInstrumentationName()).startActiveSpan("compressionHandler - execute", (span) => {
try {
span.setAttribute("com.microsoft.kiota.handler.compression.enable", currentOptions.ShouldCompress);
return this.executeInternal(currentOptions, url, requestInit as FetchRequestInit, requestOptions, span);
} finally {
span.end();
}
});
}
return this.executeInternal(currentOptions, url, requestInit as FetchRequestInit, requestOptions);
}

private async executeInternal(options: CompressionHandlerOptions, url: string, requestInit: FetchRequestInit, requestOptions?: Record<string, RequestOption>, span?: Span): Promise<Response> {
if (!options.ShouldCompress || this.contentRangeBytesIsPresent(requestInit.headers) || this.contentEncodingIsPresent(requestInit.headers) || requestInit.body === null || requestInit.body === undefined) {
return this.next?.execute(url, requestInit as RequestInit, requestOptions) ?? Promise.reject(new Error("Response is undefined"));
}

span?.setAttribute("http.request.body.compressed", true);

const unCompressedBody = requestInit.body;
const unCompressedBodySize = this.getRequestBodySize(unCompressedBody);

// compress the request body
const compressedBody = await this.compressRequestBody(unCompressedBody);

// add Content-Encoding to request header
setRequestHeader(requestInit, CompressionHandler.CONTENT_ENCODING_HEADER, "gzip");
requestInit.body = compressedBody.compressedBody;

span?.setAttribute("http.request.body.size", compressedBody.size);

// execute the next middleware and check if the response code is 415
let response = await this.next?.execute(url, requestInit as RequestInit, requestOptions);
if (!response) {
throw new Error("Response is undefined");
}
if (response.status === 415) {
// remove the Content-Encoding header
deleteRequestHeader(requestInit, CompressionHandler.CONTENT_ENCODING_HEADER);
requestInit.body = unCompressedBody;
span?.setAttribute("http.request.body.compressed", false);
span?.setAttribute("http.request.body.size", unCompressedBodySize);

response = await this.next?.execute(url, requestInit as RequestInit, requestOptions);
}
return response !== undefined && response !== null ? Promise.resolve(response) : Promise.reject(new Error("Response is undefined"));
}

private contentRangeBytesIsPresent(header: FetchHeadersInit | undefined): boolean {
if (!header) {
return false;
}
const contentRange = getRequestHeader(header, CompressionHandler.CONTENT_RANGE_HEADER);
return contentRange?.toLowerCase().includes("bytes") ?? false;
}

private contentEncodingIsPresent(header: FetchHeadersInit | undefined): boolean {
if (!header) {
return false;
}
return getRequestHeader(header, CompressionHandler.CONTENT_ENCODING_HEADER) !== undefined;
}

private getRequestBodySize(body: unknown): number {
if (!body) {
return 0;
}
if (typeof body === "string") {
return body.length;
}
if (body instanceof Blob) {
return body.size;
}
if (body instanceof ArrayBuffer) {
return body.byteLength;
}
if (ArrayBuffer.isView(body)) {
return body.byteLength;
}
if (inNodeEnv() && Buffer.isBuffer(body)) {
return body.byteLength;
}
throw new Error("Unsupported body type");
}

private readBodyAsBytes(body: unknown): { stream: ReadableStream<Uint8Array>; size: number } {
if (!body) {
return { stream: new ReadableStream<Uint8Array>(), size: 0 };
}

const uint8ArrayToStream = (uint8Array: Uint8Array): ReadableStream<Uint8Array> => {
return new ReadableStream({
start(controller) {
controller.enqueue(uint8Array);
controller.close();
},
});
};

if (typeof body === "string") {
return { stream: uint8ArrayToStream(new TextEncoder().encode(body)), size: body.length };
}
if (body instanceof Blob) {
return { stream: body.stream(), size: body.size };
}
if (body instanceof ArrayBuffer) {
return { stream: uint8ArrayToStream(new Uint8Array(body)), size: body.byteLength };
}
if (ArrayBuffer.isView(body)) {
return { stream: uint8ArrayToStream(new Uint8Array(body.buffer, body.byteOffset, body.byteLength)), size: body.byteLength };
}
throw new Error("Unsupported body type");
}

private async compressRequestBody(body: unknown): Promise<{
compressedBody: ArrayBuffer | Buffer;
size: number;
}> {
if (!inNodeEnv()) {
// in browser
const compressionData = this.readBodyAsBytes(body);
const compressedBody = await this.compressUsingCompressionStream(compressionData.stream);
return {
compressedBody: compressedBody.body,
size: compressedBody.size,
};
} else {
// In Node.js
const compressedBody = await this.compressUsingZlib(body);
return {
compressedBody,
size: compressedBody.length,
};
}
}

private async compressUsingZlib(body: unknown): Promise<Buffer> {
// @ts-ignore
const zlib = await import("zlib");
return await new Promise((resolve, reject) => {
zlib.gzip(body as string | ArrayBuffer | NodeJS.ArrayBufferView, (err, compressed) => {
if (err) {
reject(err);
} else {
resolve(compressed);
}
});
});
}

private async compressUsingCompressionStream(uint8ArrayStream: ReadableStream<Uint8Array>): Promise<{ body: ArrayBuffer; size: number }> {
const compressionStream = new CompressionStream("gzip");

const compressedStream = uint8ArrayStream.pipeThrough<Uint8Array>(compressionStream);

const reader = compressedStream.getReader();
const compressedChunks: Uint8Array[] = [];
let totalLength = 0;

let result = await reader.read();
while (!result.done) {
const chunk = result.value;
compressedChunks.push(chunk);
totalLength += chunk.length;
result = await reader.read();
}

const compressedArray = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of compressedChunks) {
compressedArray.set(chunk, offset);
offset += chunk.length;
}

return {
body: compressedArray.buffer,
size: compressedArray.length,
};
}
}
8 changes: 2 additions & 6 deletions packages/http/fetch/src/middlewares/customFetchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ export class CustomFetchHandler implements Middleware {
*/
next: Middleware | undefined;

constructor(private customFetch: (input: string, init: RequestInit) => Promise<Response>) {}
constructor(private readonly customFetch: (input: string, init: RequestInit) => Promise<Response>) {}

/**
* @public
* @async
* To execute the current middleware
* @param {Context} context - The request context object
* @returns A promise that resolves to nothing
* @inheritdoc
*/
public async execute(url: string, requestInit: RequestInit): Promise<Response> {
return await this.customFetch(url, requestInit);
Expand Down
2 changes: 2 additions & 0 deletions packages/http/fetch/src/middlewares/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export interface Middleware {
next: Middleware | undefined;

/**
* @public
* @async
* Main method of the middleware.
* @param requestInit The Fetch RequestInit object.
* @param url The URL of the request.
Expand Down
3 changes: 2 additions & 1 deletion packages/http/fetch/src/middlewares/middlewareFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { ParametersNameDecodingHandler } from "./parametersNameDecodingHandler";
import { RedirectHandler } from "./redirectHandler";
import { RetryHandler } from "./retryHandler";
import { UserAgentHandler } from "./userAgentHandler";
import { CompressionHandler } from "./compressionHandler";

/**
* @class
Expand All @@ -25,6 +26,6 @@ export class MiddlewareFactory {
* @returns an array of the middleware handlers of the default middleware chain
*/
public static getDefaultMiddlewares(customFetch: (request: string, init: RequestInit) => Promise<Response> = (...args) => fetch(...args) as any): Middleware[] {
return [new RetryHandler(), new RedirectHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
return [new RetryHandler(), new RedirectHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new CompressionHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
}
}
Loading

0 comments on commit 40440fb

Please sign in to comment.