-
Notifications
You must be signed in to change notification settings - Fork 10
feat(apisix): separate inline upstream #354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9c11178
167b545
6897819
c1adfd8
f62da20
959de96
574e1a8
c9ec2fe
b7d3d36
7d31ada
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,11 +6,13 @@ import { | |
Subject, | ||
catchError, | ||
concatMap, | ||
delay, | ||
from, | ||
map, | ||
mergeMap, | ||
of, | ||
reduce, | ||
retry, | ||
tap, | ||
throwError, | ||
} from 'rxjs'; | ||
|
@@ -36,22 +38,121 @@ export class Operator extends ADCSDK.backend.BackendEventSource { | |
|
||
private operate(event: ADCSDK.Event) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
See the next comment actually only one function is enough. private operate(event: ADCSDK.Event) {
// keep original codes
}
// handle only event for service
private operateService(event: ADCSDK.Event) {
const operateWithRetry = (op: () => Promise<AxiosResponse>) =>
defer(op).pipe(retry({ count: 3, delay: 100 /* or fn */ }));
const paths = ['/apisix/admin/upstreams/xxx', '/apisix/admin/services/xxx'];
const isUpdate = event.type !== ADCSDK.EventType.DELETE;
return from(isUpdate ? paths : paths.reverse()).pipe(
map(
(url) => () =>
this.client.request({
method: 'DELETE',
url,
...(isUpdate && {
method: 'PUT',
data: this.fromADC(event, this.opts.version),
}),
}),
),
concatMap(operateWithRetry),
);
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Subsequently, it can be further simplified to a single private operate(event: ADCSDK.Event) {
const { type, resourceType, resourceId, parentId } = event;
const isUpdate = type !== ADCSDK.EventType.DELETE;
const PATH_PREFIX = '/apisix/admin';
const paths = [
`${PATH_PREFIX}/${
resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
? `consumers/${parentId}/credentials/${resourceId}`
: `${resourceTypeToAPIName(resourceType)}/${resourceId}`
}`,
];
if (event.resourceType === ADCSDK.ResourceType.SERVICE) {
const path = `${PATH_PREFIX}/upstreams/${event.resourceId}`;
if (event.type === ADCSDK.EventType.DELETE)
paths.push(path); // services will be deleted before upstreams
else paths.unshift(path); // services will be created/updated after upstreams
}
const operateWithRetry = (op: () => Promise<AxiosResponse>) =>
defer(op).pipe(retry({ count: 3, delay: 100 /* or fn => timeout * 2 ** count */ }));
return from(paths).pipe(
map(
(path) => () =>
this.client.request({
method: 'DELETE',
url: path,
...(isUpdate && {
method: 'PUT',
data: this.fromADC(event, this.opts.version),
}),
}),
),
concatMap(operateWithRetry),
);
} |
||
const { type, resourceType, resourceId, parentId } = event; | ||
const isUpdate = type !== ADCSDK.EventType.DELETE; | ||
const path = `/apisix/admin/${ | ||
resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL | ||
? `consumers/${parentId}/credentials/${resourceId}` | ||
: `${resourceTypeToAPIName(resourceType)}/${resourceId}` | ||
}`; | ||
const path = `/apisix/admin/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL | ||
? `consumers/${parentId}/credentials/${resourceId}` | ||
: `${resourceTypeToAPIName(resourceType)}/${resourceId}` | ||
}`; | ||
|
||
// Handle deletion | ||
if (type === ADCSDK.EventType.DELETE) { | ||
// Delete service with upstream: delete service first, then upstream | ||
if (resourceType === ADCSDK.ResourceType.SERVICE && event.oldValue && (event.oldValue as ADCSDK.Service).upstream) { | ||
return this.deleteServiceWithUpstream(event, path); | ||
} | ||
return from(this.client.request({ url: path, method: 'DELETE' })); | ||
} | ||
|
||
const data = this.fromADC(event, this.opts.version); | ||
|
||
// Handle service with upstream changes | ||
if (resourceType === ADCSDK.ResourceType.SERVICE) { | ||
const oldUpstream = event.oldValue ? (event.oldValue as ADCSDK.Service).upstream : undefined; | ||
const newUpstream = (event.newValue as ADCSDK.Service).upstream; | ||
|
||
// oldValue has upstream, newValue doesn't -> delete upstream | ||
if (oldUpstream && !newUpstream) { | ||
return this.deleteUpstreamThenUpdateService(event, data as typing.Service, path); | ||
} | ||
|
||
// newValue has upstream -> create/update upstream | ||
if (newUpstream) { | ||
return this.upsertServiceWithUpstream(event, data as typing.Service, path); | ||
} | ||
} | ||
|
||
return from(this.client.request({ url: path, method: 'PUT', data })); | ||
} | ||
|
||
private upsertServiceWithUpstream(event: ADCSDK.Event, data: typing.Service, servicePath: string) { | ||
const upstreamData: typing.Upstream = { | ||
...data.upstream, | ||
id: event.resourceId, | ||
name: event.resourceName, | ||
}; | ||
|
||
const serviceData = { | ||
...data, | ||
upstream: undefined, | ||
upstream_id: event.resourceId, | ||
}; | ||
|
||
// Create/Update upstream first, then service | ||
return from( | ||
this.client.request({ | ||
method: 'DELETE', | ||
url: path, | ||
...(isUpdate && { | ||
url: `/apisix/admin/upstreams/${event.resourceId}`, | ||
method: 'PUT', | ||
data: upstreamData, | ||
}), | ||
).pipe( | ||
concatMap(() => | ||
this.client.request({ | ||
url: servicePath, | ||
method: 'PUT', | ||
data: this.fromADC(event, this.opts.version), | ||
data: serviceData, | ||
}), | ||
), | ||
); | ||
} | ||
|
||
private deleteUpstreamWithRetry(upstreamId: string) { | ||
// Delete upstream with retry on race condition | ||
return from( | ||
this.client.request({ | ||
url: `/apisix/admin/upstreams/${upstreamId}`, | ||
method: 'DELETE', | ||
}), | ||
).pipe( | ||
retry({ | ||
count: 3, | ||
delay: (error: Error | AxiosError, retryCount: number) => { | ||
// Only retry if upstream deletion fails due to "still using" race condition | ||
if ( | ||
axios.isAxiosError(error) && | ||
error.response?.data?.error_msg?.includes('is still using it') | ||
) { | ||
// Exponential backoff: 100ms, 200ms, 400ms | ||
const delayMs = 100 * Math.pow(2, retryCount - 1); | ||
return of(null).pipe(delay(delayMs)); | ||
} | ||
// Don't retry other errors | ||
return throwError(() => error); | ||
}, | ||
}), | ||
); | ||
} | ||
|
||
private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) { | ||
// Delete service first, then upstream with retry | ||
return from( | ||
this.client.request({ | ||
url: servicePath, | ||
method: 'DELETE', | ||
}), | ||
).pipe( | ||
concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)), | ||
); | ||
} | ||
|
||
private deleteUpstreamThenUpdateService(event: ADCSDK.Event, data: typing.Service, servicePath: string) { | ||
// Update service first (remove upstream reference), then delete upstream with retry | ||
return from( | ||
this.client.request({ | ||
url: servicePath, | ||
method: 'PUT', | ||
data, | ||
}), | ||
).pipe( | ||
concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)), | ||
); | ||
} | ||
|
||
|
@@ -112,7 +213,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { | |
() => | ||
new Error( | ||
error.response?.data?.error_msg ?? | ||
JSON.stringify(error.response?.data), | ||
JSON.stringify(error.response?.data), | ||
), | ||
); | ||
return throwError(() => error); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ensure that code is always formatted by Prettier and avoid making changes that serve no practical purpose.