Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add features to task queue functions #1423

Merged
merged 4 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Add features to task queue functions. (#1423)
49 changes: 47 additions & 2 deletions spec/common/providers/tasks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ describe("onEnqueueHandler", () => {
function mockEnqueueRequest(
data: unknown,
contentType = "application/json",
context: { authorization?: string } = { authorization: "Bearer abc" }
context: { authorization?: string } = { authorization: "Bearer abc" },
headers: Record<string, string> = {}
): ReturnType<typeof mockRequest> {
return mockRequest(data, contentType, context);
return mockRequest(data, contentType, context, headers);
}

before(() => {
Expand Down Expand Up @@ -194,6 +195,50 @@ describe("onEnqueueHandler", () => {
});
});

it("should populate context with values from header", () => {
const headers = {
"x-cloudtasks-queuename": "x",
"x-cloudtasks-taskname": "x",
"x-cloudtasks-taskretrycount": "1",
"x-cloudtasks-taskexecutioncount": "1",
"x-cloudtasks-tasketa": "timestamp",
"x-cloudtasks-taskpreviousresponse": "400",
"x-cloudtasks-taskretryreason": "something broke",
};
const expectedContext = {
queueName: "x",
id: "x",
retryCount: 1,
executionCount: 1,
scheduledTime: "timestamp",
previousResponse: 400,
retryReason: "something broke",
};

const projectId = getApp().options.projectId;
const idToken = generateIdToken(projectId);
return runTaskTest({
httpRequest: mockEnqueueRequest(
{},
"application/json",
{ authorization: "Bearer " + idToken },
headers
),
expectedData: {},
taskFunction: (data, context) => {
checkAuthContext(context, projectId, mocks.user_id);
expect(context).to.include(expectedContext);
return null;
},
taskFunction2: (request) => {
checkAuthContext(request, projectId, mocks.user_id);
expect(request).to.include(expectedContext);
return null;
},
expectedStatus: 204,
});
});

it("should handle auth", async () => {
const projectId = getApp().options.projectId;
const idToken = generateIdToken(projectId);
Expand Down
5 changes: 5 additions & 0 deletions spec/v1/providers/tasks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ describe("#onDispatch", () => {
uid: "abc",
token: "token" as any,
},
queueName: "fn",
id: "task0",
retryCount: 0,
executionCount: 0,
scheduledTime: "timestamp",
};
let done = false;
const cf = taskQueue().onDispatch((d, c) => {
Expand Down
91 changes: 82 additions & 9 deletions src/common/providers/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,72 @@ export interface TaskContext {
* The result of decoding and verifying an ODIC token.
*/
auth?: AuthData;

/**
* The name of the queue.
* Populated via the `X-CloudTasks-QueueName` header.
*/
queueName: string;

/**
* The "short" name of the task, or, if no name was specified at creation, a unique
* system-generated id.
* This is the my-task-id value in the complete task name, ie, task_name =
* projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
* Populated via the `X-CloudTasks-TaskName` header.
*/
id: string;

/**
* The number of times this task has been retried.
* For the first attempt, this value is 0. This number includes attempts where the task failed
* due to 5XX error codes and never reached the execution phase.
* Populated via the `X-CloudTasks-TaskRetryCount` header.
*/
retryCount: number;

/**
* The total number of times that the task has received a response from the handler.
* Since Cloud Tasks deletes the task once a successful response has been received, all
* previous handler responses were failures. This number does not include failures due to 5XX
* error codes.
* Populated via the `X-CloudTasks-TaskExecutionCount` header.
*/
executionCount: number;

/**
* The schedule time of the task, as an RFC 3339 string in UTC time zone.
* Populated via the `X-CloudTasks-TaskETA` header, which uses seconds since January 1 1970.
*/
scheduledTime: string;

/**
* The HTTP response code from the previous retry.
* Populated via the `X-CloudTasks-TaskPreviousResponse` header
*/
previousResponse?: number;

/**
* The reason for retrying the task.
* Populated via the `X-CloudTasks-TaskRetryReason` header.
*/
retryReason?: string;

/**
* Raw request headers.
*/
headers?: Record<string, string>;
}

/**
* The request used to call a Task Queue function.
* The request used to call a task queue function.
*/
export interface Request<T = any> {
export type Request<T = any> = TaskContext & {
/**
* The parameters used by a client when calling this function.
*/
data: T;

/**
* The result of decoding and verifying an ODIC token.
*/
auth?: AuthData;
}
};

type v1TaskHandler = (data: any, context: TaskContext) => void | Promise<void>;
type v2TaskHandler<Req> = (request: Request<Req>) => void | Promise<void>;
Expand All @@ -119,7 +169,30 @@ export function onDispatchHandler<Req = any>(
throw new https.HttpsError("invalid-argument", "Bad Request");
}

const context: TaskContext = {};
const headers: Record<string, string> = {};
for (const [key, value] of Object.entries(req.headers)) {
if (!Array.isArray(value)) {
headers[key] = value;
}
}

const context: TaskContext = {
queueName: req.header("X-CloudTasks-QueueName"),
id: req.header("X-CloudTasks-TaskName"),
retryCount: req.header("X-CloudTasks-TaskRetryCount")
? Number(req.header("X-CloudTasks-TaskRetryCount"))
: undefined,
executionCount: req.header("X-CloudTasks-TaskExecutionCount")
? Number(req.header("X-CloudTasks-TaskExecutionCount"))
: undefined,
scheduledTime: req.header("X-CloudTasks-TaskETA"),
previousResponse: req.header("X-CloudTasks-TaskPreviousResponse")
? Number(req.header("X-CloudTasks-TaskPreviousResponse"))
: undefined,
retryReason: req.header("X-CloudTasks-TaskRetryReason"),
headers,
};

if (!process.env.FUNCTIONS_EMULATOR) {
const authHeader = req.header("Authorization") || "";
const token = authHeader.match(/^Bearer (.*)$/)?.[1];
Expand Down