-
Notifications
You must be signed in to change notification settings - Fork 111
/
netstream.lua
386 lines (332 loc) · 10.7 KB
/
netstream.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
--A net extension which allows sending large streams of data without overflowing the reliable channel
--Keep it in lua/autorun so it will be shared between addons
AddCSLuaFile()
net.Stream = {}
net.Stream.SendSize = 20000 --This is the size of each packet to send
net.Stream.Timeout = 30 --How long to wait for client response before cleaning up
net.Stream.MaxWriteStreams = 1024 --The maximum number of write data items to store
net.Stream.MaxReadStreams = 128 --The maximum number of queued read data items to store
net.Stream.MaxChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB
net.Stream.MaxSize = net.Stream.SendSize*net.Stream.MaxChunks
net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data
local WriteStreamQueue = {
__index = {
Add = function(self, stream)
local identifier = self.curidentifier
local startid = identifier
while self.queue[identifier] do
identifier = identifier % net.Stream.MaxWriteStreams + 1
if identifier == startid then
ErrorNoHalt("Netstream is full of WriteStreams!")
net.WriteUInt(0, 32)
return
end
end
self.curidentifier = identifier % net.Stream.MaxWriteStreams + 1
if next(self.queue)==nil then
self.activitytimeout = CurTime()+net.Stream.Timeout
timer.Create("netstream_queueclean", 5, 0, function() self:Clean() end)
end
self.queue[identifier] = stream
stream.identifier = identifier
return stream
end,
Write = function(self, ply)
local identifier = net.ReadUInt(32)
local chunkidx = net.ReadUInt(32)
local stream = self.queue[identifier]
--print("Got request", identifier, chunkidx, stream)
if stream then
if stream:Write(ply, chunkidx) then
self.activitytimeout = CurTime()+net.Stream.Timeout
stream.timeout = CurTime()+net.Stream.Timeout
end
else
-- Tell them the stream doesn't exist
net.Start("NetStreamRead")
net.WriteUInt(identifier, 32)
net.WriteUInt(0, 32)
if SERVER then net.Send(ply) else net.SendToServer() end
end
end,
Clean = function(self)
local t = CurTime()
for k, stream in pairs(self.queue) do
if (next(stream.clients)~=nil and t >= stream.timeout) or t >= self.activitytimeout then
stream:Remove()
self.queue[k] = nil
end
end
if next(self.queue)==nil then
timer.Remove("netstream_queueclean")
end
end,
},
__call = function(t)
return setmetatable({
activitytimeout = CurTime()+net.Stream.Timeout,
curidentifier = 1,
queue = {}
}, t)
end
}
setmetatable(WriteStreamQueue, WriteStreamQueue)
net.Stream.WriteStreams = WriteStreamQueue()
local ReadStreamQueue = {
__index = {
Add = function(self, stream)
local queue = self.queues[stream.player]
if #queue == net.Stream.MaxReadStreams then
ErrorNoHalt("Receiving too many ReadStream requests!")
return
end
for _, v in ipairs(queue) do
if v.identifier == stream.identifier then
ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!")
return
end
end
queue[#queue+1] = stream
if #queue == 1 then
stream:Request()
end
return stream
end,
Remove = function(self, stream)
local queue = rawget(self.queues, stream.player)
if queue then
if stream == queue[1] then
table.remove(queue, 1)
local nextInQueue = queue[1]
if nextInQueue then
nextInQueue:Request()
else
self.queues[stream.player] = nil
end
else
for k, v in ipairs(queue) do
if v == stream then
table.remove(queue, k)
break
end
end
end
end
end,
Read = function(self, ply)
local identifier = net.ReadUInt(32)
local queue = rawget(self.queues, ply)
if queue and queue[1] then
queue[1]:Read(identifier)
end
end
},
__call = function(t)
return setmetatable({
queues = setmetatable({}, {__index = function(t,k) local r={} t[k]=r return r end})
}, t)
end
}
setmetatable(ReadStreamQueue, ReadStreamQueue)
net.Stream.ReadStreams = ReadStreamQueue()
local WritingDataItem = {
__index = {
Write = function(self, ply, chunkidx)
local client = self.clients[ply]
if client.finished then return false end
if chunkidx == #self.chunks+1 then self:Finished(ply) return true end
if client.downloads+#self.chunks-client.progress >= net.Stream.MaxTries * #self.chunks then self:Finished(ply) return false end
client.downloads = client.downloads + 1
local chunk = self.chunks[chunkidx]
if not chunk then return false end
client.progress = chunkidx
--print("Sending", "NetStreamRead", self.identifier, #chunk.data, chunkidx, chunk.crc)
net.Start("NetStreamRead")
net.WriteUInt(self.identifier, 32)
net.WriteUInt(#chunk.data, 32)
net.WriteUInt(chunkidx, 32)
net.WriteString(chunk.crc)
net.WriteData(chunk.data, #chunk.data)
if CLIENT then net.SendToServer() else net.Send(ply) end
return true
end,
Finished = function(self, ply)
self.clients[ply].finished = true
if self.callback then
local ok, err = xpcall(self.callback, debug.traceback, ply)
if not ok then ErrorNoHalt(err) end
end
end,
GetProgress = function(self, ply)
return self.clients[ply].progress / #self.chunks
end,
Remove = function(self)
local sendTo = {}
for ply, client in pairs(self.clients) do
if not client.finished then
client.finished = true
if CLIENT or ply:IsValid() then sendTo[#sendTo+1] = ply end
end
end
if next(sendTo)~=nil then
--print("Sending", "NetStreamRead", self.identifier, 0)
net.Start("NetStreamRead")
net.WriteUInt(self.identifier, 32)
net.WriteUInt(0, 32)
if SERVER then net.Send(sendTo) else net.SendToServer() end
end
end
},
__call = function(t, data, callback)
local chunks = {}
for i=1, math.ceil(#data / net.Stream.SendSize) do
local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize)
chunks[i] = { data = datachunk, crc = util.CRC(datachunk) }
end
return setmetatable({
timeout = CurTime()+net.Stream.Timeout,
chunks = chunks,
callback = callback,
lasttouched = 0,
clients = setmetatable({},{__index = function(t,k)
local r = {
finished = false,
downloads = 0,
progress = 0,
} t[k]=r return r
end})
}, t)
end
}
setmetatable(WritingDataItem, WritingDataItem)
local ReadingDataItem = {
__index = {
Request = function(self)
if self.downloads+self.numchunks-#self.chunks >= net.Stream.MaxTries*self.numchunks then self:Remove() return end
self.downloads = self.downloads + 1
timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout*0.5, 1, function() self:Request() end)
self:WriteRequest()
end,
WriteRequest = function(self)
--print("Requesting", self.identifier, #self.chunks)
net.Start("NetStreamWrite")
net.WriteUInt(self.identifier, 32)
net.WriteUInt(#self.chunks+1, 32)
if CLIENT then net.SendToServer() else net.Send(self.player) end
end,
Read = function(self, identifier)
if self.identifier ~= identifier then self:Request() return end
local size = net.ReadUInt(32)
if size == 0 then self:Remove() return end
local chunkidx = net.ReadUInt(32)
if chunkidx ~= #self.chunks+1 then self:Request() return end
local crc = net.ReadString()
local data = net.ReadData(size)
if crc ~= util.CRC(data) then self:Request() return end
self.chunks[chunkidx] = data
if #self.chunks == self.numchunks then self:Remove(true) return end
self:Request()
end,
GetProgress = function(self)
return #self.chunks/self.numchunks
end,
Remove = function(self, finished)
timer.Remove("NetStreamReadTimeout" .. self.identifier)
local data
if finished then
data = table.concat(self.chunks)
if self.compressed then
data = util.Decompress(data, net.Stream.MaxSize)
end
self:WriteRequest() -- Notify we finished
end
local ok, err = xpcall(self.callback, debug.traceback, data)
if not ok then ErrorNoHalt(err) end
net.Stream.ReadStreams:Remove(self)
end
},
__call = function(t, ply, callback, numchunks, identifier, compressed)
return setmetatable({
identifier = identifier,
chunks = {},
compressed = compressed,
numchunks = numchunks,
callback = callback,
player = ply,
downloads = 0
}, t)
end
}
setmetatable(ReadingDataItem, ReadingDataItem)
function net.WriteStream(data, callback, dontcompress)
if not isstring(data) then
error("bad argument #1 to 'WriteStream' (string expected, got " .. type(data) .. ")", 2)
end
if callback ~= nil and not isfunction(callback) then
error("bad argument #2 to 'WriteStream' (function expected, got " .. type(callback) .. ")", 2)
end
local compressed = not dontcompress
if compressed then
data = util.Compress(data) or ""
end
if #data == 0 then
net.WriteUInt(0, 32)
return
end
if #data > net.Stream.MaxSize then
ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB")
net.WriteUInt(0, 32)
return
end
local stream = net.Stream.WriteStreams:Add(WritingDataItem(data, callback, compressed))
if not stream then return end
--print("WriteStream", #stream.chunks, stream.identifier, compressed)
net.WriteUInt(#stream.chunks, 32)
net.WriteUInt(stream.identifier, 32)
net.WriteBool(compressed)
return stream
end
--If the receiver is a player then add it to a queue.
--If the receiver is the server then add it to a queue for each individual player
function net.ReadStream(ply, callback)
if CLIENT then
ply = NULL
else
if type(ply) ~= "Player" then
error("bad argument #1 to 'ReadStream' (Player expected, got " .. type(ply) .. ")", 2)
elseif not ply:IsValid() then
error("bad argument #1 to 'ReadStream' (Tried to use a NULL entity!)", 2)
end
end
if not isfunction(callback) then
error("bad argument #2 to 'ReadStream' (function expected, got " .. type(callback) .. ")", 2)
end
local numchunks = net.ReadUInt(32)
if numchunks == nil then
return
elseif numchunks == 0 then
local ok, err = xpcall(callback, debug.traceback, "")
if not ok then ErrorNoHalt(err) end
return
end
local identifier = net.ReadUInt(32)
local compressed = net.ReadBool()
if numchunks > net.Stream.MaxChunks then
ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB")
return
end
--print("ReadStream", numchunks, identifier, compressed)
return net.Stream.ReadStreams:Add(ReadingDataItem(ply, callback, numchunks, identifier, compressed))
end
if SERVER then
util.AddNetworkString("NetStreamWrite")
util.AddNetworkString("NetStreamRead")
end
--Send requested stream data
net.Receive("NetStreamWrite", function(len, ply)
net.Stream.WriteStreams:Write(ply or NULL)
end)
--Download the sent stream data
net.Receive("NetStreamRead", function(len, ply)
net.Stream.ReadStreams:Read(ply or NULL)
end)