Skip to content

Commit

Permalink
m-ld/m-ld-security-spec#3: Adding trace function to MeldUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
gsvarovsky committed Jul 26, 2022
1 parent fc3e3a2 commit 11735e6
Show file tree
Hide file tree
Showing 12 changed files with 387 additions and 148 deletions.
61 changes: 61 additions & 0 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { QueryableRdfSource } from './rdfjs-support';
import { Consumable, Flowable } from 'rx-flowable';
import { MeldMessageType } from './ns/m-ld';
import { MeldApp } from './config';
import { EncodedOperation } from './engine/index';

/**
* A convenience type for a struct with a `@insert` and `@delete` property, like
Expand Down Expand Up @@ -280,6 +281,11 @@ export interface MeldUpdate extends MeldPreUpdate {
* @see MeldStatus.ticks
*/
readonly '@ticks': number;
/**
* Traces through to the underlying **m-ld** protocol information that gave
* rise to this update.
*/
trace(): UpdateTrace;
}

/**
Expand Down Expand Up @@ -674,6 +680,61 @@ export interface Attribution {
sig: Buffer;
}

/**
* Underlying Protocol information that gave rise to an update
*/
export interface AuditOperation {
/**
* The (possibly signed) operation encoded according to the **m-ld** protocol
* as a binary buffer
*/
data: Buffer;
/**
* The operation attribution; may contain a binary signature of the
* {@link data} Buffer
*/
attribution: Attribution | null;
/**
* The operation content in the protocol JSON tuple format
*/
operation: EncodedOperation;
}

/**
* Auditable trace of an {@link MeldUpdate app update}. The properties of this
* object represent a number of possible relationships among updates and
* operations, each of which is well-defined in the **m-ld** protocol. This
* means that a sufficiently sophisticated auditing system would be able to
* re-create, and therefore verify, the trace provided.
*/
export interface UpdateTrace {
/**
* The operation that directly triggered an app update, either a local write
* or an arriving remote operation. This operation always exists but is not
* necessarily recorded in the clone journal.
*/
readonly trigger: AuditOperation;
/**
* The triggering operation, adjusted to remove any parts of it that have
* already been applied. This is relevant if the trigger is remote and a
* fusion. This operation is always recorded in the clone journal.
*/
readonly applicable?: AuditOperation;
/**
* If the applicable operation violated a constraint, then the update will
* combine it with a resolution. This property gives access to the raw
* resolution, which is also recorded in the journal.
*/
readonly resolution?: AuditOperation;
/**
* If the applicable operation is an agreement, it may have caused some
* operations to have been voided. This property gives access to precisely
* which operations were removed, in reverse order (as if each was undone).
* These operations will have already been removed from the journal.
*/
readonly voids: AuditOperation[]
}

/**
* A transport security interceptor extension. Modifies data buffers sent to
* other clones via remotes, typically by applying cryptography.
Expand Down
6 changes: 5 additions & 1 deletion src/engine/MeldOperationMessage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TreeClock } from './clocks';
import { Future, MsgPack } from './util';
import { Attribution } from '../api';
import { Attribution, AuditOperation } from '../api';
import { MeldError } from './MeldError';
import { levels } from 'loglevel';
import { MeldEncoder } from './MeldEncoding';
Expand Down Expand Up @@ -68,6 +68,10 @@ export class MeldOperationMessage implements OperationMessage {
return 8 + this.enc.length + (pid?.length ?? 0) + (sig?.length ?? 0);
}

toAuditOperation(): AuditOperation {
return { attribution: this.attr, data: this.enc, operation: this.data }
}

static enc(msg: OperationMessage) {
if (msg instanceof MeldOperationMessage)
return msg.enc;
Expand Down
97 changes: 70 additions & 27 deletions src/engine/dataset/SuSetDataset.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
Attribution, GraphSubject, MeldConstraint, MeldExtensions, MeldPreUpdate, MeldUpdate,
noTransportSecurity, StateManaged
Attribution, AuditOperation, GraphSubject, MeldConstraint, MeldExtensions, MeldPreUpdate,
MeldUpdate, noTransportSecurity, StateManaged, UpdateTrace
} from '../../api';
import { BufferEncoding, EncodedOperation, OperationMessage, Snapshot } from '..';
import { GlobalClock, TickTree, TreeClock } from '../clocks';
Expand Down Expand Up @@ -240,8 +240,11 @@ export class SuSetDataset extends MeldEncoder {
const journal = await this.journal.state();
const msg = await this.operationMessage(journal, time, op);
const journaling = journal.builder().next(op, deletedTriplesTids, time, msg.attr);

return this.txnResult({ ...txn, tidPatch, journaling, msg });
const trace: MeldUpdate['trace'] = () => ({
// No applicable, resolution or voids for local txn
trigger: msg.toAuditOperation(), voids: []
});
return this.txnResult({ ...txn, tidPatch, journaling, msg, trace });
}
});
}
Expand Down Expand Up @@ -275,7 +278,8 @@ export class SuSetDataset extends MeldEncoder {
journaling: EntryBuilder,
msg: OperationMessage | null,
internalUpdate: MeldPreUpdate,
userUpdate: MeldPreUpdate
userUpdate: MeldPreUpdate,
trace: MeldUpdate['trace']
}): Promise<PatchResult<OperationMessage | null>> {
const commitTids = await this.tidsStore.commit(txn.tidPatch);
this.log.debug(`patch ${txn.journaling.appendEntries.map(e => e.operation.time)}:
Expand All @@ -293,7 +297,8 @@ export class SuSetDataset extends MeldEncoder {
await this.extensions.onUpdate(txn.internalUpdate, this.readState);
this.emitUpdate({
...txn.userUpdate,
'@ticks': txn.journaling.state.time.ticks
'@ticks': txn.journaling.state.time.ticks,
trace: txn.trace
});
}
};
Expand Down Expand Up @@ -394,7 +399,7 @@ export class SuSetDataset extends MeldEncoder {
@SuSetDataset.checkNotClosed.async
@SuSetDataset.checkStateLocked.async
async apply(
msg: OperationMessage,
msg: MeldOperationMessage,
clockHolder: ClockHolder<TreeClock>
): Promise<OperationMessage | null> {
return this.dataset.transact<OperationMessage | null>({
Expand All @@ -413,7 +418,7 @@ export class SuSetDataset extends MeldEncoder {
const attribution = applicableOp === receivedOp ?
msg.attr : await this.sign(applicableOp);
return new SuSetDataset.OperationApplication(
this, applicableOp, attribution, journal.builder(), txc, clockHolder).apply();
this, applicableOp, attribution, journal.builder(), txc, clockHolder, msg).apply();
}
}
});
Expand All @@ -422,11 +427,12 @@ export class SuSetDataset extends MeldEncoder {
private static OperationApplication = class {
constructor(
private ssd: SuSetDataset,
private op: MeldOperation,
private operation: MeldOperation,
private attribution: Attribution | null,
private journaling: EntryBuilder,
private txc: TxnContext,
private clockHolder: ClockHolder<TreeClock>
private clockHolder: ClockHolder<TreeClock>,
private received: MeldOperationMessage
) {}

/**
Expand All @@ -443,25 +449,25 @@ export class SuSetDataset extends MeldEncoder {

this.txc.sw.next('apply-cx'); // "cx" = constraint
const { assertions: cxnAssertions, ...txn } = await this.ssd.assertConstraints(
patch.quads, 'apply', this.op.principalId, this.op.agreed?.proof);
patch.quads, 'apply', this.operation.principalId, this.operation.agreed?.proof);

if (processAgreement && this.op.agreed != null) {
if (processAgreement && this.operation.agreed != null) {
// Check agreement conditions. This is done against the non-rewound
// state, because we may have to recover if the rewind goes back too
// far. This is allowed because an agreement condition should only
// inspect previously agreed state.
const ext = await this.ssd.extensions.ready();
for (let agreementCondition of ext.agreementConditions ?? [])
await agreementCondition.test(this.ssd.readState, txn.internalUpdate);
if (this.op.time.anyLt(this.journaling.state.time)) {
if (this.operation.time.anyLt(this.journaling.state.time)) {
// A rewind is required. This trumps the work we have already done.
this.txc.sw.next('rewind');
return this.rewindAndReapply();
}
}

const opTime = this.clockHolder.event(), cxnTime = this.clockHolder.event();
const insertTids = new TripleMap(this.op.inserts);
const insertTids = new TripleMap(this.operation.inserts);
const cxn = await this.constraintTxn(cxnAssertions, patch.quads, insertTids, cxnTime);
// After applying the constraint, some new quads might have been removed
patch.tids.append({
Expand All @@ -472,11 +478,11 @@ export class SuSetDataset extends MeldEncoder {
// Done determining the applied operation patch. At this point we could
// have an empty patch, but we still need to complete the journal entry.
this.txc.sw.next('journal');
this.journaling.next(this.op,
this.journaling.next(this.operation,
expandItemTids(deleteTids, new TripleMap), opTime, this.attribution);

// If the constraint has done anything, we need to merge its work
let cxnMsg: OperationMessage | null = null;
let cxnMsg: MeldOperationMessage | null = null;
if (cxn != null) {
// update['@ticks'] = cxnTime.ticks;
patch.tids.append(cxn.tidPatch);
Expand All @@ -491,7 +497,8 @@ export class SuSetDataset extends MeldEncoder {
assertions: patch.quads,
tidPatch: patch.tids,
journaling: this.journaling,
msg: cxnMsg
msg: cxnMsg,
trace: this.tracer({ applied: true, cxnMsg })
});
}

Expand All @@ -502,8 +509,8 @@ export class SuSetDataset extends MeldEncoder {
// re-connect to recover what's missing. This could include a rewound
// local txn. But first, commit the rewind.
const localTime = this.journaling.state.time;
const rewoundJoinTime = localTime.ticked(this.op.time);
if (rewoundJoinTime.anyLt(this.op.time)) {
const rewoundJoinTime = localTime.ticked(this.operation.time);
if (rewoundJoinTime.anyLt(this.operation.time)) {
this.clockHolder.push(localTime); // Not joining
return this.missingCausesResult(patch);
} else {
Expand All @@ -518,7 +525,7 @@ export class SuSetDataset extends MeldEncoder {
const tidPatch = new PatchTids(this.ssd.tidsStore);
for (
let entry = await this.ssd.journal.entryBefore();
entry != null && this.op.time.anyLt(entry.operation.time);
entry != null && this.operation.time.anyLt(entry.operation.time);
entry = await this.ssd.journal.entryBefore(entry.key)
) {
const entryOp = entry.operation.asMeldOperation();
Expand All @@ -544,17 +551,17 @@ export class SuSetDataset extends MeldEncoder {
return { tids: tidPatch, quads: quadPatch };
}

private async missingCausesResult(patch: SuSetDataPatch): Promise<PatchResult<null>> {
private async missingCausesResult(rewindPatch: SuSetDataPatch): Promise<PatchResult<null>> {
const { userUpdate } = await new InterimUpdatePatch(
this.ssd.userGraph,
this.ssd.userCtx,
patch.quads,
rewindPatch.quads,
M_LD.localEngine,
null,
{ mutable: false }).finalise();
const commitTids = await this.ssd.tidsStore.commit(patch.tids);
const commitTids = await this.ssd.tidsStore.commit(rewindPatch.tids);
return {
patch: patch.quads,
patch: rewindPatch.quads,
kvps: batch => {
commitTids(batch);
this.journaling.commit(batch);
Expand All @@ -563,14 +570,47 @@ export class SuSetDataset extends MeldEncoder {
after: () => {
this.ssd.emitUpdate({
...userUpdate,
'@ticks': this.journaling.state.time.ticks
'@ticks': this.journaling.state.time.ticks,
trace: this.tracer({ applied: false })
});
throw new MeldError('Update out of order',
'Journal rewind missing agreement causes');
}
};
}

private tracer({ cxnMsg, applied }: {
applied: boolean,
cxnMsg?: MeldOperationMessage | null
}): MeldUpdate['trace'] {
return () => {
const { received, operation, attribution, journaling } = this;
function audit(
operation: EncodedOperation,
attribution: Attribution | null
): AuditOperation {
return { operation, attribution, data: EncodedOperation.toBuffer(operation) };
}
return new class implements UpdateTrace {
get trigger() {
return received.toAuditOperation();
}
get applicable() {
if (applied)
return audit(operation.encoded, attribution);
}
get resolution() {
if (cxnMsg != null)
return cxnMsg.toAuditOperation();
}
get voids() {
return journaling.deleteEntries.map(voided =>
audit(voided.operation.encoded, voided.attribution));
}
}();
};
}

/**
* The operation's delete contains reifications of deleted triples. This
* method resolves the deletions into TID graph changes and deleted triples.
Expand All @@ -581,7 +621,10 @@ export class SuSetDataset extends MeldEncoder {
*/
private async processSuSetOpToPatch(patch: SuSetDataPatch) {
// First establish triples to be deleted according to the SU-Set
const deletions = await this.op.deletes.reduce(async (resultSoFar, [triple, theirTids]) => {
const deletions = await this.operation.deletes.reduce(async (
resultSoFar,
[triple, theirTids]
) => {
// For each unique deleted triple, subtract the claimed tids from the tids we have
const [ourTripleTids, deletions] = await Promise.all(
// Ensure that any prior patch updates are accounted for
Expand All @@ -603,7 +646,7 @@ export class SuSetDataset extends MeldEncoder {
patch.tids.append({ deletes: deletions.tids });
patch.quads.append({
deletes: deletions.triples.map(this.ssd.toUserQuad),
inserts: this.op.inserts.map(([triple]) => this.ssd.toUserQuad(triple))
inserts: this.operation.inserts.map(([triple]) => this.ssd.toUserQuad(triple))
});
return deletions.tids;
}
Expand Down
9 changes: 5 additions & 4 deletions src/engine/journal/JournalState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { GlobalClock, GlobalClockJson, TreeClock, TreeClockJson } from '../clock
import { Kvps } from '../dataset';
import { JournalEntry } from './JournalEntry';
import { EncodedOperation } from '../index';
import { EntryIndex, Journal, tickKey } from '.';
import { Journal, tickKey } from '.';
import { MeldOperation } from '../MeldOperation';
import { TripleMap } from '../quads';
import { UUID } from '../MeldEncoding';
Expand Down Expand Up @@ -42,6 +42,7 @@ export interface EntryBuilder {
attribution: Attribution | null
): this;
void(entry: JournalEntry): this;
deleteEntries: JournalEntry[];
appendEntries: JournalEntry[];
state: JournalState;
commit: Kvps;
Expand Down Expand Up @@ -87,7 +88,7 @@ export class JournalState {

builder(): EntryBuilder {
return new (class implements EntryBuilder {
deleteEntries: EntryIndex[] = [];
deleteEntries: JournalEntry[] = [];
appendEntries: JournalEntry[] = [];

constructor(
Expand Down Expand Up @@ -122,7 +123,7 @@ export class JournalState {
// reset to the previous external tick and tid for the process.
const [prevTick, prevTid] = entry.prev;
const prevTime = entry.operation.time.ticked(prevTick);
this.deleteEntries.push(entry.index);
this.deleteEntries.push(entry);
this.state = this.state.withTime(
this.state.time.ticked(prevTime),
this.state.gwc.set(prevTime, prevTid));
Expand All @@ -132,7 +133,7 @@ export class JournalState {
/** Commits the changed journal */
commit: Kvps = async batch => {
this.state.journal.spliceEntries(
this.deleteEntries,
this.deleteEntries.map(entry => entry.index),
this.appendEntries,
{ appending: true }
)(batch);
Expand Down
Loading

0 comments on commit 11735e6

Please sign in to comment.