stream_base: dispatch reqs in the stream impl

Dispatch requests in the implementation of the stream, not in the code
creating these requests. The requests might be piled up and invoked
internally in the implementation, so it should know better when it is
the time to dispatch them.

In fact, TLS was doing exactly this thing which led us to...

Fix: https://github.com/iojs/io.js/issues/1512
PR-URL: https://github.com/iojs/io.js/pull/1563
Reviewed-By: Shigeki Ohtsu <ohtsu@iij.ad.jp>
This commit is contained in:
Fedor Indutny 2015-04-29 13:50:36 +02:00
parent 78f4b038f8
commit 30b7349176
4 changed files with 8 additions and 6 deletions

View File

@ -71,6 +71,7 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object()
};
req_wrap->Dispatched();
Local<Value> res =
MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);
@ -95,6 +96,7 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr
};
w->Dispatched();
Local<Value> res =
MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);

View File

@ -60,7 +60,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
AfterShutdown);
int err = DoShutdown(req_wrap);
req_wrap->Dispatched();
if (err)
delete req_wrap;
return err;
@ -181,7 +180,6 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (bufs != bufs_)
delete[] bufs;
req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
req_wrap->object()->Set(env->bytes_string(),
Number::New(env->isolate(), bytes));
@ -228,7 +226,6 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap->Dispatched();
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (err)
@ -347,7 +344,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
reinterpret_cast<uv_stream_t*>(send_handle));
}
req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
if (err)

View File

@ -279,7 +279,10 @@ void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
int err;
err = uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
req_wrap->Dispatched();
return err;
}
@ -353,6 +356,7 @@ int StreamWrap::DoWrite(WriteWrap* w,
}
}
w->Dispatched();
UpdateWriteQueueSize();
return r;

View File

@ -309,7 +309,6 @@ void TLSWrap::EncOut() {
for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]);
int err = stream_->DoWrite(write_req, buf, count, nullptr);
write_req->Dispatched();
// Ignore errors, this should be already handled in js
if (err) {
@ -565,6 +564,7 @@ int TLSWrap::DoWrite(WriteWrap* w,
// Queue callback to execute it on next tick
write_item_queue_.PushBack(new WriteItem(w));
w->Dispatched();
// Write queued data
if (empty) {