src: remove StreamBase::kFlagHasWritev
Since libuv 1.21.0, pipes on Windows support `writev` on the libuv side. This allows for some simplification, and makes the `StreamBase` API more uniform (multi-buffer `Write()` is always supported now, including when used by other non-JS consumers like HTTP/2). PR-URL: https://github.com/nodejs/node/pull/21527 Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
a078521a6a
commit
64a3fadf71
@ -210,10 +210,6 @@ function initSocketHandle(self) {
|
|||||||
self._handle.owner = self;
|
self._handle.owner = self;
|
||||||
self._handle.onread = onread;
|
self._handle.onread = onread;
|
||||||
self[async_id_symbol] = getNewAsyncId(self._handle);
|
self[async_id_symbol] = getNewAsyncId(self._handle);
|
||||||
|
|
||||||
// If handle doesn't support writev - neither do we
|
|
||||||
if (!self._handle.writev)
|
|
||||||
self._writev = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +208,7 @@ void JSStream::Initialize(Local<Object> target,
|
|||||||
env->SetProtoMethod(t, "readBuffer", ReadBuffer);
|
env->SetProtoMethod(t, "readBuffer", ReadBuffer);
|
||||||
env->SetProtoMethod(t, "emitEOF", EmitEOF);
|
env->SetProtoMethod(t, "emitEOF", EmitEOF);
|
||||||
|
|
||||||
StreamBase::AddMethods<JSStream>(env, t, StreamBase::kFlagHasWritev);
|
StreamBase::AddMethods<JSStream>(env, t);
|
||||||
target->Set(jsStreamString, t->GetFunction());
|
target->Set(jsStreamString, t->GetFunction());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1997,7 +1997,7 @@ void Initialize(Local<Object> target,
|
|||||||
Local<String> handleString =
|
Local<String> handleString =
|
||||||
FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandle");
|
FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandle");
|
||||||
fd->SetClassName(handleString);
|
fd->SetClassName(handleString);
|
||||||
StreamBase::AddMethods<FileHandle>(env, fd, StreamBase::kFlagNone);
|
StreamBase::AddMethods<FileHandle>(env, fd);
|
||||||
target->Set(context, handleString, fd->GetFunction()).FromJust();
|
target->Set(context, handleString, fd->GetFunction()).FromJust();
|
||||||
env->set_fd_constructor_template(fdt);
|
env->set_fd_constructor_template(fdt);
|
||||||
|
|
||||||
|
@ -2840,7 +2840,7 @@ void Initialize(Local<Object> target,
|
|||||||
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
|
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
|
||||||
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);
|
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);
|
||||||
AsyncWrap::AddWrapMethods(env, stream);
|
AsyncWrap::AddWrapMethods(env, stream);
|
||||||
StreamBase::AddMethods<Http2Stream>(env, stream, StreamBase::kFlagHasWritev);
|
StreamBase::AddMethods<Http2Stream>(env, stream);
|
||||||
Local<ObjectTemplate> streamt = stream->InstanceTemplate();
|
Local<ObjectTemplate> streamt = stream->InstanceTemplate();
|
||||||
streamt->SetInternalFieldCount(1);
|
streamt->SetInternalFieldCount(1);
|
||||||
env->set_http2stream_constructor_template(streamt);
|
env->set_http2stream_constructor_template(streamt);
|
||||||
|
@ -83,11 +83,7 @@ void PipeWrap::Initialize(Local<Object> target,
|
|||||||
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
|
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
|
||||||
env->SetProtoMethod(t, "hasRef", HandleWrap::HasRef);
|
env->SetProtoMethod(t, "hasRef", HandleWrap::HasRef);
|
||||||
|
|
||||||
#ifdef _WIN32
|
|
||||||
LibuvStreamWrap::AddMethods(env, t);
|
LibuvStreamWrap::AddMethods(env, t);
|
||||||
#else
|
|
||||||
LibuvStreamWrap::AddMethods(env, t, StreamBase::kFlagHasWritev);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
env->SetProtoMethod(t, "bind", Bind);
|
env->SetProtoMethod(t, "bind", Bind);
|
||||||
env->SetProtoMethod(t, "listen", Listen);
|
env->SetProtoMethod(t, "listen", Listen);
|
||||||
|
@ -266,9 +266,7 @@ inline WriteWrap* StreamBase::CreateWriteWrap(
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class Base>
|
template <class Base>
|
||||||
void StreamBase::AddMethods(Environment* env,
|
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
|
||||||
Local<FunctionTemplate> t,
|
|
||||||
int flags) {
|
|
||||||
HandleScope scope(env->isolate());
|
HandleScope scope(env->isolate());
|
||||||
|
|
||||||
enum PropertyAttribute attributes =
|
enum PropertyAttribute attributes =
|
||||||
@ -325,7 +323,6 @@ void StreamBase::AddMethods(Environment* env,
|
|||||||
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
|
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
|
||||||
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
|
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
|
||||||
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
|
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
|
||||||
if ((flags & kFlagHasWritev) != 0)
|
|
||||||
env->SetProtoMethod(t, "writev", JSMethod<Base, &StreamBase::Writev>);
|
env->SetProtoMethod(t, "writev", JSMethod<Base, &StreamBase::Writev>);
|
||||||
env->SetProtoMethod(t,
|
env->SetProtoMethod(t,
|
||||||
"writeBuffer",
|
"writeBuffer",
|
||||||
|
@ -255,15 +255,9 @@ class StreamResource {
|
|||||||
|
|
||||||
class StreamBase : public StreamResource {
|
class StreamBase : public StreamResource {
|
||||||
public:
|
public:
|
||||||
enum Flags {
|
|
||||||
kFlagNone = 0x0,
|
|
||||||
kFlagHasWritev = 0x1
|
|
||||||
};
|
|
||||||
|
|
||||||
template <class Base>
|
template <class Base>
|
||||||
static inline void AddMethods(Environment* env,
|
static inline void AddMethods(Environment* env,
|
||||||
v8::Local<v8::FunctionTemplate> target,
|
v8::Local<v8::FunctionTemplate> target);
|
||||||
int flags = kFlagNone);
|
|
||||||
|
|
||||||
virtual bool IsAlive() = 0;
|
virtual bool IsAlive() = 0;
|
||||||
virtual bool IsClosing() = 0;
|
virtual bool IsClosing() = 0;
|
||||||
|
@ -97,8 +97,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
|
|||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::AddMethods(Environment* env,
|
void LibuvStreamWrap::AddMethods(Environment* env,
|
||||||
v8::Local<v8::FunctionTemplate> target,
|
v8::Local<v8::FunctionTemplate> target) {
|
||||||
int flags) {
|
|
||||||
Local<FunctionTemplate> get_write_queue_size =
|
Local<FunctionTemplate> get_write_queue_size =
|
||||||
FunctionTemplate::New(env->isolate(),
|
FunctionTemplate::New(env->isolate(),
|
||||||
GetWriteQueueSize,
|
GetWriteQueueSize,
|
||||||
@ -110,7 +109,7 @@ void LibuvStreamWrap::AddMethods(Environment* env,
|
|||||||
Local<FunctionTemplate>(),
|
Local<FunctionTemplate>(),
|
||||||
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
|
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
|
||||||
env->SetProtoMethod(target, "setBlocking", SetBlocking);
|
env->SetProtoMethod(target, "setBlocking", SetBlocking);
|
||||||
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
|
StreamBase::AddMethods<LibuvStreamWrap>(env, target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -85,8 +85,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
|
|||||||
AsyncWrap* GetAsyncWrap() override;
|
AsyncWrap* GetAsyncWrap() override;
|
||||||
|
|
||||||
static void AddMethods(Environment* env,
|
static void AddMethods(Environment* env,
|
||||||
v8::Local<v8::FunctionTemplate> target,
|
v8::Local<v8::FunctionTemplate> target);
|
||||||
int flags = StreamBase::kFlagNone);
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
inline void set_fd(int fd) {
|
inline void set_fd(int fd) {
|
||||||
|
@ -93,7 +93,7 @@ void TCPWrap::Initialize(Local<Object> target,
|
|||||||
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
|
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
|
||||||
env->SetProtoMethod(t, "hasRef", HandleWrap::HasRef);
|
env->SetProtoMethod(t, "hasRef", HandleWrap::HasRef);
|
||||||
|
|
||||||
LibuvStreamWrap::AddMethods(env, t, StreamBase::kFlagHasWritev);
|
LibuvStreamWrap::AddMethods(env, t);
|
||||||
|
|
||||||
env->SetProtoMethod(t, "open", Open);
|
env->SetProtoMethod(t, "open", Open);
|
||||||
env->SetProtoMethod(t, "bind", Bind);
|
env->SetProtoMethod(t, "bind", Bind);
|
||||||
|
@ -895,7 +895,7 @@ void TLSWrap::Initialize(Local<Object> target,
|
|||||||
env->SetProtoMethod(t, "destroySSL", DestroySSL);
|
env->SetProtoMethod(t, "destroySSL", DestroySSL);
|
||||||
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
|
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
|
||||||
|
|
||||||
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
|
StreamBase::AddMethods<TLSWrap>(env, t);
|
||||||
SSLWrap<TLSWrap>::AddMethods(env, t);
|
SSLWrap<TLSWrap>::AddMethods(env, t);
|
||||||
|
|
||||||
env->SetProtoMethod(t, "getServername", GetServername);
|
env->SetProtoMethod(t, "getServername", GetServername);
|
||||||
|
52
test/parallel/test-http2-pipe-named-pipe.js
Normal file
52
test/parallel/test-http2-pipe-named-pipe.js
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const fixtures = require('../common/fixtures');
|
||||||
|
const assert = require('assert');
|
||||||
|
const http2 = require('http2');
|
||||||
|
const fs = require('fs');
|
||||||
|
const net = require('net');
|
||||||
|
const path = require('path');
|
||||||
|
|
||||||
|
// HTTP/2 servers can listen on a named pipe.
|
||||||
|
|
||||||
|
const tmpdir = require('../common/tmpdir');
|
||||||
|
tmpdir.refresh();
|
||||||
|
const loc = fixtures.path('url-tests.js');
|
||||||
|
const fn = path.join(tmpdir.path, 'http2-url-tests.js');
|
||||||
|
|
||||||
|
const server = http2.createServer();
|
||||||
|
|
||||||
|
server.on('stream', common.mustCall((stream) => {
|
||||||
|
const dest = stream.pipe(fs.createWriteStream(fn));
|
||||||
|
|
||||||
|
dest.on('finish', () => {
|
||||||
|
assert.strictEqual(fs.readFileSync(loc).length,
|
||||||
|
fs.readFileSync(fn).length);
|
||||||
|
});
|
||||||
|
stream.respond();
|
||||||
|
stream.end();
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.listen(common.PIPE, common.mustCall(() => {
|
||||||
|
const client = http2.connect('http://localhost', {
|
||||||
|
createConnection(url) {
|
||||||
|
return net.connect(server.address());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const req = client.request({ ':method': 'POST' });
|
||||||
|
req.on('response', common.mustCall());
|
||||||
|
req.resume();
|
||||||
|
|
||||||
|
req.on('close', common.mustCall(() => {
|
||||||
|
server.close();
|
||||||
|
client.close();
|
||||||
|
}));
|
||||||
|
|
||||||
|
const str = fs.createReadStream(loc);
|
||||||
|
str.on('end', common.mustCall());
|
||||||
|
str.pipe(req);
|
||||||
|
}));
|
Loading…
x
Reference in New Issue
Block a user