Skip to content

Commit

Permalink
Shadow Context + Thread checking removed
Browse files Browse the repository at this point in the history
  • Loading branch information
bvarga committed Jan 9, 2013
1 parent dd67a13 commit e7db05d
Showing 1 changed file with 36 additions and 39 deletions.
75 changes: 36 additions & 39 deletions zmqapi.pas
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,6 @@ TZMQSocket = class
protected
fSocket: Pointer;
fContext: TZMQContext;
{$ifdef FPC}
fThreadId: TThreadID;
{$else}
fThreadId: Cardinal;
{$endif}
private
fRaiseEAgain: Boolean;
{$ifdef zmq3}
Expand Down Expand Up @@ -447,12 +442,17 @@ TZMQContext = class
{$endif}
protected
fTerminated: Boolean;
fMainThread: Boolean;
constructor createShadow( context: TZMQContext );

procedure CheckResult( rc: Integer );
procedure RemoveSocket( lSocket: TZMQSocket );

public
constructor create{$ifndef zmq3}( io_threads: Integer = 1 ){$endif};
destructor Destroy; override;

function Shadow: TZMQContext;
function Socket( stype: TZMQSocketType ): TZMQSocket;
procedure Terminate;
property ContextPtr: Pointer read fContext;
Expand Down Expand Up @@ -881,11 +881,6 @@ function TZMQMsg.getItem( indx: Integer ): TZMQFrame;

constructor TZMQSocket.Create;
begin
{$ifdef FPC}
fThreadId := GetCurrentThreadId;
{$else}
fThreadId := Windows.GetCurrentThreadId;
{$endif}
fRaiseEAgain := False;
{$ifdef zmq3}
fAcceptFilter := TStringList.Create;
Expand Down Expand Up @@ -1808,6 +1803,7 @@ function TZMQSocket.recv( msg: TStrings; flags: TZMQRecvFlags = [] ): Integer;
constructor TZMQContext.create{$ifndef zmq3}( io_threads: Integer ){$endif};
begin
fTerminated := false;
fMainThread := true;
contexts.Add( Self );
{$ifdef zmq3}
fContext := zmq_ctx_new;
Expand All @@ -1821,45 +1817,36 @@ constructor TZMQContext.create{$ifndef zmq3}( io_threads: Integer ){$endif};
fSockets := TList.Create;
end;

constructor TZMQContext.createShadow( context: TZMQContext );
begin
fTerminated := false;
fMainThread := false;
contexts.Add( Self );
fContext := context.ContextPtr;
fLinger := context.Linger;
fSockets := TList.Create;
end;

destructor TZMQContext.destroy;
var
i: Integer;
{$ifdef FPC}
fThreadId: TThreadID;
{$else}
fThreadId: Cardinal;
{$endif}

begin
if fLinger >= -1 then
for i:= 0 to fSockets.Count - 1 do
TZMQSocket(fSockets[i]).Linger := Linger;

// if Socket created in the same Thread, it's safe to
// terminate here.
{$ifdef FPC}
fThreadId := GetCurrentThreadId;
{$else}
fThreadId := Windows.GetCurrentThreadId;
{$endif}

i := 0;
while i < fSockets.Count do
begin
if TZMQSocket(fSockets[i]).fThreadId = fThreadId then
TZMQSocket(fSockets[i]).Free;
Inc( i );
end;
for i := 0 to fSockets.Count - 1 do
TZMQSocket(fSockets[i]).Free;

if fContext <> nil then
if ( fContext <> nil ) and fMainThread then
begin
{$ifdef zmq3}
CheckResult( zmq_ctx_destroy( ContextPtr ) );
{$else}
CheckResult( zmq_term( ContextPtr ) );
{$endif}
fContext := nil;
end;
fContext := nil;

fSockets.Free;
contexts.Delete( contexts.IndexOf(Self) );
Expand All @@ -1876,11 +1863,16 @@ procedure TZMQContext.Terminate;
{$ifndef unix}
p := ContextPtr;
fContext := nil;
{$ifdef zmq3}
CheckResult( zmq_ctx_destroy( p ) );
{$else}
CheckResult( zmq_term( p ) );
{$endif}

if fMainThread then
begin
{$ifdef zmq3}
CheckResult( zmq_ctx_destroy( p ) );
{$else}
CheckResult( zmq_term( p ) );
{$endif}
end;

{$endif}
end;
end;
Expand Down Expand Up @@ -1935,6 +1927,11 @@ procedure TZMQContext.setMaxSockets( const Value: Integer );

{$endif}

function TZMQContext.Shadow: TZMQContext;
begin
result := TZMQContext.createShadow( self );
end;

function TZMQContext.Socket( stype: TZMQSocketType ): TZMQSocket;
begin
EnterCriticalSection( cs );
Expand Down Expand Up @@ -2434,7 +2431,7 @@ function console_handler( dwCtrlType: DWORD ): BOOL;
begin
if CTRL_C_EVENT = dwCtrlType then
begin
for i := 0 to contexts.Count - 1 do
for i := contexts.Count - 1 downto 0 do
TZMQContext(contexts[i]).Terminate;
result := True;
// if I set to True than the app won't exit,
Expand Down

0 comments on commit e7db05d

Please sign in to comment.