@@ -17,15 +17,105 @@ type Data = {
1717 pathname : string ;
1818} ;
1919
20+ type SizedData = Data & { size : number } ;
21+
2022type ChunkedData = {
21- chunks : Data [ ] | null ;
23+ chunks : SizedData [ ] | null ;
2224 size : number ;
2325} ;
2426
27+ /**
28+ * Batches chunked JSON data up to the specified byte size limit.
29+ */
30+ class BatchChunks extends stream . Transform {
31+ private batch : SizedData [ ] = [ ] ;
32+ private size = 0 ;
33+
34+ constructor ( private maxSize : number , opts ?: stream . TransformOptions ) {
35+ super ( { ...opts , objectMode : true } ) ;
36+ }
37+
38+ _transform ( chunk : SizedData , _ : BufferEncoding , callback : stream . TransformCallback ) : void {
39+ const totalChunkSize = chunk . size + chunk . pathname . length ; // Overestimate
40+ if ( this . size + totalChunkSize > this . maxSize ) {
41+ this . push ( this . transformBatchToPatchData ( this . batch ) ) ;
42+ this . batch = [ ] ;
43+ this . size = 0 ;
44+ }
45+ this . batch . push ( chunk ) ;
46+ this . size += totalChunkSize ;
47+ callback ( null ) ;
48+ }
49+
50+ private transformBatchToPatchData ( batch : SizedData [ ] ) : SizedData {
51+ return this . sanitizePatchData ( this . compactData ( batch ) ) ;
52+ }
53+
54+ private compactData ( batch : SizedData [ ] ) : SizedData {
55+ if ( batch . length === 1 ) {
56+ return batch [ 0 ] ;
57+ }
58+ const pathname = this . findLongestCommonPrefixArray ( batch . map ( ( d ) => d . pathname ) ) ;
59+ let json = { } ;
60+ let size = 0 ;
61+ for ( const chunk of batch ) {
62+ const truncatedPath = chunk . pathname . substring ( pathname . length + 1 ) ; // +1 to trim leading slash
63+ json = Object . assign ( { } , json , { [ truncatedPath ] : chunk . json } ) ;
64+ size += chunk . size ;
65+ }
66+ return { json, pathname, size } ;
67+ }
68+
69+ // Since we cannot PATCH primitives and arrays, we explicitly convert them to objects.
70+ private sanitizePatchData ( { json, pathname, size } : SizedData ) : SizedData {
71+ if ( typeof json === "string" || typeof json === "number" || typeof json === "boolean" ) {
72+ const tokens = pathname . split ( "/" ) ;
73+ const lastToken = tokens . pop ( ) ;
74+ return { json : { [ lastToken ! ] : json } , pathname : tokens . join ( "/" ) , size } ;
75+ }
76+ if ( Array . isArray ( json ) ) {
77+ return { json : { ...json } , pathname, size } ;
78+ }
79+ return { json, pathname, size } ;
80+ }
81+
82+ private findLongestCommonPrefixArray ( paths : string [ ] ) : string {
83+ const findLongestCommonPrefixPair = ( p : string , q : string ) : string => {
84+ const pTokens = p . split ( "/" ) ;
85+ const qTokens = q . split ( "/" ) ;
86+ let prefix = pTokens . slice ( 0 , qTokens . length ) ;
87+ for ( let i = 0 ; i < prefix . length ; i ++ ) {
88+ if ( prefix [ i ] !== qTokens [ i ] ) {
89+ prefix = prefix . slice ( 0 , i ) ;
90+ break ;
91+ }
92+ }
93+ return prefix . join ( "/" ) ;
94+ } ;
95+
96+ if ( paths . length === 0 ) {
97+ return "" ;
98+ }
99+ let prefix = paths [ 0 ] ;
100+ for ( let i = 1 ; i < paths . length ; i ++ ) {
101+ prefix = findLongestCommonPrefixPair ( prefix , paths [ i ] ) ;
102+ }
103+ return prefix ;
104+ }
105+
106+ _flush ( callback : stream . TransformCallback ) : void {
107+ if ( this . size > 0 ) {
108+ this . push ( this . transformBatchToPatchData ( this . batch ) ) ;
109+ }
110+ callback ( null ) ;
111+ }
112+ }
113+
25114/**
26115 * Imports JSON data to a given RTDB instance.
27116 *
28- * The data is parsed and chunked into subtrees of ~10 MB, to be subsequently written in parallel.
117+ * The data is parsed and chunked into subtrees of the specified payload size, to be subsequently
118+ * written in parallel.
29119 */
30120export default class DatabaseImporter {
31121 private client : Client ;
@@ -36,7 +126,7 @@ export default class DatabaseImporter {
36126 private dbUrl : URL ,
37127 private inStream : stream . Readable ,
38128 private dataPath : string ,
39- private chunkBytes : number ,
129+ private payloadSize : number ,
40130 concurrency : number
41131 ) {
42132 this . client = new Client ( { urlPrefix : dbUrl . origin , auth : true } ) ;
@@ -68,26 +158,35 @@ export default class DatabaseImporter {
68158 }
69159 }
70160
161+ /**
162+ * The top-level objects are parsed and chunked, with each chunk capped at payloadSize. Then,
163+ * chunks are batched, with each batch also capped at payloadSize. Finally, the batched chunks
164+ * are written in parallel.
165+ *
166+ * In the case where the data contains very large objects, chunking ensures that the request is
167+ * not too large. On the other hand, in the case where the data contains many small objects,
168+ * batching ensures that there are not too many requests.
169+ */
71170 private readAndWriteChunks ( ) : Promise < ClientResponse < JsonType > [ ] > {
72- const { dbUrl } = this ;
171+ const { dbUrl, payloadSize } = this ;
73172 const chunkData = this . chunkData . bind ( this ) ;
74- const writeChunk = this . writeChunk . bind ( this ) ;
173+ const doWriteBatch = this . doWriteBatch . bind ( this ) ;
75174 const getJoinedPath = this . joinPath . bind ( this ) ;
76175
77176 const readChunks = new stream . Transform ( { objectMode : true } ) ;
78177 readChunks . _transform = function ( chunk : { key : string ; value : JsonType } , _ , done ) {
79178 const data = { json : chunk . value , pathname : getJoinedPath ( dbUrl . pathname , chunk . key ) } ;
80179 const chunkedData = chunkData ( data ) ;
81- const chunks = chunkedData . chunks || [ data ] ;
180+ const chunks = chunkedData . chunks || [ { ... data , size : JSON . stringify ( data . json ) . length } ] ;
82181 for ( const chunk of chunks ) {
83182 this . push ( chunk ) ;
84183 }
85184 done ( ) ;
86185 } ;
87186
88- const writeChunks = new stream . Transform ( { objectMode : true } ) ;
89- writeChunks . _transform = async function ( chunk : Data , _ , done ) {
90- const res = await writeChunk ( chunk ) ;
187+ const writeBatch = new stream . Transform ( { objectMode : true } ) ;
188+ writeBatch . _transform = async function ( batch : SizedData , _ , done ) {
189+ const res = await doWriteBatch ( batch ) ;
91190 this . push ( res ) ;
92191 done ( ) ;
93192 } ;
@@ -115,19 +214,20 @@ export default class DatabaseImporter {
115214 )
116215 )
117216 . pipe ( readChunks )
118- . pipe ( writeChunks )
217+ . pipe ( new BatchChunks ( payloadSize ) )
218+ . pipe ( writeBatch )
119219 . on ( "data" , ( res : ClientResponse < JsonType > ) => responses . push ( res ) )
120220 . on ( "error" , reject )
121221 . once ( "end" , ( ) => resolve ( responses ) ) ;
122222 } ) ;
123223 }
124224
125- private writeChunk ( chunk : Data ) : Promise < ClientResponse < JsonType > > {
225+ private doWriteBatch ( batch : SizedData ) : Promise < ClientResponse < JsonType > > {
126226 const doRequest = ( ) : Promise < ClientResponse < JsonType > > => {
127227 return this . client . request ( {
128- method : "PUT " ,
129- path : chunk . pathname + " .json" ,
130- body : JSON . stringify ( chunk . json ) ,
228+ method : "PATCH " ,
229+ path : ` ${ batch . pathname } .json` ,
230+ body : batch . json ,
131231 queryParams : this . dbUrl . searchParams ,
132232 } ) ;
133233 } ;
@@ -157,7 +257,7 @@ export default class DatabaseImporter {
157257 // Children node
158258 let size = 2 ; // {}
159259
160- const chunks = [ ] ;
260+ const chunks : SizedData [ ] = [ ] ;
161261 let hasChunkedChild = false ;
162262
163263 for ( const [ key , val ] of Object . entries ( json ) ) {
@@ -170,11 +270,11 @@ export default class DatabaseImporter {
170270 hasChunkedChild = true ;
171271 chunks . push ( ...childChunks . chunks ) ;
172272 } else {
173- chunks . push ( child ) ;
273+ chunks . push ( { ... child , size : childChunks . size } ) ;
174274 }
175275 }
176276
177- if ( hasChunkedChild || size >= this . chunkBytes ) {
277+ if ( hasChunkedChild || size >= this . payloadSize ) {
178278 return { chunks, size } ;
179279 } else {
180280 return { chunks : null , size } ;
0 commit comments