Skip to content

Commit

Permalink
Improvements and fixes (bvarga#27)
Browse files Browse the repository at this point in the history
* Access frames as byte and TBytes

* Added function zmq_ctx_shutdown

* Fix memory leak

* Split ETERM to dedicated exception, inherited from EAbort

* Correct Unicode/Utf8 string handling;
Removed unused variables

* Check for Assigned(fContext) before removing socket
  • Loading branch information
WhiteWind authored and bvarga committed Oct 13, 2018
1 parent ffb011a commit 8c4126a
Showing 1 changed file with 55 additions and 68 deletions.
123 changes: 55 additions & 68 deletions zmqapi.pas
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

{$ifdef FPC}
{$mode delphi}{$H+}
{$else}
{$WARN IMPLICIT_STRING_CAST OFF}
{$WARN IMPLICIT_STRING_CAST_LOSS OFF}
{$endif}

{$I zmq.inc}
Expand Down Expand Up @@ -88,6 +91,8 @@ EZMQException = class( Exception )
property Num: Integer read errnum;
end;

EZMQTerminate = class(EAbort);

TZMQContext = class;
TZMQSocket = class;

Expand Down Expand Up @@ -794,11 +799,8 @@ function TZMQFrame.getAsInteger: Integer;
end;

function TZMQFrame.getAsUtf8String: Utf8String;
var
t: AnsiString;
begin
SetString( t, PAnsiChar(data), size );
result := t;
SetString( Result, PAnsiChar(data), size );
end;

procedure TZMQFrame.setAsByte(const Value: Byte);
Expand Down Expand Up @@ -1103,7 +1105,7 @@ function TZMQMsg.saveasHex: Utf8String;
begin
for i := 0 to size - 1 do
begin
result := result + item[i].asHexString;
result := result + RawByteString(item[i].asHexString);
if i < size - 1 then
result := result + #13 + #10;
end;
Expand Down Expand Up @@ -1149,7 +1151,8 @@ destructor TZMQSocket.destroy;
DeRegisterMonitor;
{$endif}
close;
fContext.RemoveSocket( Self );
if Assigned(fContext) then
fContext.RemoveSocket( Self );
{$ifdef zmq3}
fAcceptFilter.Free;
{$endif}
Expand All @@ -1172,6 +1175,8 @@ function TZMQSocket.CheckResult( rc: Integer ): Integer;
if rc = -1 then
begin
errn := zmq_errno;
if errn = ETERM then
raise EZMQTerminate.Create('ZMQ context shutdown');
if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
raise EZMQException.Create( errn );
end else
Expand Down Expand Up @@ -1670,47 +1675,32 @@ procedure TZMQSocket.unSubscribe( filter: AnsiString );

{$ifdef zmq3}
function TZMQSocket.sendBuffer( const Buffer; len: Size_t; flags: TZMQSendFlags = [] ): Integer;
var
errn: Integer;
begin
result := zmq_send( SocketPtr, Buffer, len, Byte( flags ) );
if result < -1 then
raise EZMQException.Create('zmq_send return value less than -1.')
else if result = -1 then
begin
errn := zmq_errno;
if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
raise EZMQException.Create( errn );
end;
if result < 0 then
result := CheckResult(result);
end;
{$endif}

// sends the msg, and FreeAndNils it if successful. the return value is the number of
// bytes in the msg if successful, if not returns -1, and the msgs is not discarded.
function TZMQSocket.send( var msg: TZMQFrame; flags: Integer = 0 ): Integer;
var
errn: Integer;
begin
{$ifdef zmq3}
result := zmq_sendmsg( SocketPtr, msg.fMessage, flags );
//result := zmq_msg_send( msg.fMessage, SocketPtr, flags );

if result < -1 then
raise EZMQException.Create('zmq_sendmsg return value less than -1.')
else if result = -1 then
begin
errn := zmq_errno;
if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
raise EZMQException.Create( errn );
end else
if result < 0 then
result := CheckResult(result)
else
FreeAndNil( msg );
{$else}
result := msg.size;
try
if CheckResult( zmq_send( SocketPtr, msg.fMessage, flags ) ) = 0 then
FreeAndNil( msg )
else
result := -1;
if CheckResult( zmq_send( SocketPtr, msg.fMessage, flags ) ) = 0 then
FreeAndNil( msg )
else
result := -1;
except
on e: Exception do
begin
Expand Down Expand Up @@ -1853,18 +1843,10 @@ function TZMQSocket.send(msg: TBytes; flags: TZMQSendFlags): Integer;

{$ifdef zmq3}
function TZMQSocket.recvBuffer( var Buffer; len: size_t; flags: TZMQRecvFlags = [] ): Integer;
var
errn: Integer;
begin
result := zmq_recv( SocketPtr, Buffer, len, Byte( flags ) );
if result < -1 then
raise EZMQException.Create('zmq_recv return value less than -1.')
else if result = -1 then
begin
errn := zmq_errno;
if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
raise EZMQException.Create( errn );
end;
if result < 0 then
result := CheckResult(result);
end;

procedure MonitorProc( ZMQMonitorRec: PZMQMonitorRec );
Expand Down Expand Up @@ -1964,25 +1946,17 @@ procedure TZMQSocket.DeRegisterMonitor;
{$endif}

function TZMQSocket.recv( var msg: TZMQFrame; flags: Integer = 0 ): Integer;
var
errn: Integer;
begin
if msg = nil then
msg := TZMQFrame.Create;
if msg.size > 0 then
msg.rebuild;

{$ifdef zmq3}
result := zmq_recvmsg( SocketPtr, msg.fMessage, flags );
result := zmq_recvmsg(SocketPtr, msg.fMessage, flags);
// result := zmq_msg_recv( msg.fMessage, SocketPtr, flags );
if result < -1 then
raise EZMQException.Create('zmq_recvmsg return value less than -1.')
else if result = -1 then
begin
errn := zmq_errno;
if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
raise EZMQException.Create( errn );
end;
if Result < 0 then
Result := CheckResult(Result);
{$else}
result := -1;
if CheckResult( zmq_recv( SocketPtr, msg.fMessage, flags ) ) = 0 then
Expand Down Expand Up @@ -2410,19 +2384,19 @@ procedure TZMQPoller.Execute;
reglistcap,
reglistcount: Integer;

procedure AddToRegList( so: TZMQSocket; ev: TZMQPollEvents; reg: Boolean; sync: Boolean );
begin
if reglistcap = reglistcount then
procedure AddToRegList( so: TZMQSocket; ev: TZMQPollEvents; reg: Boolean; sync: Boolean );
begin
reglistcap := reglistcap + 10;
SetLength( reglist, reglistcap );
if reglistcap = reglistcount then
begin
reglistcap := reglistcap + 10;
SetLength( reglist, reglistcap );
end;
reglist[reglistcount].socket := so;
reglist[reglistcount].events := ev;
reglist[reglistcount].reg := reg;
reglist[reglistcount].sync := sync;
inc( reglistcount );
end;
reglist[reglistcount].socket := so;
reglist[reglistcount].events := ev;
reglist[reglistcount].reg := reg;
reglist[reglistcount].sync := sync;
inc( reglistcount );
end;

begin
reglistcap := 10;
Expand Down Expand Up @@ -2533,11 +2507,10 @@ procedure AddToRegList( so: TZMQSocket; ev: TZMQPollEvents; reg: Boolean; sync:
except
on e: Exception do
begin
if ( e is EZMQException ) and
( EZMQException(e).Num = ETERM ) then
if ( e is EZMQTerminate ) then
Terminate;
if Assigned( fOnException ) then
fOnException( e );
if Assigned( fOnException ) then
fOnException( e );
end;
end;
msg.Free;
Expand Down Expand Up @@ -2610,6 +2583,7 @@ procedure TZMQPoller.setPollNumber( const Value: Integer; bWait: Boolean = false
function TZMQPoller.poll( timeout: Integer = -1; lPollNumber: Integer = -1 ): Integer;
var
pc, i: Integer;
errno: Integer;
begin
if not fSync then
raise EZMQException.Create('Poller hasn''t created in Synchronous mode');
Expand All @@ -2633,7 +2607,13 @@ function TZMQPoller.poll( timeout: Integer = -1; lPollNumber: Integer = -1 ): In

result := zmq_poll( fPollItem[0], pc, timeout );
if result < 0 then
raise EZMQException.Create
begin
errno := zmq_errno;
if errno = ETERM then
raise EZMQTerminate.Create('Context terminating')
else
raise EZMQException.Create(errno);
end;
end;

function TZMQPoller.getPollResult( indx: Integer ): TZMQPollItem;
Expand Down Expand Up @@ -2690,14 +2670,21 @@ function ZMQPoll( var pia: TZMQPollItemA; piaSize: Integer = -1; timeout: Intege
function ZMQPoll( var pia: TZMQPollItem; piaSize: Integer = 1; timeout: Integer = -1 ): Integer; overload;
var
PollItem: zmq.pollitem_t;
errno: Integer;
begin
PollItem.socket := pia.Socket.SocketPtr;
PollItem.fd := 0;
PollItem.events := Byte( pia.events );
PollItem.revents := 0;
result := zmq_poll( PollItem, piaSize, timeout );
if result < 0 then
raise EZMQException.Create;
begin
errno := zmq_errno;
if errno = ETERM then
raise EZMQTerminate.Create('Context terminating')
else
raise EZMQException.Create(errno);
end;
Byte(pia.revents) := PollItem.revents;
end;

Expand Down

0 comments on commit 8c4126a

Please sign in to comment.