forked from zeromq/clrzmq4
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflclient2.cs
209 lines (177 loc) · 4.78 KB
/
flclient2.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using ZeroMQ;
namespace Examples
{
using FLClient2;
//
// Freelance client - Model 2
// Uses DEALER socket to blast one or more services
//
// Author: metadings
//
namespace FLClient2
{
public class FLClient : IDisposable
{
// If not a single service replies within this time, give up
static readonly TimeSpan GLOBAL_TIMEOUT = TimeSpan.FromMilliseconds(2500);
// Here is the flclient class implementation. Each instance has a
// context, a DEALER socket it uses to talk to the servers, a counter
// of how many servers it's connected to, and a request sequence number:
// Our context wrapper
ZContext context;
// DEALER socket talking to servers
ZSocket socket;
// How many servers we have connected to
int servers;
// Number of requests ever sent
int sequence;
public FLClient()
{
// Constructor
context = new ZContext();
socket = new ZSocket(context, ZSocketType.DEALER);
socket.Linger = GLOBAL_TIMEOUT;
}
~FLClient()
{
Dispose(false);
}
public void Dispose()
{
GC.SuppressFinalize(this);
Dispose(true);
}
protected void Dispose(bool disposing)
{
if (disposing)
{
// Destructor
if (socket != null)
{
socket.Dispose();
socket = null;
}
if (context != null)
{
context.Dispose();
context = null;
}
}
}
public void Connect(string endpoint)
{
// Connect to new server endpoint
socket.Connect(endpoint);
servers++;
}
public ZMessage Request(ZMessage request)
{
// This method does the hard work. It sends a request to all
// connected servers in parallel (for this to work, all connections
// must be successful and completed by this time). It then waits
// for a single successful reply, and returns that to the caller.
// Any other replies are just dropped:
ZMessage reply = null;
using (request)
{
// Prefix request with sequence number and empty envelope
this.sequence++;
request.Prepend(new ZFrame(this.sequence));
request.Prepend(new ZFrame());
// Blast the request to all connected servers
for (int server = 0; server < this.servers; ++server)
{
using (var outgoing = request.Duplicate())
{
this.socket.Send(outgoing);
}
}
// Wait for a matching reply to arrive from anywhere
// Since we can poll several times, calculate each one
ZError error;
DateTime endtime = DateTime.UtcNow + GLOBAL_TIMEOUT;
var poll = ZPollItem.CreateReceiver();
while (endtime > DateTime.UtcNow)
{
if (this.socket.PollIn(poll, out reply, out error, endtime - DateTime.UtcNow))
{
// Reply is [empty][sequence][OK]
if (reply.Count < 3)
{
throw new InvalidOperationException();
}
reply.RemoveAt(0);
using (ZFrame sequenceFrame = reply.RemoveAt(0, false))
{
int sequence = sequenceFrame.ReadInt32();
if (sequence == this.sequence)
{
break; // Reply is ok
}
}
reply.Dispose();
}
else
{
if (error == ZError.ETERM)
break; // Interrupted
if (error != ZError.EAGAIN)
throw new ZException(error);
}
}
}
return reply;
}
}
}
static partial class Program
{
public static void FLClient2(string[] args)
{
if (args == null || args.Length < 1)
{
Console.WriteLine();
Console.WriteLine("Usage: ./{0} FLClient2 [Endpoint] ...", AppDomain.CurrentDomain.FriendlyName);
Console.WriteLine();
Console.WriteLine(" Endpoint Where FLClient2 should connect to.");
Console.WriteLine(" Default is tcp://127.0.0.1:7781");
Console.WriteLine();
args = new string[] { "tcp://127.0.0.1:7781" };
}
// Create new freelance client object
using (var client = new FLClient())
{
// Connect to each endpoint
for (int i = 0; i < args.Length; ++i)
{
client.Connect(args[i]);
}
// Send a bunch of name resolution 'requests', measure time
int requests = 0;
DateTime starttime = DateTime.UtcNow;
var error = ZError.None;
while (++requests < 10000)
{
var outgoing = new ZMessage();
outgoing.Add(new ZFrame("random name"));
ZMessage incoming = client.Request(outgoing);
if (incoming == null)
{
error = ZError.ETERM;
break;
}
incoming.Dispose(); // using (incoming) ;
}
if (error == ZError.ETERM)
Console.WriteLine("E: name service not available, aborted.");
else
Console.WriteLine("Average round trip cost: {0} ms", (DateTime.UtcNow - starttime).TotalMilliseconds / requests);
}
}
}
}