forked from Jigsaw-Code/outline-ss-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
428 lines (387 loc) · 12.5 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
// Copyright 2018 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shadowsocks
import (
"bytes"
"crypto/cipher"
"encoding/binary"
"fmt"
"io"
"sync"
"github.com/Jigsaw-Code/outline-ss-server/slicepool"
)
// payloadSizeMask is the maximum size of payload in bytes.
const payloadSizeMask = 0x3FFF // 16*1024 - 1
// Buffer pool used for decrypting Shadowsocks streams.
// The largest buffer we could need is for decrypting a max-length payload.
var readBufPool = slicepool.MakePool(payloadSizeMask + maxTagSize())
// Writer is an io.Writer that also implements io.ReaderFrom to
// allow for piping the data without extra allocations and copies.
// The LazyWrite and Flush methods allow a header to be
// added but delayed until the first write, for concatenation.
// All methods except Flush must be called from a single thread.
type Writer struct {
// This type is single-threaded except when needFlush is true.
// mu protects needFlush, and also protects everything
// else while needFlush could be true.
mu sync.Mutex
// Indicates that a concurrent flush is currently allowed.
needFlush bool
writer io.Writer
ssCipher *Cipher
saltGenerator SaltGenerator
// Wrapper for input that arrives as a slice.
byteWrapper bytes.Reader
// Number of plaintext bytes that are currently buffered.
pending int
// These are populated by init():
buf []byte
aead cipher.AEAD
// Index of the next encrypted chunk to write.
counter []byte
}
// NewShadowsocksWriter creates a Writer that encrypts the given Writer using
// the shadowsocks protocol with the given shadowsocks cipher.
func NewShadowsocksWriter(writer io.Writer, ssCipher *Cipher) *Writer {
return &Writer{writer: writer, ssCipher: ssCipher, saltGenerator: RandomSaltGenerator}
}
// SetSaltGenerator sets the salt generator to be used. Must be called before the first write.
func (sw *Writer) SetSaltGenerator(saltGenerator SaltGenerator) {
sw.saltGenerator = saltGenerator
}
// init generates a random salt, sets up the AEAD object and writes
// the salt to the inner Writer.
func (sw *Writer) init() (err error) {
if sw.aead == nil {
salt := make([]byte, sw.ssCipher.SaltSize())
if err := sw.saltGenerator.GetSalt(salt); err != nil {
return fmt.Errorf("failed to generate salt: %v", err)
}
sw.aead, err = sw.ssCipher.NewAEAD(salt)
if err != nil {
return fmt.Errorf("failed to create AEAD: %v", err)
}
sw.saltGenerator = nil // No longer needed, so release reference.
sw.counter = make([]byte, sw.aead.NonceSize())
// The maximum length message is the salt (first message only), length, length tag,
// payload, and payload tag.
sizeBufSize := 2 + sw.aead.Overhead()
maxPayloadBufSize := payloadSizeMask + sw.aead.Overhead()
sw.buf = make([]byte, len(salt)+sizeBufSize+maxPayloadBufSize)
// Store the salt at the start of sw.buf.
copy(sw.buf, salt)
}
return nil
}
// encryptBlock encrypts `plaintext` in-place. The slice must have enough capacity
// for the tag. Returns the total ciphertext length.
func (sw *Writer) encryptBlock(plaintext []byte) int {
out := sw.aead.Seal(plaintext[:0], sw.counter, plaintext, nil)
increment(sw.counter)
return len(out)
}
func (sw *Writer) Write(p []byte) (int, error) {
sw.byteWrapper.Reset(p)
n, err := sw.ReadFrom(&sw.byteWrapper)
return int(n), err
}
// LazyWrite queues p to be written, but doesn't send it until Flush() is
// called, a non-lazy write is made, or the buffer is filled.
func (sw *Writer) LazyWrite(p []byte) (int, error) {
if err := sw.init(); err != nil {
return 0, err
}
// Locking is needed due to potential concurrency with the Flush()
// for a previous call to LazyWrite().
sw.mu.Lock()
defer sw.mu.Unlock()
queued := 0
for {
n := sw.enqueue(p)
queued += n
p = p[n:]
if len(p) == 0 {
sw.needFlush = true
return queued, nil
}
// p didn't fit in the buffer. Flush the buffer and try
// again.
if err := sw.flush(); err != nil {
return queued, err
}
}
}
// Flush sends the pending data, if any. This method is thread-safe.
func (sw *Writer) Flush() error {
sw.mu.Lock()
defer sw.mu.Unlock()
if !sw.needFlush {
return nil
}
return sw.flush()
}
func isZero(b []byte) bool {
for _, v := range b {
if v != 0 {
return false
}
}
return true
}
// Returns the slices of sw.buf in which to place plaintext for encryption.
func (sw *Writer) buffers() (sizeBuf, payloadBuf []byte) {
// sw.buf starts with the salt.
saltSize := sw.ssCipher.SaltSize()
// Each Shadowsocks-TCP message consists of a fixed-length size block,
// followed by a variable-length payload block.
sizeBuf = sw.buf[saltSize : saltSize+2]
payloadStart := saltSize + 2 + sw.aead.Overhead()
payloadBuf = sw.buf[payloadStart : payloadStart+payloadSizeMask]
return
}
// ReadFrom implements the io.ReaderFrom interface.
func (sw *Writer) ReadFrom(r io.Reader) (int64, error) {
if err := sw.init(); err != nil {
return 0, err
}
var written int64
var err error
_, payloadBuf := sw.buffers()
// Special case: one thread-safe read, if necessary
sw.mu.Lock()
if sw.needFlush {
pending := sw.pending
sw.mu.Unlock()
saltsize := sw.ssCipher.SaltSize()
overhead := sw.aead.Overhead()
// The first pending+overhead bytes of payloadBuf are potentially
// in use, and may be modified on the flush thread. Data after
// that is safe to use on this thread.
readBuf := sw.buf[saltsize+2+overhead+pending+overhead:]
var plaintextSize int
plaintextSize, err = r.Read(readBuf)
written = int64(plaintextSize)
sw.mu.Lock()
sw.enqueue(readBuf[:plaintextSize])
if flushErr := sw.flush(); flushErr != nil {
err = flushErr
}
sw.needFlush = false
}
sw.mu.Unlock()
// Main transfer loop
for err == nil {
sw.pending, err = r.Read(payloadBuf)
written += int64(sw.pending)
if flushErr := sw.flush(); flushErr != nil {
err = flushErr
}
}
if err == io.EOF { // ignore EOF as per io.ReaderFrom contract
return written, nil
}
return written, fmt.Errorf("Failed to read payload: %w", err)
}
// Adds as much of `plaintext` into the buffer as will fit, and increases
// sw.pending accordingly. Returns the number of bytes consumed.
func (sw *Writer) enqueue(plaintext []byte) int {
_, payloadBuf := sw.buffers()
n := copy(payloadBuf[sw.pending:], plaintext)
sw.pending += n
return n
}
// Encrypts all pending data and writes it to the output.
func (sw *Writer) flush() error {
if sw.pending == 0 {
return nil
}
// sw.buf starts with the salt.
saltSize := sw.ssCipher.SaltSize()
// Normally we ignore the salt at the beginning of sw.buf.
start := saltSize
if isZero(sw.counter) {
// For the first message, include the salt. Compared to writing the salt
// separately, this saves one packet during TCP slow-start and potentially
// avoids having a distinctive size for the first packet.
start = 0
}
sizeBuf, payloadBuf := sw.buffers()
binary.BigEndian.PutUint16(sizeBuf, uint16(sw.pending))
sizeBlockSize := sw.encryptBlock(sizeBuf)
payloadSize := sw.encryptBlock(payloadBuf[:sw.pending])
_, err := sw.writer.Write(sw.buf[start : saltSize+sizeBlockSize+payloadSize])
sw.pending = 0
return err
}
// ChunkReader is similar to io.Reader, except that it controls its own
// buffer granularity.
type ChunkReader interface {
// ReadChunk reads the next chunk and returns its payload. The caller must
// complete its use of the returned buffer before the next call.
// The buffer is nil iff there is an error. io.EOF indicates a close.
ReadChunk() ([]byte, error)
}
type chunkReader struct {
reader io.Reader
ssCipher *Cipher
// These are lazily initialized:
aead cipher.AEAD
// Index of the next encrypted chunk to read.
counter []byte
// Buffer for the uint16 size and its AEAD tag. Made in init().
payloadSizeBuf []byte
// Holds a buffer for the payload and its AEAD tag, when needed.
payload slicepool.LazySlice
}
// Reader is an io.Reader that also implements io.WriterTo to
// allow for piping the data without extra allocations and copies.
type Reader interface {
io.Reader
io.WriterTo
}
// NewShadowsocksReader creates a Reader that decrypts the given Reader using
// the shadowsocks protocol with the given shadowsocks cipher.
func NewShadowsocksReader(reader io.Reader, ssCipher *Cipher) Reader {
return &readConverter{
cr: &chunkReader{
reader: reader,
ssCipher: ssCipher,
payload: readBufPool.LazySlice(),
},
}
}
// init reads the salt from the inner Reader and sets up the AEAD object
func (cr *chunkReader) init() (err error) {
if cr.aead == nil {
// For chacha20-poly1305, SaltSize is 32, NonceSize is 12 and Overhead is 16.
salt := make([]byte, cr.ssCipher.SaltSize())
if _, err := io.ReadFull(cr.reader, salt); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
err = fmt.Errorf("failed to read salt: %w", err)
}
return err
}
cr.aead, err = cr.ssCipher.NewAEAD(salt)
if err != nil {
return fmt.Errorf("failed to create AEAD: %v", err)
}
cr.counter = make([]byte, cr.aead.NonceSize())
cr.payloadSizeBuf = make([]byte, 2+cr.aead.Overhead())
}
return nil
}
// readMessage reads, decrypts, and verifies a single AEAD ciphertext.
// The ciphertext and tag (i.e. "overhead") must exactly fill `buf`,
// and the decrypted message will be placed in buf[:len(buf)-overhead].
// Returns an error only if the block could not be read.
func (cr *chunkReader) readMessage(buf []byte) error {
_, err := io.ReadFull(cr.reader, buf)
if err != nil {
return err
}
_, err = cr.aead.Open(buf[:0], cr.counter, buf, nil)
increment(cr.counter)
if err != nil {
return fmt.Errorf("failed to decrypt: %v", err)
}
return nil
}
// ReadChunk returns the next chunk from the stream. Callers must fully
// consume and discard the previous chunk before calling ReadChunk again.
func (cr *chunkReader) ReadChunk() ([]byte, error) {
if err := cr.init(); err != nil {
return nil, err
}
// Release the previous payload buffer.
cr.payload.Release()
// In Shadowsocks-AEAD, each chunk consists of two
// encrypted messages. The first message contains the payload length,
// and the second message is the payload. Idle read threads will
// block here until the next chunk.
if err := cr.readMessage(cr.payloadSizeBuf); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
err = fmt.Errorf("failed to read payload size: %w", err)
}
return nil, err
}
size := int(binary.BigEndian.Uint16(cr.payloadSizeBuf) & payloadSizeMask)
sizeWithTag := size + cr.aead.Overhead()
payloadBuf := cr.payload.Acquire()
if cap(payloadBuf) < sizeWithTag {
// This code is unreachable if the constants are set correctly.
return nil, io.ErrShortBuffer
}
if err := cr.readMessage(payloadBuf[:sizeWithTag]); err != nil {
if err == io.EOF { // EOF is not expected mid-chunk.
err = io.ErrUnexpectedEOF
}
cr.payload.Release()
return nil, err
}
return payloadBuf[:size], nil
}
// readConverter adapts from ChunkReader, with source-controlled
// chunk sizes, to Go-style IO.
type readConverter struct {
cr ChunkReader
leftover []byte
}
func (c *readConverter) Read(b []byte) (int, error) {
if err := c.ensureLeftover(); err != nil {
return 0, err
}
n := copy(b, c.leftover)
c.leftover = c.leftover[n:]
return n, nil
}
func (c *readConverter) WriteTo(w io.Writer) (written int64, err error) {
for {
if err = c.ensureLeftover(); err != nil {
if err == io.EOF {
err = nil
}
return written, err
}
n, err := w.Write(c.leftover)
written += int64(n)
c.leftover = c.leftover[n:]
if err != nil {
return written, err
}
}
}
// Ensures that c.leftover is nonempty. If leftover is empty, this method
// waits for incoming data and decrypts it.
// Returns an error only if c.leftover could not be populated.
func (c *readConverter) ensureLeftover() error {
if len(c.leftover) > 0 {
return nil
}
c.leftover = nil
payload, err := c.cr.ReadChunk()
if err != nil {
return err
}
c.leftover = payload
return nil
}
// increment little-endian encoded unsigned integer b. Wrap around on overflow.
func increment(b []byte) {
for i := range b {
b[i]++
if b[i] != 0 {
return
}
}
}