Skip to content

Commit

Permalink
Fixed an issue where engine would crash randomly, or when changing sc…
Browse files Browse the repository at this point in the history
…enes

When a RPC client is destroyed, there could be an issue where its Conduits would be destroyed, and then FRunnableThread would try to access them, crashing the engine in process.
To mitigate this, BeginDestroy, which is called before destructor, would Stop the RPC client; also, FRunnableThread is stored via an Atomic; this forbids synchronization issues with multiple calls to Stop()
  • Loading branch information
koriandrei committed Sep 10, 2019
1 parent c6cc028 commit a70bc9f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
41 changes: 24 additions & 17 deletions Source/InfraworldRuntime/Private/RpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ bool URpcClient::Init(const FString& URI, UChannelCredentials* ChannelCredential
// Launch 'chaining' hierarchical init, which will init a superclass (a concrete implementation).
HierarchicalInit();

InnerWorker->URI = URI;
InnerWorker->ChannelCredentials = ChannelCredentials;

// Retrieve and set an Error Message Queue
if (InnerWorker)
{
InnerWorker->URI = URI;
InnerWorker->ChannelCredentials = ChannelCredentials;

InnerWorker->ErrorMessageQueue = &ErrorMessageQueue;

const FString ThreadName(FString::Printf(TEXT("RPC Client Thread %s %d"), *(GetClass()->GetName()), FMath::RandRange(0, TNumericLimits<int32>::Max())));
Thread = FRunnableThread::Create(InnerWorker.Get(), *ThreadName);

bCanSendRequests = true;
UE_LOG(LogTemp, Verbose, TEXT("Just made a thread: %s"), *ThreadName);
UE_LOG(LogTemp, Verbose, TEXT("Just made a thread: %s, address %lld"), *ThreadName, reinterpret_cast<int64>(InnerWorker.Get()));
}
else
{
Expand Down Expand Up @@ -100,12 +100,6 @@ URpcClient::~URpcClient()
{
UE_LOG(LogTemp, Verbose, TEXT("An instance of RPC Client has been destroyed. Still can send requests: %s"),
*UKismetStringLibrary::Conv_BoolToString(CanSendRequests()));

// Being called when GC'ed, should be called synchronously.
if (CanSendRequests())
{
Stop(true);
}
}

void URpcClient::Update()
Expand Down Expand Up @@ -149,23 +143,36 @@ URpcClient* URpcClient::CreateRpcClientUri(TSubclassOf<URpcClient> Class, const
}
}

void URpcClient::BeginDestroy()
{
// Being called when GC'ed, should be called synchronously.
if (CanSendRequests())
{
Stop(true);
}

Super::BeginDestroy();
}

void URpcClient::Stop(bool bSynchronous)
{
if (Thread)
FRunnableThread* ThreadToStop = Thread.Exchange(nullptr);

if (ThreadToStop)
{
if (!InnerWorker->IsPendingStopped())
InnerWorker->MarkPendingStopped();

bCanSendRequests = false;
UE_LOG(LogTemp, Verbose, TEXT("Scheduled to stop %s via setting 'bCanSendRequests = false'"), *(GetClass()->GetName()));
UE_LOG(LogTemp, Verbose, TEXT("Scheduled to stop %s via setting 'bCanSendRequests = false', address %lld"), *(GetClass()->GetName()), reinterpret_cast<int64>(InnerWorker.Get()));

// Should be synchronous in (almost) any case
Thread->Kill(bSynchronous);
ThreadToStop->Kill(bSynchronous);

delete Thread;
Thread = nullptr;
FTicker::GetCoreTicker().RemoveTicker(TickDelegateHandle);
delete ThreadToStop;
ThreadToStop = nullptr;
FTicker::GetCoreTicker().RemoveTicker(TickDelegateHandle);
}
else
{
Expand Down
5 changes: 4 additions & 1 deletion Source/InfraworldRuntime/Public/RpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "Containers/Queue.h"
#include "Templates/SubclassOf.h"
#include "Templates/Atomic.h"

#include "RpcClientWorker.h"
#include "ChannelCredentials.h"
Expand Down Expand Up @@ -129,11 +130,13 @@ class INFRAWORLDRUNTIME_API URpcClient : public UObject
TUniquePtr<RpcClientWorker> InnerWorker;

private:
virtual void BeginDestroy() override;

/** Whether the RPC Client could send requests or not */
bool bCanSendRequests = false;

/** A thread, where RPC client worker will reside */
FRunnableThread* Thread = nullptr;
TAtomic<FRunnableThread*> Thread = nullptr;

/** An accumulator for error messages */
TQueue<FRpcError> ErrorMessageQueue;
Expand Down

0 comments on commit a70bc9f

Please sign in to comment.