Skip to content

Commit

Permalink
Added interrupt.cs and refactored rrbroker.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
ptomasroos committed Feb 10, 2012
1 parent 351e91d commit eb0c174
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 48 deletions.
47 changes: 47 additions & 0 deletions examples/C#/interrupt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// Shows how to handle Ctrl-C
//

// Author: Tomas Roos
// Email: ptomasroos@gmail.com

using System;
using System.Text;
using System.Threading;
using ZMQ;

namespace ZMQGuide
{
internal class Program
{

public static void Main(string[] args)
{
using (var context = new Context(1))
{
using (Socket replyer = context.Socket(SocketType.REP))
{
replyer.Bind("tcp://*:5555");

bool interrupted = false;

Console.CancelKeyPress += delegate { interrupted = true; };

const string replyMessage = "World";

while (!interrupted)
{
string message = replyer.Recv(Encoding.Unicode);
Console.WriteLine("Received request: {0}", message);

// Simulate work, by sleeping
Thread.Sleep(1000);

// Send reply back to client
replyer.Send(replyMessage, Encoding.Unicode);
}
}
}
}
}
}
89 changes: 41 additions & 48 deletions examples/C#/rrbroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,59 @@
// Simple request-reply broker
//

// Author: Michael Compton
// Email: michael.compton@littleedge.co.uk
// Author: Michael Compton, Tomas Roos
// Email: michael.compton@littleedge.co.uk, ptomasroos@gmail.com

using System;
using System.Text;
using ZMQ;

namespace ZMQGuide {
public class RRBroker {
private Context context;
private Socket backend;
private Socket frontend;

public RRBroker() {
// Prepare our context and sockets
context = new Context(1);
frontend = context.Socket(SocketType.ROUTER);
backend = context.Socket(SocketType.DEALER);
frontend.Bind("tcp://*:5559");
backend.Bind("tcp://*:5560");
}

public void Broker() {
// Initialize poll set
PollItem[] pollItems = new PollItem[2];
pollItems[0] = frontend.CreatePollItem(IOMultiPlex.POLLIN);
pollItems[0].PollInHandler += new PollHandler(FrontendPollInHandler);
pollItems[1] = backend.CreatePollItem(IOMultiPlex.POLLIN);
pollItems[1].PollInHandler += new PollHandler(BackendPollInHandler);
// Switch messages between sockets
while (true) {
context.Poll(pollItems, -1);
namespace ZMQGuide
{
internal class Program
{
public static void Main(string[] args)
{
using (var context = new Context(1))
{
using (Socket frontend = context.Socket(SocketType.ROUTER), backend = context.Socket(SocketType.DEALER))
{
frontend.Bind("tcp://*:5559");
backend.Bind("tcp://*:5560");

var pollItems = new PollItem[2];
pollItems[0] = frontend.CreatePollItem(IOMultiPlex.POLLIN);
pollItems[0].PollInHandler += (socket, revents) => FrontendPollInHandler(socket, backend);
pollItems[1] = backend.CreatePollItem(IOMultiPlex.POLLIN);
pollItems[1].PollInHandler += (socket, revents) => BackendPollInHandler(socket, frontend);

while (true)
{
context.Poll(pollItems, -1);
}
}
}
}

private void FrontendPollInHandler(Socket socket, IOMultiPlex revents) {
// Process all parts of the message
bool isProcessing = true;
while (isProcessing) {
byte[] message = socket.Recv();
backend.Send(message, socket.RcvMore ? SendRecvOpt.SNDMORE : 0);
isProcessing = socket.RcvMore;
}
private static void FrontendPollInHandler(Socket frontend, Socket backend)
{
RelayMessage(frontend, backend);
}

private void BackendPollInHandler(Socket socket, IOMultiPlex revents) {
// Process all parts of the message
bool isProcessing = true;
while (isProcessing) {
byte[] message = socket.Recv();
frontend.Send(message, socket.RcvMore ? SendRecvOpt.SNDMORE : 0);
isProcessing = socket.RcvMore;
}
private static void BackendPollInHandler(Socket backend, Socket frontend)
{
RelayMessage(backend, frontend);
}
}

class Program {
static void Main(string[] args) {
RRBroker broker = new RRBroker();
broker.Broker();
private static void RelayMessage(Socket source, Socket destination)
{
bool hasMore = true;
while (hasMore)
{
byte[] message = source.Recv();
hasMore = source.RcvMore;
destination.Send(message, hasMore ? SendRecvOpt.SNDMORE : 0);
}
}
}
}

0 comments on commit eb0c174

Please sign in to comment.