Skip to content

Commit

Permalink
resolving left out merges
Browse files Browse the repository at this point in the history
  • Loading branch information
mvarshney committed Oct 6, 2014
1 parent 02174c6 commit 6bc7379
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public CombinedFileRecordReader(InputFormat<K, V> inputFormat,
long totalLength = 0;
for (long length : lengths)
totalLength += length;
fractionLength = new float[lengths.length];
for (int i = 0; i < lengths.length; i++)
fractionLength[i] = ((float) lengths[i]) / totalLength;
}
Expand Down
20 changes: 13 additions & 7 deletions src/main/java/com/linkedin/cubert/operator/CubeOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import com.linkedin.cubert.utils.Pair;

/**
*
*
* @author Maneesh Varshney
*
*
*/
public class CubeOperator implements TupleOperator
{
Expand Down Expand Up @@ -159,7 +159,7 @@ public void setInput(Map<String, Block> input, JsonNode json, BlockProperties pr
* Process input tuples for cubing without inner dimensions. Note that
* DupleCubeAggregators cannot be used here (any attempt to use such aggregators would
* have be caught at the compile time).
*
*
* @return boolean flag to indicate if there is more input to be processed
* @throws IOException
* @throws InterruptedException
Expand Down Expand Up @@ -200,7 +200,7 @@ private boolean processWithoutInnerDimensions() throws IOException,

/**
* Process input tuples for cubing WITH inner dimensions.
*
*
* @return boolean flag to indicate if there is more input to be processed
* @return
* @throws IOException
Expand Down Expand Up @@ -428,10 +428,15 @@ public DupleCubeAggInfo(DupleCubeAggregator x, JsonNode y)
private static BlockSchema createOutputSchema(BlockSchema inputSchema, JsonNode json) throws PreconditionException
{
List<CubeAggInfo> additiveAggs = new ArrayList<CubeAggInfo>();
List<DupleCubeAggInfo> partitionedAdditiveAggs = new ArrayList<DupleCubeAggInfo>();
List<DupleCubeAggInfo> partitionedAdditiveAggs =
new ArrayList<DupleCubeAggInfo>();
final String[] innerDimensions = JsonUtils.asArray(json, "innerDimensions");

createAggregators(json, inputSchema, innerDimensions != null, additiveAggs, partitionedAdditiveAggs);
createAggregators(json,
inputSchema,
innerDimensions != null,
additiveAggs,
partitionedAdditiveAggs);

Map<JsonNode, BlockSchema> aggMap = new HashMap<JsonNode, BlockSchema>();
for (CubeAggInfo info : additiveAggs)
Expand All @@ -446,7 +451,8 @@ private static BlockSchema createOutputSchema(BlockSchema inputSchema, JsonNode
aggMap.put(aggNode, info.getFirst().outputSchema(inputSchema, aggNode));
}

BlockSchema outputSchema = inputSchema.getSubset(JsonUtils.asArray(JsonUtils.get(json, "dimensions")));
BlockSchema outputSchema =
inputSchema.getSubset(JsonUtils.asArray(JsonUtils.get(json, "dimensions")));
for (JsonNode aggregateJson : json.get("aggregates"))
outputSchema = outputSchema.append(aggMap.get(aggregateJson));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* (c) 2014 LinkedIn Corp. All rights reserved.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -52,7 +52,6 @@
* Parses and executes the physical plan of a single Map-Reduce job.
*
* @author Maneesh Varshney
*
*/
public class JobExecutor
{
Expand Down Expand Up @@ -206,6 +205,8 @@ protected void configureJob() throws IOException,

}

conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.output.fileoutputformat.compress", "true");
}

private void serializeExecutionConfig() throws IOException
Expand Down

0 comments on commit 6bc7379

Please sign in to comment.