Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][format] Optimize manifest reading performance,add pushdown for manifest and orc. #4497

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9873457
[core] fix hll class not found
Aug 21, 2024
dedcaa4
[format][orc] open orc switch useSelected,allowSARGToFilter to make s…
Sep 22, 2024
4794d5f
Merge branch 'master' of github.com:ranxianglei/paimon
Sep 22, 2024
462edc6
[format][orc] miss tolerateMissingSchema
Sep 22, 2024
2226fb9
[format][orc] fix orc selected close for no filter condition
Sep 22, 2024
ee915c8
[orc] keep useSelected and allowSARGToFilter close default, or deleti…
Sep 27, 2024
2920fc9
[format][orc] VectorizedRowBatch to OrcColumnVector for selected rows…
Oct 15, 2024
8a89649
[format][orc] remove all isRepeating
Oct 15, 2024
c745e55
[core][format] merge with aa16c2bf1
Nov 11, 2024
7841f25
[core][format] merge conflicts
Nov 11, 2024
bbdd316
[core] fix AuditLogTable merge error
Nov 11, 2024
10ef09c
[format] recover HadoopFileIO
Nov 12, 2024
a2acbab
[format] checkstyle
Nov 12, 2024
e1c90c7
[format][orc] add pushdown option only for reader .
Nov 12, 2024
4364ac1
Merge branch 'master' into op_manifest
ranxianglei Nov 12, 2024
8c9a75c
[core] recover bucket
Nov 12, 2024
dfaeac3
Merge branch 'op_manifest' of github.com:ranxianglei/paimon into op_m…
Nov 12, 2024
f71d658
[core][format] add test for withBuckets and orcFormat
Nov 12, 2024
efac5b6
[format] fix checkstyle
Nov 12, 2024
15b1910
[format] fix version caused error
Nov 12, 2024
e1b3406
[core] fix checkstyle
Nov 12, 2024
d48fff6
[format] add FormatPerformanceTest
Nov 13, 2024
a0efae2
[format][tests] FormatPerformanceTest change to 10 times
Nov 13, 2024
016620c
[format][tests] FormatPerformanceTest change to lessthan to pass gith…
Nov 13, 2024
133a491
[format] merge conflicts
Nov 14, 2024
282a2c9
[format] id to lowercase
Nov 14, 2024
884d12f
[tests] core org.apache.paimon.factories.Factory
Nov 14, 2024
a1cc9f4
[tests] fileFormat factories add to paimon-flink-common
Nov 14, 2024
e401844
[core] resolve withBuckets commit
Nov 15, 2024
710af06
[format] no need call rowMapper under getArray
Nov 15, 2024
669dc30
[core] cancel manifest format factory cache for while .
Nov 18, 2024
00db1f6
Merge branch 'apache:master' into op_manifest
ranxianglei Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
Expand All @@ -32,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;

/**
* Factory class which creates reader and writer factories for specific file format.
Expand Down Expand Up @@ -88,26 +88,15 @@ public static FileFormat fromIdentifier(String identifier, Options options) {

/** Create a {@link FileFormat} from format identifier and format options. */
public static FileFormat fromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormat.class.getClassLoader())
.orElseThrow(
() ->
new RuntimeException(
String.format(
"Could not find a FileFormatFactory implementation class for %s format",
identifier)));
}

private static Optional<FileFormat> fromIdentifier(
String formatIdentifier, FormatContext context, ClassLoader classLoader) {
ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class, classLoader);
for (FileFormatFactory factory : serviceLoader) {
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
return Optional.of(factory.create(context));
}
if (identifier != null) {
identifier = identifier.toLowerCase();
}

return Optional.empty();
FileFormatFactory fileFormatFactory =
FactoryUtil.discoverFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just create a PR for FileFormatFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Of course you can, but I’ll change it in a few days. I’ve been a little busy lately.

当然可以,不过过几天再改,最近有点忙

FileFormatFactory.class.getClassLoader(),
FileFormatFactory.class,
identifier);
return fileFormatFactory.create(context);
}

protected Options getIdentifierPrefixOptions(Options options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.paimon.format;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

/** Factory to create {@link FileFormat}. */
public interface FileFormatFactory {
public interface FileFormatFactory extends Factory {

String identifier();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.function.Function;

/**
* Sample Object Cache Manager .
*
* @param <K>
* @param <V>
*/
public class ObjectCacheManager<K, V> {
private final Cache<K, V> cache;

private ObjectCacheManager(Duration timeout, int maxSize) {
this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build();
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
}

public static <K, V> ObjectCacheManager<K, V> newObjectCacheManager(
Duration timeout, int maxSize) {
return new ObjectCacheManager<>(timeout, maxSize);
}

public ObjectCacheManager<K, V> put(K k, V v) {
this.cache.put(k, v);
return this;
}

public V get(K k, Function<? super K, ? extends V> creator) {
return this.cache.get(k, creator);
}

public V getIfPresent(K k) {
return this.cache.getIfPresent(k);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE

private final DataFileMetaSerializer dataFileMetaSerializer;

private static final ManifestEntrySerializer MANIFEST_ENTRY_SERIALIZER =
new ManifestEntrySerializer();

public ManifestEntrySerializer() {
super(ManifestEntry.SCHEMA);
this.dataFileMetaSerializer = new DataFileMetaSerializer();
}

public static ManifestEntrySerializer getInstance() {
return MANIFEST_ENTRY_SERIALIZER;
}

@Override
public int getVersion() {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -197,14 +198,18 @@ public boolean isCacheEnabled() {
}

public ManifestFile create() {
return create(null);
}

public ManifestFile create(List<Predicate> filters) {
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
return new ManifestFile(
fileIO,
schemaManager,
partitionType,
new ManifestEntrySerializer(),
ManifestEntrySerializer.getInstance(),
entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createReaderFactory(entryType, filters),
Copy link
Contributor

@Aitozi Aitozi Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we enable the reader filter and the manifest cache, will we miss data from other buckets when reading data from bucket-x? Previously, data was stored in ObjectCache after passing through the loadFilter, but now it must pass through this filter first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ObjectCache is enabled and push-down withBuckets is used, the problem you mentioned may indeed occur. So I originally planned to add a Filter condition to ObjectCache, but it was too complicated to change and I didn't have so much time to do these things, so I could only push down withBuckets for the time being. Because, in most scenarios, there will be no problem. If it is in flink or spark, I have seen that withBuckets will not be called at all. If it is an olap query and the corresponding bucket is read in segments, the bucket and segment will remain mapped. There will be no problems with the relationship.
If it were not based on this consideration, I suggest that the partition should also be pushed down.
If you feel the risk is too great, you can even turn off the manifest's metadata cache, and the performance will still improve significantly. @Aitozi

如果开启了ObjectCache缓存,有使用了withBuckets的下推,确实可能出现你说的问题。所以我本来打算给ObjectCache增加一个Filter条件,但是改起来太复杂而我没有那么多时间做这些东西,只能暂时先把withBuckets下推做了。因为,大部分场景下都不会出现问题,如果是flink里面或者spark里面,我看了根本就不会调用withBuckets,如果是olap查询,分segment读取对应的bucket,则bucket和segment会保持映射关系,也不会出现问题。
如果不是基于这个考虑,我建议分区也应该下推。
如果你觉得风险太大,甚至可以关闭manifest的元数据缓存,性能依然提升很明显。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation, If we can not handle the push down when the cache enabled, I think we can disable the filter push down when the cache is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to choose the latter between metadata caching and manifest pushdown. The performance of paimon's ObjectCache implementation is very low. After testing, sometimes it is not even as fast as manifest pushdown. I will submit a PR later to fix the performance problem of ObjectCache.

在元数据缓存和manifest下推之间建议选择后者。paimon的ObjectCache实现的性能非常低,经测试有时候甚至比不上manifest下推快。后面我会提交一个pr修复ObjectCache的性能问题。

@Aitozi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aitozi This is a scenario that is quite different from mainstream applications in the community. The author's internal analysis engine does not have the ability of a central node, and can only plan by each computing node themselves. Each computing node only cares about its own bucket.

Actually, this is more like a manifest cache in the writer node than the current design.

Copy link
Contributor

@Aitozi Aitozi Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi In the writer node, it could still may need to read more than one bucket entry from the manifest if the parallelism is lower than the bucket number

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aitozi It is true, there are problems in this PR's implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
great! Read more than 2G of metadata at one time

fileFormat.createWriterFactory(entryType),
compression,
pathFactory.manifestFileFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
Expand Down Expand Up @@ -81,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private Collection<Integer> buckets;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
Expand Down Expand Up @@ -128,6 +133,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
@Override
public FileStoreScan withBucket(int bucket) {
this.bucketFilter = i -> i == bucket;
this.buckets = Collections.singletonList(bucket);
return this;
}

@Override
public FileStoreScan withBuckets(Collection<Integer> buckets) {
this.bucketFilter = buckets::contains;
this.buckets = buckets;
return this;
}

Expand Down Expand Up @@ -379,7 +392,7 @@ protected TableSchema scanTableSchema(long id) {
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
List<ManifestEntry> entries =
manifestFileFactory
.create()
.create(createPushDownFilter(buckets))
.read(
manifest.fileName(),
manifest.fileSize(),
Expand Down Expand Up @@ -426,6 +439,22 @@ private Filter<InternalRow> createCacheRowFilter() {
return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row));
}

/**
* Read the corresponding entries based on the current required bucket, but push down into file
* format .
*/
private static List<Predicate> createPushDownFilter(Collection<Integer> buckets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the query performance mainly gain from the bucket field push down for the ORC manifest file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than half of the performance improvement comes from the orc pushdown of the manifest, the other part comes from the optimization of OrcFileFormat creation, and the other part comes from the caching of some time-consuming object operations on Scan.

性能提升一多半来自于manifest的orc下推,另外一部分来自于OrcFileFormat创建的优化,还有一部分来自于Scan上部分耗时的对象操作缓存 @Aitozi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with #4231 together, bucket data with orc pushdown . Tests see this issue #4586 , current orc impl is faster more than Parquet 10 times! . @Aitozi

if (buckets == null || buckets.isEmpty()) {
return null;
}
List<Predicate> predicates = new ArrayList<>();
PredicateBuilder predicateBuilder =
new PredicateBuilder(
RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"}));
predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets)));
return predicates;
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -57,6 +58,8 @@ public interface FileStoreScan {

FileStoreScan withBucket(int bucket);

FileStoreScan withBuckets(Collection<Integer> buckets);

FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);

FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
Expand All @@ -44,6 +45,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -266,6 +268,13 @@ public Scan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
mainScan.withBuckets(buckets);
fallbackScan.withBuckets(buckets);
return this;
}

@Override
public Scan withLevelFilter(Filter<Integer> levelFilter) {
mainScan.withLevelFilter(levelFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -79,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public AbstractDataTableScan withBuckets(Collection<Integer> buckets) {
snapshotReader.withBuckets(buckets);
return this;
}

@Override
public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -47,6 +49,15 @@ default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

default InnerTableScan withBucket(Integer bucket) {
return withBuckets(Collections.singletonList(bucket));
}

default InnerTableScan withBuckets(Collection<Integer> buckets) {
// return this is not safe for too many class not impl this method and withBucketFilter
return this;
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,6 +82,8 @@ public interface SnapshotReader {

SnapshotReader withBucket(int bucket);

SnapshotReader withBuckets(Collection<Integer> buckets);

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -246,6 +247,11 @@ public SnapshotReader withBucket(int bucket) {
return this;
}

public SnapshotReader withBuckets(Collection<Integer> buckets) {
scan.withBuckets(buckets);
return this;
}

@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
scan.withBucketFilter(bucketFilter);
Expand All @@ -272,7 +278,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt
Math.abs(file.hashCode() % numberOfParallelSubtasks)
== indexOfThisSubtask);
} else {
withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask);
Set<Integer> buckets = new HashSet<>();
for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) {
if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) {
buckets.add(bucket);
}
}
withBuckets(buckets);
}
return this;
}
Expand Down
Loading
Loading