mirror of
https://github.com/nodejs/node.git
synced 2025-05-13 16:39:33 +00:00
stream: Remove bufferSize option
Now that highWaterMark increases when there are large reads, this greatly reduces the number of calls necessary to _read(size), assuming that _read actually respects the size argument.
This commit is contained in:
parent
d5a0940fff
commit
b0f6789a78
@ -90,8 +90,6 @@ method. (See below.)
|
|||||||
### new stream.Readable([options])
|
### new stream.Readable([options])
|
||||||
|
|
||||||
* `options` {Object}
|
* `options` {Object}
|
||||||
* `bufferSize` {Number} The size of the chunks to consume from the
|
|
||||||
underlying resource. Default=16kb
|
|
||||||
* `highWaterMark` {Number} The maximum number of bytes to store in
|
* `highWaterMark` {Number} The maximum number of bytes to store in
|
||||||
the internal buffer before ceasing to read from the underlying
|
the internal buffer before ceasing to read from the underlying
|
||||||
resource. Default=16kb
|
resource. Default=16kb
|
||||||
|
@ -32,16 +32,12 @@ util.inherits(Readable, Stream);
|
|||||||
function ReadableState(options, stream) {
|
function ReadableState(options, stream) {
|
||||||
options = options || {};
|
options = options || {};
|
||||||
|
|
||||||
// the argument passed to this._read(n)
|
|
||||||
this.bufferSize = options.bufferSize || 16 * 1024;
|
|
||||||
|
|
||||||
// the point at which it stops calling _read() to fill the buffer
|
// the point at which it stops calling _read() to fill the buffer
|
||||||
// Note: 0 is a valid value, means "don't call _read preemptively ever"
|
// Note: 0 is a valid value, means "don't call _read preemptively ever"
|
||||||
var hwm = options.highWaterMark;
|
var hwm = options.highWaterMark;
|
||||||
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
|
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
|
||||||
|
|
||||||
// cast to ints.
|
// cast to ints.
|
||||||
this.bufferSize = ~~this.bufferSize;
|
|
||||||
this.highWaterMark = ~~this.highWaterMark;
|
this.highWaterMark = ~~this.highWaterMark;
|
||||||
|
|
||||||
this.buffer = [];
|
this.buffer = [];
|
||||||
@ -265,7 +261,7 @@ Readable.prototype.read = function(n) {
|
|||||||
if (state.length === 0)
|
if (state.length === 0)
|
||||||
state.needReadable = true;
|
state.needReadable = true;
|
||||||
// call internal read method
|
// call internal read method
|
||||||
this._read(state.bufferSize);
|
this._read(state.highWaterMark);
|
||||||
state.sync = false;
|
state.sync = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ function afterTransform(stream, er, data) {
|
|||||||
|
|
||||||
var rs = stream._readableState;
|
var rs = stream._readableState;
|
||||||
if (rs.needReadable || rs.length < rs.highWaterMark) {
|
if (rs.needReadable || rs.length < rs.highWaterMark) {
|
||||||
stream._read(rs.bufferSize);
|
stream._read(rs.highWaterMark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,7 +165,7 @@ Transform.prototype._write = function(chunk, encoding, cb) {
|
|||||||
if (ts.needTransform ||
|
if (ts.needTransform ||
|
||||||
rs.needReadable ||
|
rs.needReadable ||
|
||||||
rs.length < rs.highWaterMark)
|
rs.length < rs.highWaterMark)
|
||||||
this._read(rs.bufferSize);
|
this._read(rs.highWaterMark);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1431,7 +1431,6 @@ function ReadStream(path, options) {
|
|||||||
|
|
||||||
// a little bit bigger buffer and water marks by default
|
// a little bit bigger buffer and water marks by default
|
||||||
options = util._extend({
|
options = util._extend({
|
||||||
bufferSize: 64 * 1024,
|
|
||||||
highWaterMark: 64 * 1024
|
highWaterMark: 64 * 1024
|
||||||
}, options || {});
|
}, options || {});
|
||||||
|
|
||||||
@ -1505,10 +1504,9 @@ ReadStream.prototype._read = function(n) {
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
if (!pool || pool.length - pool.used < kMinPoolSpace) {
|
if (!pool || pool.length - pool.used < kMinPoolSpace) {
|
||||||
// discard the old pool. Can't add to the free list because
|
// discard the old pool.
|
||||||
// users might have refernces to slices on it.
|
|
||||||
pool = null;
|
pool = null;
|
||||||
allocNewPool(this._readableState.bufferSize);
|
allocNewPool(this._readableState.highWaterMark);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grab another reference to the pool in the case that while we're
|
// Grab another reference to the pool in the case that while we're
|
||||||
|
@ -32,7 +32,7 @@ var Readable = require('stream').Readable;
|
|||||||
// throw an error if we trigger a nextTick warning.
|
// throw an error if we trigger a nextTick warning.
|
||||||
process.throwDeprecation = true;
|
process.throwDeprecation = true;
|
||||||
|
|
||||||
var stream = new Readable({ highWaterMark: 2, bufferSize: 2 });
|
var stream = new Readable({ highWaterMark: 2 });
|
||||||
var reads = 0;
|
var reads = 0;
|
||||||
stream._read = function(size) {
|
stream._read = function(size) {
|
||||||
reads++;
|
reads++;
|
||||||
@ -59,7 +59,7 @@ flow(stream, 5000, function() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
process.on('exit', function(code) {
|
process.on('exit', function(code) {
|
||||||
assert.equal(reads, 5000);
|
assert.equal(reads, 2);
|
||||||
// we pushed up the high water mark
|
// we pushed up the high water mark
|
||||||
assert.equal(stream._readableState.highWaterMark, 5000);
|
assert.equal(stream._readableState.highWaterMark, 5000);
|
||||||
assert.equal(stream._readableState.length, 5000);
|
assert.equal(stream._readableState.length, 5000);
|
||||||
|
@ -32,11 +32,7 @@ var file = path.resolve(common.fixturesDir, 'x1024.txt');
|
|||||||
|
|
||||||
var size = fs.statSync(file).size;
|
var size = fs.statSync(file).size;
|
||||||
|
|
||||||
// expect to see chunks no more than 10 bytes each.
|
var expectLengths = [1024];
|
||||||
var expectLengths = [];
|
|
||||||
for (var i = size; i > 0; i -= 10) {
|
|
||||||
expectLengths.push(Math.min(i, 10));
|
|
||||||
}
|
|
||||||
|
|
||||||
var util = require('util');
|
var util = require('util');
|
||||||
var Stream = require('stream');
|
var Stream = require('stream');
|
||||||
@ -60,7 +56,7 @@ TestWriter.prototype.end = function(c) {
|
|||||||
this.emit('results', this.buffer);
|
this.emit('results', this.buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
var r = new FSReadable(file, { bufferSize: 10 });
|
var r = new FSReadable(file);
|
||||||
var w = new TestWriter();
|
var w = new TestWriter();
|
||||||
|
|
||||||
w.on('results', function(res) {
|
w.on('results', function(res) {
|
||||||
|
@ -64,9 +64,7 @@ process.nextTick(run);
|
|||||||
util.inherits(TestReader, R);
|
util.inherits(TestReader, R);
|
||||||
|
|
||||||
function TestReader(n, opts) {
|
function TestReader(n, opts) {
|
||||||
R.call(this, util._extend({
|
R.call(this, opts);
|
||||||
bufferSize: 5
|
|
||||||
}, opts));
|
|
||||||
|
|
||||||
this.pos = 0;
|
this.pos = 0;
|
||||||
this.len = n || 100;
|
this.len = n || 100;
|
||||||
|
Loading…
Reference in New Issue
Block a user