Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Differ } from '@api7/adc-differ';
import * as ADCSDK from '@api7/adc-sdk';

import { BackendAPISIX } from '../../src';
Expand All @@ -22,6 +23,140 @@ describe('Service-Upstreams E2E', () => {
});
});

describe('Service inline upstream', () => {
const serviceName = 'test-inline-upstream';
const service = {
name: serviceName,
upstream: {
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 100,
},
],
},
} satisfies ADCSDK.Service;

it('Create service with inline upstream', async () =>
syncEvents(backend, Differ.diff({ services: [service] }, {})));

it('Dump (inline upstream should exist)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream).toMatchObject({
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 100,
},
],
});
// Verify that inline upstream has no id and name
expect(testService?.upstream?.id).toBeUndefined();
expect(testService?.upstream?.name).toBeUndefined();
});

const updatedService = {
name: serviceName,
upstream: {
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 50,
},
{
host: 'example.com',
port: 80,
weight: 50,
},
],
},
} satisfies ADCSDK.Service;
it('Update service inline upstream', async () =>
syncEvents(
backend,
Differ.diff({ services: [updatedService] }, await dumpConfiguration(backend)),
));

it('Dump (inline upstream should be updated)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream?.nodes).toHaveLength(2);
expect(testService?.upstream).toMatchObject(updatedService.upstream);
// Verify that inline upstream still has no id and name
expect(testService?.upstream?.id).toBeUndefined();
expect(testService?.upstream?.name).toBeUndefined();
});

const serviceWithoutUpstream = {
name: serviceName,
hosts: ['test.example.com'],
} satisfies ADCSDK.Service;
it('Update service to remove inline upstream', async () =>
syncEvents(
backend,
Differ.diff(
{ services: [serviceWithoutUpstream] },
await dumpConfiguration(backend),
),
));

it('Dump (inline upstream should be removed)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream).toBeUndefined();
expect(testService?.hosts).toEqual(['test.example.com']);
});

const serviceForDeletion = {
name: serviceName,
hosts: ['test.example.com'],
upstream: {
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 100,
},
],
},
} satisfies ADCSDK.Service;
it('Re-add inline upstream for deletion test', async () =>
syncEvents(
backend,
Differ.diff(
{ services: [serviceForDeletion] },
await dumpConfiguration(backend),
),
));

it('Dump (inline upstream should exist again)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream).toBeDefined();
});

it('Delete service with inline upstream', async () =>
syncEvents(backend, Differ.diff({}, await dumpConfiguration(backend))));

it('Dump again (service should not exist)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeUndefined();
});
});

describe('Service multiple upstreams', () => {
const serviceName = 'test';
const service = {
Expand Down
16 changes: 8 additions & 8 deletions libs/backend-apisix/e2e/support/utils.ts
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ensure that code is always formatted by Prettier and avoid making changes that serve no practical purpose.

Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ export const createEvent = (
resourceName,
resourceId:
resourceType === ADCSDK.ResourceType.CONSUMER ||
resourceType === ADCSDK.ResourceType.GLOBAL_RULE ||
resourceType === ADCSDK.ResourceType.PLUGIN_METADATA
resourceType === ADCSDK.ResourceType.GLOBAL_RULE ||
resourceType === ADCSDK.ResourceType.PLUGIN_METADATA
? resourceName
: resourceType === ADCSDK.ResourceType.SSL
? ADCSDK.utils.generateId((resource as ADCSDK.SSL).snis.join(','))
: ADCSDK.utils.generateId(
parentName ? `${parentName}.${resourceName}` : resourceName,
),
parentName ? `${parentName}.${resourceName}` : resourceName,
),
newValue: resource,
parentId: parentName
? resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
Expand Down Expand Up @@ -71,12 +71,12 @@ export const deleteEvent = (
resourceName,
resourceId:
resourceType === ADCSDK.ResourceType.CONSUMER ||
resourceType === ADCSDK.ResourceType.GLOBAL_RULE ||
resourceType === ADCSDK.ResourceType.PLUGIN_METADATA
resourceType === ADCSDK.ResourceType.GLOBAL_RULE ||
resourceType === ADCSDK.ResourceType.PLUGIN_METADATA
? resourceName
: ADCSDK.utils.generateId(
parentName ? `${parentName}.${resourceName}` : resourceName,
),
parentName ? `${parentName}.${resourceName}` : resourceName,
),
parentId: parentName
? resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
? parentName
Expand Down
5 changes: 3 additions & 2 deletions libs/backend-apisix/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
}
},
"devDependencies": {
"@api7/adc-sdk": "workspace:*"
"@api7/adc-sdk": "workspace:*",
"@api7/adc-differ": "workspace:*"
},
"nx": {
"name": "backend-apisix",
Expand All @@ -29,4 +30,4 @@
}
}
}
}
}
3 changes: 3 additions & 0 deletions libs/backend-apisix/src/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { SemVer, gte as semVerGTE } from 'semver';
import { ToADC } from './transformer';
import * as typing from './typing';
import { resourceTypeToAPIName } from './utils';
import { unset } from 'lodash';

export interface FetcherOptions {
client: AxiosInstance;
Expand Down Expand Up @@ -271,6 +272,8 @@ export class Fetcher extends ADCSDK.backend.BackendEventSource {
produce(service, (serviceDraft) => {
if (service.upstream_id)
serviceDraft.upstream = upstreamIdMap[service.upstream_id];
unset(serviceDraft, 'upstream.id');
unset(serviceDraft, 'upstream.name');
if (upstreamServiceIdMap[service.id])
serviceDraft.upstreams = upstreamServiceIdMap[service.id];
}),
Expand Down
123 changes: 112 additions & 11 deletions libs/backend-apisix/src/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import {
Subject,
catchError,
concatMap,
delay,
from,
map,
mergeMap,
of,
reduce,
retry,
tap,
throwError,
} from 'rxjs';
Expand All @@ -36,22 +38,121 @@ export class Operator extends ADCSDK.backend.BackendEventSource {

private operate(event: ADCSDK.Event) {
Copy link
Collaborator

@bzp2010 bzp2010 Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that service is a special case, creating an operateService function for it is a better choice. Let other resources continue using the simple operate function, while services with specific upstream expansion logic use a separate function.

See the next comment actually only one function is enough.

private operate(event: ADCSDK.Event) {
   // keep original codes
}

// handle only event for service
private operateService(event: ADCSDK.Event) {
  const operateWithRetry = (op: () => Promise<AxiosResponse>) =>
    defer(op).pipe(retry({ count: 3, delay: 100 /* or fn */ }));
  const paths = ['/apisix/admin/upstreams/xxx', '/apisix/admin/services/xxx'];
  const isUpdate = event.type !== ADCSDK.EventType.DELETE;

  return from(isUpdate ? paths : paths.reverse()).pipe(
    map(
      (url) => () =>
        this.client.request({
          method: 'DELETE',
          url,
          ...(isUpdate && {
            method: 'PUT',
            data: this.fromADC(event, this.opts.version),
          }),
        }),
    ),
    concatMap(operateWithRetry),
  );
}

Copy link
Collaborator

@bzp2010 bzp2010 Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subsequently, it can be further simplified to a single operate function.

private operate(event: ADCSDK.Event) {
  const { type, resourceType, resourceId, parentId } = event;
  const isUpdate = type !== ADCSDK.EventType.DELETE;
  const PATH_PREFIX = '/apisix/admin';
  const paths = [
    `${PATH_PREFIX}/${
      resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
        ? `consumers/${parentId}/credentials/${resourceId}`
        : `${resourceTypeToAPIName(resourceType)}/${resourceId}`
    }`,
  ];

  if (event.resourceType === ADCSDK.ResourceType.SERVICE) {
    const path = `${PATH_PREFIX}/upstreams/${event.resourceId}`;
    if (event.type === ADCSDK.EventType.DELETE)
      paths.push(path); // services will be deleted before upstreams
    else paths.unshift(path); // services will be created/updated after upstreams
  }

  const operateWithRetry = (op: () => Promise<AxiosResponse>) =>
    defer(op).pipe(retry({ count: 3, delay: 100 /* or fn => timeout * 2 ** count */ }));
  return from(paths).pipe(
    map(
      (path) => () =>
        this.client.request({
          method: 'DELETE',
          url: path,
          ...(isUpdate && {
            method: 'PUT',
            data: this.fromADC(event, this.opts.version),
          }),
        }),
    ),
    concatMap(operateWithRetry),
  );
}

const { type, resourceType, resourceId, parentId } = event;
const isUpdate = type !== ADCSDK.EventType.DELETE;
const path = `/apisix/admin/${
resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
? `consumers/${parentId}/credentials/${resourceId}`
: `${resourceTypeToAPIName(resourceType)}/${resourceId}`
}`;
const path = `/apisix/admin/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
? `consumers/${parentId}/credentials/${resourceId}`
: `${resourceTypeToAPIName(resourceType)}/${resourceId}`
}`;

// Handle deletion
if (type === ADCSDK.EventType.DELETE) {
// Delete service with upstream: delete service first, then upstream
if (resourceType === ADCSDK.ResourceType.SERVICE && event.oldValue && (event.oldValue as ADCSDK.Service).upstream) {
return this.deleteServiceWithUpstream(event, path);
}
return from(this.client.request({ url: path, method: 'DELETE' }));
}

const data = this.fromADC(event, this.opts.version);

// Handle service with upstream changes
if (resourceType === ADCSDK.ResourceType.SERVICE) {
const oldUpstream = event.oldValue ? (event.oldValue as ADCSDK.Service).upstream : undefined;
const newUpstream = (event.newValue as ADCSDK.Service).upstream;

// oldValue has upstream, newValue doesn't -> delete upstream
if (oldUpstream && !newUpstream) {
return this.deleteUpstreamThenUpdateService(event, data as typing.Service, path);
}

// newValue has upstream -> create/update upstream
if (newUpstream) {
return this.upsertServiceWithUpstream(event, data as typing.Service, path);
}
}

return from(this.client.request({ url: path, method: 'PUT', data }));
}

private upsertServiceWithUpstream(event: ADCSDK.Event, data: typing.Service, servicePath: string) {
const upstreamData: typing.Upstream = {
...data.upstream,
id: event.resourceId,
name: event.resourceName,
};

const serviceData = {
...data,
upstream: undefined,
upstream_id: event.resourceId,
};

// Create/Update upstream first, then service
return from(
this.client.request({
method: 'DELETE',
url: path,
...(isUpdate && {
url: `/apisix/admin/upstreams/${event.resourceId}`,
method: 'PUT',
data: upstreamData,
}),
).pipe(
concatMap(() =>
this.client.request({
url: servicePath,
method: 'PUT',
data: this.fromADC(event, this.opts.version),
data: serviceData,
}),
),
);
}

private deleteUpstreamWithRetry(upstreamId: string) {
// Delete upstream with retry on race condition
return from(
this.client.request({
url: `/apisix/admin/upstreams/${upstreamId}`,
method: 'DELETE',
}),
).pipe(
retry({
count: 3,
delay: (error: Error | AxiosError, retryCount: number) => {
// Only retry if upstream deletion fails due to "still using" race condition
if (
axios.isAxiosError(error) &&
error.response?.data?.error_msg?.includes('is still using it')
) {
// Exponential backoff: 100ms, 200ms, 400ms
const delayMs = 100 * Math.pow(2, retryCount - 1);
return of(null).pipe(delay(delayMs));
}
// Don't retry other errors
return throwError(() => error);
},
}),
);
}

private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) {
// Delete service first, then upstream with retry
return from(
this.client.request({
url: servicePath,
method: 'DELETE',
}),
).pipe(
concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)),
);
}

private deleteUpstreamThenUpdateService(event: ADCSDK.Event, data: typing.Service, servicePath: string) {
// Update service first (remove upstream reference), then delete upstream with retry
return from(
this.client.request({
url: servicePath,
method: 'PUT',
data,
}),
).pipe(
concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)),
);
}

Expand Down Expand Up @@ -112,7 +213,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource {
() =>
new Error(
error.response?.data?.error_msg ??
JSON.stringify(error.response?.data),
JSON.stringify(error.response?.data),
),
);
return throwError(() => error);
Expand Down
4 changes: 2 additions & 2 deletions libs/backend-apisix/src/transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class ToADC {

hosts: service.hosts,

upstream: this.transformUpstream(service.upstream),
upstream: service.upstream ? this.transformUpstream(service.upstream) : undefined,
upstreams: service.upstreams,
plugins: service.plugins,
} as ADCSDK.Service);
Expand Down Expand Up @@ -282,7 +282,7 @@ export class FromADC {
name: service.name,
desc: service.description,
labels: FromADC.transformLabels(service.labels),
upstream: this.transformUpstream(service.upstream),
upstream: service.upstream ? this.transformUpstream(service.upstream) : undefined,
plugins: service.plugins,
hosts: service.hosts,
});
Expand Down
Loading
Loading