Skip to content

Commit f7b030e

Browse files
tyao1facebook-github-bot
authored andcommitted
Split a project into multiple chunks and parallelize
Reviewed By: evanyeung Differential Revision: D56339821 fbshipit-source-id: 3ee092e5d3b513b35802efd27538a25e0f6778c0
1 parent 3a24702 commit f7b030e

File tree

4 files changed

+203
-16
lines changed

4 files changed

+203
-16
lines changed

compiler/crates/dependency-analyzer/src/ir.rs

+52
Original file line numberDiff line numberDiff line change
@@ -305,3 +305,55 @@ fn add_descendants(
305305
}
306306
}
307307
}
308+
309+
/// Get fragment references of each definition
310+
pub fn get_ir_definition_references<'a>(
311+
schema: &SDLSchema,
312+
definitions: impl IntoIterator<Item = &'a ExecutableDefinition>,
313+
) -> ExecutableDefinitionNameMap<ExecutableDefinitionNameSet> {
314+
let mut result: ExecutableDefinitionNameMap<ExecutableDefinitionNameSet> = Default::default();
315+
for definition in definitions {
316+
let name = definition.name_with_location().item;
317+
let name = match definition {
318+
ExecutableDefinition::Operation(_) => OperationDefinitionName(name).into(),
319+
ExecutableDefinition::Fragment(_) => FragmentDefinitionName(name).into(),
320+
};
321+
let mut selections: Vec<_> = match definition {
322+
ExecutableDefinition::Operation(definition) => &definition.selections,
323+
ExecutableDefinition::Fragment(definition) => &definition.selections,
324+
}
325+
.iter()
326+
.collect();
327+
let mut references: ExecutableDefinitionNameSet = Default::default();
328+
while let Some(selection) = selections.pop() {
329+
match selection {
330+
Selection::FragmentSpread(selection) => {
331+
references.insert(selection.fragment.item.into());
332+
}
333+
Selection::LinkedField(selection) => {
334+
if let Some(fragment_name) = get_resolver_fragment_dependency_name(
335+
schema.field(selection.definition.item),
336+
) {
337+
references.insert(fragment_name.into());
338+
}
339+
selections.extend(&selection.selections);
340+
}
341+
Selection::InlineFragment(selection) => {
342+
selections.extend(&selection.selections);
343+
}
344+
Selection::Condition(selection) => {
345+
selections.extend(&selection.selections);
346+
}
347+
Selection::ScalarField(selection) => {
348+
if let Some(fragment_name) = get_resolver_fragment_dependency_name(
349+
schema.field(selection.definition.item),
350+
) {
351+
references.insert(fragment_name.into());
352+
}
353+
}
354+
}
355+
}
356+
result.insert(name, references);
357+
}
358+
result
359+
}

compiler/crates/dependency-analyzer/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod schema_change_analyzer;
1818
pub use ast::get_definition_references;
1919
pub use ast::get_reachable_ast;
2020
pub use ast::ReachableAst;
21+
pub use ir::get_ir_definition_references;
2122
pub use ir::get_reachable_ir;
2223
pub use ir::ExecutableDefinitionNameMap;
2324
pub use ir::ExecutableDefinitionNameSet;

compiler/crates/relay-compiler/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ lazy_static = "1.4"
4848
log = { version = "0.4.17", features = ["kv_unstable", "kv_unstable_std"] }
4949
md-5 = "0.10"
5050
persist-query = { path = "../persist-query" }
51+
petgraph = { version = "0.6.3", features = ["serde-1"] }
5152
rayon = "1.2"
5253
regex = "1.9.2"
5354
relay-codegen = { path = "../relay-codegen" }

compiler/crates/relay-compiler/src/build_project.rs

+149-16
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,23 @@ use common::PerfLogger;
3737
use common::WithDiagnostics;
3838
use dashmap::mapref::entry::Entry;
3939
use dashmap::DashSet;
40+
use dependency_analyzer::get_ir_definition_references;
4041
use fnv::FnvBuildHasher;
4142
use fnv::FnvHashMap;
4243
use fnv::FnvHashSet;
4344
pub use generate_artifacts::generate_artifacts;
4445
pub use generate_artifacts::generate_preloadable_query_parameters_artifact;
4546
pub use generate_artifacts::Artifact;
4647
pub use generate_artifacts::ArtifactContent;
48+
use graphql_ir::ExecutableDefinition;
49+
use graphql_ir::ExecutableDefinitionName;
4750
use graphql_ir::FragmentDefinitionNameSet;
4851
use graphql_ir::Program;
52+
use indexmap::IndexSet;
4953
use log::debug;
5054
use log::info;
5155
use log::warn;
56+
use petgraph::unionfind::UnionFind;
5257
use rayon::iter::IntoParallelRefIterator;
5358
use rayon::slice::ParallelSlice;
5459
use relay_codegen::Printer;
@@ -83,7 +88,7 @@ use crate::file_source::SourceControlUpdateStatus;
8388
use crate::graphql_asts::GraphQLAsts;
8489

8590
type BuildProjectOutput = WithDiagnostics<(ProjectName, Arc<SDLSchema>, Programs, Vec<Artifact>)>;
86-
type BuildProgramsOutput = WithDiagnostics<(Programs, Arc<SourceHashes>)>;
91+
type BuildProgramsOutput = WithDiagnostics<(Vec<Programs>, Arc<SourceHashes>)>;
8792

8893
pub enum BuildProjectFailure {
8994
Error(BuildProjectError),
@@ -141,6 +146,87 @@ pub fn build_raw_program(
141146
Ok((program, source_hashes))
142147
}
143148

149+
const MIN_CHUNK_SIZE: usize = 8192;
150+
151+
/// Build raw programs and divide them into chunks for parallelization
152+
fn build_raw_program_chunks(
153+
project_config: &ProjectConfig,
154+
project_asts: ProjectAsts,
155+
schema: Arc<SDLSchema>,
156+
log_event: &impl PerfLogEvent,
157+
build_mode: BuildMode,
158+
) -> Result<(Vec<Program>, SourceHashes), BuildProjectError> {
159+
// Build a type aware IR.
160+
let BuildIRResult { ir, source_hashes } = log_event.time("build_ir_time", || {
161+
build_ir::build_ir(project_config, project_asts, &schema, build_mode, log_event).map_err(
162+
|errors| BuildProjectError::ValidationErrors {
163+
errors,
164+
project_name: project_config.name,
165+
},
166+
)
167+
})?;
168+
169+
let chunks = if ir.len() < MIN_CHUNK_SIZE {
170+
vec![ir]
171+
} else {
172+
let chunkify_time = log_event.start("chunkify_project_time");
173+
let dependency_map = get_ir_definition_references(&schema, &ir);
174+
let definition_indexes: IndexSet<ExecutableDefinitionName> = ir
175+
.iter()
176+
.map(|def| match def {
177+
ExecutableDefinition::Operation(operation) => {
178+
ExecutableDefinitionName::OperationDefinitionName(operation.name.item)
179+
}
180+
ExecutableDefinition::Fragment(fragment) => {
181+
ExecutableDefinitionName::FragmentDefinitionName(fragment.name.item)
182+
}
183+
})
184+
.collect();
185+
186+
let mut unionfind = UnionFind::<usize>::new(definition_indexes.len());
187+
for (source, destinations) in &dependency_map {
188+
let source_index = definition_indexes.get_index_of(source).unwrap();
189+
for destination in destinations {
190+
let destination_index = definition_indexes.get_index_of(destination).unwrap();
191+
unionfind.union(source_index, destination_index);
192+
}
193+
}
194+
195+
let mut groups = FxHashMap::default();
196+
for (idx, def) in ir.into_iter().enumerate() {
197+
let group = unionfind.find(idx);
198+
groups.entry(group).or_insert_with(Vec::new).push(def);
199+
}
200+
201+
let mut chunks = vec![];
202+
let mut buffer = Vec::new();
203+
for group in groups.into_values() {
204+
if group.len() > MIN_CHUNK_SIZE {
205+
chunks.push(group);
206+
} else {
207+
buffer.extend(group);
208+
if buffer.len() > MIN_CHUNK_SIZE {
209+
chunks.push(std::mem::take(&mut buffer));
210+
}
211+
}
212+
}
213+
if !buffer.is_empty() {
214+
chunks.push(buffer);
215+
}
216+
log_event.stop(chunkify_time);
217+
chunks
218+
};
219+
220+
// Turn the IR into base Programs.
221+
let programs = log_event.time("build_program_time", || {
222+
chunks
223+
.into_iter()
224+
.map(|definitions| Program::from_definitions(Arc::clone(&schema), definitions))
225+
.collect()
226+
});
227+
Ok((programs, source_hashes))
228+
}
229+
144230
pub fn validate_program(
145231
config: &Config,
146232
project_config: &ProjectConfig,
@@ -270,26 +356,43 @@ pub fn build_programs(
270356
}
271357
},
272358
);
273-
let (program, source_hashes) =
274-
build_raw_program(project_config, project_asts, schema, log_event, build_mode)?;
359+
let (programs, source_hashes) =
360+
build_raw_program_chunks(project_config, project_asts, schema, log_event, build_mode)?;
275361

276362
if compiler_state.should_cancel_current_build() {
277363
debug!("Build is cancelled: updates in source code/or new file changes are pending.");
278364
return Err(BuildProjectFailure::Cancelled);
279365
}
366+
let base_fragment_names = Arc::new(base_fragment_names);
367+
let results: Vec<(Programs, Vec<Diagnostic>)> = programs
368+
.into_par_iter()
369+
.map(|program| {
370+
// Call validation rules that go beyond type checking.
371+
// FIXME: Return non-fatal diagnostics from transforms (only validations for now)
372+
let diagnostics = validate_program(config, project_config, &program, log_event)?;
373+
374+
let programs = transform_program(
375+
project_config,
376+
Arc::new(program),
377+
Arc::clone(&base_fragment_names),
378+
Arc::clone(&perf_logger),
379+
log_event,
380+
config.custom_transforms.as_ref(),
381+
)?;
280382

281-
// Call validation rules that go beyond type checking.
282-
// FIXME: Return non-fatal diagnostics from transforms (only validations for now)
283-
let diagnostics = validate_program(config, project_config, &program, log_event)?;
284-
285-
let programs = transform_program(
286-
project_config,
287-
Arc::new(program),
288-
Arc::new(base_fragment_names),
289-
Arc::clone(&perf_logger),
290-
log_event,
291-
config.custom_transforms.as_ref(),
292-
)?;
383+
Ok((programs, diagnostics))
384+
})
385+
.collect::<Result<Vec<_>, BuildProjectFailure>>()?;
386+
387+
let len = results.len();
388+
let (programs, diagnostics) = results.into_iter().fold(
389+
(Vec::with_capacity(len), vec![]),
390+
|(mut programs, mut diagnostics), (temp_programs, temp_diagnostics)| {
391+
programs.push(temp_programs);
392+
diagnostics.extend(temp_diagnostics);
393+
(programs, diagnostics)
394+
},
395+
);
293396

294397
Ok(WithDiagnostics {
295398
item: (programs, Arc::new(source_hashes)),
@@ -360,9 +463,19 @@ pub fn build_project(
360463

361464
// Generate artifacts by collecting information from the `Programs`.
362465
let artifacts_timer = log_event.start("generate_artifacts_time");
363-
let artifacts = generate_artifacts(project_config, &programs, Arc::clone(&source_hashes));
466+
let artifacts = programs
467+
.par_iter()
468+
.map(|programs| generate_artifacts(project_config, programs, Arc::clone(&source_hashes)))
469+
.flatten()
470+
.collect();
364471
log_event.stop(artifacts_timer);
365472

473+
let mut iter: std::vec::IntoIter<Programs> = programs.into_iter();
474+
let mut programs = iter.next().expect("Expect at least one result");
475+
for temp_programs in iter {
476+
merge_programs(&mut programs, temp_programs);
477+
}
478+
366479
log_event.number(
367480
"generated_artifacts",
368481
programs.reader.document_count() + programs.normalization.document_count(),
@@ -376,6 +489,26 @@ pub fn build_project(
376489
})
377490
}
378491

492+
fn merge_programs(onto: &mut Programs, from: Programs) {
493+
merge_program(Arc::get_mut(&mut onto.source).unwrap(), from.source);
494+
merge_program(Arc::get_mut(&mut onto.reader).unwrap(), from.reader);
495+
merge_program(
496+
Arc::get_mut(&mut onto.normalization).unwrap(),
497+
from.normalization,
498+
);
499+
merge_program(
500+
Arc::get_mut(&mut onto.operation_text).unwrap(),
501+
from.operation_text,
502+
);
503+
merge_program(Arc::get_mut(&mut onto.typegen).unwrap(), from.typegen);
504+
}
505+
506+
fn merge_program(onto: &mut Program, from: Arc<Program>) {
507+
let from = Arc::unwrap_or_clone(from);
508+
onto.fragments.extend(from.fragments);
509+
onto.operations.extend(from.operations);
510+
}
511+
379512
#[allow(clippy::too_many_arguments)]
380513
pub async fn commit_project(
381514
config: &Config,

0 commit comments

Comments
 (0)