Skip to content

Commit

Permalink
bug fixes and removed terminated parameter from threadproc and thread…
Browse files Browse the repository at this point in the history
…meth
  • Loading branch information
bvarga committed Jan 17, 2013
1 parent c665e1c commit 029faae
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 64 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ Monitoring Sockets ( just available in `v3.2.2`)

- Creating a detached thread.

procedure TMyClass.DetachedProc( args: Pointer; context: TZMQContext; terminated: PBoolean );
procedure TMyClass.DetachedMeth( args: Pointer; context: TZMQContext );
var
socket: TZMQSocket;
begin
Expand All @@ -201,7 +201,7 @@ Monitoring Sockets ( just available in `v3.2.2`)
begin
New( sockettype );
sockettype^ := stDealer;
thr := TZMQThread.CreateDetached( DetachedProc, sockettype );
thr := TZMQThread.CreateDetached( DetachedMeth, sockettype );
thr.FreeOnTerminate := true;
thr.Resume;
end;
Expand Down Expand Up @@ -237,12 +237,12 @@ Monitoring Sockets ( just available in `v3.2.2`)

- Creating an attached thread.

procedure TMyClass.AttachedProc( args: Pointer; context: TZMQContext; pipe: TZMQSocket; terminated: PBoolean );
procedure TMyClass.AttachedMeth( args: Pointer; context: TZMQContext; pipe: TZMQSocket );
var
socket: TZMQSocket;
msg: Utf8String;
begin
while not Terminated and not context.Terminated do
while not context.Terminated do
begin
// do some cool stuff.
socket := Context.socket( TZMQSocketType(Args^) );
Expand All @@ -258,7 +258,7 @@ Monitoring Sockets ( just available in `v3.2.2`)
begin
New( sockettype );
sockettype^ := stDealer;
thr := TZMQThread.CreateAttached( AttachedProc, context, sockettype );
thr := TZMQThread.CreateAttached( AttachedMeth, context, sockettype );
thr.FreeOnTerminate := true;
thr.Resume;

Expand Down
51 changes: 10 additions & 41 deletions tests/ThreadTestCase.pas
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ TThreadTestCase = class( TTestCase )
procedure SetUp; override;
procedure TearDown; override;

procedure DetachedTestProc( args: Pointer; context: TZMQContext; terminated: PBoolean );
procedure AttachedTestProc( args: Pointer; context: TZMQContext; pipe: TZMQSocket; terminated: PBoolean );
procedure DetachedTestMeth( args: Pointer; context: TZMQContext );
procedure AttachedTestMeth( args: Pointer; context: TZMQContext; pipe: TZMQSocket );

procedure AttachedPipeTestProc( args: Pointer; context: TZMQContext; pipe: TZMQSocket; terminated: PBoolean );

procedure Detached2TestProc( args: Pointer; context: TZMQContext; terminated: PBoolean );
procedure AttachedPipeTestMeth( args: Pointer; context: TZMQContext; pipe: TZMQSocket );

procedure InheritedThreadTerminate( Sender: TObject );

Expand All @@ -57,7 +55,6 @@ TThreadTestCase = class( TTestCase )
procedure CreateInheritedAttachedTest;
procedure CreateInheritedDetachedTest;

procedure CreateDetached2Test;

procedure AttachedPipeTest;
end;
Expand Down Expand Up @@ -99,7 +96,7 @@ procedure TThreadTestCase.TearDown;
CloseHandle( ehandle );
end;

procedure TThreadTestCase.AttachedTestProc( args: Pointer; context: TZMQContext; pipe: TZMQSocket; terminated: PBoolean );
procedure TThreadTestCase.AttachedTestMeth( args: Pointer; context: TZMQContext; pipe: TZMQSocket );
begin
tvar := true;
SetEvent( ehandle );
Expand All @@ -109,7 +106,7 @@ procedure TThreadTestCase.CreateAttachedTest;
var
thr: TZMQThread;
begin
thr := TZMQThread.CreateAttached( AttachedTestProc, context, nil );
thr := TZMQThread.CreateAttached( AttachedTestMeth, context, nil );
thr.FreeOnTerminate := true;
thr.Resume;

Expand All @@ -118,7 +115,7 @@ procedure TThreadTestCase.CreateAttachedTest;

end;

procedure TThreadTestCase.DetachedTestProc( args: Pointer; context: TZMQContext; terminated: PBoolean );
procedure TThreadTestCase.DetachedTestMeth( args: Pointer; context: TZMQContext );
var
socket: TZMQSocket;
begin
Expand All @@ -135,7 +132,7 @@ procedure TThreadTestCase.CreateDetachedTest;
begin
New( sockettype );
sockettype^ := stDealer;
thr := TZMQThread.CreateDetached( DetachedTestProc, sockettype );
thr := TZMQThread.CreateDetached( DetachedTestMeth, sockettype );
thr.FreeOnTerminate := true;
thr.Resume;

Expand Down Expand Up @@ -174,36 +171,8 @@ procedure TThreadTestCase.CreateInheritedDetachedTest;

end;

procedure TThreadTestCase.Detached2TestProc( args: Pointer; context: TZMQContext;
terminated: PBoolean );
begin
while not Terminated^ do
begin
// do something.
inc( tmpI );
end;
SetEvent( ehandle );
end;

procedure TThreadTestCase.CreateDetached2Test;
var
thr: TZMQThread;
begin

thr := TZMQThread.CreateDetached( Detached2TestProc, nil );
thr.FreeOnTerminate := true;
thr.Resume;

sleep(10);
thr.Terminate;

WaitForSingleObject( ehandle, INFINITE );
CheckTrue( tmpI > 100, 'thread cycles less than 100' );

end;

procedure TThreadTestCase.AttachedPipeTestProc(args: Pointer;
context: TZMQContext; pipe: TZMQSocket; terminated: PBoolean);
procedure TThreadTestCase.AttachedPipeTestMeth(args: Pointer;
context: TZMQContext; pipe: TZMQSocket );
begin
pipe.recv( tmpS );
SetEvent( ehandle );
Expand All @@ -213,7 +182,7 @@ procedure TThreadTestCase.AttachedPipeTest;
var
thr: TZMQThread;
begin
thr := TZMQThread.CreateAttached( AttachedPipeTestProc, context, nil );
thr := TZMQThread.CreateAttached( AttachedPipeTestMeth, context, nil );
thr.FreeOnTerminate := true;
thr.Resume;

Expand Down
21 changes: 13 additions & 8 deletions zhelpers.pas
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ interface
;

procedure s_dump( socket: TZMQSocket );
function s_random( len: Integer ): Utf8String;
procedure s_set_id( socket: TZMQSocket );

// for threadSafe logging to the console.
Expand All @@ -19,7 +20,7 @@ implementation

uses
Windows;

var
cs: TRTLCriticalSection;

Expand Down Expand Up @@ -73,20 +74,24 @@ procedure s_dump( socket: TZMQSocket );
end;
end;

// Set simple random printable identity on socket
//
procedure s_set_id( socket: TZMQSocket );
function s_random( len: Integer ): Utf8String;
const
Chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ';
var
s: String;
i: integer;
begin
Randomize;
S := '';
for i := 1 to 10 do
S := S + Chars[Random(Length(Chars)) + 1];
socket.Identity := s;
result := '';
for i := 1 to len do
result := result + Chars[Random(Length(Chars)) + 1];
end;

// Set simple random printable identity on socket
//
procedure s_set_id( socket: TZMQSocket );
begin
socket.Identity := s_random( 10 );
end;

initialization
Expand Down
50 changes: 40 additions & 10 deletions zmqapi.pas
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,11 @@ TZMQPoller = class( TThread )

type
// Thread related functions.
TDetachedThreadProc = procedure( args: Pointer; context: TZMQContext; Terminated: PBoolean ) of object;
TAttachedThreadProc = procedure( args: Pointer; Context: TZMQContext; Pipe: TZMQSocket; Terminated: PBoolean ) of object;
TDetachedThreadMeth = procedure( args: Pointer; context: TZMQContext ) of object;
TAttachedThreadMeth = procedure( args: Pointer; Context: TZMQContext; Pipe: TZMQSocket ) of object;

TDetachedThreadProc = procedure( args: Pointer; context: TZMQContext );
TAttachedThreadProc = procedure( args: Pointer; Context: TZMQContext; Pipe: TZMQSocket );

TZMQThread = class( TThread )
private
Expand All @@ -553,14 +556,18 @@ TZMQThread = class( TThread )
// attached thread pipe in the new thread.
thrPipe: TZMQSocket;

fDetachedMeth: TDetachedThreadMeth;
fAttachedMeth: TAttachedThreadMeth;
fDetachedProc: TDetachedThreadProc;
fAttachedProc: TAttachedThreadProc;
fContext: TZMQContext;
fArgs: Pointer;
public
constructor Create( lArgs: Pointer; ctx: TZMQContext );
constructor CreateAttached( lAttachedProc: TAttachedThreadProc; ctx: TZMQContext; lArgs: Pointer );
constructor CreateDetached( lDetachedProc: TDetachedThreadProc; lArgs: Pointer );
constructor CreateAttached( lAttachedMeth: TAttachedThreadMeth; ctx: TZMQContext; lArgs: Pointer );
constructor CreateDetached( lDetachedMeth: TDetachedThreadMeth; lArgs: Pointer );
constructor CreateAttachedProc( lAttachedProc: TAttachedThreadProc; ctx: TZMQContext; lArgs: Pointer );
constructor CreateDetachedProc( lDetachedProc: TDetachedThreadProc; lArgs: Pointer );
destructor Destroy; override;
protected
procedure Execute; override;
Expand Down Expand Up @@ -716,10 +723,11 @@ function TZMQFrame.dump: Utf8String;
iSize := size;
if iSize = 0 then
result := ''
else if Integer(data^) = 0 then
else if AnsiChar(data^) = #0 then
begin
SetLength( sutf8, iSize * 2 );
BinToHex( data, PAnsiChar(sutf8), iSize );
result := sutf8;
end else
result := asUtf8String;
end;
Expand Down Expand Up @@ -2348,7 +2356,7 @@ procedure TZMQPoller.setPollNumber( const Value: Integer; bWait: Boolean = false
/// sockets polled
function TZMQPoller.poll( timeout: Integer = -1; lPollNumber: Integer = -1 ): Integer;
var
pc: Integer;
pc, i: Integer;
begin
if not fSync then
raise EZMQException.Create('Poller hasn''t created in Synchronous mode');
Expand All @@ -2367,6 +2375,9 @@ function TZMQPoller.poll( timeout: Integer = -1; lPollNumber: Integer = -1 ): In
timeout := timeout * 1000;
{$endif}

for i := 0 to fPollItemCount - 1 do
fPollItem[i].revents := 0;

result := zmq_poll( fPollItem[0], pc, timeout );
if result < 0 then
raise EZMQException.Create
Expand Down Expand Up @@ -2498,14 +2509,26 @@ constructor TZMQThread.Create( lArgs: Pointer; ctx: TZMQContext );
end;
end;

constructor TZMQThread.CreateAttached( lAttachedProc: TAttachedThreadProc; ctx: TZMQContext;
constructor TZMQThread.CreateAttached( lAttachedMeth: TAttachedThreadMeth; ctx: TZMQContext;
lArgs: Pointer);
begin
Create( lArgs, ctx );
fAttachedMeth := lAttachedMeth;
end;

constructor TZMQThread.CreateDetached( lDetachedMeth: TDetachedThreadMeth; lArgs: Pointer);
begin
Create( lArgs, nil );
fDetachedMeth := lDetachedMeth;
end;

constructor TZMQThread.CreateAttachedProc( lAttachedProc: TAttachedThreadProc; ctx: TZMQContext; lArgs: Pointer );
begin
Create( lArgs, ctx );
fAttachedProc := lAttachedProc;
end;

constructor TZMQThread.CreateDetached( lDetachedProc: TDetachedThreadProc; lArgs: Pointer);
constructor TZMQThread.CreateDetachedProc( lDetachedProc: TDetachedThreadProc; lArgs: Pointer );
begin
Create( lArgs, nil );
fDetachedProc := lDetachedProc;
Expand All @@ -2520,11 +2543,18 @@ destructor TZMQThread.Destroy;

procedure TZMQThread.DoExecute;
begin
if Assigned( fAttachedMeth ) then
fAttachedMeth( fArgs, Context, thrPipe )
else
if Assigned( fDetachedMeth ) then
fDetachedMeth( fArgs, Context )
else
if Assigned( fAttachedProc ) then
fAttachedProc( fArgs, Context, thrPipe, @Terminated )
fAttachedProc( fArgs, Context, thrPipe )
else
if Assigned( fDetachedProc ) then
fDetachedProc( fArgs, Context, @Terminated );
fDetachedProc( fArgs, Context );

end;

procedure TZMQThread.Execute;
Expand Down

0 comments on commit 029faae

Please sign in to comment.