src: minor refactoring to StreamBase writes

Instead of having per-request callbacks, always call a callback
on the `StreamBase` instance itself for `WriteWrap` and `ShutdownWrap`.

This makes `WriteWrap` cleanup consistent for all stream classes,
since the after-write callback is always the same now.

If special handling is needed for writes that happen to a sub-class,
`AfterWrite` can be overridden by that class, rather than that
class providing its own callback (e.g. updating the write
queue size for libuv streams).

If special handling is needed for writes that happen on another
stream instance, the existing `after_write_cb()` callback
is used for that (e.g. custom code after writing to the
transport from a TLS stream).

As a nice bonus, this also makes `WriteWrap` and `ShutdownWrap`
instances slightly smaller.

PR-URL: https://github.com/nodejs/node/pull/17564
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2017-12-09 05:29:11 +01:00
parent 24b0f67c2b
commit 453008d73e
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
9 changed files with 73 additions and 87 deletions

View File

@ -176,7 +176,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>()); ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
wrap->OnAfterWrite(w); w->Done(0);
} }

View File

@ -987,9 +987,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
WriteWrap* Http2Session::AllocateSend() { WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate()); HandleScope scope(env()->isolate());
auto AfterWrite = [](WriteWrap* req, int status) {
req->Dispose();
};
Local<Object> obj = Local<Object> obj =
env()->write_wrap_constructor_function() env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked(); ->NewInstance(env()->context()).ToLocalChecked();
@ -999,7 +996,7 @@ WriteWrap* Http2Session::AllocateSend() {
session(), session(),
NGHTTP2_SETTINGS_MAX_FRAME_SIZE); NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
// Max frame size + 9 bytes for the header // Max frame size + 9 bytes for the header
return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9); return WriteWrap::New(env(), obj, stream_, size + 9);
} }
void Http2Session::Send(WriteWrap* req, char* buf, size_t length) { void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {

View File

@ -143,15 +143,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
} }
inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status);
}
WriteWrap* WriteWrap::New(Environment* env, WriteWrap* WriteWrap::New(Environment* env,
Local<Object> obj, Local<Object> obj,
StreamBase* wrap, StreamBase* wrap,
DoneCb cb,
size_t extra) { size_t extra) {
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra; size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
char* storage = new char[storage_size]; char* storage = new char[storage_size];
return new(storage) WriteWrap(env, obj, wrap, cb, storage_size); return new(storage) WriteWrap(env, obj, wrap, storage_size);
} }
@ -171,6 +175,10 @@ size_t WriteWrap::ExtraSize() const {
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize); return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
} }
inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status);
}
} // namespace node } // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

View File

@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
env->set_init_trigger_async_id(wrap->get_async_id()); env->set_init_trigger_async_id(wrap->get_async_id());
ShutdownWrap* req_wrap = new ShutdownWrap(env, ShutdownWrap* req_wrap = new ShutdownWrap(env,
req_wrap_obj, req_wrap_obj,
this, this);
AfterShutdown);
int err = DoShutdown(req_wrap); int err = DoShutdown(req_wrap);
if (err) if (err)
@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env(); Environment* env = req_wrap->env();
// The wrap and request objects should still be there. // The wrap and request objects should still be there.
@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
Local<Object> req_wrap_obj = req_wrap->object(); Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = { Local<Value> argv[3] = {
Integer::New(env->isolate(), status), Integer::New(env->isolate(), status),
wrap->GetObject(), GetObject(),
req_wrap_obj req_wrap_obj
}; };
@ -158,7 +156,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap(); wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr); CHECK_NE(wrap, nullptr);
env->set_init_trigger_async_id(wrap->get_async_id()); env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size); req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
offset = 0; offset = 0;
if (!all_buffers) { if (!all_buffers) {
@ -248,7 +246,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
if (wrap != nullptr) if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id()); env->set_init_trigger_async_id(wrap->get_async_id());
// Allocate, or write rest // Allocate, or write rest
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite); req_wrap = WriteWrap::New(env, req_wrap_obj, this);
err = DoWrite(req_wrap, bufs, count, nullptr); err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate())); req_wrap_obj->Set(env->async(), True(env->isolate()));
@ -332,7 +330,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap(); wrap = GetAsyncWrap();
if (wrap != nullptr) if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id()); env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size); req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
data = req_wrap->Extra(); data = req_wrap->Extra();
@ -393,7 +391,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env(); Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env->isolate());
@ -405,19 +402,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
// Unref handle property // Unref handle property
Local<Object> req_wrap_obj = req_wrap->object(); Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust(); req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
wrap->OnAfterWrite(req_wrap); OnAfterWrite(req_wrap, status);
Local<Value> argv[] = { Local<Value> argv[] = {
Integer::New(env->isolate(), status), Integer::New(env->isolate(), status),
wrap->GetObject(), GetObject(),
req_wrap_obj, req_wrap_obj,
Undefined(env->isolate()) Undefined(env->isolate())
}; };
const char* msg = wrap->Error(); const char* msg = Error();
if (msg != nullptr) { if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg); argv[3] = OneByteString(env->isolate(), msg);
wrap->ClearError(); ClearError();
} }
if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())

View File

@ -16,27 +16,27 @@ namespace node {
// Forward declarations // Forward declarations
class StreamBase; class StreamBase;
template <class Req> template<typename Base>
class StreamReq { class StreamReq {
public: public:
typedef void (*DoneCb)(Req* req, int status); explicit StreamReq(StreamBase* stream) : stream_(stream) {
explicit StreamReq(DoneCb cb) : cb_(cb) {
} }
inline void Done(int status, const char* error_str = nullptr) { inline void Done(int status, const char* error_str = nullptr) {
Req* req = static_cast<Req*>(this); Base* req = static_cast<Base*>(this);
Environment* env = req->env(); Environment* env = req->env();
if (error_str != nullptr) { if (error_str != nullptr) {
req->object()->Set(env->error_string(), req->object()->Set(env->error_string(),
OneByteString(env->isolate(), error_str)); OneByteString(env->isolate(), error_str));
} }
cb_(req, status); req->OnDone(status);
} }
inline StreamBase* stream() const { return stream_; }
private: private:
DoneCb cb_; StreamBase* const stream_;
}; };
class ShutdownWrap : public ReqWrap<uv_shutdown_t>, class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
@ -44,11 +44,9 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
public: public:
ShutdownWrap(Environment* env, ShutdownWrap(Environment* env,
v8::Local<v8::Object> req_wrap_obj, v8::Local<v8::Object> req_wrap_obj,
StreamBase* wrap, StreamBase* stream)
DoneCb cb)
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
StreamReq<ShutdownWrap>(cb), StreamReq<ShutdownWrap>(stream) {
wrap_(wrap) {
Wrap(req_wrap_obj, this); Wrap(req_wrap_obj, this);
} }
@ -60,11 +58,9 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
return ContainerOf(&ShutdownWrap::req_, req); return ContainerOf(&ShutdownWrap::req_, req);
} }
inline StreamBase* wrap() const { return wrap_; }
size_t self_size() const override { return sizeof(*this); } size_t self_size() const override { return sizeof(*this); }
private: inline void OnDone(int status); // Just calls stream()->AfterShutdown()
StreamBase* const wrap_;
}; };
class WriteWrap : public ReqWrap<uv_write_t>, class WriteWrap : public ReqWrap<uv_write_t>,
@ -72,15 +68,12 @@ class WriteWrap: public ReqWrap<uv_write_t>,
public: public:
static inline WriteWrap* New(Environment* env, static inline WriteWrap* New(Environment* env,
v8::Local<v8::Object> obj, v8::Local<v8::Object> obj,
StreamBase* wrap, StreamBase* stream,
DoneCb cb,
size_t extra = 0); size_t extra = 0);
inline void Dispose(); inline void Dispose();
inline char* Extra(size_t offset = 0); inline char* Extra(size_t offset = 0);
inline size_t ExtraSize() const; inline size_t ExtraSize() const;
inline StreamBase* wrap() const { return wrap_; }
size_t self_size() const override { return storage_size_; } size_t self_size() const override { return storage_size_; }
static WriteWrap* from_req(uv_write_t* req) { static WriteWrap* from_req(uv_write_t* req) {
@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap<uv_write_t>,
WriteWrap(Environment* env, WriteWrap(Environment* env,
v8::Local<v8::Object> obj, v8::Local<v8::Object> obj,
StreamBase* wrap, StreamBase* stream)
DoneCb cb)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb), StreamReq<WriteWrap>(stream),
wrap_(wrap),
storage_size_(0) { storage_size_(0) {
Wrap(obj, this); Wrap(obj, this);
} }
inline void OnDone(int status); // Just calls stream()->AfterWrite()
protected: protected:
WriteWrap(Environment* env, WriteWrap(Environment* env,
v8::Local<v8::Object> obj, v8::Local<v8::Object> obj,
StreamBase* wrap, StreamBase* stream,
DoneCb cb,
size_t storage_size) size_t storage_size)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb), StreamReq<WriteWrap>(stream),
wrap_(wrap),
storage_size_(storage_size) { storage_size_(storage_size) {
Wrap(obj, this); Wrap(obj, this);
} }
@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap<uv_write_t>,
// WriteWrap. Ensure this never happens. // WriteWrap. Ensure this never happens.
void operator delete(void* ptr) { UNREACHABLE(); } void operator delete(void* ptr) { UNREACHABLE(); }
StreamBase* const wrap_;
const size_t storage_size_; const size_t storage_size_;
}; };
@ -151,7 +141,7 @@ class StreamResource {
void* ctx; void* ctx;
}; };
typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread, typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf, const uv_buf_t* buf,
@ -176,9 +166,9 @@ class StreamResource {
virtual void ClearError(); virtual void ClearError();
// Events // Events
inline void OnAfterWrite(WriteWrap* w) { inline void OnAfterWrite(WriteWrap* w, int status) {
if (!after_write_cb_.is_empty()) if (!after_write_cb_.is_empty())
after_write_cb_.fn(w, after_write_cb_.ctx); after_write_cb_.fn(w, status, after_write_cb_.ctx);
} }
inline void OnAlloc(size_t size, uv_buf_t* buf) { inline void OnAlloc(size_t size, uv_buf_t* buf) {
@ -208,14 +198,12 @@ class StreamResource {
inline Callback<ReadCb> read_cb() { return read_cb_; } inline Callback<ReadCb> read_cb() { return read_cb_; }
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; } inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
private: protected:
Callback<AfterWriteCb> after_write_cb_; Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_; Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_; Callback<ReadCb> read_cb_;
Callback<DestructCb> destruct_cb_; Callback<DestructCb> destruct_cb_;
uint64_t bytes_read_; uint64_t bytes_read_;
friend class StreamBase;
}; };
class StreamBase : public StreamResource { class StreamBase : public StreamResource {
@ -253,6 +241,10 @@ class StreamBase : public StreamResource {
v8::Local<v8::Object> buf, v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle); v8::Local<v8::Object> handle);
// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);
protected: protected:
explicit StreamBase(Environment* env) : env_(env), consumed_(false) { explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
} }
@ -263,10 +255,6 @@ class StreamBase : public StreamResource {
virtual AsyncWrap* GetAsyncWrap() = 0; virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject(); virtual v8::Local<v8::Object> GetObject();
// Libuv callbacks
static void AfterShutdown(ShutdownWrap* req, int status);
static void AfterWrite(WriteWrap* req, int status);
// JS Methods // JS Methods
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args); int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args); int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);

View File

@ -91,7 +91,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider), provider),
StreamBase(env), StreamBase(env),
stream_(stream) { stream_(stream) {
set_after_write_cb({ OnAfterWriteImpl, this });
set_alloc_cb({ OnAllocImpl, this }); set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this }); set_read_cb({ OnReadImpl, this });
} }
@ -293,13 +292,13 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) { int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
int err; int err;
err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown); err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
req_wrap->Dispatched(); req_wrap->Dispatched();
return err; return err;
} }
void LibuvStreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
ShutdownWrap* req_wrap = ShutdownWrap::from_req(req); ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
CHECK_NE(req_wrap, nullptr); CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate()); HandleScope scope(req_wrap->env()->isolate());
@ -354,9 +353,9 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
uv_stream_t* send_handle) { uv_stream_t* send_handle) {
int r; int r;
if (send_handle == nullptr) { if (send_handle == nullptr) {
r = uv_write(w->req(), stream(), bufs, count, AfterWrite); r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
} else { } else {
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite); r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite);
} }
if (!r) { if (!r) {
@ -377,7 +376,7 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
} }
void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) { void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = WriteWrap::from_req(req); WriteWrap* req_wrap = WriteWrap::from_req(req);
CHECK_NE(req_wrap, nullptr); CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate()); HandleScope scope(req_wrap->env()->isolate());
@ -386,9 +385,9 @@ void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
} }
void LibuvStreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx); StreamBase::AfterWrite(w, status);
wrap->UpdateWriteQueueSize(); UpdateWriteQueueSize();
} }
} // namespace node } // namespace node

View File

@ -102,17 +102,18 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
static void OnRead(uv_stream_t* handle, static void OnRead(uv_stream_t* handle,
ssize_t nread, ssize_t nread,
const uv_buf_t* buf); const uv_buf_t* buf);
static void AfterWrite(uv_write_t* req, int status); static void AfterUvWrite(uv_write_t* req, int status);
static void AfterShutdown(uv_shutdown_t* req, int status); static void AfterUvShutdown(uv_shutdown_t* req, int status);
// Resource interface implementation // Resource interface implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread, static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf, const uv_buf_t* buf,
uv_handle_type pending, uv_handle_type pending,
void* ctx); void* ctx);
void AfterWrite(WriteWrap* req_wrap, int status) override;
uv_stream_t* const stream_; uv_stream_t* const stream_;
}; };

View File

@ -328,8 +328,7 @@ void TLSWrap::EncOut() {
->NewInstance(env()->context()).ToLocalChecked(); ->NewInstance(env()->context()).ToLocalChecked();
WriteWrap* write_req = WriteWrap::New(env(), WriteWrap* write_req = WriteWrap::New(env(),
req_wrap_obj, req_wrap_obj,
this, stream_);
EncOutCb);
uv_buf_t buf[arraysize(data)]; uv_buf_t buf[arraysize(data)];
for (size_t i = 0; i < count; i++) for (size_t i = 0; i < count; i++)
@ -346,34 +345,31 @@ void TLSWrap::EncOut() {
} }
void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) {
TLSWrap* wrap = static_cast<TLSWrap*>(req_wrap->wrap());
req_wrap->Dispose();
// We should not be getting here after `DestroySSL`, because all queued writes // We should not be getting here after `DestroySSL`, because all queued writes
// must be invoked with UV_ECANCELED // must be invoked with UV_ECANCELED
CHECK_NE(wrap->ssl_, nullptr); CHECK_NE(ssl_, nullptr);
// Handle error // Handle error
if (status) { if (status) {
// Ignore errors after shutdown // Ignore errors after shutdown
if (wrap->shutdown_) if (shutdown_)
return; return;
// Notify about error // Notify about error
wrap->InvokeQueued(status); InvokeQueued(status);
return; return;
} }
// Commit // Commit
crypto::NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); crypto::NodeBIO::FromBIO(enc_out_)->Read(nullptr, write_size_);
// Ensure that the progress will be made and `InvokeQueued` will be called. // Ensure that the progress will be made and `InvokeQueued` will be called.
wrap->ClearIn(); ClearIn();
// Try writing more data // Try writing more data
wrap->write_size_ = 0; write_size_ = 0;
wrap->EncOut(); EncOut();
} }
@ -676,9 +672,9 @@ int TLSWrap::DoWrite(WriteWrap* w,
} }
void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx); TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->UpdateWriteQueueSize(); wrap->EncOutAfterWrite(w, status);
} }

View File

@ -111,7 +111,7 @@ class TLSWrap : public AsyncWrap,
static void SSLInfoCallback(const SSL* ssl_, int where, int ret); static void SSLInfoCallback(const SSL* ssl_, int where, int ret);
void InitSSL(); void InitSSL();
void EncOut(); void EncOut();
static void EncOutCb(WriteWrap* req_wrap, int status); void EncOutAfterWrite(WriteWrap* req_wrap, int status);
bool ClearIn(); bool ClearIn();
void ClearOut(); void ClearOut();
void MakePending(); void MakePending();
@ -134,7 +134,7 @@ class TLSWrap : public AsyncWrap,
uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);
// Resource implementation // Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx); static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread, static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf, const uv_buf_t* buf,