Skip to content

Commit

Permalink
Fix memory leak and some new methods (bvarga#24)
Browse files Browse the repository at this point in the history
* Access frames as byte and TBytes

* Added function zmq_ctx_shutdown

* Fix memory leak
  • Loading branch information
WhiteWind authored and bvarga committed Jun 4, 2017
1 parent 90a1cae commit 1f327d6
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
1 change: 1 addition & 0 deletions zmq.pas
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ function zmq_ctx_new: Pointer; cdecl; external {$IFNDEF ZMQ_STATIC_LINK}libzmq{$
function zmq_ctx_destroy( context: Pointer ): Integer; cdecl; external {$IFNDEF ZMQ_STATIC_LINK}libzmq{$ENDIF};
function zmq_ctx_set( context: Pointer; option: Integer; optval: Integer ): Integer; cdecl; external {$IFNDEF ZMQ_STATIC_LINK}libzmq{$ENDIF};
function zmq_ctx_get( context: Pointer; option: Integer ): Integer; cdecl; external {$IFNDEF ZMQ_STATIC_LINK}libzmq{$ENDIF};
function zmq_ctx_shutdown( context: Pointer ): Integer; cdecl; external {$IFNDEF ZMQ_STATIC_LINK}libzmq{$ENDIF};
{$endif}

{* Old (legacy) API *}
Expand Down
104 changes: 100 additions & 4 deletions zmqapi.pas
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
{
Copyright (c) 2012 Varga Balázs (bb.varga@gmail.com)
This file is part of 0MQ Delphi binding
Expand Down Expand Up @@ -114,7 +114,10 @@ TZMQFrame = class

function getAsUtf8String: Utf8String;
procedure setAsUtf8String(const Value: Utf8String);

function getAsByte: Byte;
procedure setAsByte(const Value: Byte);
function getAsBytes: TBytes;
procedure setAsBytes(const Value: TBytes);
public
constructor create; overload;
constructor create( size: size_t ); overload;
Expand Down Expand Up @@ -142,6 +145,8 @@ TZMQFrame = class
property asUtf8String: Utf8String read getAsUtf8String write setAsUtf8String;
property asHexString: AnsiString read getAsHexString write setAsHexString;
property asInteger: Integer read getAsInteger write setAsInteger;
property asByte: Byte read getAsByte write setAsByte;
property asBytes: TBytes read getAsBytes write setAsBytes;
end;

// for multipart message
Expand Down Expand Up @@ -182,6 +187,8 @@ TZMQMsg = class
function add( msg: TZMQFrame ): Integer;
function addstr( msg: Utf8String ): Integer;
function addint( msg: Integer ): Integer;
function addByte( msg: Byte): Integer;
function addBytes(const msg: TBytes ): Integer;

// Push frame plus empty frame to front of message, before first frame.
// Message takes ownership of frame, will destroy it when message is sent.
Expand Down Expand Up @@ -373,6 +380,7 @@ TZMQSocket = class
function send( var msg: TZMQFrame; flags: TZMQSendFlags = [] ): Integer; overload;
function send( strm: TStream; size: Integer; flags: TZMQSendFlags = [] ): Integer; overload;
function send( msg: Utf8String; flags: TZMQSendFlags = [] ): Integer; overload;
function send( msg: TBytes; flags: TZMQSendFlags = [] ): Integer; overload;

function send( var msgs: TZMQMsg; dontwait: Boolean = false ): Integer; overload;
function send( msg: Array of Utf8String; dontwait: Boolean = false ): Integer; overload;
Expand All @@ -384,6 +392,7 @@ TZMQSocket = class
function recv( msg: TZMQFrame; flags: TZMQRecvFlags = [] ): Integer; overload;
function recv( strm: TStream; flags: TZMQRecvFlags = [] ): Integer; overload;
function recv( var msg: Utf8String; flags: TZMQRecvFlags = [] ): Integer; overload;
function recv( var msg: TBytes; flags: TZMQRecvFlags = [] ): Integer; overload;

function recv( var msgs: TZMQMsg; flags: TZMQRecvFlags = [] ): Integer; overload;
function recv( msg: TStrings; flags: TZMQRecvFlags = [] ): Integer; overload;
Expand Down Expand Up @@ -470,6 +479,10 @@ TZMQContext = class
function Shadow: TZMQContext;
function Socket( stype: TZMQSocketType ): TZMQSocket;
procedure Terminate;

{$IFDEF zmq3}
procedure Shutdown;
{$ENDIF}
property ContextPtr: Pointer read fContext;

// < -1 means dont change linger when destroy
Expand Down Expand Up @@ -758,6 +771,17 @@ function TZMQFrame.dump: Utf8String;
result := asUtf8String;
end;

function TZMQFrame.getAsByte: Byte;
begin
Result := Byte(data^);
end;

function TZMQFrame.getAsBytes: TBytes;
begin
SetLength(Result, size);
System.Move(data^, Result[0], size);
end;

function TZMQFrame.getAsHexString: AnsiString;
begin
SetLength( result, size * 2 );
Expand All @@ -777,6 +801,21 @@ function TZMQFrame.getAsUtf8String: Utf8String;
result := t;
end;

procedure TZMQFrame.setAsByte(const Value: Byte);
var
iSize: Integer;
begin
iSize := SizeOf( Value );
rebuild( iSize );
Integer(data^) := Value;
end;

procedure TZMQFrame.setAsBytes(const Value: TBytes);
begin
rebuild(Length(Value));
System.Move(Value[0], data^, Length(Value));
end;

procedure TZMQFrame.setAsHexString( const Value: AnsiString );
var
iSize: Integer;
Expand Down Expand Up @@ -921,6 +960,24 @@ function TZMQMsg.addstr( msg: Utf8String ): Integer;
result := add( frame );
end;

function TZMQMsg.addByte(msg: Byte): Integer;
var
frame: TZMQFrame;
begin
frame := TZMQFrame.create( sizeOf( Byte ) );
frame.asByte := msg;
result := add( frame );
end;

function TZMQMsg.addBytes(const msg: TBytes): Integer;
var
frame: TZMQFrame;
begin
frame := TZMQFrame.create( Length(msg) );
frame.asBytes := msg;
result := add( frame );
end;

function TZMQMsg.addint( msg: Integer ): Integer;
var
frame: TZMQFrame;
Expand Down Expand Up @@ -1780,6 +1837,20 @@ function TZMQSocket.send( msg: TStrings; dontwait: Boolean = false ): Integer;
end;
end;

function TZMQSocket.send(msg: TBytes; flags: TZMQSendFlags): Integer;
var
frame: TZMQFrame;
begin
frame := TZMQFrame.create;
try
frame.asBytes := msg;
result := send( frame, flags );
finally
if frame <> nil then
frame.Free;
end;
end;

{$ifdef zmq3}
function TZMQSocket.recvBuffer( var Buffer; len: size_t; flags: TZMQRecvFlags = [] ): Integer;
var
Expand Down Expand Up @@ -1958,13 +2029,18 @@ function TZMQSocket.recv( var msgs: TZMQMsg; flags: TZMQRecvFlags = [] ): Intege
begin
if msgs = nil then
msgs := TZMQMsg.Create;

bRcvMore := True;
result := 0;
while bRcvMore do
begin
msg := TZMQFrame.create;
rc := recv( msg, flags );
try
rc := recv( msg, flags );
except
msg.Free;
raise;
end;
if rc <> -1 then
begin
msgs.Add( msg );
Expand Down Expand Up @@ -1995,6 +2071,19 @@ function TZMQSocket.recv( msg: TStrings; flags: TZMQRecvFlags = [] ): Integer;
end;
end;

function TZMQSocket.recv(var msg: TBytes; flags: TZMQRecvFlags): Integer;
var
frame: TZMQFrame;
begin
frame := TZMQFrame.Create;
try
Result := recv( frame, flags );
msg := frame.asBytes;
finally
frame.Free;
end;
end;

{ TZMQContext }

constructor TZMQContext.create{$ifndef zmq3}( io_threads: Integer ){$endif};
Expand Down Expand Up @@ -2129,6 +2218,13 @@ function TZMQContext.Shadow: TZMQContext;
result := TZMQContext.createShadow( self );
end;

{$IFDEF zmq3}
procedure TZMQContext.Shutdown;
begin
zmq_ctx_shutdown(ContextPtr);
end;
{$ENDIF}

function TZMQContext.Socket( stype: TZMQSocketType ): TZMQSocket;
begin
EnterCriticalSection( cs );
Expand Down

0 comments on commit 1f327d6

Please sign in to comment.