Skip to content

Commit 6c201ee

Browse files
authored
feat: serialize and hydrate data before setting it into the ydoc (#3155)
1 parent 0e8765b commit 6c201ee

File tree

5 files changed

+74
-59
lines changed

5 files changed

+74
-59
lines changed

src/flows/components/FromTemplatePage.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ const Template: FC = () => {
5252

5353
TEMPLATES[params[0]].init
5454
.apply(this, params.slice(1))
55-
.then(data => hydrate(data))
55+
.then(data => {
56+
data.orgID = org.id
57+
return hydrate(data)
58+
})
5659
.then(data => {
5760
if (isFlagEnabled('ephemeralNotebook')) {
5861
populate(data)

src/flows/components/header/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ const FlowHeader: FC = () => {
140140

141141
const printJSON = () => {
142142
/* eslint-disable no-console */
143-
console.log(JSON.stringify(serialize(flow, orgID), null, 2))
143+
console.log(JSON.stringify(serialize(flow), null, 2))
144144
/* eslint-enable no-console */
145145
}
146146

src/flows/context/flow.current.tsx

Lines changed: 50 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ import React, {
77
useEffect,
88
} from 'react'
99
import {Flow, PipeData, PipeMeta} from 'src/types/flows'
10-
import {useParams} from 'react-router'
1110
import {FlowListContext, FlowListProvider} from 'src/flows/context/flow.list'
1211
import {v4 as UUID} from 'uuid'
1312
import {DEFAULT_PROJECT_NAME, PIPE_DEFINITIONS} from 'src/flows'
1413
import {isFlagEnabled} from 'src/shared/utils/featureFlag'
1514
import * as Y from 'yjs'
1615
import {WebsocketProvider} from 'y-websocket'
16+
import {serialize, hydrate} from 'src/flows/context/flow.list'
1717

1818
export interface FlowContextType {
1919
name: string
@@ -45,13 +45,10 @@ export const FlowProvider: FC = ({children}) => {
4545
const {flows, update, currentID} = useContext(FlowListContext)
4646
const [currentFlow, setCurrentFlow] = useState<Flow>()
4747

48-
const yDoc = useRef(new Y.Doc())
4948
const provider = useRef<WebsocketProvider>()
50-
51-
const {id: flowId} = useParams<{id: string}>()
52-
49+
const yDoc = useRef(new Y.Doc())
5350
function disconnectProvider() {
54-
if (provider.current?.wsconnected) {
51+
if (provider.current) {
5552
provider.current.disconnect()
5653
}
5754
}
@@ -68,38 +65,49 @@ export const FlowProvider: FC = ({children}) => {
6865
}
6966
}, [flows, currentID])
7067

68+
const syncFunc = useCallback(
69+
(isSynced: boolean) => {
70+
if (!isSynced || !yDoc.current) {
71+
return
72+
}
73+
const {flowUpdateData} = yDoc.current.getMap('flowUpdateData')?.toJSON()
74+
if (!flowUpdateData && currentFlow) {
75+
yDoc.current
76+
.getMap('flowUpdateData')
77+
.set('flowUpdateData', serialize(currentFlow))
78+
}
79+
},
80+
[currentFlow]
81+
)
82+
7183
useEffect(() => {
72-
if (isFlagEnabled('sharedFlowEditing')) {
84+
const doc = yDoc.current
85+
if (isFlagEnabled('sharedFlowEditing') && currentID) {
7386
provider.current = new WebsocketProvider(
87+
// 'ws://localhost:1223', // todo(ariel): replace this with an actual API that we setup
7488
'wss://demos.yjs.dev', // todo(ariel): replace this with an actual API that we setup
75-
flowId, // todo(ariel): we might need to confine this to an org, depends on how multi-org plays out
76-
yDoc.current
89+
currentID,
90+
doc
7791
)
78-
provider.current.on('sync', isSynced => {
79-
if (isSynced) {
80-
const localState = yDoc.current.getMap('localState')?.toJSON()
81-
if (!localState) {
82-
yDoc.current.getMap('localState').set('localState', currentFlow)
83-
}
84-
}
85-
})
86-
yDoc.current.on('update', () => {
87-
const {localState} = yDoc.current.getMap('localState').toJSON()
88-
setCurrentFlow(prev => {
89-
if (btoa(JSON.stringify(prev)) === btoa(JSON.stringify(localState))) {
90-
return prev
91-
}
92-
return localState
93-
})
94-
})
92+
93+
provider.current.on('sync', syncFunc)
9594
}
9695

96+
const onUpdate = () => {
97+
const {flowUpdateData} = doc.getMap('flowUpdateData').toJSON()
98+
const hydrated = hydrate(flowUpdateData?.data)
99+
100+
setCurrentFlow(hydrated)
101+
}
102+
103+
doc.on('update', onUpdate)
97104
return () => {
98105
if (isFlagEnabled('sharedFlowEditing')) {
99106
disconnectProvider()
100107
}
108+
doc.off('update', onUpdate)
101109
}
102-
}, [flowId])
110+
}, [currentID])
103111

104112
const updateData = useCallback(
105113
(id: string, data: Partial<PipeData>) => {
@@ -110,11 +118,9 @@ export const FlowProvider: FC = ({children}) => {
110118
...(flowCopy.data.byID[id] || {}),
111119
...data,
112120
}
113-
const update = {
114-
...flowCopy,
115-
...flowCopy.data.byID[id],
116-
}
117-
yDoc.current.getMap('localState').set('localState', update)
121+
yDoc.current
122+
.getMap('flowUpdateData')
123+
.set('flowUpdateData', serialize(flowCopy))
118124
}
119125
return
120126
}
@@ -154,11 +160,9 @@ export const FlowProvider: FC = ({children}) => {
154160
...meta,
155161
}
156162

157-
const update = {
158-
...flowCopy,
159-
...flowCopy.meta.byID[id],
160-
}
161-
yDoc.current.getMap('localState').set('localState', update)
163+
yDoc.current
164+
.getMap('flowUpdateData')
165+
.set('flowUpdateData', serialize(flowCopy))
162166
}
163167
return
164168
}
@@ -198,9 +202,9 @@ export const FlowProvider: FC = ({children}) => {
198202
for (const ni in flow) {
199203
flowCopy[ni] = flow[ni]
200204
}
201-
yDoc.current.getMap('localState').set('localState', {
202-
...flowCopy,
203-
})
205+
yDoc.current
206+
.getMap('flowUpdateData')
207+
.set('flowUpdateData', serialize(flowCopy))
204208
}
205209
return
206210
}
@@ -248,9 +252,9 @@ export const FlowProvider: FC = ({children}) => {
248252
flowCopy.data.allIDs.push(id)
249253
flowCopy.meta.allIDs.push(id)
250254
}
251-
yDoc.current.getMap('localState').set('localState', {
252-
...flowCopy,
253-
})
255+
yDoc.current
256+
.getMap('flowUpdateData')
257+
.set('flowUpdateData', serialize(flowCopy))
254258
return
255259
}
256260
if (isFlagEnabled('ephemeralNotebook') && !currentFlow.id) {
@@ -299,9 +303,9 @@ export const FlowProvider: FC = ({children}) => {
299303

300304
delete flowCopy.data.byID[id]
301305
delete flowCopy.meta.byID[id]
302-
yDoc.current.getMap('localState').set('localState', {
303-
...flowCopy,
304-
})
306+
yDoc.current
307+
.getMap('flowUpdateData')
308+
.set('flowUpdateData', serialize(flowCopy))
305309
return
306310
}
307311
if (isFlagEnabled('ephemeralNotebook') && !currentFlow.id) {

src/flows/context/flow.list.tsx

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,18 @@ export const FlowListContext = React.createContext<FlowListContextType>(
7474
DEFAULT_CONTEXT
7575
)
7676

77-
export function serialize(flow: Flow, orgID: string) {
78-
const apiFlow = {
77+
export function serialize(flow: Flow) {
78+
const apiFlow: any = {
7979
data: {
80-
orgID,
80+
orgID: flow.orgID,
8181
name: flow.name,
8282
spec: {
8383
name: flow.name,
8484
readOnly: flow.readOnly,
8585
range: flow.range,
8686
refresh: flow.refresh,
8787
createdAt: flow.createdAt,
88+
createdBy: flow.createdBy,
8889
updatedAt: flow.updatedAt,
8990
pipes: flow.data.allIDs.map(id => {
9091
const meta = flow.meta.byID[id]
@@ -110,6 +111,10 @@ export function serialize(flow: Flow, orgID: string) {
110111
},
111112
}
112113

114+
if (flow.id) {
115+
apiFlow.data.id = flow.id
116+
}
117+
113118
return apiFlow
114119
}
115120

@@ -127,6 +132,9 @@ export function hydrate(data) {
127132
if (data.id) {
128133
flow.id = data.id
129134
}
135+
if (data.orgID) {
136+
flow.orgID = data.orgID
137+
}
130138

131139
if (!data?.spec?.pipes) {
132140
return flow
@@ -193,10 +201,12 @@ export const FlowListProvider: FC = ({children}) => {
193201
_flow = hydrate({
194202
...TEMPLATES['default'].init(),
195203
name: DEFAULT_PROJECT_NAME,
204+
orgID: org.id,
196205
})
197206
} else {
198207
_flow = {
199208
name: flow.name,
209+
orgID: flow.orgID,
200210
range: flow.range,
201211
refresh: flow.refresh,
202212
data: flow.data,
@@ -207,7 +217,7 @@ export const FlowListProvider: FC = ({children}) => {
207217
}
208218
}
209219

210-
const apiFlow = serialize(_flow, org.id)
220+
const apiFlow = serialize(_flow)
211221

212222
let id: string = `local_${UUID()}`
213223
try {
@@ -247,13 +257,10 @@ export const FlowListProvider: FC = ({children}) => {
247257
},
248258
}))
249259

250-
const apiFlow = serialize(
251-
{
252-
...flows[id],
253-
...flow,
254-
},
255-
org.id
256-
)
260+
const apiFlow = serialize({
261+
...flows[id],
262+
...flow,
263+
})
257264

258265
pooledUpdateAPI({id, ...apiFlow})
259266
},

src/types/flows.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ export interface FlowState {
103103

104104
export interface Flow {
105105
id?: string
106+
orgID?: string
106107
name: string
107108
range: TimeRange
108109
refresh: AutoRefresh

0 commit comments

Comments
 (0)