stream_base: expose bytesRead
getter
This will provide `bytesRead` data on consumed sockets. Fix: #3021 PR-URL: https://github.com/nodejs/node/pull/6284 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
parent
e1cf634a0b
commit
6198472d83
18
lib/net.js
18
lib/net.js
@ -97,7 +97,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs;
|
|||||||
// called when creating new Socket, or when re-using a closed Socket
|
// called when creating new Socket, or when re-using a closed Socket
|
||||||
function initSocketHandle(self) {
|
function initSocketHandle(self) {
|
||||||
self.destroyed = false;
|
self.destroyed = false;
|
||||||
self.bytesRead = 0;
|
|
||||||
self._bytesDispatched = 0;
|
self._bytesDispatched = 0;
|
||||||
self._sockname = null;
|
self._sockname = null;
|
||||||
|
|
||||||
@ -112,6 +111,10 @@ function initSocketHandle(self) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
const BYTES_READ = Symbol('bytesRead');
|
||||||
|
|
||||||
|
|
||||||
function Socket(options) {
|
function Socket(options) {
|
||||||
if (!(this instanceof Socket)) return new Socket(options);
|
if (!(this instanceof Socket)) return new Socket(options);
|
||||||
|
|
||||||
@ -179,6 +182,9 @@ function Socket(options) {
|
|||||||
// Reserve properties
|
// Reserve properties
|
||||||
this.server = null;
|
this.server = null;
|
||||||
this._server = null;
|
this._server = null;
|
||||||
|
|
||||||
|
// Used after `.destroy()`
|
||||||
|
this[BYTES_READ] = 0;
|
||||||
}
|
}
|
||||||
util.inherits(Socket, stream.Duplex);
|
util.inherits(Socket, stream.Duplex);
|
||||||
|
|
||||||
@ -470,6 +476,9 @@ Socket.prototype._destroy = function(exception, cb) {
|
|||||||
if (this !== process.stderr)
|
if (this !== process.stderr)
|
||||||
debug('close handle');
|
debug('close handle');
|
||||||
var isException = exception ? true : false;
|
var isException = exception ? true : false;
|
||||||
|
// `bytesRead` should be accessible after `.destroy()`
|
||||||
|
this[BYTES_READ] = this._handle.bytesRead;
|
||||||
|
|
||||||
this._handle.close(() => {
|
this._handle.close(() => {
|
||||||
debug('emit close');
|
debug('emit close');
|
||||||
this.emit('close', isException);
|
this.emit('close', isException);
|
||||||
@ -521,10 +530,6 @@ function onread(nread, buffer) {
|
|||||||
// will prevent this from being called again until _read() gets
|
// will prevent this from being called again until _read() gets
|
||||||
// called again.
|
// called again.
|
||||||
|
|
||||||
// if it's not enough data, we'll just call handle.readStart()
|
|
||||||
// again right away.
|
|
||||||
self.bytesRead += nread;
|
|
||||||
|
|
||||||
// Optimization: emit the original buffer with end points
|
// Optimization: emit the original buffer with end points
|
||||||
var ret = self.push(buffer);
|
var ret = self.push(buffer);
|
||||||
|
|
||||||
@ -580,6 +585,9 @@ Socket.prototype._getpeername = function() {
|
|||||||
return this._peername;
|
return this._peername;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Socket.prototype.__defineGetter__('bytesRead', function() {
|
||||||
|
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
|
||||||
|
});
|
||||||
|
|
||||||
Socket.prototype.__defineGetter__('remoteAddress', function() {
|
Socket.prototype.__defineGetter__('remoteAddress', function() {
|
||||||
return this._getpeername().address;
|
return this._getpeername().address;
|
||||||
|
@ -72,6 +72,7 @@ namespace node {
|
|||||||
V(buffer_string, "buffer") \
|
V(buffer_string, "buffer") \
|
||||||
V(bytes_string, "bytes") \
|
V(bytes_string, "bytes") \
|
||||||
V(bytes_parsed_string, "bytesParsed") \
|
V(bytes_parsed_string, "bytesParsed") \
|
||||||
|
V(bytes_read_string, "bytesRead") \
|
||||||
V(cached_data_string, "cachedData") \
|
V(cached_data_string, "cachedData") \
|
||||||
V(cached_data_produced_string, "cachedDataProduced") \
|
V(cached_data_produced_string, "cachedDataProduced") \
|
||||||
V(cached_data_rejected_string, "cachedDataRejected") \
|
V(cached_data_rejected_string, "cachedDataRejected") \
|
||||||
|
@ -43,6 +43,13 @@ void StreamBase::AddMethods(Environment* env,
|
|||||||
v8::DEFAULT,
|
v8::DEFAULT,
|
||||||
attributes);
|
attributes);
|
||||||
|
|
||||||
|
t->InstanceTemplate()->SetAccessor(env->bytes_read_string(),
|
||||||
|
GetBytesRead<Base>,
|
||||||
|
nullptr,
|
||||||
|
env->as_external(),
|
||||||
|
v8::DEFAULT,
|
||||||
|
attributes);
|
||||||
|
|
||||||
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
|
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
|
||||||
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
|
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
|
||||||
if ((flags & kFlagNoShutdown) == 0)
|
if ((flags & kFlagNoShutdown) == 0)
|
||||||
@ -79,6 +86,16 @@ void StreamBase::GetFD(Local<String> key,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template <class Base>
|
||||||
|
void StreamBase::GetBytesRead(Local<String> key,
|
||||||
|
const PropertyCallbackInfo<Value>& args) {
|
||||||
|
StreamBase* wrap = Unwrap<Base>(args.Holder());
|
||||||
|
|
||||||
|
// uint64_t -> double. 53bits is enough for all real cases.
|
||||||
|
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
template <class Base>
|
template <class Base>
|
||||||
void StreamBase::GetExternal(Local<String> key,
|
void StreamBase::GetExternal(Local<String> key,
|
||||||
const PropertyCallbackInfo<Value>& args) {
|
const PropertyCallbackInfo<Value>& args) {
|
||||||
|
@ -136,7 +136,7 @@ class StreamResource {
|
|||||||
uv_handle_type pending,
|
uv_handle_type pending,
|
||||||
void* ctx);
|
void* ctx);
|
||||||
|
|
||||||
StreamResource() {
|
StreamResource() : bytes_read_(0) {
|
||||||
}
|
}
|
||||||
virtual ~StreamResource() = default;
|
virtual ~StreamResource() = default;
|
||||||
|
|
||||||
@ -160,9 +160,11 @@ class StreamResource {
|
|||||||
alloc_cb_.fn(size, buf, alloc_cb_.ctx);
|
alloc_cb_.fn(size, buf, alloc_cb_.ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void OnRead(size_t nread,
|
inline void OnRead(ssize_t nread,
|
||||||
const uv_buf_t* buf,
|
const uv_buf_t* buf,
|
||||||
uv_handle_type pending = UV_UNKNOWN_HANDLE) {
|
uv_handle_type pending = UV_UNKNOWN_HANDLE) {
|
||||||
|
if (nread > 0)
|
||||||
|
bytes_read_ += static_cast<uint64_t>(nread);
|
||||||
if (!read_cb_.is_empty())
|
if (!read_cb_.is_empty())
|
||||||
read_cb_.fn(nread, buf, pending, read_cb_.ctx);
|
read_cb_.fn(nread, buf, pending, read_cb_.ctx);
|
||||||
}
|
}
|
||||||
@ -182,6 +184,9 @@ class StreamResource {
|
|||||||
Callback<AfterWriteCb> after_write_cb_;
|
Callback<AfterWriteCb> after_write_cb_;
|
||||||
Callback<AllocCb> alloc_cb_;
|
Callback<AllocCb> alloc_cb_;
|
||||||
Callback<ReadCb> read_cb_;
|
Callback<ReadCb> read_cb_;
|
||||||
|
uint64_t bytes_read_;
|
||||||
|
|
||||||
|
friend class StreamBase;
|
||||||
};
|
};
|
||||||
|
|
||||||
class StreamBase : public StreamResource {
|
class StreamBase : public StreamResource {
|
||||||
@ -249,6 +254,10 @@ class StreamBase : public StreamResource {
|
|||||||
static void GetExternal(v8::Local<v8::String> key,
|
static void GetExternal(v8::Local<v8::String> key,
|
||||||
const v8::PropertyCallbackInfo<v8::Value>& args);
|
const v8::PropertyCallbackInfo<v8::Value>& args);
|
||||||
|
|
||||||
|
template <class Base>
|
||||||
|
static void GetBytesRead(v8::Local<v8::String> key,
|
||||||
|
const v8::PropertyCallbackInfo<v8::Value>& args);
|
||||||
|
|
||||||
template <class Base,
|
template <class Base,
|
||||||
int (StreamBase::*Method)( // NOLINT(whitespace/parens)
|
int (StreamBase::*Method)( // NOLINT(whitespace/parens)
|
||||||
const v8::FunctionCallbackInfo<v8::Value>& args)>
|
const v8::FunctionCallbackInfo<v8::Value>& args)>
|
||||||
|
37
test/parallel/test-net-bytes-read.js
Normal file
37
test/parallel/test-net-bytes-read.js
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const net = require('net');
|
||||||
|
|
||||||
|
const big = Buffer.alloc(1024 * 1024);
|
||||||
|
|
||||||
|
const server = net.createServer((socket) => {
|
||||||
|
socket.end(big);
|
||||||
|
server.close();
|
||||||
|
}).listen(common.PORT, () => {
|
||||||
|
let prev = 0;
|
||||||
|
|
||||||
|
function checkRaise(value) {
|
||||||
|
assert(value > prev);
|
||||||
|
prev = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
const socket = net.connect(common.PORT, () => {
|
||||||
|
socket.on('data', (chunk) => {
|
||||||
|
checkRaise(socket.bytesRead);
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on('end', common.mustCall(() => {
|
||||||
|
assert.equal(socket.bytesRead, prev);
|
||||||
|
assert.equal(big.length, prev);
|
||||||
|
}));
|
||||||
|
|
||||||
|
socket.on('close', common.mustCall(() => {
|
||||||
|
assert(!socket._handle);
|
||||||
|
assert.equal(socket.bytesRead, prev);
|
||||||
|
assert.equal(big.length, prev);
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
socket.end();
|
||||||
|
});
|
Loading…
x
Reference in New Issue
Block a user