Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,29 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- Ensure incubator Vector API is on the module path for javac -->
<compilerArgs combine.self="merge">
<arg>${extraJavaVectorArgs}</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<!-- Ensure javadoc resolves incubator Vector API -->
<additionalOptions combine.self="merge">${extraJavaVectorArgs}</additionalOptions>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
20 changes: 20 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class FeaturesConfig
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
private CompressionCodec exchangeCompressionCodec = NONE;
private BlockSerdeVectorizedNullSuppressionStrategy blockSerdeVectorizedNullSuppressionStrategy = BlockSerdeVectorizedNullSuppressionStrategy.AUTO;
private boolean pagesIndexEagerCompactionEnabled;
private boolean omitDateTimeTypePrecision;
private int maxRecursionDepth = 10;
Expand Down Expand Up @@ -133,6 +134,12 @@ public enum DataIntegrityVerification
/**/;
}

public enum BlockSerdeVectorizedNullSuppressionStrategy
{
AUTO,
NONE,
}

public boolean isOmitDateTimeTypePrecision()
{
return omitDateTimeTypePrecision;
Expand Down Expand Up @@ -366,6 +373,19 @@ public FeaturesConfig setExchangeCompressionCodec(CompressionCodec exchangeCompr
return this;
}

@Config("experimental.blockserde-vectorized-null-suppression-strategy")
@ConfigDescription("Strategy used for vectorized null suppression in block serde")
public FeaturesConfig setBlockSerdeVectorizedNullSuppressionStrategy(BlockSerdeVectorizedNullSuppressionStrategy blockSerdeVectorizedNullSuppressionStrategy)
{
this.blockSerdeVectorizedNullSuppressionStrategy = blockSerdeVectorizedNullSuppressionStrategy;
return this;
}

public BlockSerdeVectorizedNullSuppressionStrategy getBlockSerdeVectorizedNullSuppressionStrategy()
{
return blockSerdeVectorizedNullSuppressionStrategy;
}

public DataIntegrityVerification getExchangeDataIntegrityVerification()
{
return exchangeDataIntegrityVerification;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package io.trino.metadata;

import com.google.inject.Inject;
import io.trino.simd.BlockEncodingSimdSupport;
import io.trino.spi.SimdSupport;
import io.trino.spi.block.ArrayBlockEncoding;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockEncoding;
Expand All @@ -32,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.simd.BlockEncodingSimdSupport.TESTING_BLOCK_ENCODING_SIMD_SUPPORT;
import static java.util.Objects.requireNonNull;

public final class BlockEncodingManager
Expand All @@ -41,14 +45,19 @@ public final class BlockEncodingManager
// for serialization
private final Map<Class<? extends Block>, BlockEncoding> blockEncodingNamesByClass = new ConcurrentHashMap<>();

public BlockEncodingManager()
public static final BlockEncodingManager TESTING_BLOCK_ENCODING_MANAGER = new BlockEncodingManager(TESTING_BLOCK_ENCODING_SIMD_SUPPORT);

@Inject
public BlockEncodingManager(
BlockEncodingSimdSupport blockEncodingSimdSupport)
{
// add the built-in BlockEncodings
SimdSupport simdSupport = blockEncodingSimdSupport.getSimdSupport();
addBlockEncoding(new VariableWidthBlockEncoding());
addBlockEncoding(new ByteArrayBlockEncoding());
addBlockEncoding(new ShortArrayBlockEncoding());
addBlockEncoding(new IntArrayBlockEncoding());
addBlockEncoding(new LongArrayBlockEncoding());
addBlockEncoding(new ByteArrayBlockEncoding(simdSupport.expandAndCompressByte()));
addBlockEncoding(new ShortArrayBlockEncoding(simdSupport.expandAndCompressShort()));
addBlockEncoding(new IntArrayBlockEncoding(simdSupport.expandAndCompressInt()));
addBlockEncoding(new LongArrayBlockEncoding(simdSupport.expandAndCompressLong()));
addBlockEncoding(new Fixed12BlockEncoding());
addBlockEncoding(new Int128ArrayBlockEncoding());
addBlockEncoding(new DictionaryBlockEncoding());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import io.trino.server.protocol.PreparedStatementEncoder;
import io.trino.server.protocol.spooling.SpoolingServerModule;
import io.trino.server.remotetask.HttpLocationFactory;
import io.trino.simd.BlockEncodingSimdSupport;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
import io.trino.spi.VersionEmbedder;
Expand Down Expand Up @@ -427,6 +428,9 @@ protected void setup(Binder binder)
.to(ServerPluginsProvider.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(ServerPluginsProviderConfig.class);

// SIMD support
binder.bind(BlockEncodingSimdSupport.class).in(Scopes.SINGLETON);

// block encodings
binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
jsonBinder(binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.simd;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.trino.FeaturesConfig;
import io.trino.spi.SimdSupport;
import io.trino.util.MachineInfo;
import jdk.incubator.vector.ByteVector;
import jdk.incubator.vector.IntVector;
import jdk.incubator.vector.LongVector;
import jdk.incubator.vector.ShortVector;
import oshi.hardware.CentralProcessor.ProcessorIdentifier;

import java.util.EnumSet;
import java.util.Set;

import static io.trino.FeaturesConfig.BlockSerdeVectorizedNullSuppressionStrategy.AUTO;
import static io.trino.FeaturesConfig.BlockSerdeVectorizedNullSuppressionStrategy.NONE;
import static io.trino.util.MachineInfo.readCpuFlags;
import static java.util.Locale.ENGLISH;

@Singleton
public final class BlockEncodingSimdSupport
{
public static final int MINIMUM_SIMD_LENGTH = 512;
private final SimdSupport simdSupport;

public static final BlockEncodingSimdSupport TESTING_BLOCK_ENCODING_SIMD_SUPPORT = new BlockEncodingSimdSupport(new FeaturesConfig().setBlockSerdeVectorizedNullSuppressionStrategy(AUTO));

@Inject
public BlockEncodingSimdSupport(
FeaturesConfig featuresConfig)
{
simdSupport = detectSimd(featuresConfig);
}

private static SimdSupport detectSimd(FeaturesConfig featuresConfig)
{
if (featuresConfig.getBlockSerdeVectorizedNullSuppressionStrategy().equals(NONE)) {
return SimdSupport.NONE;
}

ProcessorIdentifier id = MachineInfo.getProcessorInfo();

String vendor = id.getVendor().toLowerCase(ENGLISH);

if (vendor.contains("intel") || vendor.contains("amd")) {
return detectX86SimdSupport();
}

return SimdSupport.NONE;
}

private static SimdSupport detectX86SimdSupport()
{
enum X86Isa {
avx512f,
avx512vbmi2
}

Set<String> flags = readCpuFlags();
EnumSet<X86Isa> x86Flags = EnumSet.noneOf(X86Isa.class);

if (!flags.isEmpty()) {
for (X86Isa isa : X86Isa.values()) {
if (flags.contains(isa.name())) {
x86Flags.add(isa);
}
}
}

return new SimdSupport(
(ByteVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86Isa.avx512vbmi2),
(ShortVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86Isa.avx512vbmi2),
(IntVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86Isa.avx512f),
(LongVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86Isa.avx512f));
}

public SimdSupport getSimdSupport()
{
return simdSupport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import io.trino.memory.MemoryManagerConfig;
import io.trino.memory.NodeMemoryConfig;
import io.trino.metadata.AnalyzePropertyManager;
import io.trino.metadata.BlockEncodingManager;
import io.trino.metadata.CatalogManager;
import io.trino.metadata.ColumnPropertyManager;
import io.trino.metadata.DisabledSystemSecurityMetadata;
Expand Down Expand Up @@ -252,6 +251,7 @@
import static io.trino.execution.ParameterExtractor.bindParameters;
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static io.trino.execution.warnings.WarningCollector.NOOP;
import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
import static io.trino.node.TestingInternalNodeManager.CURRENT_NODE;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
Expand Down Expand Up @@ -358,10 +358,9 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
catalogManager,
notificationExecutor);

BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
TypeRegistry typeRegistry = new TypeRegistry(typeOperators, new FeaturesConfig());
TypeManager typeManager = new InternalTypeManager(typeRegistry);
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodingManager, typeManager);
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
SecretsResolver secretsResolver = new SecretsResolver(ImmutableMap.of());

this.globalFunctionCatalog = new GlobalFunctionCatalog(
Expand Down Expand Up @@ -496,7 +495,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
new GroupProviderManager(secretsResolver),
new SessionPropertyDefaults(nodeInfo, accessControl, secretsResolver),
typeRegistry,
blockEncodingManager,
TESTING_BLOCK_ENCODING_MANAGER,
new HandleResolver(),
exchangeManagerRegistry,
spoolingManagerRegistry);
Expand Down
76 changes: 75 additions & 1 deletion core/trino-main/src/main/java/io/trino/util/MachineInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@
package io.trino.util;

import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.ImmutableSet;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.CentralProcessor.ProcessorIdentifier;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.Math.min;
import static java.util.Locale.ENGLISH;

public final class MachineInfo
{
// cache physical processor count, so that it's not queried multiple times during tests
private static volatile int physicalProcessorCount = -1;
private static final SystemInfo SYSTEM_INFO = new SystemInfo();

private MachineInfo() {}

Expand All @@ -38,7 +49,7 @@ public static int getAvailablePhysicalProcessorCount()
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
// Oshi can recognize physical processor count (without hyper threading) for x86 platforms.
// However, it doesn't correctly recognize physical processor count for ARM platforms.
totalPhysicalProcessorCount = new SystemInfo()
totalPhysicalProcessorCount = SYSTEM_INFO
.getHardware()
.getProcessor()
.getPhysicalProcessorCount();
Expand All @@ -52,4 +63,67 @@ public static int getAvailablePhysicalProcessorCount()
physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
return physicalProcessorCount;
}

public static ProcessorIdentifier getProcessorInfo()
{
return SYSTEM_INFO.getHardware().getProcessor().getProcessorIdentifier();
}

public static Set<String> readCpuFlags()
{
CentralProcessor cpu = SYSTEM_INFO.getHardware().getProcessor();
List<String> flags = cpu.getFeatureFlags();
if (flags == null || flags.isEmpty()) {
return ImmutableSet.of();
}

Set<String> intersection = null;

for (String line : flags) {
if (line == null || line.isBlank()) {
continue;
}

// Strip the "flags:" / "Features:" prefix if present.
String body = line;
int colon = line.indexOf(':');
if (colon >= 0) {
body = line.substring(colon + 1);
}

// Tokenize + normalize.
Set<String> tokens = Arrays.stream(body.trim().split("\\s+"))
.map(token -> normalizeFlag(token))
.filter(token -> !token.isEmpty())
.collect(toImmutableSet());

if (tokens.isEmpty()) {
continue;
}

if (intersection == null) {
Copy link
Member

@pettyjamesm pettyjamesm Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on with the intersection logic here? Is line of flags the hardware support for an individual core and we're looking for only flags advertised by all cores? If so, let's add an inline comment to that effect to make this easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will do

intersection = new HashSet<>(tokens);
}
else {
intersection.retainAll(tokens);
if (intersection.isEmpty()) {
break; // nothing in common
}
}
}

return intersection == null ? ImmutableSet.of() : intersection;
}

public static String normalizeFlag(String flag)
{
flag = flag.toLowerCase(ENGLISH).replace("_", "").trim();

// Skip stray keys that may sneak in if the colon wasn’t found.
if (flag.equals("flags") || flag.equals("features")) {
return "";
}

return flag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class BenchmarkBlockSerde
{
private static final DecimalType LONG_DECIMAL_TYPE = createDecimalType(30, 5);

public static final int ROWS = 10_000_000;
public static final int ROWS = 8192;

@Benchmark
public Object serializeLongDecimal(LongDecimalBenchmarkData data)
Expand Down
Loading
Loading