net,src: refactor writeQueueSize tracking
Currently, writeQueueSize is never used in C++ and barely used within JS. Instead of constantly updating the value on the JS object, create a getter that will retrieve the most up-to-date value from C++. For the vast majority of cases though, create a new prop on Socket.prototype[kLastWriteQueueSize] using a Symbol. Use this to track the current write size, entirely in JS land. PR-URL: https://github.com/nodejs/node/pull/17650 Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
parent
68c63a9fa3
commit
d36e1b4fed
@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) {
|
|||||||
var options = this._tlsOptions;
|
var options = this._tlsOptions;
|
||||||
var ssl = this._handle;
|
var ssl = this._handle;
|
||||||
|
|
||||||
// lib/net.js expect this value to be non-zero if write hasn't been flushed
|
|
||||||
// immediately. After the handshake is done this will represent the actual
|
|
||||||
// write queue size
|
|
||||||
ssl.writeQueueSize = 1;
|
|
||||||
|
|
||||||
this.server = options.server;
|
this.server = options.server;
|
||||||
|
|
||||||
// For clients, we will always have either a given ca list or be using
|
// For clients, we will always have either a given ca list or be using
|
||||||
|
33
lib/net.js
33
lib/net.js
@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick');
|
|||||||
const errors = require('internal/errors');
|
const errors = require('internal/errors');
|
||||||
const dns = require('dns');
|
const dns = require('dns');
|
||||||
|
|
||||||
|
const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
|
||||||
|
|
||||||
// `cluster` is only used by `listenInCluster` so for startup performance
|
// `cluster` is only used by `listenInCluster` so for startup performance
|
||||||
// reasons it's lazy loaded.
|
// reasons it's lazy loaded.
|
||||||
var cluster = null;
|
var cluster = null;
|
||||||
@ -198,6 +200,7 @@ function Socket(options) {
|
|||||||
this._handle = null;
|
this._handle = null;
|
||||||
this._parent = null;
|
this._parent = null;
|
||||||
this._host = null;
|
this._host = null;
|
||||||
|
this[kLastWriteQueueSize] = 0;
|
||||||
|
|
||||||
if (typeof options === 'number')
|
if (typeof options === 'number')
|
||||||
options = { fd: options }; // Legacy interface.
|
options = { fd: options }; // Legacy interface.
|
||||||
@ -398,12 +401,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
|
|||||||
|
|
||||||
|
|
||||||
Socket.prototype._onTimeout = function() {
|
Socket.prototype._onTimeout = function() {
|
||||||
if (this._handle) {
|
const handle = this._handle;
|
||||||
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
|
const lastWriteQueueSize = this[kLastWriteQueueSize];
|
||||||
|
if (lastWriteQueueSize > 0 && handle) {
|
||||||
|
// `lastWriteQueueSize !== writeQueueSize` means there is
|
||||||
// an active write in progress, so we suppress the timeout.
|
// an active write in progress, so we suppress the timeout.
|
||||||
const prevWriteQueueSize = this._handle.writeQueueSize;
|
const writeQueueSize = handle.writeQueueSize;
|
||||||
if (prevWriteQueueSize > 0 &&
|
if (lastWriteQueueSize !== writeQueueSize) {
|
||||||
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
|
this[kLastWriteQueueSize] = writeQueueSize;
|
||||||
this._unrefTimer();
|
this._unrefTimer();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -473,7 +478,7 @@ Object.defineProperty(Socket.prototype, 'readyState', {
|
|||||||
Object.defineProperty(Socket.prototype, 'bufferSize', {
|
Object.defineProperty(Socket.prototype, 'bufferSize', {
|
||||||
get: function() {
|
get: function() {
|
||||||
if (this._handle) {
|
if (this._handle) {
|
||||||
return this._handle.writeQueueSize + this.writableLength;
|
return this[kLastWriteQueueSize] + this.writableLength;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -764,12 +769,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
|
|||||||
|
|
||||||
this._bytesDispatched += req.bytes;
|
this._bytesDispatched += req.bytes;
|
||||||
|
|
||||||
// If it was entirely flushed, we can write some more right now.
|
if (!req.async) {
|
||||||
// However, if more is left in the queue, then wait until that clears.
|
|
||||||
if (req.async && this._handle.writeQueueSize !== 0)
|
|
||||||
req.cb = cb;
|
|
||||||
else
|
|
||||||
cb();
|
cb();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
req.cb = cb;
|
||||||
|
this[kLastWriteQueueSize] = req.bytes;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -853,6 +859,9 @@ function afterWrite(status, handle, req, err) {
|
|||||||
if (self !== process.stderr && self !== process.stdout)
|
if (self !== process.stderr && self !== process.stdout)
|
||||||
debug('afterWrite', status);
|
debug('afterWrite', status);
|
||||||
|
|
||||||
|
if (req.async)
|
||||||
|
self[kLastWriteQueueSize] = 0;
|
||||||
|
|
||||||
// callback may come after call to destroy.
|
// callback may come after call to destroy.
|
||||||
if (self.destroyed) {
|
if (self.destroyed) {
|
||||||
debug('afterWrite destroyed');
|
debug('afterWrite destroyed');
|
||||||
@ -872,7 +881,7 @@ function afterWrite(status, handle, req, err) {
|
|||||||
debug('afterWrite call cb');
|
debug('afterWrite call cb');
|
||||||
|
|
||||||
if (req.cb)
|
if (req.cb)
|
||||||
req.cb.call(self);
|
req.cb.call(undefined);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -165,7 +165,6 @@ PipeWrap::PipeWrap(Environment* env,
|
|||||||
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
|
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
|
||||||
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
|
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
|
||||||
// Suggestion: uv_pipe_init() returns void.
|
// Suggestion: uv_pipe_init() returns void.
|
||||||
UpdateWriteQueueSize();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -193,6 +193,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = DoWrite(req_wrap, buf_list, count, nullptr);
|
err = DoWrite(req_wrap, buf_list, count, nullptr);
|
||||||
|
if (HasWriteQueue())
|
||||||
req_wrap_obj->Set(env->async(), True(env->isolate()));
|
req_wrap_obj->Set(env->async(), True(env->isolate()));
|
||||||
|
|
||||||
if (err)
|
if (err)
|
||||||
@ -249,6 +250,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
|
|||||||
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
|
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
|
||||||
|
|
||||||
err = DoWrite(req_wrap, bufs, count, nullptr);
|
err = DoWrite(req_wrap, bufs, count, nullptr);
|
||||||
|
if (HasWriteQueue())
|
||||||
req_wrap_obj->Set(env->async(), True(env->isolate()));
|
req_wrap_obj->Set(env->async(), True(env->isolate()));
|
||||||
req_wrap_obj->Set(env->buffer_string(), args[1]);
|
req_wrap_obj->Set(env->buffer_string(), args[1]);
|
||||||
|
|
||||||
@ -373,6 +375,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
|
|||||||
reinterpret_cast<uv_stream_t*>(send_handle));
|
reinterpret_cast<uv_stream_t*>(send_handle));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (HasWriteQueue())
|
||||||
req_wrap_obj->Set(env->async(), True(env->isolate()));
|
req_wrap_obj->Set(env->async(), True(env->isolate()));
|
||||||
|
|
||||||
if (err)
|
if (err)
|
||||||
@ -467,6 +470,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool StreamResource::HasWriteQueue() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
const char* StreamResource::Error() const {
|
const char* StreamResource::Error() const {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
@ -162,6 +162,7 @@ class StreamResource {
|
|||||||
uv_buf_t* bufs,
|
uv_buf_t* bufs,
|
||||||
size_t count,
|
size_t count,
|
||||||
uv_stream_t* send_handle) = 0;
|
uv_stream_t* send_handle) = 0;
|
||||||
|
virtual bool HasWriteQueue();
|
||||||
virtual const char* Error() const;
|
virtual const char* Error() const;
|
||||||
virtual void ClearError();
|
virtual void ClearError();
|
||||||
|
|
||||||
|
@ -40,13 +40,15 @@
|
|||||||
namespace node {
|
namespace node {
|
||||||
|
|
||||||
using v8::Context;
|
using v8::Context;
|
||||||
|
using v8::DontDelete;
|
||||||
using v8::EscapableHandleScope;
|
using v8::EscapableHandleScope;
|
||||||
using v8::FunctionCallbackInfo;
|
using v8::FunctionCallbackInfo;
|
||||||
using v8::FunctionTemplate;
|
using v8::FunctionTemplate;
|
||||||
using v8::HandleScope;
|
using v8::HandleScope;
|
||||||
using v8::Integer;
|
|
||||||
using v8::Local;
|
using v8::Local;
|
||||||
using v8::Object;
|
using v8::Object;
|
||||||
|
using v8::ReadOnly;
|
||||||
|
using v8::Signature;
|
||||||
using v8::Value;
|
using v8::Value;
|
||||||
|
|
||||||
|
|
||||||
@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
|
|||||||
void LibuvStreamWrap::AddMethods(Environment* env,
|
void LibuvStreamWrap::AddMethods(Environment* env,
|
||||||
v8::Local<v8::FunctionTemplate> target,
|
v8::Local<v8::FunctionTemplate> target,
|
||||||
int flags) {
|
int flags) {
|
||||||
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
|
Local<FunctionTemplate> get_write_queue_size =
|
||||||
|
FunctionTemplate::New(env->isolate(),
|
||||||
|
GetWriteQueueSize,
|
||||||
|
env->as_external(),
|
||||||
|
Signature::New(env->isolate(), target));
|
||||||
|
target->PrototypeTemplate()->SetAccessorProperty(
|
||||||
|
env->write_queue_size_string(),
|
||||||
|
get_write_queue_size,
|
||||||
|
Local<FunctionTemplate>(),
|
||||||
|
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
|
||||||
env->SetProtoMethod(target, "setBlocking", SetBlocking);
|
env->SetProtoMethod(target, "setBlocking", SetBlocking);
|
||||||
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
|
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
|
||||||
}
|
}
|
||||||
@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
|
|
||||||
HandleScope scope(env()->isolate());
|
|
||||||
uint32_t write_queue_size = stream()->write_queue_size;
|
|
||||||
object()->Set(env()->context(),
|
|
||||||
env()->write_queue_size_string(),
|
|
||||||
Integer::NewFromUnsigned(env()->isolate(),
|
|
||||||
write_queue_size)).FromJust();
|
|
||||||
return write_queue_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int LibuvStreamWrap::ReadStart() {
|
int LibuvStreamWrap::ReadStart() {
|
||||||
return uv_read_start(stream(), OnAlloc, OnRead);
|
return uv_read_start(stream(), OnAlloc, OnRead);
|
||||||
}
|
}
|
||||||
@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::UpdateWriteQueueSize(
|
void LibuvStreamWrap::GetWriteQueueSize(
|
||||||
const FunctionCallbackInfo<Value>& args) {
|
const FunctionCallbackInfo<Value>& info) {
|
||||||
LibuvStreamWrap* wrap;
|
LibuvStreamWrap* wrap;
|
||||||
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
|
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
|
||||||
|
|
||||||
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
|
if (wrap->stream() == nullptr) {
|
||||||
args.GetReturnValue().Set(write_queue_size);
|
info.GetReturnValue().Set(0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t write_queue_size = wrap->stream()->write_queue_size;
|
||||||
|
info.GetReturnValue().Set(write_queue_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
|
|||||||
}
|
}
|
||||||
|
|
||||||
w->Dispatched();
|
w->Dispatched();
|
||||||
UpdateWriteQueueSize();
|
|
||||||
|
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool LibuvStreamWrap::HasWriteQueue() {
|
||||||
|
return stream()->write_queue_size > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
|
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
|
||||||
WriteWrap* req_wrap = WriteWrap::from_req(req);
|
WriteWrap* req_wrap = WriteWrap::from_req(req);
|
||||||
CHECK_NE(req_wrap, nullptr);
|
CHECK_NE(req_wrap, nullptr);
|
||||||
@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
|
|||||||
|
|
||||||
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
|
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
|
||||||
StreamBase::AfterWrite(w, status);
|
StreamBase::AfterWrite(w, status);
|
||||||
UpdateWriteQueueSize();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace node
|
} // namespace node
|
||||||
|
@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
|
|||||||
uv_buf_t* bufs,
|
uv_buf_t* bufs,
|
||||||
size_t count,
|
size_t count,
|
||||||
uv_stream_t* send_handle) override;
|
uv_stream_t* send_handle) override;
|
||||||
|
bool HasWriteQueue() override;
|
||||||
|
|
||||||
inline uv_stream_t* stream() const {
|
inline uv_stream_t* stream() const {
|
||||||
return stream_;
|
return stream_;
|
||||||
@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
AsyncWrap* GetAsyncWrap() override;
|
AsyncWrap* GetAsyncWrap() override;
|
||||||
uint32_t UpdateWriteQueueSize();
|
|
||||||
|
|
||||||
static void AddMethods(Environment* env,
|
static void AddMethods(Environment* env,
|
||||||
v8::Local<v8::FunctionTemplate> target,
|
v8::Local<v8::FunctionTemplate> target,
|
||||||
int flags = StreamBase::kFlagNone);
|
int flags = StreamBase::kFlagNone);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static void UpdateWriteQueueSize(
|
static void GetWriteQueueSize(
|
||||||
const v8::FunctionCallbackInfo<v8::Value>& args);
|
const v8::FunctionCallbackInfo<v8::Value>& info);
|
||||||
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
|
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||||
|
|
||||||
// Callbacks for libuv
|
// Callbacks for libuv
|
||||||
|
@ -169,7 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
|
|||||||
int r = uv_tcp_init(env->event_loop(), &handle_);
|
int r = uv_tcp_init(env->event_loop(), &handle_);
|
||||||
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
|
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
|
||||||
// Suggestion: uv_tcp_init() returns void.
|
// Suggestion: uv_tcp_init() returns void.
|
||||||
UpdateWriteQueueSize();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -35,14 +35,16 @@ namespace node {
|
|||||||
using crypto::SecureContext;
|
using crypto::SecureContext;
|
||||||
using crypto::SSLWrap;
|
using crypto::SSLWrap;
|
||||||
using v8::Context;
|
using v8::Context;
|
||||||
|
using v8::DontDelete;
|
||||||
using v8::EscapableHandleScope;
|
using v8::EscapableHandleScope;
|
||||||
using v8::Exception;
|
using v8::Exception;
|
||||||
using v8::Function;
|
using v8::Function;
|
||||||
using v8::FunctionCallbackInfo;
|
using v8::FunctionCallbackInfo;
|
||||||
using v8::FunctionTemplate;
|
using v8::FunctionTemplate;
|
||||||
using v8::Integer;
|
|
||||||
using v8::Local;
|
using v8::Local;
|
||||||
using v8::Object;
|
using v8::Object;
|
||||||
|
using v8::ReadOnly;
|
||||||
|
using v8::Signature;
|
||||||
using v8::String;
|
using v8::String;
|
||||||
using v8::Value;
|
using v8::Value;
|
||||||
|
|
||||||
@ -309,7 +311,6 @@ void TLSWrap::EncOut() {
|
|||||||
|
|
||||||
// No data to write
|
// No data to write
|
||||||
if (BIO_pending(enc_out_) == 0) {
|
if (BIO_pending(enc_out_) == 0) {
|
||||||
UpdateWriteQueueSize();
|
|
||||||
if (clear_in_->Length() == 0)
|
if (clear_in_->Length() == 0)
|
||||||
InvokeQueued(0);
|
InvokeQueued(0);
|
||||||
return;
|
return;
|
||||||
@ -555,17 +556,6 @@ bool TLSWrap::IsClosing() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
|
|
||||||
HandleScope scope(env()->isolate());
|
|
||||||
if (write_queue_size == 0)
|
|
||||||
write_queue_size = BIO_pending(enc_out_);
|
|
||||||
object()->Set(env()->context(),
|
|
||||||
env()->write_queue_size_string(),
|
|
||||||
Integer::NewFromUnsigned(env()->isolate(),
|
|
||||||
write_queue_size)).FromJust();
|
|
||||||
return write_queue_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int TLSWrap::ReadStart() {
|
int TLSWrap::ReadStart() {
|
||||||
if (stream_ != nullptr)
|
if (stream_ != nullptr)
|
||||||
@ -612,9 +602,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
|
|||||||
// However, if there is any data that should be written to the socket,
|
// However, if there is any data that should be written to the socket,
|
||||||
// the callback should not be invoked immediately
|
// the callback should not be invoked immediately
|
||||||
if (BIO_pending(enc_out_) == 0) {
|
if (BIO_pending(enc_out_) == 0) {
|
||||||
// net.js expects writeQueueSize to be > 0 if the write isn't
|
|
||||||
// immediately flushed
|
|
||||||
UpdateWriteQueueSize(1);
|
|
||||||
return stream_->DoWrite(w, bufs, count, send_handle);
|
return stream_->DoWrite(w, bufs, count, send_handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -666,7 +653,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
|
|||||||
|
|
||||||
// Try writing data immediately
|
// Try writing data immediately
|
||||||
EncOut();
|
EncOut();
|
||||||
UpdateWriteQueueSize();
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -938,12 +924,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
|
|||||||
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
|
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
|
||||||
|
|
||||||
|
|
||||||
void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
|
void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo<Value>& info) {
|
||||||
TLSWrap* wrap;
|
TLSWrap* wrap;
|
||||||
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
|
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
|
||||||
|
|
||||||
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
|
if (wrap->clear_in_ == nullptr) {
|
||||||
args.GetReturnValue().Set(write_queue_size);
|
info.GetReturnValue().Set(0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t write_queue_size = BIO_pending(wrap->enc_out_);
|
||||||
|
info.GetReturnValue().Set(write_queue_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -966,6 +957,17 @@ void TLSWrap::Initialize(Local<Object> target,
|
|||||||
t->InstanceTemplate()->SetInternalFieldCount(1);
|
t->InstanceTemplate()->SetInternalFieldCount(1);
|
||||||
t->SetClassName(tlsWrapString);
|
t->SetClassName(tlsWrapString);
|
||||||
|
|
||||||
|
Local<FunctionTemplate> get_write_queue_size =
|
||||||
|
FunctionTemplate::New(env->isolate(),
|
||||||
|
GetWriteQueueSize,
|
||||||
|
env->as_external(),
|
||||||
|
Signature::New(env->isolate(), t));
|
||||||
|
t->PrototypeTemplate()->SetAccessorProperty(
|
||||||
|
env->write_queue_size_string(),
|
||||||
|
get_write_queue_size,
|
||||||
|
Local<FunctionTemplate>(),
|
||||||
|
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
|
||||||
|
|
||||||
AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
|
AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
|
||||||
env->SetProtoMethod(t, "receive", Receive);
|
env->SetProtoMethod(t, "receive", Receive);
|
||||||
env->SetProtoMethod(t, "start", Start);
|
env->SetProtoMethod(t, "start", Start);
|
||||||
@ -973,7 +975,6 @@ void TLSWrap::Initialize(Local<Object> target,
|
|||||||
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
|
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
|
||||||
env->SetProtoMethod(t, "destroySSL", DestroySSL);
|
env->SetProtoMethod(t, "destroySSL", DestroySSL);
|
||||||
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
|
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
|
||||||
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
|
|
||||||
|
|
||||||
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
|
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
|
||||||
SSLWrap<TLSWrap>::AddMethods(env, t);
|
SSLWrap<TLSWrap>::AddMethods(env, t);
|
||||||
|
@ -131,7 +131,6 @@ class TLSWrap : public AsyncWrap,
|
|||||||
|
|
||||||
AsyncWrap* GetAsyncWrap() override;
|
AsyncWrap* GetAsyncWrap() override;
|
||||||
bool IsIPCPipe() override;
|
bool IsIPCPipe() override;
|
||||||
uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);
|
|
||||||
|
|
||||||
// Resource implementation
|
// Resource implementation
|
||||||
static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
|
static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
|
||||||
@ -189,8 +188,8 @@ class TLSWrap : public AsyncWrap,
|
|||||||
bool eof_;
|
bool eof_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static void UpdateWriteQueueSize(
|
static void GetWriteQueueSize(
|
||||||
const v8::FunctionCallbackInfo<v8::Value>& args);
|
const v8::FunctionCallbackInfo<v8::Value>& info);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace node
|
} // namespace node
|
||||||
|
@ -153,12 +153,11 @@ void TTYWrap::New(const FunctionCallbackInfo<Value>& args) {
|
|||||||
CHECK_GE(fd, 0);
|
CHECK_GE(fd, 0);
|
||||||
|
|
||||||
int err = 0;
|
int err = 0;
|
||||||
TTYWrap* wrap = new TTYWrap(env, args.This(), fd, args[1]->IsTrue(), &err);
|
new TTYWrap(env, args.This(), fd, args[1]->IsTrue(), &err);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
env->CollectUVExceptionInfo(args[2], err, "uv_tty_init");
|
env->CollectUVExceptionInfo(args[2], err, "uv_tty_init");
|
||||||
args.GetReturnValue().SetUndefined();
|
args.GetReturnValue().SetUndefined();
|
||||||
}
|
}
|
||||||
wrap->UpdateWriteQueueSize();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,17 +7,17 @@ const fixtures = require('../common/fixtures');
|
|||||||
const tls = require('tls');
|
const tls = require('tls');
|
||||||
|
|
||||||
const iter = 10;
|
const iter = 10;
|
||||||
const overhead = 30;
|
|
||||||
|
|
||||||
const server = tls.createServer({
|
const server = tls.createServer({
|
||||||
key: fixtures.readKey('agent2-key.pem'),
|
key: fixtures.readKey('agent2-key.pem'),
|
||||||
cert: fixtures.readKey('agent2-cert.pem')
|
cert: fixtures.readKey('agent2-cert.pem')
|
||||||
}, common.mustCall((socket) => {
|
}, common.mustCall((socket) => {
|
||||||
socket.on('readable', common.mustCallAtLeast(() => {
|
let str = '';
|
||||||
socket.read();
|
socket.setEncoding('utf-8');
|
||||||
}, 1));
|
socket.on('data', (chunk) => { str += chunk; });
|
||||||
|
|
||||||
socket.on('end', common.mustCall(() => {
|
socket.on('end', common.mustCall(() => {
|
||||||
|
assert.strictEqual(str, 'a'.repeat(iter - 1));
|
||||||
server.close();
|
server.close();
|
||||||
}));
|
}));
|
||||||
}));
|
}));
|
||||||
@ -31,7 +31,7 @@ server.listen(0, common.mustCall(() => {
|
|||||||
|
|
||||||
for (let i = 1; i < iter; i++) {
|
for (let i = 1; i < iter; i++) {
|
||||||
client.write('a');
|
client.write('a');
|
||||||
assert.strictEqual(client.bufferSize, i + overhead);
|
assert.strictEqual(client.bufferSize, i + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
client.on('finish', common.mustCall(() => {
|
client.on('finish', common.mustCall(() => {
|
||||||
|
@ -6,26 +6,12 @@ const http = require('http');
|
|||||||
// This test assesses whether long-running writes can complete
|
// This test assesses whether long-running writes can complete
|
||||||
// or timeout because the socket is not aware that the backing
|
// or timeout because the socket is not aware that the backing
|
||||||
// stream is still writing.
|
// stream is still writing.
|
||||||
// To simulate a slow client, we write a really large chunk and
|
|
||||||
// then proceed through the following cycle:
|
|
||||||
// 1) Receive first 'data' event and record currently written size
|
|
||||||
// 2) Once we've read up to currently written size recorded above,
|
|
||||||
// we pause the stream and wait longer than the server timeout
|
|
||||||
// 3) Socket.prototype._onTimeout triggers and should confirm
|
|
||||||
// that the backing stream is still active and writing
|
|
||||||
// 4) Our timer fires, we resume the socket and start at 1)
|
|
||||||
|
|
||||||
const minReadSize = 250000;
|
const writeSize = 3000000;
|
||||||
const serverTimeout = common.platformTimeout(500);
|
let socket;
|
||||||
let offsetTimeout = common.platformTimeout(100);
|
|
||||||
let serverConnectionHandle;
|
|
||||||
let writeSize = 3000000;
|
|
||||||
let didReceiveData = false;
|
|
||||||
// this represents each cycles write size, where the cycle consists
|
|
||||||
// of `write > read > _onTimeout`
|
|
||||||
let currentWriteSize = 0;
|
|
||||||
|
|
||||||
const server = http.createServer(common.mustCall((req, res) => {
|
const server = http.createServer(common.mustCall((req, res) => {
|
||||||
|
server.close();
|
||||||
const content = Buffer.alloc(writeSize, 0x44);
|
const content = Buffer.alloc(writeSize, 0x44);
|
||||||
|
|
||||||
res.writeHead(200, {
|
res.writeHead(200, {
|
||||||
@ -34,47 +20,28 @@ const server = http.createServer(common.mustCall((req, res) => {
|
|||||||
'Vary': 'Accept-Encoding'
|
'Vary': 'Accept-Encoding'
|
||||||
});
|
});
|
||||||
|
|
||||||
serverConnectionHandle = res.socket._handle;
|
socket = res.socket;
|
||||||
|
const onTimeout = socket._onTimeout;
|
||||||
|
socket._onTimeout = common.mustCallAtLeast(() => onTimeout.call(socket), 1);
|
||||||
res.write(content);
|
res.write(content);
|
||||||
res.end();
|
res.end();
|
||||||
}));
|
}));
|
||||||
server.setTimeout(serverTimeout);
|
|
||||||
server.on('timeout', () => {
|
server.on('timeout', () => {
|
||||||
assert.strictEqual(didReceiveData, false, 'Should not timeout');
|
// TODO(apapirovski): This test is faulty on certain Windows systems
|
||||||
|
// as no queue is ever created
|
||||||
|
assert(!socket._handle || socket._handle.writeQueueSize === 0,
|
||||||
|
'Should not timeout');
|
||||||
});
|
});
|
||||||
|
|
||||||
server.listen(0, common.mustCall(() => {
|
server.listen(0, common.mustCall(() => {
|
||||||
http.get({
|
http.get({
|
||||||
path: '/',
|
path: '/',
|
||||||
port: server.address().port
|
port: server.address().port
|
||||||
}, common.mustCall((res) => {
|
}, (res) => {
|
||||||
const resume = () => res.resume();
|
res.once('data', () => {
|
||||||
let receivedBufferLength = 0;
|
socket._onTimeout();
|
||||||
let firstReceivedAt;
|
res.on('data', () => {});
|
||||||
res.on('data', common.mustCallAtLeast((buf) => {
|
});
|
||||||
if (receivedBufferLength === 0) {
|
res.on('end', () => server.close());
|
||||||
currentWriteSize = Math.max(
|
});
|
||||||
minReadSize,
|
|
||||||
writeSize - serverConnectionHandle.writeQueueSize
|
|
||||||
);
|
|
||||||
didReceiveData = false;
|
|
||||||
firstReceivedAt = Date.now();
|
|
||||||
}
|
|
||||||
receivedBufferLength += buf.length;
|
|
||||||
if (receivedBufferLength >= currentWriteSize) {
|
|
||||||
didReceiveData = true;
|
|
||||||
writeSize = serverConnectionHandle.writeQueueSize;
|
|
||||||
receivedBufferLength = 0;
|
|
||||||
res.pause();
|
|
||||||
setTimeout(
|
|
||||||
resume,
|
|
||||||
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
|
|
||||||
);
|
|
||||||
offsetTimeout = 0;
|
|
||||||
}
|
|
||||||
}, 1));
|
|
||||||
res.on('end', common.mustCall(() => {
|
|
||||||
server.close();
|
|
||||||
}));
|
|
||||||
}));
|
|
||||||
}));
|
}));
|
||||||
|
@ -2,31 +2,15 @@
|
|||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
if (!common.hasCrypto)
|
if (!common.hasCrypto)
|
||||||
common.skip('missing crypto');
|
common.skip('missing crypto');
|
||||||
const assert = require('assert');
|
|
||||||
const fixtures = require('../common/fixtures');
|
const fixtures = require('../common/fixtures');
|
||||||
const https = require('https');
|
const https = require('https');
|
||||||
|
|
||||||
// This test assesses whether long-running writes can complete
|
// This test assesses whether long-running writes can complete
|
||||||
// or timeout because the socket is not aware that the backing
|
// or timeout because the socket is not aware that the backing
|
||||||
// stream is still writing.
|
// stream is still writing.
|
||||||
// To simulate a slow client, we write a really large chunk and
|
|
||||||
// then proceed through the following cycle:
|
|
||||||
// 1) Receive first 'data' event and record currently written size
|
|
||||||
// 2) Once we've read up to currently written size recorded above,
|
|
||||||
// we pause the stream and wait longer than the server timeout
|
|
||||||
// 3) Socket.prototype._onTimeout triggers and should confirm
|
|
||||||
// that the backing stream is still active and writing
|
|
||||||
// 4) Our timer fires, we resume the socket and start at 1)
|
|
||||||
|
|
||||||
const minReadSize = 250000;
|
const writeSize = 30000000;
|
||||||
const serverTimeout = common.platformTimeout(500);
|
let socket;
|
||||||
let offsetTimeout = common.platformTimeout(100);
|
|
||||||
let serverConnectionHandle;
|
|
||||||
let writeSize = 2000000;
|
|
||||||
let didReceiveData = false;
|
|
||||||
// this represents each cycles write size, where the cycle consists
|
|
||||||
// of `write > read > _onTimeout`
|
|
||||||
let currentWriteSize = 0;
|
|
||||||
|
|
||||||
const server = https.createServer({
|
const server = https.createServer({
|
||||||
key: fixtures.readKey('agent1-key.pem'),
|
key: fixtures.readKey('agent1-key.pem'),
|
||||||
@ -40,50 +24,24 @@ const server = https.createServer({
|
|||||||
'Vary': 'Accept-Encoding'
|
'Vary': 'Accept-Encoding'
|
||||||
});
|
});
|
||||||
|
|
||||||
serverConnectionHandle = res.socket._handle;
|
socket = res.socket;
|
||||||
res.write(content, () => {
|
const onTimeout = socket._onTimeout;
|
||||||
assert.strictEqual(serverConnectionHandle.writeQueueSize, 0);
|
socket._onTimeout = common.mustCallAtLeast(() => onTimeout.call(socket), 1);
|
||||||
});
|
res.write(content);
|
||||||
res.end();
|
res.end();
|
||||||
}));
|
}));
|
||||||
server.setTimeout(serverTimeout);
|
server.on('timeout', common.mustNotCall());
|
||||||
server.on('timeout', () => {
|
|
||||||
assert.strictEqual(didReceiveData, false, 'Should not timeout');
|
|
||||||
});
|
|
||||||
|
|
||||||
server.listen(0, common.mustCall(() => {
|
server.listen(0, common.mustCall(() => {
|
||||||
https.get({
|
https.get({
|
||||||
path: '/',
|
path: '/',
|
||||||
port: server.address().port,
|
port: server.address().port,
|
||||||
rejectUnauthorized: false
|
rejectUnauthorized: false
|
||||||
}, common.mustCall((res) => {
|
}, (res) => {
|
||||||
const resume = () => res.resume();
|
res.once('data', () => {
|
||||||
let receivedBufferLength = 0;
|
socket._onTimeout();
|
||||||
let firstReceivedAt;
|
res.on('data', () => {});
|
||||||
res.on('data', common.mustCallAtLeast((buf) => {
|
});
|
||||||
if (receivedBufferLength === 0) {
|
res.on('end', () => server.close());
|
||||||
currentWriteSize = Math.max(
|
});
|
||||||
minReadSize,
|
|
||||||
writeSize - serverConnectionHandle.writeQueueSize
|
|
||||||
);
|
|
||||||
didReceiveData = false;
|
|
||||||
firstReceivedAt = Date.now();
|
|
||||||
}
|
|
||||||
receivedBufferLength += buf.length;
|
|
||||||
if (receivedBufferLength >= currentWriteSize) {
|
|
||||||
didReceiveData = true;
|
|
||||||
writeSize = serverConnectionHandle.writeQueueSize;
|
|
||||||
receivedBufferLength = 0;
|
|
||||||
res.pause();
|
|
||||||
setTimeout(
|
|
||||||
resume,
|
|
||||||
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
|
|
||||||
);
|
|
||||||
offsetTimeout = 0;
|
|
||||||
}
|
|
||||||
}, 1));
|
|
||||||
res.on('end', common.mustCall(() => {
|
|
||||||
server.close();
|
|
||||||
}));
|
|
||||||
}));
|
|
||||||
}));
|
}));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user