Skip to content

Commit

Permalink
polling function
Browse files Browse the repository at this point in the history
  • Loading branch information
bvarga committed Mar 24, 2013
1 parent 44a58b2 commit df53f8d
Showing 1 changed file with 121 additions and 4 deletions.
125 changes: 121 additions & 4 deletions zmqapi.pas
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,13 @@ TZMQMsg = class
// now owns frame and must destroy it when finished with it.
// Set the cursor to 0
function pop: TZMQFrame;

function popstr: Utf8String;
// Add frame to the end of the message, i.e. after all other frames.
// Message takes ownership of frame, will destroy it when message is sent.
// Set the cursor to 0
// Returns 0 on success
function add( msg: TZMQFrame ): Integer;
function addstr( msg: Utf8String ): Integer;

// Push frame plus empty frame to front of message, before first frame.
// Message takes ownership of frame, will destroy it when message is sent.
Expand Down Expand Up @@ -206,6 +207,9 @@ TZMQMsg = class
// dumpt message
function dump: Utf8String;

function saveasHex: Utf8String;
procedure loadfromHex( data: Utf8String );

procedure Clear;
property item[indx: Integer]: TZMQFrame read getItem; default;
end;
Expand Down Expand Up @@ -482,6 +486,7 @@ TZMQPollItem = record
events: TZMQPollEvents;
revents: TZMQPollEvents;
end;
TZMQPollItemA = array of TZMQPollItem;

TZMQPollEventProc = procedure( socket: TZMQSocket; event: TZMQPollEvents ) of object;
TZMQExceptionProc = procedure( exception: Exception ) of object;
Expand Down Expand Up @@ -538,6 +543,15 @@ TZMQPoller = class( TThread )

TZMQDevice = ( dStreamer, dForwarder, dQueue );

TZMQPollRec = record
socket: TZMQSocket;
events: TZMQPollEvents;
end;
TZMQPollRecA = array of TZMQPollRec;

function ZMQPoll( var pia: TZMQPollItemA; piaSize: Integer = -1; timeout: Integer = -1 ): Integer; overload;
function ZMQPoll( var pia: TZMQPollItem; piaSize: Integer = 1; timeout: Integer = -1 ): Integer; overload;

{$ifdef zmq3}
procedure ZMQProxy( frontend, backend, capture: TZMQSocket );
{$endif}
Expand All @@ -546,7 +560,8 @@ TZMQPoller = class( TThread )
procedure ZMQVersion(var major, minor, patch: Integer);

procedure ZMQTerminate;

var
ZMQTerminated: Boolean = false;
type
// Thread related functions.
TDetachedThreadMeth = procedure( args: Pointer; context: TZMQContext ) of object;
Expand Down Expand Up @@ -839,6 +854,18 @@ function TZMQMsg.pop: TZMQFrame;
result := nil;
end;

function TZMQMsg.popstr: Utf8String;
var
frame: TZMQFrame;
begin
frame := pop;
try
result := frame.asUtf8String;
finally
frame.Free;
end;
end;

function TZMQMsg.add( msg: TZMQFrame ): Integer;
begin
try
Expand All @@ -851,6 +878,15 @@ function TZMQMsg.add( msg: TZMQFrame ): Integer;
end;
end;

function TZMQMsg.addstr( msg: Utf8String ): Integer;
var
frame: TZMQFrame;
begin
frame := TZMQFrame.create;
frame.asUtf8String := msg;
result := add( frame );
end;

procedure TZMQMsg.wrap( msg: TZMQFrame );
begin
push( TZMQFrame.create( 0 ) );
Expand Down Expand Up @@ -959,6 +995,40 @@ function TZMQMsg.dump: Utf8String;
end;
end;

function TZMQMsg.saveasHex: Utf8String;
var
i: Integer;
begin
for i := 0 to size - 1 do
begin
result := result + item[i].asHexString;
if i < size - 1 then
result := result + #13 + #10;
end;
end;

procedure TZMQMsg.loadfromHex( data: Utf8String );
var
tsl: TStringList;
i: Integer;
frame: TZMQFrame;
begin
Clear;
tsl := TStringList.Create;
try
tsl.Text := data;
for i := 0 to tsl.Count - 1 do
begin
frame := TZMQFrame.create;
frame.asHexString := tsl[i];
add( frame );
end;
finally
tsl.Free;
end;
end;


{ TZMQSocket }

constructor TZMQSocket.Create;
Expand Down Expand Up @@ -2419,7 +2489,7 @@ function TZMQPoller.poll( timeout: Integer = -1; lPollNumber: Integer = -1 ): In

for i := 0 to fPollItemCount - 1 do
fPollItem[i].revents := 0;

result := zmq_poll( fPollItem[0], pc, timeout );
if result < 0 then
raise EZMQException.Create
Expand All @@ -2444,8 +2514,53 @@ function TZMQPoller.getPollResult( indx: Integer ): TZMQPollItem;
Byte(result.events) := fPollItem[i].revents;
end;

// Thread related functions.
function ZMQPoll( var pia: TZMQPollItemA; piaSize: Integer = -1; timeout: Integer = -1 ): Integer;
var
PollItem: array of zmq.pollitem_t;
i,l,n: Integer;
begin
l := Length( pia );
if l = 0 then
raise EZMQException.Create( 'Nothing to poll!' );
SetLength( PollItem, l );
try
for i := 0 to l - 1 do
begin
PollItem[i].socket := pia[i].Socket.SocketPtr;
PollItem[i].fd := 0;
PollItem[i].events := Byte( pia[i].events );
PollItem[i].revents := 0;
end;
if piaSize = -1 then
n := l
else
n := piaSize;
result := zmq_poll( PollItem[0], n, timeout );
if result < 0 then
raise EZMQException.Create;
for i := 0 to l - 1 do
Byte(pia[i].revents) := PollItem[i].revents;

finally
PollItem := nil;
end;
end;

function ZMQPoll( var pia: TZMQPollItem; piaSize: Integer = 1; timeout: Integer = -1 ): Integer; overload;
var
PollItem: zmq.pollitem_t;
begin
PollItem.socket := pia.Socket.SocketPtr;
PollItem.fd := 0;
PollItem.events := Byte( pia.events );
PollItem.revents := 0;
result := zmq_poll( PollItem, piaSize, timeout );
if result < 0 then
raise EZMQException.Create;
Byte(pia.revents) := PollItem.revents;
end;

// Thread related functions.

procedure ZMQProxy( frontend, backend, capture: TZMQSocket );
var
Expand Down Expand Up @@ -2476,6 +2591,7 @@ procedure InterruptContexts;
var
i: Integer;
begin
ZMQTerminated := true;
for i := 0 to contexts.Count - 1 do
TZMQContext(contexts[i]).Terminate;
end;
Expand Down Expand Up @@ -2519,6 +2635,7 @@ function console_handler( dwCtrlType: DWORD ): BOOL;
begin
if CTRL_C_EVENT = dwCtrlType then
begin
ZMQTerminated := true;
for i := contexts.Count - 1 downto 0 do
TZMQContext(contexts[i]).Terminate;
result := True;
Expand Down

0 comments on commit df53f8d

Please sign in to comment.