Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send history in MUCs #227

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions changelog.d/227.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Send room history to XMPP clients when they join.
120 changes: 120 additions & 0 deletions src/xmppjs/HistoryManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { Element } from "@xmpp/xml";
import { JID } from "@xmpp/jid";

export interface IHistoryLimits {
maxchars?: number,
maxstanzas?: number,
seconds?: number,
since?: Date,
}

/**
* Abstraction of a history storage backend.
*/
interface IHistoryStorage {
// add a message to the history of a given room
addMessage: (chatName: string, message: Element, jid: JID) => unknown;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why allow unknown to be returned and not just void?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it could be async and return Promise<void>

// get the history of a room. The storage should apply the limits that it
// wishes too, and remove the limits that it applied from the `limits`
// parameter. The returned list of Elements must include the Delayed
// Delivery information.
getHistory: (chatName: string, limits: IHistoryLimits) => Promise<Element[]>;
}

/**
* Store room history in memory.
*/
export class MemoryStorage implements IHistoryStorage {
private history: Map<string, Element[]>;
constructor(public maxHistory: number) {
this.history = new Map();
}

addMessage(chatName: string, message: Element, jid: JID): void {
if (!this.history.has(chatName)) {
this.history.set(chatName, []);
}
const currRoomHistory = this.history.get(chatName);

// shallow-copy the message, and add the timestamp
const copiedMessage = new Element(message.name, message.attrs);
copiedMessage.append(message.children as Element[]);
copiedMessage.attr("from", jid.toString());
copiedMessage.append(new Element("delay", {
xmlns: "urn:xmpp:delay",
from: chatName,
stamp: (new Date()).toISOString(),
}));

currRoomHistory.push(copiedMessage);

while (currRoomHistory.length > this.maxHistory) {
currRoomHistory.shift();
}
}

async getHistory(chatName: string, limits: IHistoryLimits): Promise<Element[]> {
return this.history.get(chatName) || [];
}
}

// TODO: make a class that stores in PostgreSQL so that we don't lose history
// when we restart
Comment on lines +61 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to file that as an issue or address this before merging this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just file as an issue.


/**
* Manage room history for a MUC
*/
export class HistoryManager {
constructor(
private storage: IHistoryStorage,
) {}

addMessage(chatName: string, message: Element, jid: JID): unknown {
return this.storage.addMessage(chatName, message, jid);
}

async getHistory(chatName: string, limits: IHistoryLimits): Promise<Element[]> {
if (limits.seconds) {
const since = new Date(Date.now() - limits.seconds * 1000);
if (limits.since === undefined || limits.since < since) {
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
limits.since = since;
}
delete limits.seconds;
}
let history: Element[] = await this.storage.getHistory(chatName, limits);

// index of the first history element that we will keep after applying
// the limits
let idx = 0;

if ("maxstanzas" in limits && history.length > limits.maxstanzas) {
idx = history.length - limits.maxstanzas;
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
}

if ("since" in limits) {
// FIXME: binary search would be better than linear search
for (; idx < history.length; idx++) {
try {
const ts = history[idx].getChild("delay", "urn:xmpp:delay")?.attr("stamp");
if (new Date(Date.parse(ts)) >= limits.since) {
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
break;
}
} catch (e) {}
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
}
}

if ("maxchars" in limits) {
let numChars = 0;
let i = history.length;
for (; i > idx && numChars < limits.maxchars; i--) {
jaller94 marked this conversation as resolved.
Show resolved Hide resolved
numChars += history[i - 1].toString().length;
}
}

if (idx > 0) {
history = history.slice(idx);
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
}

return history;
}
}
69 changes: 63 additions & 6 deletions src/xmppjs/XJSGateway.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import { XmppJsInstance, XMPP_PROTOCOL } from "./XJSInstance";
import { Element, x } from "@xmpp/xml";
import parse from "@xmpp/xml/lib/parse";
import { jid, JID } from "@xmpp/jid";
import { Logging } from "matrix-appservice-bridge";
import { IConfigBridge } from "../Config";
import { IBasicProtocolMessage } from "..//MessageFormatter";
import { IGatewayJoin, IUserStateChanged, IStoreRemoteUser, IUserInfo } from "../bifrost/Events";
import {
IGatewayJoin,
IUserStateChanged,
IStoreRemoteUser,
IUserInfo,
IReceivedImMsg
} from "../bifrost/Events";
import { IGatewayRoom } from "../bifrost/Gateway";
import { PresenceCache } from "./PresenceCache";
import { XHTMLIM } from "./XHTMLIM";
Expand All @@ -17,6 +24,7 @@ import { XMPPStatusCode } from "./XMPPConstants";
import { AutoRegistration } from "../AutoRegistration";
import { GatewayStateResolve } from "./GatewayStateResolve";
import { MatrixMembershipEvent } from "../MatrixTypes";
import { IHistoryLimits, HistoryManager, MemoryStorage } from "./HistoryManager";

const log = Logging.get("XmppJsGateway");

Expand All @@ -31,18 +39,24 @@ export interface RemoteGhostExtraData {
* and XMPP.
*/
export class XmppJsGateway implements IGateway {
// For storing room history, should be clipped at MAX_HISTORY per room.
private roomHistory: Map<string, [Element]>;
// For storing room history
private roomHistory: HistoryManager;
// For storing requests to be responded to, like joins
private stanzaCache: Map<string, Element>; // id -> stanza
private presenceCache: PresenceCache;
// Storing every XMPP user and their anonymous.
private members: GatewayMUCMembership;
constructor(private xmpp: XmppJsInstance, private registration: AutoRegistration, private config: IConfigBridge) {
this.roomHistory = new Map();
this.roomHistory = new HistoryManager(new MemoryStorage(50));
jaller94 marked this conversation as resolved.
Show resolved Hide resolved
this.stanzaCache = new Map();
this.members = new GatewayMUCMembership();
this.presenceCache = new PresenceCache(true);
xmpp.on("received-chat-msg-xmpp", (convName: string, stanza: Element) => {
this.roomHistory.addMessage(
convName, stanza,
this.members.getXmppMemberByDevice(convName, stanza.attrs.from).anonymousJid,
);
});
}

public handleStanza(stanza: Element, gatewayAlias: string) {
Expand Down Expand Up @@ -163,6 +177,16 @@ export class XmppJsGateway implements IGateway {
"groupchat",
)
);

// add the message to the room history
const historyStanza = new StzaMessage(
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
from.anonymousJid.toString(),
"",
msg,
"groupchat",
);
this.roomHistory.addMessage(chatName, parse(historyStanza.xml), from.anonymousJid);

return this.xmpp.xmppSendBulk(msgs);
}

Expand Down Expand Up @@ -419,10 +443,43 @@ export class XmppJsGateway implements IGateway {

// 4. Room history
log.debug("Emitting history");
const history: Element[] = this.roomHistory.get(room.roomId) || [];
const historyLimits: IHistoryLimits = {};
const historyRequest = stanza.getChild("x", "http://jabber.org/protocol/muc")?.getChild("history");
if (historyRequest !== undefined) {
const getIntValue = (str) => {
if (!str.match(/^\d+$/)) {
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
throw new Error("Not a number");
}
return parseInt(str);
};
const getDateValue = (str) => {
const val = Date.parse(str);
if (isNaN(val)) {
throw new Error("Not a date");
}
return new Date(val); // Date.parse returns a number, which we need to turn into a Date object
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
};
const getHistoryParam = (name: string, parser: (str: string) => any): void => {
const param = historyRequest.getAttr(name);
if (param !== undefined) {
try {
historyLimits[name] = parser(param);
} catch (e) {
log.debug(`Invalid ${name} in history management: "${param}" (${e})`);
}
}
};
getHistoryParam("maxchars", getIntValue);
getHistoryParam("maxstanzas", getIntValue);
getHistoryParam("seconds", getIntValue);
getHistoryParam("since", getDateValue);
} else {
// default to 20 stanzas if the client doesn't specify
historyLimits.maxstanzas = 20;
}
const history: Element[] = await this.roomHistory.getHistory(chatName, historyLimits);
history.forEach((e) => {
e.attrs.to = stanza.attrs.from;
// TODO: Add delay info to this.
this.xmpp.xmppWriteToStream(e);
});

Expand Down
1 change: 1 addition & 0 deletions src/xmppjs/XJSInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ export class XmppJsInstance extends EventEmitter implements IBifrostInstance {

if (type === "groupchat") {
log.debug("Emitting group message", message);
this.emit("received-chat-msg-xmpp", convName, stanza);
this.emit("received-chat-msg", {
eventName: "received-chat-msg",
sender: from.toString(),
Expand Down