src: turn JS stream into a full duplex
Remove unused methods for reading data from `JSStream` and add those required for emitting data or an EOF event to the JS side, in essentially the same way that `LibuvStreamWrap` does it. PR-URL: https://github.com/nodejs/node/pull/16269 Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
127f83ab8d
commit
170bc31669
@ -27,6 +27,9 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
|
||||
StreamBase(env) {
|
||||
node::Wrap(obj, this);
|
||||
MakeWeak<JSStream>(this);
|
||||
|
||||
set_alloc_cb({ OnAllocImpl, this });
|
||||
set_read_cb({ OnReadImpl, this });
|
||||
}
|
||||
|
||||
|
||||
@ -34,6 +37,45 @@ JSStream::~JSStream() {
|
||||
}
|
||||
|
||||
|
||||
void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
|
||||
buf->base = Malloc(size);
|
||||
buf->len = size;
|
||||
}
|
||||
|
||||
|
||||
void JSStream::OnReadImpl(ssize_t nread,
|
||||
const uv_buf_t* buf,
|
||||
uv_handle_type pending,
|
||||
void* ctx) {
|
||||
JSStream* wrap = static_cast<JSStream*>(ctx);
|
||||
CHECK_NE(wrap, nullptr);
|
||||
Environment* env = wrap->env();
|
||||
HandleScope handle_scope(env->isolate());
|
||||
Context::Scope context_scope(env->context());
|
||||
|
||||
if (nread < 0) {
|
||||
if (buf != nullptr && buf->base != nullptr)
|
||||
free(buf->base);
|
||||
wrap->EmitData(nread, Local<Object>(), Local<Object>());
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread == 0) {
|
||||
if (buf->base != nullptr)
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
|
||||
CHECK_LE(static_cast<size_t>(nread), buf->len);
|
||||
char* base = node::Realloc(buf->base, nread);
|
||||
|
||||
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
|
||||
|
||||
Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
|
||||
wrap->EmitData(nread, obj, Local<Object>());
|
||||
}
|
||||
|
||||
|
||||
void* JSStream::Cast() {
|
||||
return static_cast<void*>(this);
|
||||
}
|
||||
@ -134,37 +176,6 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {
|
||||
}
|
||||
|
||||
|
||||
static void FreeCallback(char* data, void* hint) {
|
||||
// Intentional no-op
|
||||
}
|
||||
|
||||
|
||||
void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
|
||||
JSStream* wrap;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
|
||||
|
||||
uv_buf_t buf;
|
||||
wrap->OnAlloc(args[0]->Int32Value(), &buf);
|
||||
Local<Object> vbuf = Buffer::New(
|
||||
wrap->env(),
|
||||
buf.base,
|
||||
buf.len,
|
||||
FreeCallback,
|
||||
nullptr).ToLocalChecked();
|
||||
return args.GetReturnValue().Set(vbuf);
|
||||
}
|
||||
|
||||
|
||||
void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
|
||||
JSStream* wrap;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
|
||||
|
||||
CHECK(Buffer::HasInstance(args[1]));
|
||||
uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
|
||||
wrap->OnRead(args[0]->Int32Value(), &buf);
|
||||
}
|
||||
|
||||
|
||||
void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
|
||||
JSStream* wrap;
|
||||
CHECK(args[0]->IsObject());
|
||||
@ -230,8 +241,6 @@ void JSStream::Initialize(Local<Object> target,
|
||||
|
||||
AsyncWrap::AddWrapMethods(env, t);
|
||||
|
||||
env->SetProtoMethod(t, "doAlloc", DoAlloc);
|
||||
env->SetProtoMethod(t, "doRead", DoRead);
|
||||
env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
|
||||
env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
|
||||
env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
|
||||
|
@ -38,12 +38,16 @@ class JSStream : public AsyncWrap, public StreamBase {
|
||||
AsyncWrap* GetAsyncWrap() override;
|
||||
|
||||
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
|
||||
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
|
||||
static void OnReadImpl(ssize_t nread,
|
||||
const uv_buf_t* buf,
|
||||
uv_handle_type pending,
|
||||
void* ctx);
|
||||
|
||||
template <class Wrap>
|
||||
static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
};
|
||||
|
22
test/parallel/test-wrap-js-stream-duplex.js
Normal file
22
test/parallel/test-wrap-js-stream-duplex.js
Normal file
@ -0,0 +1,22 @@
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const StreamWrap = require('_stream_wrap');
|
||||
const { PassThrough } = require('stream');
|
||||
const { Socket } = require('net');
|
||||
|
||||
{
|
||||
const wrap = new StreamWrap(new PassThrough());
|
||||
assert(wrap instanceof Socket);
|
||||
wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo')));
|
||||
wrap.on('end', common.mustNotCall());
|
||||
wrap.write('foo');
|
||||
}
|
||||
|
||||
{
|
||||
const wrap = new StreamWrap(new PassThrough());
|
||||
assert(wrap instanceof Socket);
|
||||
wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo')));
|
||||
wrap.on('end', common.mustCall());
|
||||
wrap.end('foo');
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user