Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ friend class IHTTPGateway;
}

size_t FillHandlers() {
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
for (auto it = Streams.cbegin(); Streams.cend() != it;) {
if (const auto& stream = it->lock()) {
const auto streamHandle = stream->GetHandle();
Expand Down Expand Up @@ -763,7 +763,7 @@ friend class IHTTPGateway;
TEasyCurl::TPtr easy;
long httpResponseCode = 0L;
{
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
if (const auto it = Allocated.find(handle); Allocated.cend() != it) {
easy = std::move(it->second);
TString codeLabel;
Expand Down Expand Up @@ -815,7 +815,7 @@ friend class IHTTPGateway;
void Fail(CURLMcode result) {
std::stack<TEasyCurl::TPtr> works;
{
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());

for (auto& item : Allocated) {
works.emplace(std::move(item.second));
Expand All @@ -836,7 +836,7 @@ friend class IHTTPGateway;
void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final {
Rps->Inc();

const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
Await.emplace(std::move(easy));
Wakeup(0U);
Expand All @@ -845,7 +845,7 @@ friend class IHTTPGateway;
void Delete(TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final {
Rps->Inc();

const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::DELETE, 0, std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
Await.emplace(std::move(easy));
Wakeup(0U);
Expand All @@ -866,7 +866,7 @@ friend class IHTTPGateway;
callback(TResult(CURLE_OK, TIssues{error}));
return;
}
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
Await.emplace(std::move(easy));
Wakeup(sizeLimit);
Expand All @@ -883,13 +883,14 @@ friend class IHTTPGateway;
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
{
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList());
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
Streams.emplace_back(stream);
Allocated.emplace(handle, std::move(stream));
Wakeup(0ULL);
return [weak](TIssue issue) {
return [weak, sync=Sync](TIssue issue) {
const std::unique_lock lock(*sync);
if (const auto& stream = weak.lock())
stream->Cancel(issue);
};
Expand All @@ -900,7 +901,7 @@ friend class IHTTPGateway;
}

void OnRetry(TEasyCurlBuffer::TPtr easy) {
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
const size_t sizeLimit = easy->GetSizeLimit();
Await.emplace(std::move(easy));
Wakeup(sizeLimit);
Expand All @@ -918,6 +919,10 @@ friend class IHTTPGateway;
}

private:
std::mutex& SyncRef() {
return *Sync;
}

CURLM* Handle = nullptr;

std::queue<TEasyCurlBuffer::TPtr> Await;
Expand All @@ -927,7 +932,7 @@ friend class IHTTPGateway;
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
std::priority_queue<std::pair<TInstant, TEasyCurlBuffer::TPtr>> Delayed;

std::mutex Sync;
std::shared_ptr<std::mutex> Sync = std::make_shared<std::mutex>();
std::thread Thread;
std::atomic<bool> IsStopped = false;

Expand Down
13 changes: 6 additions & 7 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ struct TReadSpec {
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
NDB::FormatSettings Settings;
TString Format, Compression;
// It's very important to keep here std::string instead of TString
// because of the cast from TString to std::string is using the MutRef (it isn't thread-safe).
// This behaviour can be found in the getInputFormat call
std::string Format;
TString Compression;
ui64 SizeLimit = 0;
ui32 BlockLengthPosition = 0;
std::vector<ui32> ColumnReorder;
Expand Down Expand Up @@ -1367,12 +1371,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
DecodedChunkSizeHist,
HttpInflightSize,
HttpDataRps,
DeferredQueueSize,
ReadSpec->Format,
ReadSpec->Compression,
ReadSpec->ArrowSchema,
ReadSpec->RowSpec,
ReadSpec->Settings
DeferredQueueSize
);

if (!UseRuntimeListing) {
Expand Down
17 changes: 1 addition & 16 deletions ydb/library/yql/providers/s3/common/source_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ struct TSourceContext {
, NMonitoring::THistogramPtr decodedChunkSizeHist
, NMonitoring::TDynamicCounters::TCounterPtr httpInflightSize
, NMonitoring::TDynamicCounters::TCounterPtr httpDataRps
, NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize
, const TString format
, const TString compression
, std::shared_ptr<arrow::Schema> schema
, std::unordered_map<TStringBuf, NKikimr::NMiniKQL::TType*, THash<TStringBuf>> rowTypes
, NDB::FormatSettings settings)
, NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize)
: SourceId(sourceId)
, Limit(limit)
, ActorSystem(actorSystem)
Expand All @@ -54,11 +49,6 @@ struct TSourceContext {
, HttpInflightSize(httpInflightSize)
, HttpDataRps(httpDataRps)
, DeferredQueueSize(deferredQueueSize)
, Format(format)
, Compression(compression)
, Schema(schema)
, RowTypes(rowTypes)
, Settings(settings)
{
}

Expand Down Expand Up @@ -105,11 +95,6 @@ struct TSourceContext {
NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
const TString Format;
const TString Compression;
std::shared_ptr<arrow::Schema> Schema;
std::unordered_map<TStringBuf, NKikimr::NMiniKQL::TType*, THash<TStringBuf>> RowTypes;
NDB::FormatSettings Settings;
private:
std::atomic_uint64_t Value;
std::mutex Mutex;
Expand Down