zlib: refactor zlib internals

Split out things that are specific to zlib as a specific
compression library, vs. the interface that is common to
most C compression libraries.

This should pave the way for including support for e.g.
brotli.

PR-URL: https://github.com/nodejs/node/pull/23360
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
This commit is contained in:
Anna Henningsen 2018-09-22 14:09:52 +02:00 committed by Daniel Bevenius
parent 74f854e769
commit c34eae5f88
2 changed files with 575 additions and 467 deletions

View File

@ -88,32 +88,75 @@ enum node_zlib_mode {
#define GZIP_HEADER_ID1 0x1f #define GZIP_HEADER_ID1 0x1f
#define GZIP_HEADER_ID2 0x8b #define GZIP_HEADER_ID2 0x8b
/** struct CompressionError {
* Deflate/Inflate CompressionError(const char* message, const char* code, int err)
*/ : message(message), code(code), err(err) {}
class ZCtx : public AsyncWrap, public ThreadPoolWork { CompressionError() = default;
const char* message = nullptr;
const char* code = nullptr;
int err = 0;
inline bool IsError() const { return code != nullptr; }
};
class ZlibContext : public MemoryRetainer {
public: public:
ZCtx(Environment* env, Local<Object> wrap, node_zlib_mode mode) ZlibContext() = default;
// Streaming-related, should be available for all compression libraries:
void Close();
void DoThreadPoolWork();
void SetBuffers(char* in, uint32_t in_len, char* out, uint32_t out_len);
void SetFlush(int flush);
void GetAfterWriteOffsets(uint32_t* avail_in, uint32_t* avail_out) const;
CompressionError GetErrorInfo() const;
// Zlib-specific:
CompressionError Init(int level, int window_bits, int mem_level, int strategy,
std::vector<unsigned char>&& dictionary);
inline void SetMode(node_zlib_mode mode) { mode_ = mode; }
void SetAllocationFunctions(alloc_func alloc, free_func free, void* opaque);
CompressionError ResetStream();
CompressionError SetParams(int level, int strategy);
SET_MEMORY_INFO_NAME(ZlibContext)
SET_SELF_SIZE(ZlibContext)
void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("dictionary", dictionary_);
}
private:
CompressionError ErrorForMessage(const char* message) const;
CompressionError SetDictionary();
int err_ = 0;
int flush_ = 0;
int level_ = 0;
int mem_level_ = 0;
node_zlib_mode mode_ = NONE;
int strategy_ = 0;
int window_bits_ = 0;
unsigned int gzip_id_bytes_read_ = 0;
std::vector<unsigned char> dictionary_;
z_stream strm_;
DISALLOW_COPY_AND_ASSIGN(ZlibContext);
};
template <typename CompressionContext>
class CompressionStream : public AsyncWrap, public ThreadPoolWork {
public:
CompressionStream(Environment* env, Local<Object> wrap)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB), : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB),
ThreadPoolWork(env), ThreadPoolWork(env),
err_(0),
flush_(0),
init_done_(false),
level_(0),
memLevel_(0),
mode_(mode),
strategy_(0),
windowBits_(0),
write_in_progress_(false),
pending_close_(false),
refs_(0),
gzip_id_bytes_read_(0),
write_result_(nullptr) { write_result_(nullptr) {
MakeWeak(); MakeWeak();
} }
~CompressionStream() override {
~ZCtx() override {
CHECK_EQ(false, write_in_progress_ && "write in progress"); CHECK_EQ(false, write_in_progress_ && "write in progress");
Close(); Close();
CHECK_EQ(zlib_memory_, 0); CHECK_EQ(zlib_memory_, 0);
@ -127,27 +170,16 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
} }
pending_close_ = false; pending_close_ = false;
closed_ = true;
CHECK(init_done_ && "close before init"); CHECK(init_done_ && "close before init");
CHECK_LE(mode_, UNZIP);
AllocScope alloc_scope(this); AllocScope alloc_scope(this);
int status = Z_OK; ctx_.Close();
if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) {
status = deflateEnd(&strm_);
} else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW ||
mode_ == UNZIP) {
status = inflateEnd(&strm_);
}
CHECK(status == Z_OK || status == Z_DATA_ERROR);
mode_ = NONE;
dictionary_.clear();
} }
static void Close(const FunctionCallbackInfo<Value>& args) { static void Close(const FunctionCallbackInfo<Value>& args) {
ZCtx* ctx; CompressionStream* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
ctx->Close(); ctx->Close();
} }
@ -198,7 +230,7 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf))); CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf)));
out = Buffer::Data(out_buf) + out_off; out = Buffer::Data(out_buf) + out_off;
ZCtx* ctx; CompressionStream* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
ctx->Write<async>(flush, in, in_len, out, out_len); ctx->Write<async>(flush, in, in_len, out, out_len);
@ -211,26 +243,22 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
AllocScope alloc_scope(this); AllocScope alloc_scope(this);
CHECK(init_done_ && "write before init"); CHECK(init_done_ && "write before init");
CHECK(mode_ != NONE && "already finalized"); CHECK(!closed_ && "already finalized");
CHECK_EQ(false, write_in_progress_); CHECK_EQ(false, write_in_progress_);
CHECK_EQ(false, pending_close_); CHECK_EQ(false, pending_close_);
write_in_progress_ = true; write_in_progress_ = true;
Ref(); Ref();
strm_.avail_in = in_len; ctx_.SetBuffers(in, in_len, out, out_len);
strm_.next_in = reinterpret_cast<Bytef*>(in); ctx_.SetFlush(flush);
strm_.avail_out = out_len;
strm_.next_out = reinterpret_cast<Bytef*>(out);
flush_ = flush;
if (!async) { if (!async) {
// sync version // sync version
env()->PrintSyncTrace(); env()->PrintSyncTrace();
DoThreadPoolWork(); DoThreadPoolWork();
if (CheckError()) { if (CheckError()) {
write_result_[0] = strm_.avail_out; UpdateWriteResult();
write_result_[1] = strm_.avail_in;
write_in_progress_ = false; write_in_progress_ = false;
} }
Unref(); Unref();
@ -241,11 +269,296 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
ScheduleWork(); ScheduleWork();
} }
void UpdateWriteResult() {
ctx_.GetAfterWriteOffsets(&write_result_[1], &write_result_[0]);
}
// thread pool! // thread pool!
// This function may be called multiple times on the uv_work pool // This function may be called multiple times on the uv_work pool
// for a single write() call, until all of the input bytes have // for a single write() call, until all of the input bytes have
// been consumed. // been consumed.
void DoThreadPoolWork() override { void DoThreadPoolWork() override {
ctx_.DoThreadPoolWork();
}
bool CheckError() {
const CompressionError err = ctx_.GetErrorInfo();
if (!err.IsError()) return true;
EmitError(err);
return false;
}
// v8 land!
void AfterThreadPoolWork(int status) override {
AllocScope alloc_scope(this);
OnScopeLeave on_scope_leave([&]() { Unref(); });
write_in_progress_ = false;
if (status == UV_ECANCELED) {
Close();
return;
}
CHECK_EQ(status, 0);
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
if (!CheckError())
return;
UpdateWriteResult();
// call the write() cb
Local<Function> cb = PersistentToLocal(env()->isolate(),
write_js_callback_);
MakeCallback(cb, 0, nullptr);
if (pending_close_)
Close();
}
// TODO(addaleax): Switch to modern error system (node_errors.h).
void EmitError(const CompressionError& err) {
// If you hit this assertion, you forgot to enter the v8::Context first.
CHECK_EQ(env()->context(), env()->isolate()->GetCurrentContext());
HandleScope scope(env()->isolate());
Local<Value> args[3] = {
OneByteString(env()->isolate(), err.message),
Integer::New(env()->isolate(), err.err),
OneByteString(env()->isolate(), err.code)
};
MakeCallback(env()->onerror_string(), arraysize(args), args);
// no hope of rescue.
write_in_progress_ = false;
if (pending_close_)
Close();
}
void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("compression context", ctx_);
tracker->TrackFieldWithSize("zlib_memory",
zlib_memory_ + unreported_allocations_);
}
protected:
CompressionContext* context() { return &ctx_; }
void InitStream(uint32_t* write_result, Local<Function> write_js_callback) {
write_result_ = write_result;
write_js_callback_.Reset(env()->isolate(), write_js_callback);
init_done_ = true;
}
// Allocation functions provided to zlib itself. We store the real size of
// the allocated memory chunk just before the "payload" memory we return
// to zlib.
// Because we use zlib off the thread pool, we can not report memory directly
// to V8; rather, we first store it as "unreported" memory in a separate
// field and later report it back from the main thread.
static void* AllocForZlib(void* data, uInt items, uInt size) {
CompressionStream* ctx = static_cast<CompressionStream*>(data);
size_t real_size =
MultiplyWithOverflowCheck(static_cast<size_t>(items),
static_cast<size_t>(size)) + sizeof(size_t);
char* memory = UncheckedMalloc(real_size);
if (UNLIKELY(memory == nullptr)) return nullptr;
*reinterpret_cast<size_t*>(memory) = real_size;
ctx->unreported_allocations_.fetch_add(real_size,
std::memory_order_relaxed);
return memory + sizeof(size_t);
}
static void FreeForZlib(void* data, void* pointer) {
if (UNLIKELY(pointer == nullptr)) return;
CompressionStream* ctx = static_cast<CompressionStream*>(data);
char* real_pointer = static_cast<char*>(pointer) - sizeof(size_t);
size_t real_size = *reinterpret_cast<size_t*>(real_pointer);
ctx->unreported_allocations_.fetch_sub(real_size,
std::memory_order_relaxed);
free(real_pointer);
}
// This is called on the main thread after zlib may have allocated something
// in order to report it back to V8.
void AdjustAmountOfExternalAllocatedMemory() {
ssize_t report =
unreported_allocations_.exchange(0, std::memory_order_relaxed);
if (report == 0) return;
CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast<size_t>(-report));
zlib_memory_ += report;
env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report);
}
struct AllocScope {
explicit AllocScope(CompressionStream* stream) : stream(stream) {}
~AllocScope() { stream->AdjustAmountOfExternalAllocatedMemory(); }
CompressionStream* stream;
};
private:
void Ref() {
if (++refs_ == 1) {
ClearWeak();
}
}
void Unref() {
CHECK_GT(refs_, 0);
if (--refs_ == 0) {
MakeWeak();
}
}
bool init_done_ = false;
bool write_in_progress_ = false;
bool pending_close_ = false;
bool closed_ = false;
unsigned int refs_ = 0;
uint32_t* write_result_ = nullptr;
Persistent<Function> write_js_callback_;
std::atomic<ssize_t> unreported_allocations_{0};
size_t zlib_memory_ = 0;
CompressionContext ctx_;
};
class ZlibStream : public CompressionStream<ZlibContext> {
public:
ZlibStream(Environment* env, Local<Object> wrap, node_zlib_mode mode)
: CompressionStream(env, wrap) {
context()->SetMode(mode);
}
static void New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsInt32());
node_zlib_mode mode =
static_cast<node_zlib_mode>(args[0].As<Int32>()->Value());
new ZlibStream(env, args.This(), mode);
}
// just pull the ints out of the args and call the other Init
static void Init(const FunctionCallbackInfo<Value>& args) {
// Refs: https://github.com/nodejs/node/issues/16649
// Refs: https://github.com/nodejs/node/issues/14161
if (args.Length() == 5) {
fprintf(stderr,
"WARNING: You are likely using a version of node-tar or npm that "
"is incompatible with this version of Node.js.\nPlease use "
"either the version of npm that is bundled with Node.js, or "
"a version of npm (> 5.5.1 or < 5.4.0) or node-tar (> 4.0.1) "
"that is compatible with Node.js 9 and above.\n");
}
CHECK(args.Length() == 7 &&
"init(windowBits, level, memLevel, strategy, writeResult, writeCallback,"
" dictionary)");
ZlibStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
Local<Context> context = args.GetIsolate()->GetCurrentContext();
// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
uint32_t window_bits;
if (!args[0]->Uint32Value(context).To(&window_bits)) return;
int32_t level;
if (!args[1]->Int32Value(context).To(&level)) return;
uint32_t mem_level;
if (!args[2]->Uint32Value(context).To(&mem_level)) return;
uint32_t strategy;
if (!args[3]->Uint32Value(context).To(&strategy)) return;
CHECK(args[4]->IsUint32Array());
Local<Uint32Array> array = args[4].As<Uint32Array>();
Local<ArrayBuffer> ab = array->Buffer();
uint32_t* write_result = static_cast<uint32_t*>(ab->GetContents().Data());
Local<Function> write_js_callback = args[5].As<Function>();
std::vector<unsigned char> dictionary;
if (Buffer::HasInstance(args[6])) {
unsigned char* data =
reinterpret_cast<unsigned char*>(Buffer::Data(args[6]));
dictionary = std::vector<unsigned char>(
data,
data + Buffer::Length(args[6]));
}
wrap->InitStream(write_result, write_js_callback);
AllocScope alloc_scope(wrap);
wrap->context()->SetAllocationFunctions(
AllocForZlib, FreeForZlib, static_cast<CompressionStream*>(wrap));
const CompressionError err =
wrap->context()->Init(level, window_bits, mem_level, strategy,
std::move(dictionary));
if (err.IsError())
wrap->EmitError(err);
return args.GetReturnValue().Set(!err.IsError());
}
static void Params(const FunctionCallbackInfo<Value>& args) {
CHECK(args.Length() == 2 && "params(level, strategy)");
ZlibStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
Local<Context> context = args.GetIsolate()->GetCurrentContext();
int level;
if (!args[0]->Int32Value(context).To(&level)) return;
int strategy;
if (!args[1]->Int32Value(context).To(&strategy)) return;
AllocScope alloc_scope(wrap);
const CompressionError err = wrap->context()->SetParams(level, strategy);
if (err.IsError())
wrap->EmitError(err);
}
static void Reset(const FunctionCallbackInfo<Value> &args) {
ZlibStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
AllocScope alloc_scope(wrap);
const CompressionError err = wrap->context()->ResetStream();
if (err.IsError())
wrap->EmitError(err);
}
SET_MEMORY_INFO_NAME(ZlibStream)
SET_SELF_SIZE(ZlibStream)
};
void ZlibContext::Close() {
CHECK_LE(mode_, UNZIP);
int status = Z_OK;
if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) {
status = deflateEnd(&strm_);
} else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW ||
mode_ == UNZIP) {
status = inflateEnd(&strm_);
}
CHECK(status == Z_OK || status == Z_DATA_ERROR);
mode_ = NONE;
dictionary_.clear();
}
void ZlibContext::DoThreadPoolWork() {
const Bytef* next_expected_header_byte = nullptr; const Bytef* next_expected_header_byte = nullptr;
// If the avail_out is left at 0, then it means that it ran out // If the avail_out is left at 0, then it means that it ran out
@ -336,237 +649,146 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
// Trailing zero bytes are okay, though, since they are frequently // Trailing zero bytes are okay, though, since they are frequently
// used for padding. // used for padding.
Reset(); ResetStream();
err_ = inflate(&strm_, flush_); err_ = inflate(&strm_, flush_);
} }
break; break;
default: default:
UNREACHABLE(); UNREACHABLE();
} }
// pass any errors back to the main thread to deal with.
// now After will emit the output, and
// either schedule another call to Process,
// or shift the queue and call Process.
} }
bool CheckError() { void ZlibContext::SetBuffers(char* in, uint32_t in_len,
char* out, uint32_t out_len) {
strm_.avail_in = in_len;
strm_.next_in = reinterpret_cast<Bytef*>(in);
strm_.avail_out = out_len;
strm_.next_out = reinterpret_cast<Bytef*>(out);
}
void ZlibContext::SetFlush(int flush) {
flush_ = flush;
}
void ZlibContext::GetAfterWriteOffsets(uint32_t* avail_in,
uint32_t* avail_out) const {
*avail_in = strm_.avail_in;
*avail_out = strm_.avail_out;
}
CompressionError ZlibContext::ErrorForMessage(const char* message) const {
if (strm_.msg != nullptr)
message = strm_.msg;
return CompressionError { message, ZlibStrerror(err_), err_ };
}
CompressionError ZlibContext::GetErrorInfo() const {
// Acceptable error states depend on the type of zlib stream. // Acceptable error states depend on the type of zlib stream.
switch (err_) { switch (err_) {
case Z_OK: case Z_OK:
case Z_BUF_ERROR: case Z_BUF_ERROR:
if (strm_.avail_out != 0 && flush_ == Z_FINISH) { if (strm_.avail_out != 0 && flush_ == Z_FINISH) {
Error("unexpected end of file"); return ErrorForMessage("unexpected end of file");
return false;
} }
case Z_STREAM_END: case Z_STREAM_END:
// normal statuses, not fatal // normal statuses, not fatal
break; break;
case Z_NEED_DICT: case Z_NEED_DICT:
if (dictionary_.empty()) if (dictionary_.empty())
Error("Missing dictionary"); return ErrorForMessage("Missing dictionary");
else else
Error("Bad dictionary"); return ErrorForMessage("Bad dictionary");
return false;
default: default:
// something else. // something else.
Error("Zlib error"); return ErrorForMessage("Zlib error");
return false;
} }
return true; return CompressionError {};
} }
// v8 land! CompressionError ZlibContext::ResetStream() {
void AfterThreadPoolWork(int status) override { err_ = Z_OK;
AllocScope alloc_scope(this);
OnScopeLeave on_scope_leave([&]() { Unref(); });
write_in_progress_ = false; switch (mode_) {
case DEFLATE:
if (status == UV_ECANCELED) { case DEFLATERAW:
Close(); case GZIP:
return; err_ = deflateReset(&strm_);
break;
case INFLATE:
case INFLATERAW:
case GUNZIP:
err_ = inflateReset(&strm_);
break;
default:
break;
} }
CHECK_EQ(status, 0); if (err_ != Z_OK)
return ErrorForMessage("Failed to reset stream");
HandleScope handle_scope(env()->isolate()); return SetDictionary();
Context::Scope context_scope(env()->context());
if (!CheckError())
return;
write_result_[0] = strm_.avail_out;
write_result_[1] = strm_.avail_in;
// call the write() cb
Local<Function> cb = PersistentToLocal(env()->isolate(),
write_js_callback_);
MakeCallback(cb, 0, nullptr);
if (pending_close_)
Close();
} }
// TODO(addaleax): Switch to modern error system (node_errors.h).
void Error(const char* message) {
// If you hit this assertion, you forgot to enter the v8::Context first.
CHECK_EQ(env()->context(), env()->isolate()->GetCurrentContext());
if (strm_.msg != nullptr) { void ZlibContext::SetAllocationFunctions(alloc_func alloc,
message = strm_.msg; free_func free,
void* opaque) {
strm_.zalloc = alloc;
strm_.zfree = free;
strm_.opaque = opaque;
} }
HandleScope scope(env()->isolate());
Local<Value> args[3] = {
OneByteString(env()->isolate(), message),
Integer::New(env()->isolate(), err_),
OneByteString(env()->isolate(), ZlibStrerror(err_))
};
MakeCallback(env()->onerror_string(), arraysize(args), args);
// no hope of rescue. CompressionError ZlibContext::Init(
write_in_progress_ = false; int level, int window_bits, int mem_level, int strategy,
if (pending_close_) std::vector<unsigned char>&& dictionary) {
Close(); if (!((window_bits == 0) &&
} (mode_ == INFLATE ||
mode_ == GUNZIP ||
static void New(const FunctionCallbackInfo<Value>& args) { mode_ == UNZIP))) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsInt32());
node_zlib_mode mode =
static_cast<node_zlib_mode>(args[0].As<Int32>()->Value());
new ZCtx(env, args.This(), mode);
}
// just pull the ints out of the args and call the other Init
static void Init(const FunctionCallbackInfo<Value>& args) {
// Refs: https://github.com/nodejs/node/issues/16649
// Refs: https://github.com/nodejs/node/issues/14161
if (args.Length() == 5) {
fprintf(stderr,
"WARNING: You are likely using a version of node-tar or npm that "
"is incompatible with this version of Node.js.\nPlease use "
"either the version of npm that is bundled with Node.js, or "
"a version of npm (> 5.5.1 or < 5.4.0) or node-tar (> 4.0.1) "
"that is compatible with Node.js 9 and above.\n");
}
CHECK(args.Length() == 7 &&
"init(windowBits, level, memLevel, strategy, writeResult, writeCallback,"
" dictionary)");
ZCtx* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
Local<Context> context = args.GetIsolate()->GetCurrentContext();
// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
uint32_t windowBits;
if (!args[0]->Uint32Value(context).To(&windowBits)) return;
if (!((windowBits == 0) &&
(ctx->mode_ == INFLATE ||
ctx->mode_ == GUNZIP ||
ctx->mode_ == UNZIP))) {
CHECK( CHECK(
(windowBits >= Z_MIN_WINDOWBITS && windowBits <= Z_MAX_WINDOWBITS) && (window_bits >= Z_MIN_WINDOWBITS && window_bits <= Z_MAX_WINDOWBITS) &&
"invalid windowBits"); "invalid windowBits");
} }
int level;
if (!args[1]->Int32Value(context).To(&level)) return;
CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) && CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) &&
"invalid compression level"); "invalid compression level");
uint32_t memLevel; CHECK((mem_level >= Z_MIN_MEMLEVEL && mem_level <= Z_MAX_MEMLEVEL) &&
if (!args[2]->Uint32Value(context).To(&memLevel)) return;
CHECK((memLevel >= Z_MIN_MEMLEVEL && memLevel <= Z_MAX_MEMLEVEL) &&
"invalid memlevel"); "invalid memlevel");
uint32_t strategy;
if (!args[3]->Uint32Value(context).To(&strategy)) return;
CHECK((strategy == Z_FILTERED || strategy == Z_HUFFMAN_ONLY || CHECK((strategy == Z_FILTERED || strategy == Z_HUFFMAN_ONLY ||
strategy == Z_RLE || strategy == Z_FIXED || strategy == Z_RLE || strategy == Z_FIXED ||
strategy == Z_DEFAULT_STRATEGY) && strategy == Z_DEFAULT_STRATEGY) &&
"invalid strategy"); "invalid strategy");
CHECK(args[4]->IsUint32Array());
Local<Uint32Array> array = args[4].As<Uint32Array>();
Local<ArrayBuffer> ab = array->Buffer();
uint32_t* write_result = static_cast<uint32_t*>(ab->GetContents().Data());
Local<Function> write_js_callback = args[5].As<Function>();
std::vector<unsigned char> dictionary;
if (Buffer::HasInstance(args[6])) {
unsigned char* data =
reinterpret_cast<unsigned char*>(Buffer::Data(args[6]));
dictionary = std::vector<unsigned char>(
data,
data + Buffer::Length(args[6]));
}
bool ret = ctx->Init(level, windowBits, memLevel, strategy, write_result,
write_js_callback, std::move(dictionary));
if (ret)
ctx->SetDictionary();
return args.GetReturnValue().Set(ret);
}
static void Params(const FunctionCallbackInfo<Value>& args) {
CHECK(args.Length() == 2 && "params(level, strategy)");
ZCtx* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
Environment* env = ctx->env();
int level;
if (!args[0]->Int32Value(env->context()).To(&level)) return;
int strategy;
if (!args[1]->Int32Value(env->context()).To(&strategy)) return;
ctx->Params(level, strategy);
}
static void Reset(const FunctionCallbackInfo<Value> &args) {
ZCtx* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
ctx->Reset();
ctx->SetDictionary();
}
bool Init(int level, int windowBits, int memLevel,
int strategy, uint32_t* write_result,
Local<Function> write_js_callback,
std::vector<unsigned char>&& dictionary) {
AllocScope alloc_scope(this);
level_ = level; level_ = level;
windowBits_ = windowBits; window_bits_ = window_bits;
memLevel_ = memLevel; mem_level_ = mem_level;
strategy_ = strategy; strategy_ = strategy;
strm_.zalloc = AllocForZlib;
strm_.zfree = FreeForZlib;
strm_.opaque = static_cast<void*>(this);
flush_ = Z_NO_FLUSH; flush_ = Z_NO_FLUSH;
err_ = Z_OK; err_ = Z_OK;
if (mode_ == GZIP || mode_ == GUNZIP) { if (mode_ == GZIP || mode_ == GUNZIP) {
windowBits_ += 16; window_bits_ += 16;
} }
if (mode_ == UNZIP) { if (mode_ == UNZIP) {
windowBits_ += 32; window_bits_ += 32;
} }
if (mode_ == DEFLATERAW || mode_ == INFLATERAW) { if (mode_ == DEFLATERAW || mode_ == INFLATERAW) {
windowBits_ *= -1; window_bits_ *= -1;
} }
switch (mode_) { switch (mode_) {
@ -576,15 +798,15 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
err_ = deflateInit2(&strm_, err_ = deflateInit2(&strm_,
level_, level_,
Z_DEFLATED, Z_DEFLATED,
windowBits_, window_bits_,
memLevel_, mem_level_,
strategy_); strategy_);
break; break;
case INFLATE: case INFLATE:
case GUNZIP: case GUNZIP:
case INFLATERAW: case INFLATERAW:
case UNZIP: case UNZIP:
err_ = inflateInit2(&strm_, windowBits_); err_ = inflateInit2(&strm_, window_bits_);
break; break;
default: default:
UNREACHABLE(); UNREACHABLE();
@ -592,23 +814,19 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
dictionary_ = std::move(dictionary); dictionary_ = std::move(dictionary);
write_in_progress_ = false;
init_done_ = true;
if (err_ != Z_OK) { if (err_ != Z_OK) {
dictionary_.clear(); dictionary_.clear();
mode_ = NONE; mode_ = NONE;
return false; return ErrorForMessage(nullptr);
} }
write_result_ = write_result; return SetDictionary();
write_js_callback_.Reset(env()->isolate(), write_js_callback);
return true;
} }
void SetDictionary() {
CompressionError ZlibContext::SetDictionary() {
if (dictionary_.empty()) if (dictionary_.empty())
return; return CompressionError {};
err_ = Z_OK; err_ = Z_OK;
@ -631,13 +849,14 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
} }
if (err_ != Z_OK) { if (err_ != Z_OK) {
Error("Failed to set dictionary"); return ErrorForMessage("Failed to set dictionary");
}
} }
void Params(int level, int strategy) { return CompressionError {};
AllocScope alloc_scope(this); }
CompressionError ZlibContext::SetParams(int level, int strategy) {
err_ = Z_OK; err_ = Z_OK;
switch (mode_) { switch (mode_) {
@ -650,141 +869,30 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
} }
if (err_ != Z_OK && err_ != Z_BUF_ERROR) { if (err_ != Z_OK && err_ != Z_BUF_ERROR) {
Error("Failed to set parameters"); return ErrorForMessage("Failed to set parameters");
}
} }
void Reset() { return CompressionError {};
AllocScope alloc_scope(this);
err_ = Z_OK;
switch (mode_) {
case DEFLATE:
case DEFLATERAW:
case GZIP:
err_ = deflateReset(&strm_);
break;
case INFLATE:
case INFLATERAW:
case GUNZIP:
err_ = inflateReset(&strm_);
break;
default:
break;
} }
if (err_ != Z_OK) {
Error("Failed to reset stream");
}
}
void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("dictionary", dictionary_);
tracker->TrackFieldWithSize("zlib_memory",
zlib_memory_ + unreported_allocations_);
}
SET_MEMORY_INFO_NAME(ZCtx)
SET_SELF_SIZE(ZCtx)
private:
void Ref() {
if (++refs_ == 1) {
ClearWeak();
}
}
void Unref() {
CHECK_GT(refs_, 0);
if (--refs_ == 0) {
MakeWeak();
}
}
// Allocation functions provided to zlib itself. We store the real size of
// the allocated memory chunk just before the "payload" memory we return
// to zlib.
// Because we use zlib off the thread pool, we can not report memory directly
// to V8; rather, we first store it as "unreported" memory in a separate
// field and later report it back from the main thread.
static void* AllocForZlib(void* data, uInt items, uInt size) {
ZCtx* ctx = static_cast<ZCtx*>(data);
size_t real_size =
MultiplyWithOverflowCheck(static_cast<size_t>(items),
static_cast<size_t>(size)) + sizeof(size_t);
char* memory = UncheckedMalloc(real_size);
if (UNLIKELY(memory == nullptr)) return nullptr;
*reinterpret_cast<size_t*>(memory) = real_size;
ctx->unreported_allocations_.fetch_add(real_size,
std::memory_order_relaxed);
return memory + sizeof(size_t);
}
static void FreeForZlib(void* data, void* pointer) {
if (UNLIKELY(pointer == nullptr)) return;
ZCtx* ctx = static_cast<ZCtx*>(data);
char* real_pointer = static_cast<char*>(pointer) - sizeof(size_t);
size_t real_size = *reinterpret_cast<size_t*>(real_pointer);
ctx->unreported_allocations_.fetch_sub(real_size,
std::memory_order_relaxed);
free(real_pointer);
}
// This is called on the main thread after zlib may have allocated something
// in order to report it back to V8.
void AdjustAmountOfExternalAllocatedMemory() {
ssize_t report =
unreported_allocations_.exchange(0, std::memory_order_relaxed);
if (report == 0) return;
CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast<size_t>(-report));
zlib_memory_ += report;
env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report);
}
struct AllocScope {
explicit AllocScope(ZCtx* ctx) : ctx(ctx) {}
~AllocScope() { ctx->AdjustAmountOfExternalAllocatedMemory(); }
ZCtx* ctx;
};
std::vector<unsigned char> dictionary_;
int err_;
int flush_;
bool init_done_;
int level_;
int memLevel_;
node_zlib_mode mode_;
int strategy_;
z_stream strm_;
int windowBits_;
bool write_in_progress_;
bool pending_close_;
unsigned int refs_;
unsigned int gzip_id_bytes_read_;
uint32_t* write_result_;
Persistent<Function> write_js_callback_;
std::atomic<ssize_t> unreported_allocations_{0};
size_t zlib_memory_ = 0;
};
void Initialize(Local<Object> target, void Initialize(Local<Object> target,
Local<Value> unused, Local<Value> unused,
Local<Context> context, Local<Context> context,
void* priv) { void* priv) {
Environment* env = Environment::GetCurrent(context); Environment* env = Environment::GetCurrent(context);
Local<FunctionTemplate> z = env->NewFunctionTemplate(ZCtx::New); Local<FunctionTemplate> z = env->NewFunctionTemplate(ZlibStream::New);
z->InstanceTemplate()->SetInternalFieldCount(1); z->InstanceTemplate()->SetInternalFieldCount(1);
z->Inherit(AsyncWrap::GetConstructorTemplate(env)); z->Inherit(AsyncWrap::GetConstructorTemplate(env));
env->SetProtoMethod(z, "write", ZCtx::Write<true>); env->SetProtoMethod(z, "write", ZlibStream::Write<true>);
env->SetProtoMethod(z, "writeSync", ZCtx::Write<false>); env->SetProtoMethod(z, "writeSync", ZlibStream::Write<false>);
env->SetProtoMethod(z, "init", ZCtx::Init); env->SetProtoMethod(z, "close", ZlibStream::Close);
env->SetProtoMethod(z, "close", ZCtx::Close);
env->SetProtoMethod(z, "params", ZCtx::Params); env->SetProtoMethod(z, "init", ZlibStream::Init);
env->SetProtoMethod(z, "reset", ZCtx::Reset); env->SetProtoMethod(z, "params", ZlibStream::Params);
env->SetProtoMethod(z, "reset", ZlibStream::Reset);
Local<String> zlibString = FIXED_ONE_BYTE_STRING(env->isolate(), "Zlib"); Local<String> zlibString = FIXED_ONE_BYTE_STRING(env->isolate(), "Zlib");
z->SetClassName(zlibString); z->SetClassName(zlibString);

View File

@ -4,10 +4,10 @@ require('../common');
const { validateSnapshotNodes } = require('../common/heap'); const { validateSnapshotNodes } = require('../common/heap');
const zlib = require('zlib'); const zlib = require('zlib');
validateSnapshotNodes('Node / ZCtx', []); validateSnapshotNodes('Node / ZlibStream', []);
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
const gunzip = zlib.createGunzip(); const gunzip = zlib.createGunzip();
validateSnapshotNodes('Node / ZCtx', [ validateSnapshotNodes('Node / ZlibStream', [
{ {
children: [ children: [
{ node_name: 'Zlib', edge_name: 'wrapped' }, { node_name: 'Zlib', edge_name: 'wrapped' },