Skip to content

Commit

Permalink
Merge pull request #68 from tulios/add-support-to-recordbatch-protocol
Browse files Browse the repository at this point in the history
Add support to RecordBatch protocol
  • Loading branch information
tulios authored Jun 4, 2018
2 parents 64923d4 + 6e74c37 commit e65d781
Show file tree
Hide file tree
Showing 16 changed files with 1,178 additions and 29 deletions.
49 changes: 37 additions & 12 deletions src/protocol/decoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ module.exports = class Decoder {
return INT32_SIZE
}

static decodeZigZag(value) {
return (value >>> 1) ^ -(value & 1)
}

static decodeZigZag64(longValue) {
return longValue.shiftRightUnsigned(1).xor(longValue.and(Long.fromInt(1)).negate())
}

constructor(buffer) {
this.buffer = buffer
this.offset = 0
Expand Down Expand Up @@ -65,6 +73,19 @@ module.exports = class Decoder {
return value
}

readVarIntString() {
const byteLength = this.readVarInt()

if (byteLength === -1) {
return null
}

const stringBuffer = this.buffer.slice(this.offset, this.offset + byteLength)
const value = stringBuffer.toString('utf8')
this.offset += byteLength
return value
}

canReadBytes(length) {
return Buffer.byteLength(this.buffer) - this.offset >= length
}
Expand All @@ -81,6 +102,18 @@ module.exports = class Decoder {
return stringBuffer
}

readVarIntBytes() {
const byteLength = this.readVarInt()

if (byteLength === -1) {
return null
}

const stringBuffer = this.buffer.slice(this.offset, this.offset + byteLength)
this.offset += byteLength
return stringBuffer
}

readBoolean() {
return this.readInt8() === 1
}
Expand Down Expand Up @@ -123,7 +156,7 @@ module.exports = class Decoder {
return array
}

readSignedVarInt32() {
readVarInt() {
let currentByte
let result = 0
let i = 0
Expand All @@ -134,14 +167,10 @@ module.exports = class Decoder {
i += 7
} while (currentByte >= MOST_SIGNIFICANT_BIT)

return this.decodeZigZag(result)
}

decodeZigZag(value) {
return (value >>> 1) ^ -(value & 1)
return Decoder.decodeZigZag(result)
}

readSignedVarInt64() {
readVarLong() {
let currentByte
let result = Long.fromInt(0)
let i = 0
Expand All @@ -152,11 +181,7 @@ module.exports = class Decoder {
i += 7
} while (currentByte >= MOST_SIGNIFICANT_BIT)

return this.decodeZigZag64(result)
}

decodeZigZag64(longValue) {
return longValue.shiftRightUnsigned(1).xor(longValue.and(Long.fromInt(1)).negate())
return Decoder.decodeZigZag64(result)
}

slice(size) {
Expand Down
115 changes: 102 additions & 13 deletions src/protocol/encoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,49 @@ const UNSIGNED_INT32_MAX_NUMBER = 0xffffff80
const UNSIGNED_INT64_MAX_NUMBER = Long.fromBytes([-1, -1, -1, -1, -1, -1, -1, -128])

module.exports = class Encoder {
static encodeZigZag(value) {
return (value << 1) ^ (value >> 31)
}

static encodeZigZag64(value) {
const longValue = Long.fromValue(value)
return longValue.shiftLeft(1).xor(longValue.shiftRight(63))
}

static sizeOfVarInt(value) {
let encodedValue = this.encodeZigZag(value)
let bytes = 1

while ((encodedValue & UNSIGNED_INT32_MAX_NUMBER) !== 0) {
bytes += 1
encodedValue >>>= 7
}

return bytes
}

static sizeOfVarLong(value) {
let longValue = Encoder.encodeZigZag64(value)
let bytes = 1

while (longValue.and(UNSIGNED_INT64_MAX_NUMBER).notEquals(Long.fromInt(0))) {
bytes += 1
longValue = longValue.shiftRightUnsigned(7)
}

return bytes
}

static sizeOfVarIntBytes(value) {
const size = value === null ? -1 : Buffer.byteLength(value)

if (size < 0) {
return Encoder.sizeOfVarInt(-1)
}

return Encoder.sizeOfVarInt(size) + size
}

constructor() {
this.buffer = Buffer.alloc(0)
}
Expand All @@ -36,6 +79,13 @@ module.exports = class Encoder {
return this
}

writeUInt32(value) {
const tempBuffer = Buffer.alloc(INT32_SIZE)
tempBuffer.writeUInt32BE(value)
this.buffer = Buffer.concat([this.buffer, tempBuffer])
return this
}

writeInt64(value) {
const tempBuffer = Buffer.alloc(INT64_SIZE)
const longValue = Long.fromValue(value)
Expand Down Expand Up @@ -64,6 +114,20 @@ module.exports = class Encoder {
return this
}

writeVarIntString(value) {
if (value == null) {
this.writeVarInt(-1)
return this
}

const byteLength = Buffer.byteLength(value, 'utf8')
this.writeVarInt(byteLength)
const tempBuffer = Buffer.alloc(byteLength)
tempBuffer.write(value, 0, byteLength, 'utf8')
this.buffer = Buffer.concat([this.buffer, tempBuffer])
return this
}

writeBytes(value) {
if (value == null) {
this.writeInt32(-1)
Expand All @@ -86,6 +150,28 @@ module.exports = class Encoder {
return this
}

writeVarIntBytes(value) {
if (value == null) {
this.writeVarInt(-1)
return this
}

if (Buffer.isBuffer(value)) {
// raw bytes
this.writeVarInt(value.length)
this.buffer = Buffer.concat([this.buffer, value])
} else {
const valueToWrite = String(value)
const byteLength = Buffer.byteLength(valueToWrite, 'utf8')
this.writeVarInt(byteLength)
const tempBuffer = Buffer.alloc(byteLength)
tempBuffer.write(valueToWrite, 0, byteLength, 'utf8')
this.buffer = Buffer.concat([this.buffer, tempBuffer])
}

return this
}

writeEncoder(value) {
if (value instanceof Encoder !== true) {
throw new Error('value should be an instance of Encoder')
Expand Down Expand Up @@ -114,11 +200,23 @@ module.exports = class Encoder {
return this
}

writeVarIntArray(array, type) {
this.writeVarInt(array.length)
array.forEach(value => {
switch (type || typeof value) {
case 'object':
this.writeEncoder(value)
break
}
})
return this
}

// Based on:
// https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L106
writeSignedVarInt32(value) {
writeVarInt(value) {
const byteArray = []
let encodedValue = this.encodeZigZag(value)
let encodedValue = Encoder.encodeZigZag(value)

while ((encodedValue & UNSIGNED_INT32_MAX_NUMBER) !== 0) {
byteArray.push((encodedValue & OTHER_BITS) | MOST_SIGNIFICANT_BIT)
Expand All @@ -130,13 +228,9 @@ module.exports = class Encoder {
return this
}

encodeZigZag(value) {
return (value << 1) ^ (value >> 31)
}

writeSignedVarInt64(value) {
writeVarLong(value) {
const byteArray = []
let longValue = this.encodeZigZag64(value)
let longValue = Encoder.encodeZigZag64(value)

while (longValue.and(UNSIGNED_INT64_MAX_NUMBER).notEquals(Long.fromInt(0))) {
byteArray.push(
Expand All @@ -154,11 +248,6 @@ module.exports = class Encoder {
return this
}

encodeZigZag64(value) {
const longValue = Long.fromValue(value)
return longValue.shiftLeft(1).xor(longValue.shiftRight(63))
}

size() {
return Buffer.byteLength(this.buffer)
}
Expand Down
84 changes: 80 additions & 4 deletions src/protocol/encoder.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ const MAX_SAFE_POSITIVE_SIGNED_INT = 2147483647
const MIN_SAFE_NEGATIVE_SIGNED_INT = -2147483648

describe('Protocol > Encoder', () => {
const signed32 = number => new Encoder().writeSignedVarInt32(number).buffer
const decode32 = buffer => new Decoder(buffer).readSignedVarInt32()
const signed32 = number => new Encoder().writeVarInt(number).buffer
const decode32 = buffer => new Decoder(buffer).readVarInt()

const signed64 = number => new Encoder().writeSignedVarInt64(number).buffer
const decode64 = buffer => new Decoder(buffer).readSignedVarInt64()
const signed64 = number => new Encoder().writeVarLong(number).buffer
const decode64 = buffer => new Decoder(buffer).readVarLong()

const B = (...args) => Buffer.from(args)
const L = value => Long.fromString(`${value}`)
Expand Down Expand Up @@ -173,4 +173,80 @@ describe('Protocol > Encoder', () => {
expect(decode64(signed64(Long.MAX_VALUE))).toEqual(Long.MAX_VALUE)
})
})

describe('sizeOfVarInt', () => {
it('returns the size in bytes', () => {
expect(Encoder.sizeOfVarInt(0)).toEqual(signed32(1).length)
expect(Encoder.sizeOfVarInt(1)).toEqual(signed32(1).length)
expect(Encoder.sizeOfVarInt(63)).toEqual(signed32(63).length)
expect(Encoder.sizeOfVarInt(64)).toEqual(signed32(64).length)
expect(Encoder.sizeOfVarInt(8191)).toEqual(signed32(8191).length)
expect(Encoder.sizeOfVarInt(8192)).toEqual(signed32(8192).length)
expect(Encoder.sizeOfVarInt(1048575)).toEqual(signed32(1048575).length)
expect(Encoder.sizeOfVarInt(1048576)).toEqual(signed32(1048576).length)
expect(Encoder.sizeOfVarInt(134217727)).toEqual(signed32(134217727).length)
expect(Encoder.sizeOfVarInt(134217728)).toEqual(signed32(134217728).length)

expect(Encoder.sizeOfVarInt(-1)).toEqual(signed32(-1).length)
expect(Encoder.sizeOfVarInt(-64)).toEqual(signed32(-64).length)
expect(Encoder.sizeOfVarInt(-65)).toEqual(signed32(-65).length)
expect(Encoder.sizeOfVarInt(-8192)).toEqual(signed32(-8192).length)
expect(Encoder.sizeOfVarInt(-8193)).toEqual(signed32(-8193).length)
expect(Encoder.sizeOfVarInt(-1048576)).toEqual(signed32(-1048576).length)
expect(Encoder.sizeOfVarInt(-1048577)).toEqual(signed32(-1048577).length)
expect(Encoder.sizeOfVarInt(-134217728)).toEqual(signed32(-134217728).length)
expect(Encoder.sizeOfVarInt(-134217729)).toEqual(signed32(-134217729).length)

expect(Encoder.sizeOfVarInt(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual(
signed32(MAX_SAFE_POSITIVE_SIGNED_INT).length
)
expect(Encoder.sizeOfVarInt(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual(
signed32(MIN_SAFE_NEGATIVE_SIGNED_INT).length
)
})
})

describe('sizeOfVarLong', () => {
it('returns the size in bytes', () => {
expect(Encoder.sizeOfVarLong(0)).toEqual(signed64(0).length)
expect(Encoder.sizeOfVarLong(1)).toEqual(signed64(1).length)
expect(Encoder.sizeOfVarLong(63)).toEqual(signed64(63).length)
expect(Encoder.sizeOfVarLong(64)).toEqual(signed64(64).length)
expect(Encoder.sizeOfVarLong(8191)).toEqual(signed64(8191).length)
expect(Encoder.sizeOfVarLong(8192)).toEqual(signed64(8192).length)
expect(Encoder.sizeOfVarLong(1048575)).toEqual(signed64(1048575).length)
expect(Encoder.sizeOfVarLong(1048576)).toEqual(signed64(1048576).length)
expect(Encoder.sizeOfVarLong(134217727)).toEqual(signed64(134217727).length)
expect(Encoder.sizeOfVarLong(134217728)).toEqual(signed64(134217728).length)

expect(Encoder.sizeOfVarLong(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual(
signed64(MAX_SAFE_POSITIVE_SIGNED_INT).length
)

expect(Encoder.sizeOfVarLong(L('17179869183'))).toEqual(signed64(L('17179869183')).length)
expect(Encoder.sizeOfVarLong(L('17179869184'))).toEqual(signed64(L('17179869184')).length)
expect(Encoder.sizeOfVarLong(L('2199023255551'))).toEqual(signed64(L('2199023255551')).length)
expect(Encoder.sizeOfVarLong(L('2199023255552'))).toEqual(signed64(L('2199023255552')).length)
expect(Encoder.sizeOfVarLong(L('281474976710655'))).toEqual(
signed64(L('281474976710655')).length
)
expect(Encoder.sizeOfVarLong(L('281474976710656'))).toEqual(
signed64(L('281474976710656')).length
)
expect(Encoder.sizeOfVarLong(L('-36028797018963968'))).toEqual(
signed64(L('-36028797018963968')).length
)
expect(Encoder.sizeOfVarLong(L('-36028797018963969'))).toEqual(
signed64(L('-36028797018963969')).length
)
expect(Encoder.sizeOfVarLong(L('-4611686018427387904'))).toEqual(
signed64(L('-4611686018427387904')).length
)
expect(Encoder.sizeOfVarLong(L('-4611686018427387905'))).toEqual(
signed64(L('-4611686018427387905')).length
)
expect(Encoder.sizeOfVarLong(Long.MIN_VALUE)).toEqual(signed64(Long.MIN_VALUE).length)
expect(Encoder.sizeOfVarLong(Long.MAX_VALUE)).toEqual(signed64(Long.MAX_VALUE).length)
})
})
})
Loading

0 comments on commit e65d781

Please sign in to comment.