Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test mass joining / syncing in E2E #1764

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ jobs:
- 6379:6379
ircd:
image: ghcr.io/ergochat/ergo:stable
env:
ERGO__SERVER__IP_LIMITS_COUNT: "false"
ERGO__SERVER__IP_LIMITS_THROTTLE: "false"
ERGO__SERVER__IP_LIMITS_MAX_CONNECTIONS_PER_WINDOW: "32000"
ports:
- 6667:6667
steps:
Expand Down
112 changes: 112 additions & 0 deletions spec/e2e/scaling.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { ChanData, TestIrcServer } from "matrix-org-irc";
import { IrcBridgeE2ETest } from "../util/e2e-test";
import { describe, it, expect } from "@jest/globals";
import { delay } from "../../src/promiseutil";

function createUserSet(count: number) {
const localparts: string[] = [];
for (let index = 0; index < count; index++) {
localparts.push(TestIrcServer.generateUniqueNick(`alice-c${index}`));
}
return localparts;
}

describe('Bridge scaling test', () => {
let testEnv: IrcBridgeE2ETest;
beforeEach(async () => {
testEnv = await IrcBridgeE2ETest.createTestEnv({
matrixLocalparts: [TestIrcServer.generateUniqueNick("alice")],
matrixSynclessLocalparts: createUserSet(80),
ircNicks: ['bob'],
traceToFile: true,
});
await testEnv.setUp();
});
afterEach(() => {
return testEnv?.tearDown();
});
it('should be able to connect many users to a single channel', async () => {
const channel = `#${TestIrcServer.generateUniqueNick("test")}`;
const { homeserver } = testEnv;
const alice = homeserver.users[0].client;
const { bob } = testEnv.ircTest.clients;

// Create the channel
await bob.join(channel);

const adminRoomId = await testEnv.createAdminRoomHelper(alice);
const cRoomId = await testEnv.joinChannelHelper(alice, adminRoomId, channel);

// And finally wait for bob to appear.
const bobUserId = `@irc_${bob.nick}:${homeserver.domain}`;
await alice.waitForRoomEvent(
{eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId}
);

// Have all the Matrix users join
const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart))
for (const mxUser of usersToJoin) {
await mxUser.client.joinRoom(cRoomId);
}

// We now need to wait for all the expected joins on the IRC side.
const chanData = bob.chanData(channel, false);
if (!chanData) {
throw Error('Expected to have channel data for channel');
}

do {
await delay(500);
} while (chanData?.users.size < homeserver.users.length)

// Now check that all the users joined.
for (const mxUser of usersToJoin) {
expect(chanData.users.keys()).toContain(`M-${mxUser.localpart}`)
}
}, 100_000);

it('should be able to sync many users on startup', async () => {
const channel = `#${TestIrcServer.generateUniqueNick("test")}`;
const { homeserver } = testEnv;
const alice = homeserver.users[0].client;
const { bob } = testEnv.ircTest.clients;

// Create the channel
await bob.join(channel);

const adminRoomId = await testEnv.createAdminRoomHelper(alice);
const cRoomId = await testEnv.joinChannelHelper(alice, adminRoomId, channel);

// And finally wait for bob to appear.
const bobUserId = `@irc_${bob.nick}:${homeserver.domain}`;
await alice.waitForRoomEvent(
{eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId}
);

// Have all the Matrix users join
const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart))
for (const mxUser of usersToJoin) {
await mxUser.client.joinRoom(cRoomId);
}

// Now kill the bridge
await testEnv.recreateBridge();
await testEnv.setUp();


// We now need to wait for all the expected joins on the IRC side.
const chanData = bob.chanData(channel, false);
if (!chanData) {
throw Error('Expected to have channel data for channel');
}

do {
await delay(500);
} while (chanData?.users.size < homeserver.users.length)

// Now check that all the users joined.
for (const mxUser of usersToJoin) {
expect(chanData.users.keys()).toContain(`M-${mxUser.localpart}`)
}
}, 100_000);
});
12 changes: 9 additions & 3 deletions spec/util/e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const IRCBRIDGE_TEST_REDIS_URL = process.env.IRCBRIDGE_TEST_REDIS_URL;

interface Opts {
matrixLocalparts?: string[];
matrixSynclessLocalparts?: string[];
ircNicks?: string[];
timeout?: number;
config?: Partial<BridgeConfig>,
Expand Down Expand Up @@ -181,11 +182,11 @@ export class IrcBridgeE2ETest {
}

const workerID = parseInt(process.env.JEST_WORKER_ID ?? '0');
const { matrixLocalparts, config } = opts;
const { matrixLocalparts, matrixSynclessLocalparts, config } = opts;
const ircTest = new TestIrcServer();
const [postgresDb, homeserver] = await Promise.all([
this.createDatabase(),
createHS(["ircbridge_bot", ...matrixLocalparts || []], workerID),
createHS(["ircbridge_bot", ...matrixLocalparts || []], workerID, matrixSynclessLocalparts),
ircTest.setUp(opts.ircNicks),
]);
const redisUri = IRCBRIDGE_TEST_REDIS_URL && `${IRCBRIDGE_TEST_REDIS_URL}/${workerID}`;
Expand Down Expand Up @@ -246,6 +247,11 @@ export class IrcBridgeE2ETest {
displayName: "$NICK",
joinAttempts: 3,
},
ircClients: {
...IrcServer.DEFAULT_CONFIG.ircClients,
// Set a sensibly high max.
maxClients: 100_000,
},
dynamicChannels: {
enabled: true,
createAlias: true,
Expand All @@ -256,7 +262,7 @@ export class IrcBridgeE2ETest {
},
membershipLists: {
enabled: true,
floodDelayMs: 100,
floodDelayMs: 0,
global: {
ircToMatrix: {
incremental: true,
Expand Down
14 changes: 10 additions & 4 deletions spec/util/homerunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export interface ComplementHomeServer {
hsToken: string;
senderLocalpart: string;
};
users: {userId: string, accessToken: string, deviceId: string, client: E2ETestMatrixClient}[]
users: {userId: string, localpart: string, accessToken: string, deviceId: string, client: E2ETestMatrixClient}[]
}

// Ensure we don't clash with other tests.
Expand Down Expand Up @@ -50,7 +50,9 @@ async function waitForHomerunner() {
}
}

export async function createHS(localparts: string[] = [], workerId: number): Promise<ComplementHomeServer> {
export async function createHS(
localparts: string[], workerId: number, localpartsNoSync: string[] = []
): Promise<ComplementHomeServer> {
const appPort = 49152 + workerId;
await waitForHomerunner();
// Ensure we never use the same port twice.
Expand Down Expand Up @@ -80,20 +82,24 @@ export async function createHS(localparts: string[] = [], workerId: number): Pro
...asRegistration,
URL: `http://host.docker.internal:${AppserviceConfig.port}`,
}],
Users: localparts.map(localpart => ({Localpart: localpart, DisplayName: localpart})),
Users: [
...localparts,
...localpartsNoSync
].map(localpart => ({Localpart: localpart, DisplayName: localpart})),
}],
}
});
const [homeserverName, homeserver] = Object.entries(blueprintResponse.homeservers)[0];
const users = Object.entries(homeserver.AccessTokens).map(([userId, accessToken]) => ({
userId: userId,
localpart: userId.slice(1).split(':', 2)[0],
accessToken,
deviceId: homeserver.DeviceIDs[userId],
client: new E2ETestMatrixClient(homeserver.BaseURL, accessToken),
}));

// Start syncing proactively.
await Promise.all(users.map(u => u.client.start()));
await Promise.all(users.filter(u => localparts.includes(u.localpart)).map(u => u.client.start()));
return {
users,
id: blueprint,
Expand Down
4 changes: 3 additions & 1 deletion src/util/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ export class Queue<T> {
item: thing,
defer: defer
});
if (!this.intervalMs) {

// If we are not currently processing anything, consume immediately.
if (!this.intervalMs || this.processing === null) {
// always process stuff asyncly, never syncly.
process.nextTick(() => {
this.consume();
Expand Down
Loading