Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand All @@ -31,7 +30,6 @@
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;

Expand All @@ -41,13 +39,6 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
return new FakePartitionLevelConsumer();
}

@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
String groupId) {
return null;
}

@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
return new FakesStreamMetadataProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -218,12 +217,6 @@ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int
return null;
}

@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
String groupId) {
return null;
}

@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
return null;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.utils.LLCSegmentName;
Expand Down Expand Up @@ -405,23 +404,17 @@ public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig
Schema schema = indexLoadingConfig.getSchema();
assert schema != null;
boolean isHLCSegment = SegmentName.isHighLevelConsumerSegmentName(segmentName);
if (isHLCSegment) {
throw new UnsupportedOperationException("Adding high level consumer segment " + segmentName + " is not allowed");
}
if (segmentZKMetadata.getStatus().isCompleted()) {
if (isHLCSegment && !segmentDir.exists()) {
throw new RuntimeException("Failed to find local copy for committed HLC segment: " + segmentName);
}
if (tryLoadExistingSegment(segmentName, indexLoadingConfig, segmentZKMetadata)) {
// The existing completed segment has been loaded successfully
return;
} else {
if (!isHLCSegment) {
// For LLC and uploaded segments, delete the local copy and download a new copy
_logger.error("Failed to load LLC segment: {}, downloading a new copy", segmentName);
FileUtils.deleteQuietly(segmentDir);
} else {
// For HLC segments, throw out the exception because there is no way to recover (controller does not have a
// copy of the segment)
throw new RuntimeException("Failed to load local HLC segment: " + segmentName);
}
// For LLC and uploaded segments, delete the local copy and download a new copy
_logger.error("Failed to load LLC segment: {}, downloading a new copy", segmentName);
FileUtils.deleteQuietly(segmentDir);
}
// Local segment doesn't exist or cannot load, download a new copy
downloadAndReplaceSegment(segmentName, segmentZKMetadata, indexLoadingConfig, tableConfig);
Expand All @@ -436,34 +429,26 @@ public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig
_logger.error("Not adding segment {}", segmentName);
throw new RuntimeException("Mismatching schema/table config for " + _tableNameWithType);
}

VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, segmentName);
setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata);

if (!isHLCSegment) {
// Generates only one semaphore for every partitionGroupId
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
Semaphore semaphore = _partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new Semaphore(1));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager(
partitionGroupId) : null;
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager, _isTableReadyToConsumeData);
llRealtimeSegmentDataManager.startConsumption();
segmentDataManager = llRealtimeSegmentDataManager;
} else {
InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
HLRealtimeSegmentDataManager hlRealtimeSegmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata,
tableConfig, instanceZKMetadata, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, _serverMetrics);
hlRealtimeSegmentDataManager.startConsumption();
segmentDataManager = hlRealtimeSegmentDataManager;
}
// Generates only one semaphore for every partitionGroupId
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
Semaphore semaphore = _partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new Semaphore(1));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager(
partitionGroupId) : null;
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager, _isTableReadyToConsumeData);
llRealtimeSegmentDataManager.startConsumption();
segmentDataManager = llRealtimeSegmentDataManager;

_logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
registerSegment(segmentName, segmentDataManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;

import java.util.Set;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -31,7 +30,6 @@
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
Expand All @@ -50,12 +48,6 @@ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int
return new FakePartitionLevelConsumer(partition, _streamConfig, FakeStreamConfigUtils.MESSAGE_BATCH_SIZE);
}

@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
String groupId) {
throw new UnsupportedOperationException("Pinot no longer support stream level consumers!");
}

@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
return new FakeStreamMetadataProvider(_streamConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.apache.pinot.plugin.stream.kafka20;

import java.util.Set;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;


Expand All @@ -32,12 +30,6 @@ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int
return new KafkaPartitionLevelConsumer(clientId, _streamConfig, partition);
}

@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
String groupId) {
return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, fieldsToRead, groupId);
}

@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition);
Expand Down

This file was deleted.

Loading