@@ -746,7 +746,7 @@ friend class IHTTPGateway;
746746 }
747747
748748 size_t FillHandlers () {
749- const std::unique_lock lock (Sync );
749+ const std::unique_lock lock (SyncRef () );
750750 for (auto it = Streams.cbegin (); Streams.cend () != it;) {
751751 if (const auto & stream = it->lock ()) {
752752 const auto streamHandle = stream->GetHandle ();
@@ -795,7 +795,7 @@ friend class IHTTPGateway;
795795 TEasyCurl::TPtr easy;
796796 long httpResponseCode = 0L ;
797797 {
798- const std::unique_lock lock (Sync );
798+ const std::unique_lock lock (SyncRef () );
799799 if (const auto it = Allocated.find (handle); Allocated.cend () != it) {
800800 easy = std::move (it->second );
801801 TString codeLabel;
@@ -847,7 +847,7 @@ friend class IHTTPGateway;
847847 void Fail (CURLMcode result) {
848848 std::stack<TEasyCurl::TPtr> works;
849849 {
850- const std::unique_lock lock (Sync );
850+ const std::unique_lock lock (SyncRef () );
851851
852852 for (auto & item : Allocated) {
853853 works.emplace (std::move (item.second ));
@@ -868,7 +868,7 @@ friend class IHTTPGateway;
868868 void Upload (TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final {
869869 Rps->Inc ();
870870
871- const std::unique_lock lock (Sync );
871+ const std::unique_lock lock (SyncRef () );
872872 auto easy = TEasyCurlBuffer::Make (InFlight, DownloadedBytes, UploadedBytes, std::move (url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move (body), std::move (headers), 0U , 0U , std::move (callback), retryPolicy ? retryPolicy->CreateRetryState () : nullptr , InitConfig, DnsGateway.GetDNSCurlList ());
873873 Await.emplace (std::move (easy));
874874 Wakeup (0U );
@@ -877,7 +877,7 @@ friend class IHTTPGateway;
877877 void Delete (TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final {
878878 Rps->Inc ();
879879
880- const std::unique_lock lock (Sync );
880+ const std::unique_lock lock (SyncRef () );
881881 auto easy = TEasyCurlBuffer::Make (InFlight, DownloadedBytes, UploadedBytes, std::move (url), TEasyCurl::EMethod::DELETE, 0 , std::move (headers), 0U , 0U , std::move (callback), retryPolicy ? retryPolicy->CreateRetryState () : nullptr , InitConfig, DnsGateway.GetDNSCurlList ());
882882 Await.emplace (std::move (easy));
883883 Wakeup (0U );
@@ -898,7 +898,7 @@ friend class IHTTPGateway;
898898 callback (TResult (CURLE_OK, TIssues{error}));
899899 return ;
900900 }
901- const std::unique_lock lock (Sync );
901+ const std::unique_lock lock (SyncRef () );
902902 auto easy = TEasyCurlBuffer::Make (InFlight, DownloadedBytes, UploadedBytes, std::move (url), TEasyCurl::EMethod::GET, std::move (data), std::move (headers), offset, sizeLimit, std::move (callback), retryPolicy ? retryPolicy->CreateRetryState () : nullptr , InitConfig, DnsGateway.GetDNSCurlList ());
903903 Await.emplace (std::move (easy));
904904 Wakeup (sizeLimit);
@@ -915,13 +915,14 @@ friend class IHTTPGateway;
915915 const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
916916 {
917917 auto stream = TEasyCurlStream::Make (InFlightStreams, DownloadedBytes, UploadedBytes, std::move (url), std::move (headers), offset, sizeLimit, std::move (onStart), std::move (onNewData), std::move (onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList ());
918- const std::unique_lock lock (Sync );
918+ const std::unique_lock lock (SyncRef () );
919919 const auto handle = stream->GetHandle ();
920920 TEasyCurlStream::TWeakPtr weak = stream;
921921 Streams.emplace_back (stream);
922922 Allocated.emplace (handle, std::move (stream));
923923 Wakeup (0ULL );
924- return [weak](TIssue issue) {
924+ return [weak, sync=Sync](TIssue issue) {
925+ const std::unique_lock lock (*sync);
925926 if (const auto & stream = weak.lock ())
926927 stream->Cancel (issue);
927928 };
@@ -932,7 +933,7 @@ friend class IHTTPGateway;
932933 }
933934
934935 void OnRetry (TEasyCurlBuffer::TPtr easy) {
935- const std::unique_lock lock (Sync );
936+ const std::unique_lock lock (SyncRef () );
936937 const size_t sizeLimit = easy->GetSizeLimit ();
937938 Await.emplace (std::move (easy));
938939 Wakeup (sizeLimit);
@@ -950,6 +951,10 @@ friend class IHTTPGateway;
950951 }
951952
952953private:
954+ std::mutex& SyncRef () {
955+ return *Sync;
956+ }
957+
953958 CURLM* Handle = nullptr ;
954959
955960 std::queue<TEasyCurlBuffer::TPtr> Await;
@@ -959,7 +964,7 @@ friend class IHTTPGateway;
959964 std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
960965 std::priority_queue<std::pair<TInstant, TEasyCurlBuffer::TPtr>> Delayed;
961966
962- std::mutex Sync;
967+ std::shared_ptr<std:: mutex> Sync = std::make_shared<std::mutex>() ;
963968 std::thread Thread;
964969 std::atomic<bool > IsStopped = false ;
965970
0 commit comments