123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- /* Copyright 2015-present Facebook, Inc.
- * Licensed under the Apache License, Version 2.0 */
- var EE = require('events').EventEmitter;
- var util = require('util');
- var os = require('os');
- var assert = require('assert');
- var Int64 = require('node-int64');
- // BSER uses the local endianness to reduce byte swapping overheads
- // (the protocol is expressly local IPC only). We need to tell node
- // to use the native endianness when reading various native values.
- var isBigEndian = os.endianness() == 'BE';
- // Find the next power-of-2 >= size
- function nextPow2(size) {
- return Math.pow(2, Math.ceil(Math.log(size) / Math.LN2));
- }
- // Expandable buffer that we can provide a size hint for
- function Accumulator(initsize) {
- this.buf = new Buffer(nextPow2(initsize || 8192));
- this.readOffset = 0;
- this.writeOffset = 0;
- }
- // For testing
- exports.Accumulator = Accumulator
- // How much we can write into this buffer without allocating
- Accumulator.prototype.writeAvail = function() {
- return this.buf.length - this.writeOffset;
- }
- // How much we can read
- Accumulator.prototype.readAvail = function() {
- return this.writeOffset - this.readOffset;
- }
- // Ensure that we have enough space for size bytes
- Accumulator.prototype.reserve = function(size) {
- if (size < this.writeAvail()) {
- return;
- }
- // If we can make room by shunting down, do so
- if (this.readOffset > 0) {
- this.buf.copy(this.buf, 0, this.readOffset, this.writeOffset);
- this.writeOffset -= this.readOffset;
- this.readOffset = 0;
- }
- // If we made enough room, no need to allocate more
- if (size < this.writeAvail()) {
- return;
- }
- // Allocate a replacement and copy it in
- var buf = new Buffer(nextPow2(this.buf.length + size - this.writeAvail()));
- this.buf.copy(buf);
- this.buf = buf;
- }
- // Append buffer or string. Will resize as needed
- Accumulator.prototype.append = function(buf) {
- if (Buffer.isBuffer(buf)) {
- this.reserve(buf.length);
- buf.copy(this.buf, this.writeOffset, 0, buf.length);
- this.writeOffset += buf.length;
- } else {
- var size = Buffer.byteLength(buf);
- this.reserve(size);
- this.buf.write(buf, this.writeOffset);
- this.writeOffset += size;
- }
- }
- Accumulator.prototype.assertReadableSize = function(size) {
- if (this.readAvail() < size) {
- throw new Error("wanted to read " + size +
- " bytes but only have " + this.readAvail());
- }
- }
- Accumulator.prototype.peekString = function(size) {
- this.assertReadableSize(size);
- return this.buf.toString('utf-8', this.readOffset, this.readOffset + size);
- }
- Accumulator.prototype.readString = function(size) {
- var str = this.peekString(size);
- this.readOffset += size;
- return str;
- }
- Accumulator.prototype.peekInt = function(size) {
- this.assertReadableSize(size);
- switch (size) {
- case 1:
- return this.buf.readInt8(this.readOffset, size);
- case 2:
- return isBigEndian ?
- this.buf.readInt16BE(this.readOffset, size) :
- this.buf.readInt16LE(this.readOffset, size);
- case 4:
- return isBigEndian ?
- this.buf.readInt32BE(this.readOffset, size) :
- this.buf.readInt32LE(this.readOffset, size);
- case 8:
- var big = this.buf.slice(this.readOffset, this.readOffset + 8);
- if (isBigEndian) {
- // On a big endian system we can simply pass the buffer directly
- return new Int64(big);
- }
- // Otherwise we need to byteswap
- return new Int64(byteswap64(big));
- default:
- throw new Error("invalid integer size " + size);
- }
- }
- Accumulator.prototype.readInt = function(bytes) {
- var ival = this.peekInt(bytes);
- if (ival instanceof Int64 && isFinite(ival.valueOf())) {
- ival = ival.valueOf();
- }
- this.readOffset += bytes;
- return ival;
- }
- Accumulator.prototype.peekDouble = function() {
- this.assertReadableSize(8);
- return isBigEndian ?
- this.buf.readDoubleBE(this.readOffset) :
- this.buf.readDoubleLE(this.readOffset);
- }
- Accumulator.prototype.readDouble = function() {
- var dval = this.peekDouble();
- this.readOffset += 8;
- return dval;
- }
- Accumulator.prototype.readAdvance = function(size) {
- if (size > 0) {
- this.assertReadableSize(size);
- } else if (size < 0 && this.readOffset + size < 0) {
- throw new Error("advance with negative offset " + size +
- " would seek off the start of the buffer");
- }
- this.readOffset += size;
- }
- Accumulator.prototype.writeByte = function(value) {
- this.reserve(1);
- this.buf.writeInt8(value, this.writeOffset);
- ++this.writeOffset;
- }
- Accumulator.prototype.writeInt = function(value, size) {
- this.reserve(size);
- switch (size) {
- case 1:
- this.buf.writeInt8(value, this.writeOffset);
- break;
- case 2:
- if (isBigEndian) {
- this.buf.writeInt16BE(value, this.writeOffset);
- } else {
- this.buf.writeInt16LE(value, this.writeOffset);
- }
- break;
- case 4:
- if (isBigEndian) {
- this.buf.writeInt32BE(value, this.writeOffset);
- } else {
- this.buf.writeInt32LE(value, this.writeOffset);
- }
- break;
- default:
- throw new Error("unsupported integer size " + size);
- }
- this.writeOffset += size;
- }
- Accumulator.prototype.writeDouble = function(value) {
- this.reserve(8);
- if (isBigEndian) {
- this.buf.writeDoubleBE(value, this.writeOffset);
- } else {
- this.buf.writeDoubleLE(value, this.writeOffset);
- }
- this.writeOffset += 8;
- }
- var BSER_ARRAY = 0x00;
- var BSER_OBJECT = 0x01;
- var BSER_STRING = 0x02;
- var BSER_INT8 = 0x03;
- var BSER_INT16 = 0x04;
- var BSER_INT32 = 0x05;
- var BSER_INT64 = 0x06;
- var BSER_REAL = 0x07;
- var BSER_TRUE = 0x08;
- var BSER_FALSE = 0x09;
- var BSER_NULL = 0x0a;
- var BSER_TEMPLATE = 0x0b;
- var BSER_SKIP = 0x0c;
- var ST_NEED_PDU = 0; // Need to read and decode PDU length
- var ST_FILL_PDU = 1; // Know the length, need to read whole content
- var MAX_INT8 = 127;
- var MAX_INT16 = 32767;
- var MAX_INT32 = 2147483647;
- function BunserBuf() {
- EE.call(this);
- this.buf = new Accumulator();
- this.state = ST_NEED_PDU;
- }
- util.inherits(BunserBuf, EE);
- exports.BunserBuf = BunserBuf;
- BunserBuf.prototype.append = function(buf, synchronous) {
- if (synchronous) {
- this.buf.append(buf);
- return this.process(synchronous);
- }
- try {
- this.buf.append(buf);
- } catch (err) {
- this.emit('error', err);
- return;
- }
- // Arrange to decode later. This allows the consuming
- // application to make progress with other work in the
- // case that we have a lot of subscription updates coming
- // in from a large tree.
- this.processLater();
- }
- BunserBuf.prototype.processLater = function() {
- var self = this;
- process.nextTick(function() {
- try {
- self.process(false);
- } catch (err) {
- self.emit('error', err);
- }
- });
- }
- // Do something with the buffer to advance our state.
- // If we're running synchronously we'll return either
- // the value we've decoded or undefined if we don't
- // yet have enought data.
- // If we're running asynchronously, we'll emit the value
- // when it becomes ready and schedule another invocation
- // of process on the next tick if we still have data we
- // can process.
- BunserBuf.prototype.process = function(synchronous) {
- if (this.state == ST_NEED_PDU) {
- if (this.buf.readAvail() < 2) {
- return;
- }
- // Validate BSER header
- this.expectCode(0);
- this.expectCode(1);
- this.pduLen = this.decodeInt(true /* relaxed */);
- if (this.pduLen === false) {
- // Need more data, walk backwards
- this.buf.readAdvance(-2);
- return;
- }
- // Ensure that we have a big enough buffer to read the rest of the PDU
- this.buf.reserve(this.pduLen);
- this.state = ST_FILL_PDU;
- }
- if (this.state == ST_FILL_PDU) {
- if (this.buf.readAvail() < this.pduLen) {
- // Need more data
- return;
- }
- // We have enough to decode it
- var val = this.decodeAny();
- if (synchronous) {
- return val;
- }
- this.emit('value', val);
- this.state = ST_NEED_PDU;
- }
- if (!synchronous && this.buf.readAvail() > 0) {
- this.processLater();
- }
- }
- BunserBuf.prototype.raise = function(reason) {
- throw new Error(reason + ", in Buffer of length " +
- this.buf.buf.length + " (" + this.buf.readAvail() +
- " readable) at offset " + this.buf.readOffset + " buffer: " +
- JSON.stringify(this.buf.buf.slice(
- this.buf.readOffset, this.buf.readOffset + 32).toJSON()));
- }
- BunserBuf.prototype.expectCode = function(expected) {
- var code = this.buf.readInt(1);
- if (code != expected) {
- this.raise("expected bser opcode " + expected + " but got " + code);
- }
- }
- BunserBuf.prototype.decodeAny = function() {
- var code = this.buf.peekInt(1);
- switch (code) {
- case BSER_INT8:
- case BSER_INT16:
- case BSER_INT32:
- case BSER_INT64:
- return this.decodeInt();
- case BSER_REAL:
- this.buf.readAdvance(1);
- return this.buf.readDouble();
- case BSER_TRUE:
- this.buf.readAdvance(1);
- return true;
- case BSER_FALSE:
- this.buf.readAdvance(1);
- return false;
- case BSER_NULL:
- this.buf.readAdvance(1);
- return null;
- case BSER_STRING:
- return this.decodeString();
- case BSER_ARRAY:
- return this.decodeArray();
- case BSER_OBJECT:
- return this.decodeObject();
- case BSER_TEMPLATE:
- return this.decodeTemplate();
- default:
- this.raise("unhandled bser opcode " + code);
- }
- }
- BunserBuf.prototype.decodeArray = function() {
- this.expectCode(BSER_ARRAY);
- var nitems = this.decodeInt();
- var arr = [];
- for (var i = 0; i < nitems; ++i) {
- arr.push(this.decodeAny());
- }
- return arr;
- }
- BunserBuf.prototype.decodeObject = function() {
- this.expectCode(BSER_OBJECT);
- var nitems = this.decodeInt();
- var res = {};
- for (var i = 0; i < nitems; ++i) {
- var key = this.decodeString();
- var val = this.decodeAny();
- res[key] = val;
- }
- return res;
- }
- BunserBuf.prototype.decodeTemplate = function() {
- this.expectCode(BSER_TEMPLATE);
- var keys = this.decodeArray();
- var nitems = this.decodeInt();
- var arr = [];
- for (var i = 0; i < nitems; ++i) {
- var obj = {};
- for (var keyidx = 0; keyidx < keys.length; ++keyidx) {
- if (this.buf.peekInt(1) == BSER_SKIP) {
- this.buf.readAdvance(1);
- continue;
- }
- var val = this.decodeAny();
- obj[keys[keyidx]] = val;
- }
- arr.push(obj);
- }
- return arr;
- }
- BunserBuf.prototype.decodeString = function() {
- this.expectCode(BSER_STRING);
- var len = this.decodeInt();
- return this.buf.readString(len);
- }
- // This is unusual compared to the other decode functions in that
- // we may not have enough data available to satisfy the read, and
- // we don't want to throw. This is only true when we're reading
- // the PDU length from the PDU header; we'll set relaxSizeAsserts
- // in that case.
- BunserBuf.prototype.decodeInt = function(relaxSizeAsserts) {
- if (relaxSizeAsserts && !this.buf.readAvail(1)) {
- return false;
- } else {
- this.buf.assertReadableSize(1);
- }
- var code = this.buf.peekInt(1);
- var size = 0;
- switch (code) {
- case BSER_INT8:
- size = 1;
- break;
- case BSER_INT16:
- size = 2;
- break;
- case BSER_INT32:
- size = 4;
- break;
- case BSER_INT64:
- size = 8;
- break;
- default:
- this.raise("invalid bser int encoding " + code);
- }
- if (relaxSizeAsserts && !this.buf.readAvail(1 + size)) {
- return false;
- }
- this.buf.readAdvance(1);
- return this.buf.readInt(size);
- }
- // synchronously BSER decode a string and return the value
- function loadFromBuffer(input) {
- var buf = new BunserBuf();
- var result = buf.append(input, true);
- if (buf.buf.readAvail()) {
- throw Error(
- 'excess data found after input buffer, use BunserBuf instead');
- }
- if (typeof result === 'undefined') {
- throw Error(
- 'no bser found in string and no error raised!?');
- }
- return result;
- }
- exports.loadFromBuffer = loadFromBuffer
- // Byteswap an arbitrary buffer, flipping from one endian
- // to the other, returning a new buffer with the resultant data
- function byteswap64(buf) {
- var swap = new Buffer(buf.length);
- for (var i = 0; i < buf.length; i++) {
- swap[i] = buf[buf.length -1 - i];
- }
- return swap;
- }
- function dump_int64(buf, val) {
- // Get the raw bytes. The Int64 buffer is big endian
- var be = val.toBuffer();
- if (isBigEndian) {
- // We're a big endian system, so the buffer is exactly how we
- // want it to be
- buf.writeByte(BSER_INT64);
- buf.append(be);
- return;
- }
- // We need to byte swap to get the correct representation
- var le = byteswap64(be);
- buf.writeByte(BSER_INT64);
- buf.append(le);
- }
- function dump_int(buf, val) {
- var abs = Math.abs(val);
- if (abs <= MAX_INT8) {
- buf.writeByte(BSER_INT8);
- buf.writeInt(val, 1);
- } else if (abs <= MAX_INT16) {
- buf.writeByte(BSER_INT16);
- buf.writeInt(val, 2);
- } else if (abs <= MAX_INT32) {
- buf.writeByte(BSER_INT32);
- buf.writeInt(val, 4);
- } else {
- dump_int64(buf, new Int64(val));
- }
- }
- function dump_any(buf, val) {
- switch (typeof(val)) {
- case 'number':
- // check if it is an integer or a float
- if (isFinite(val) && Math.floor(val) === val) {
- dump_int(buf, val);
- } else {
- buf.writeByte(BSER_REAL);
- buf.writeDouble(val);
- }
- return;
- case 'string':
- buf.writeByte(BSER_STRING);
- dump_int(buf, Buffer.byteLength(val));
- buf.append(val);
- return;
- case 'boolean':
- buf.writeByte(val ? BSER_TRUE : BSER_FALSE);
- return;
- case 'object':
- if (val === null) {
- buf.writeByte(BSER_NULL);
- return;
- }
- if (val instanceof Int64) {
- dump_int64(buf, val);
- return;
- }
- if (Array.isArray(val)) {
- buf.writeByte(BSER_ARRAY);
- dump_int(buf, val.length);
- for (var i = 0; i < val.length; ++i) {
- dump_any(buf, val[i]);
- }
- return;
- }
- buf.writeByte(BSER_OBJECT);
- var keys = Object.keys(val);
- // First pass to compute number of defined keys
- var num_keys = keys.length;
- for (var i = 0; i < keys.length; ++i) {
- var key = keys[i];
- var v = val[key];
- if (typeof(v) == 'undefined') {
- num_keys--;
- }
- }
- dump_int(buf, num_keys);
- for (var i = 0; i < keys.length; ++i) {
- var key = keys[i];
- var v = val[key];
- if (typeof(v) == 'undefined') {
- // Don't include it
- continue;
- }
- dump_any(buf, key);
- try {
- dump_any(buf, v);
- } catch (e) {
- throw new Error(
- e.message + ' (while serializing object property with name `' +
- key + "')");
- }
- }
- return;
- default:
- throw new Error('cannot serialize type ' + typeof(val) + ' to BSER');
- }
- }
- // BSER encode value and return a buffer of the contents
- function dumpToBuffer(val) {
- var buf = new Accumulator();
- // Build out the header
- buf.writeByte(0);
- buf.writeByte(1);
- // Reserve room for an int32 to hold our PDU length
- buf.writeByte(BSER_INT32);
- buf.writeInt(0, 4); // We'll come back and fill this in at the end
- dump_any(buf, val);
- // Compute PDU length
- var off = buf.writeOffset;
- var len = off - 7 /* the header length */;
- buf.writeOffset = 3; // The length value to fill in
- buf.writeInt(len, 4); // write the length in the space we reserved
- buf.writeOffset = off;
- return buf.buf.slice(0, off);
- }
- exports.dumpToBuffer = dumpToBuffer
|