Skip to content

Commit 6f8bdcb

Browse files
authored
YQ-2634: S3 runtime file listing (#925)
* Base feature implementation * Added interconnect retries for FileQueue * Added best effort FileQueue pass away * Global ids for PB events, review fixes * Added missing peer dirs * Move FileQueue creation to FillSourceSettings * Added FileQueueBatchSize pragrma, batch also limited by requested amount * Round-robin stage in FileQueue, new s3 settings * FileQueue processes one batch at a time, removing amount from batch request * Runtime listing fixes * Clean up * Added parametrization in partitioning integration tests * Poison fix * Fixed partitioning columns, fixed retries, added extra tests * Unit test imports fixes * FileQueue consumers count correction * Review fixes
1 parent 685aefb commit 6f8bdcb

19 files changed

+1014
-292
lines changed

ydb/core/base/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ struct TKikimrEvents : TEvents {
173173
ES_GRAPH,
174174
ES_REPLICATION_SERVICE,
175175
ES_CHANGE_EXCHANGE,
176+
ES_S3_FILE_QUEUE,
176177
};
177178
};
178179

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,7 @@ class TKqpHost : public IKqpHost {
14751475
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
14761476
state->Configuration->WriteThroughDqIntegration = true;
14771477
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
1478+
state->MaxTasksPerStage = SessionCtx->ConfigPtr()->MaxTasksPerStage.Get();
14781479

14791480
state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
14801481

ydb/library/yql/dq/actors/compute/retry_queue.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ class TRetryEventsQueue {
120120
}
121121
return false;
122122
}
123+
124+
bool HasPendingEvents() {
125+
RemoveConfirmedEvents(MyConfirmedSeqNo);
126+
return !Events.empty();
127+
}
123128

124129
void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true);
125130
void HandleNodeConnected(ui32 nodeId);

ydb/library/yql/providers/s3/actors/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ PEERDIR(
2323
library/cpp/string_utils/base64
2424
library/cpp/string_utils/quote
2525
library/cpp/xml/document
26+
ydb/core/base
2627
ydb/core/fq/libs/events
2728
ydb/library/yql/dq/actors/compute
2829
ydb/library/yql/minikql/computation
@@ -36,6 +37,8 @@ PEERDIR(
3637
ydb/library/yql/providers/s3/credentials
3738
ydb/library/yql/providers/s3/object_listers
3839
ydb/library/yql/providers/s3/proto
40+
ydb/library/yql/providers/s3/range_helpers
41+
ydb/library/yql/public/issue
3942
ydb/library/yql/public/types
4043
ydb/library/yql/udfs/common/clickhouse/client
4144
)

0 commit comments

Comments
 (0)