diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 71a640cb0e5..a9477573f02 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -285,7 +285,7 @@ void IOWatcher::Dump() { io->dumps_++; io->last_dump_ = ev_now(EV_DEFAULT_UC); - DEBUG_PRINT("dumping fd %d", io->watcher_.fd); + DEBUG_PRINT("<%d> dumping", io->watcher_.fd); // Number of items we've stored in iov int iovcnt = 0; @@ -299,7 +299,7 @@ void IOWatcher::Dump() { // Unix sockets don't like huge messages. TCP sockets do. // TODO: handle EMSGSIZE after sendmsg(). - size_t max_to_write = unix_socket ? 8*KB : 64*KB; + size_t max_to_write = unix_socket ? 8*KB : 256*KB; int fd_to_send = -1; @@ -312,6 +312,7 @@ void IOWatcher::Dump() { } size_t first_offset = offset; + DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset); // Loop over all the buckets for this particular watcher/socket in order // to fill iov. @@ -367,7 +368,7 @@ void IOWatcher::Dump() { Local fd_v = bucket->Get(fd_sym); if (fd_v->IsInt32()) { fd_to_send = fd_v->Int32Value(); - DEBUG_PRINT("got fd to send: %d", fd_to_send); + DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send); assert(fd_to_send >= 0); } } @@ -406,7 +407,8 @@ void IOWatcher::Dump() { written = writev(io->watcher_.fd, iov, iovcnt); } - DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", + DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld", + io->watcher_.fd, iovcnt, to_write, written); @@ -415,6 +417,7 @@ void IOWatcher::Dump() { // Allow EAGAIN. // TODO: handle EMSGSIZE after sendmsg(). if (errno == EAGAIN) { + DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd); io->Start(); } else { // Emit error event @@ -441,6 +444,7 @@ void IOWatcher::Dump() { // what about written == 0 ? size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); + DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size); assert(queue_size >= offset); // Now drop the buckets that have been written. @@ -475,26 +479,34 @@ void IOWatcher::Dump() { bucket->Set(fd_sym, Null()); } + DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld", + io->watcher_.fd, + bucket_index, + bucket_len, + offset); assert(bucket_len > offset); - DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); - - queue_size -= written; // Only on the first bucket does is the offset > 0. if (offset + written < bucket_len) { // we have not written the entire bucket - DEBUG_PRINT("[%ld] Only wrote part of the buffer. " + DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. " "setting watcher.offset = %ld", + io->watcher_.fd, bucket_index, offset + written); watcher->Set(offset_sym, - Integer::NewFromUnsigned(offset + written)); + Integer::NewFromUnsigned(offset + written)); break; } else { - DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", + DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.", + io->watcher_.fd, bucket_index); + assert(bucket_len <= queue_size); + queue_size -= bucket_len; + + assert(bucket_len - offset <= written); written -= bucket_len - offset; Local bucket_callback_v = bucket->Get(callback_sym); @@ -519,7 +531,6 @@ void IOWatcher::Dump() { watcher->Set(first_bucket_sym, bucket->Get(next_sym)); } - // Set the queue size. watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); } @@ -536,7 +547,7 @@ void IOWatcher::Dump() { // Still have buckets to be written. Wait for fd to become writable. io->Start(); - DEBUG_PRINT("Started watcher %d", io->watcher_.fd); + DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd); } else { // No more buckets in the queue. Make sure the last_bucket_sym is // updated and then go to the next watcher. @@ -546,7 +557,7 @@ void IOWatcher::Dump() { // become writable. io->Stop(); - DEBUG_PRINT("Stop watcher %d", io->watcher_.fd); + DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd); // Emit drain event if (watcher->Has(ondrain_sym)) { diff --git a/test/simple/test-dumper-unix.js b/test/simple/test-dumper-unix.js index d6e7457c72d..2e510360d56 100644 --- a/test/simple/test-dumper-unix.js +++ b/test/simple/test-dumper-unix.js @@ -87,11 +87,13 @@ function test (N, b, cb) { w.firstBucket = { data: b }; w.lastBucket = w.firstBucket; + w.queueSize = b.length; for (var i = 0; i < N-1; i++) { var bucket = { data: b }; w.lastBucket.next = bucket; w.lastBucket = bucket; + w.queueSize += b.length; // Kind of randomly fill these buckets with fds. if (fdsSent < 5 && i % 2 == 0) { bucket.fd = 1; // send stdout diff --git a/test/simple/test-dumper.js b/test/simple/test-dumper.js index af9ad870184..c1597749f37 100644 --- a/test/simple/test-dumper.js +++ b/test/simple/test-dumper.js @@ -49,7 +49,6 @@ function test (N, b, cb) { stream.readable = true; stream.resume(); - // Count the data as it arrives on the read end of the pipe. stream.on('data', function (d) { nread += d.length; @@ -84,12 +83,14 @@ function test (N, b, cb) { w.firstBucket = { data: b }; w.lastBucket = w.firstBucket; + w.queueSize = b.length; for (var i = 0; i < N-1; i++) { var bucket = { data: b }; assert.ok(!w.lastBucket.next); w.lastBucket.next = bucket; w.lastBucket = bucket; + w.queueSize += b.length; } } diff --git a/test/simple/test-pipe.js b/test/simple/test-pipe.js index 3341e875ca3..d12c2b163e8 100644 --- a/test/simple/test-pipe.js +++ b/test/simple/test-pipe.js @@ -93,7 +93,6 @@ function startClient () { req.write(buffer); req.end(); - console.log("request fd=%d", req.connection.fd); // note the queue includes http headers.