forked from paraswap/paraswap-dex-lib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
composed-event-subscriber.ts
131 lines (121 loc) · 3.67 KB
/
composed-event-subscriber.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import { StatefulEventSubscriber } from './stateful-event-subscriber';
import { AsyncOrSync, DeepReadonly } from 'ts-essentials';
import {
Address,
BlockHeader,
Log,
Logger,
MultiCallInput,
MultiCallOutput,
} from './types';
import { Lens } from './lens';
import { IDexHelper } from './dex-helper/idex-helper';
export abstract class PartialEventSubscriber<State, SubState> {
constructor(
public addressesSubscribed: Address[],
public lens: Lens<DeepReadonly<State>, DeepReadonly<SubState>>,
protected logger: Logger,
) {}
public abstract processLog(
state: DeepReadonly<SubState>,
log: Readonly<Log>,
blockHeader: Readonly<BlockHeader>,
): AsyncOrSync<DeepReadonly<SubState> | null>;
public abstract getGenerateStateMultiCallInputs(): MultiCallInput[];
public abstract generateState(
multicallOutputs: MultiCallOutput[],
blockNumber?: number | 'latest',
): AsyncOrSync<DeepReadonly<SubState>>;
}
export abstract class ComposedEventSubscriber<
State,
> extends StatefulEventSubscriber<State> {
public addressesSubscribed: Address[];
private addressSubscribers: {
[address: string]: PartialEventSubscriber<State, any>[];
} = {};
private multiCallInputs: MultiCallInput[];
private multiCallSlices: [number, number][] = [];
constructor(
parentName: string,
name: string,
logger: Logger,
protected dexHelper: IDexHelper,
private parts: PartialEventSubscriber<State, any>[],
private blankState: DeepReadonly<State>,
) {
super(parentName, name, dexHelper, logger);
this.addressesSubscribed = [];
for (const p of this.parts) {
for (const a of p.addressesSubscribed) {
const k = a.toLowerCase();
if (!this.addressSubscribers[k]) {
this.addressSubscribers[k] = [];
this.addressesSubscribed.push(a);
}
this.addressSubscribers[k].push(p);
}
}
const multiCallInputArrays = this.parts.map(p =>
p.getGenerateStateMultiCallInputs(),
);
let i = 0;
for (const arr of multiCallInputArrays) {
this.multiCallSlices.push([i, i + arr.length]);
i += arr.length;
}
this.multiCallInputs = multiCallInputArrays.flat();
}
protected async processLog(
state: DeepReadonly<State>,
log: Readonly<Log>,
blockHeader: Readonly<BlockHeader>,
): Promise<DeepReadonly<State> | null> {
const ps = this.addressSubscribers[log.address.toLowerCase()];
if (!ps) {
this.logger.error(
`ComposedEventSubscriber ${this.name} got log with unexpected address ${log.address}`,
);
return null;
}
let newState: DeepReadonly<State> | null = null;
for (const p of ps) {
const result: any = await p.processLog(
p.lens.get()(newState || state),
log,
blockHeader,
);
if (result) newState = p.lens.set(result)(newState || state);
}
return newState;
}
public async generateState(
blockNumber?: number | 'latest',
): Promise<DeepReadonly<State>> {
let returnData: MultiCallOutput[] = [];
if (this.multiCallInputs.length) {
returnData = (
await this.dexHelper.multiContract.methods
.aggregate(this.multiCallInputs)
.call({}, blockNumber)
).returnData;
}
try {
const stateParts = await Promise.all(
this.parts.map((p, i) =>
p.generateState(
returnData.slice(...this.multiCallSlices[i]),
blockNumber,
),
),
);
return this.parts.reduce(
(acc, p, i) => p.lens.set(stateParts[i])(acc),
this.blankState,
);
} catch (e) {
this.logger.error(`Error generating state: ${e}`);
throw e;
}
}
}