Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: RewriteDatafilesAction V2 #2591

Merged
merged 27 commits into from
Jul 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
955fd38
Spark: RewriteDatafilesAction V2
RussellSpitzer May 6, 2021
a0261e1
Refactor Execute Method
RussellSpitzer May 14, 2021
f295dce
SetID -> GroupID
RussellSpitzer May 14, 2021
2c41966
Missed SpecID reference
RussellSpitzer May 14, 2021
7c1c611
Adds more tests
RussellSpitzer May 14, 2021
c40816a
Add tests for failure modes
RussellSpitzer May 14, 2021
6562dfd
More Cleanup and Reviewer Comments
RussellSpitzer May 14, 2021
0d27400
Handle changes in refactoring of RewriteDatafiles
RussellSpitzer May 17, 2021
844aac5
Refactor out common variables into container classes
RussellSpitzer May 18, 2021
c6b9cfd
Further encapsulation and refactoring
RussellSpitzer May 18, 2021
eaaa529
Checkstyle
RussellSpitzer May 18, 2021
e9d3a98
Checkstyle
RussellSpitzer May 18, 2021
3b96656
More Reviewer Comments
RussellSpitzer May 19, 2021
b03fd70
Expliclty try to shut down rewrite service
RussellSpitzer May 19, 2021
decad9e
Reviewer Comments
RussellSpitzer May 19, 2021
72887d1
Adjust job description
RussellSpitzer May 19, 2021
12b1695
Update based on some tests
RussellSpitzer Jun 22, 2021
fe8a2f4
Additional Reviewer Comments
RussellSpitzer Jun 23, 2021
557c6a2
Remove unused import
RussellSpitzer Jun 23, 2021
cc57cf2
Refactor state out of Action into Strategy
RussellSpitzer Jul 2, 2021
eddf2da
Refactor CommitService out of Action and into Core
RussellSpitzer Jul 3, 2021
da80b52
Cleanup of code
RussellSpitzer Jul 6, 2021
f27e551
Remove Extra Paren
RussellSpitzer Jul 6, 2021
cb1999d
Reviewer Comments
RussellSpitzer Jul 7, 2021
d87eea8
Reviewer Comments
RussellSpitzer Jul 7, 2021
cae6ad3
Fix up more comments
RussellSpitzer Jul 9, 2021
6dca1c0
Typo
RussellSpitzer Jul 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iceberg.actions;

import java.util.Map;
import java.util.List;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;

Expand Down Expand Up @@ -68,7 +68,7 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri
* independently and asynchronously.
**/
String MAX_CONCURRENT_FILE_GROUP_REWRITES = "max-concurrent-file-group-rewrites";
int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
int MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 1;

/**
* The output file size that this rewrite strategy will attempt to generate when rewriting files. By default this
Expand Down Expand Up @@ -100,14 +100,14 @@ default RewriteDataFiles binPack() {
* will report a total failure for the job.
*/
interface Result {
Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
List<FileGroupRewriteResult> rewriteResults();

default int addedDataFilesCount() {
return resultMap().values().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
return rewriteResults().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
}

default int rewrittenDataFilesCount() {
return resultMap().values().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum();
return rewriteResults().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum();
}
}

Expand All @@ -116,6 +116,8 @@ default int rewrittenDataFilesCount() {
* which were formerly part of the table but have been rewritten.
*/
interface FileGroupRewriteResult {
FileGroupInfo info();

int addedDataFilesCount();

int rewrittenDataFilesCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult;

public class BaseFileGroupRewriteResult implements FileGroupRewriteResult {
private final int addedDataFilesCount;
private final int rewrittenDataFilesCount;
private final FileGroupInfo info;

public BaseFileGroupRewriteResult(FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount) {
this.info = info;
this.addedDataFilesCount = addedFilesCount;
this.rewrittenDataFilesCount = rewrittenFilesCount;
}

@Override
public FileGroupInfo info() {
return info;
}

@Override
public int addedDataFilesCount() {
return addedDataFilesCount;
}

@Override
public int rewrittenDataFilesCount() {
return rewrittenDataFilesCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

public class BaseRewriteDataFilesFileGroupInfo implements RewriteDataFiles.FileGroupInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to override equals and hashCode as this is used in the result map?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will need the override equals and hashCode because I see we will use this info as keys in a HashMap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what do o here for equals and hashcode, since we can't get those for a StructLike without a Struct and at this point we don't have them. I'd be fine with changing the interface to just be a list of Pairs instead of key/value (or some new result type which has file group embedded)

Like

FileGroupRewriteResult {
  info
  filesAdded
  filesRewritten
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is no longer used in maps?

private final int globalIndex;
private final int partitionIndex;
private final StructLike partition;

public BaseRewriteDataFilesFileGroupInfo(int globalIndex, int partitionIndex, StructLike partition) {
this.globalIndex = globalIndex;
this.partitionIndex = partitionIndex;
this.partition = partition;
}

@Override
public int globalIndex() {
return globalIndex;
}

@Override
public int partitionIndex() {
return partitionIndex;
}

@Override
public StructLike partition() {
return partition;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("globalIndex", globalIndex)
.add("partitionIndex", partitionIndex)
.add("partition", partition)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult;
import org.apache.iceberg.actions.RewriteDataFiles.Result;

public class BaseRewriteDataFilesResult implements Result {
private final List<FileGroupRewriteResult> rewriteResults;

public BaseRewriteDataFilesResult(List<FileGroupRewriteResult> rewriteResults) {
this.rewriteResults = rewriteResults;
}

@Override
public List<FileGroupRewriteResult> rewriteResults() {
return rewriteResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* more files than {@link #MIN_INPUT_FILES} or would produce at least one file of
* {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
*/
abstract class BinPackStrategy implements RewriteStrategy {
public abstract class BinPackStrategy implements RewriteStrategy {

/**
* The minimum number of files that need to be in a file group for it to be considered for
Expand Down
Loading