From c34eae5f882c8bb4d58b492caf97cdb08b1dbbcb Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 22 Sep 2018 14:09:52 +0200 Subject: [PATCH] zlib: refactor zlib internals Split out things that are specific to zlib as a specific compression library, vs. the interface that is common to most C compression libraries. This should pave the way for including support for e.g. brotli. PR-URL: https://github.com/nodejs/node/pull/23360 Reviewed-By: James M Snell Reviewed-By: Daniel Bevenius --- src/node_zlib.cc | 928 ++++++++++++++++------------ test/parallel/test-heapdump-zlib.js | 4 +- 2 files changed, 520 insertions(+), 412 deletions(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 0935d3f731ded4..8b257cf0d47934 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -88,32 +88,75 @@ enum node_zlib_mode { #define GZIP_HEADER_ID1 0x1f #define GZIP_HEADER_ID2 0x8b -/** - * Deflate/Inflate - */ -class ZCtx : public AsyncWrap, public ThreadPoolWork { +struct CompressionError { + CompressionError(const char* message, const char* code, int err) + : message(message), code(code), err(err) {} + CompressionError() = default; + + const char* message = nullptr; + const char* code = nullptr; + int err = 0; + + inline bool IsError() const { return code != nullptr; } +}; + +class ZlibContext : public MemoryRetainer { + public: + ZlibContext() = default; + + // Streaming-related, should be available for all compression libraries: + void Close(); + void DoThreadPoolWork(); + void SetBuffers(char* in, uint32_t in_len, char* out, uint32_t out_len); + void SetFlush(int flush); + void GetAfterWriteOffsets(uint32_t* avail_in, uint32_t* avail_out) const; + CompressionError GetErrorInfo() const; + + // Zlib-specific: + CompressionError Init(int level, int window_bits, int mem_level, int strategy, + std::vector&& dictionary); + inline void SetMode(node_zlib_mode mode) { mode_ = mode; } + void SetAllocationFunctions(alloc_func alloc, free_func free, void* opaque); + CompressionError ResetStream(); + CompressionError SetParams(int level, int strategy); + + SET_MEMORY_INFO_NAME(ZlibContext) + SET_SELF_SIZE(ZlibContext) + + void MemoryInfo(MemoryTracker* tracker) const override { + tracker->TrackField("dictionary", dictionary_); + } + + private: + CompressionError ErrorForMessage(const char* message) const; + CompressionError SetDictionary(); + + int err_ = 0; + int flush_ = 0; + int level_ = 0; + int mem_level_ = 0; + node_zlib_mode mode_ = NONE; + int strategy_ = 0; + int window_bits_ = 0; + unsigned int gzip_id_bytes_read_ = 0; + std::vector dictionary_; + + z_stream strm_; + + DISALLOW_COPY_AND_ASSIGN(ZlibContext); +}; + +template +class CompressionStream : public AsyncWrap, public ThreadPoolWork { public: - ZCtx(Environment* env, Local wrap, node_zlib_mode mode) + CompressionStream(Environment* env, Local wrap) : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB), ThreadPoolWork(env), - err_(0), - flush_(0), - init_done_(false), - level_(0), - memLevel_(0), - mode_(mode), - strategy_(0), - windowBits_(0), - write_in_progress_(false), - pending_close_(false), - refs_(0), - gzip_id_bytes_read_(0), write_result_(nullptr) { MakeWeak(); } - - ~ZCtx() override { + ~CompressionStream() override { CHECK_EQ(false, write_in_progress_ && "write in progress"); Close(); CHECK_EQ(zlib_memory_, 0); @@ -127,27 +170,16 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { } pending_close_ = false; + closed_ = true; CHECK(init_done_ && "close before init"); - CHECK_LE(mode_, UNZIP); AllocScope alloc_scope(this); - int status = Z_OK; - if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) { - status = deflateEnd(&strm_); - } else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW || - mode_ == UNZIP) { - status = inflateEnd(&strm_); - } - - CHECK(status == Z_OK || status == Z_DATA_ERROR); - mode_ = NONE; - - dictionary_.clear(); + ctx_.Close(); } static void Close(const FunctionCallbackInfo& args) { - ZCtx* ctx; + CompressionStream* ctx; ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); ctx->Close(); } @@ -198,7 +230,7 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf))); out = Buffer::Data(out_buf) + out_off; - ZCtx* ctx; + CompressionStream* ctx; ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); ctx->Write(flush, in, in_len, out, out_len); @@ -211,26 +243,22 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { AllocScope alloc_scope(this); CHECK(init_done_ && "write before init"); - CHECK(mode_ != NONE && "already finalized"); + CHECK(!closed_ && "already finalized"); CHECK_EQ(false, write_in_progress_); CHECK_EQ(false, pending_close_); write_in_progress_ = true; Ref(); - strm_.avail_in = in_len; - strm_.next_in = reinterpret_cast(in); - strm_.avail_out = out_len; - strm_.next_out = reinterpret_cast(out); - flush_ = flush; + ctx_.SetBuffers(in, in_len, out, out_len); + ctx_.SetFlush(flush); if (!async) { // sync version env()->PrintSyncTrace(); DoThreadPoolWork(); if (CheckError()) { - write_result_[0] = strm_.avail_out; - write_result_[1] = strm_.avail_in; + UpdateWriteResult(); write_in_progress_ = false; } Unref(); @@ -241,142 +269,24 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { ScheduleWork(); } + void UpdateWriteResult() { + ctx_.GetAfterWriteOffsets(&write_result_[1], &write_result_[0]); + } + // thread pool! // This function may be called multiple times on the uv_work pool // for a single write() call, until all of the input bytes have // been consumed. void DoThreadPoolWork() override { - const Bytef* next_expected_header_byte = nullptr; - - // If the avail_out is left at 0, then it means that it ran out - // of room. If there was avail_out left over, then it means - // that all of the input was consumed. - switch (mode_) { - case DEFLATE: - case GZIP: - case DEFLATERAW: - err_ = deflate(&strm_, flush_); - break; - case UNZIP: - if (strm_.avail_in > 0) { - next_expected_header_byte = strm_.next_in; - } - - switch (gzip_id_bytes_read_) { - case 0: - if (next_expected_header_byte == nullptr) { - break; - } - - if (*next_expected_header_byte == GZIP_HEADER_ID1) { - gzip_id_bytes_read_ = 1; - next_expected_header_byte++; - - if (strm_.avail_in == 1) { - // The only available byte was already read. - break; - } - } else { - mode_ = INFLATE; - break; - } - - // fallthrough - case 1: - if (next_expected_header_byte == nullptr) { - break; - } - - if (*next_expected_header_byte == GZIP_HEADER_ID2) { - gzip_id_bytes_read_ = 2; - mode_ = GUNZIP; - } else { - // There is no actual difference between INFLATE and INFLATERAW - // (after initialization). - mode_ = INFLATE; - } - - break; - default: - CHECK(0 && "invalid number of gzip magic number bytes read"); - } - - // fallthrough - case INFLATE: - case GUNZIP: - case INFLATERAW: - err_ = inflate(&strm_, flush_); - - // If data was encoded with dictionary (INFLATERAW will have it set in - // SetDictionary, don't repeat that here) - if (mode_ != INFLATERAW && - err_ == Z_NEED_DICT && - !dictionary_.empty()) { - // Load it - err_ = inflateSetDictionary(&strm_, - dictionary_.data(), - dictionary_.size()); - if (err_ == Z_OK) { - // And try to decode again - err_ = inflate(&strm_, flush_); - } else if (err_ == Z_DATA_ERROR) { - // Both inflateSetDictionary() and inflate() return Z_DATA_ERROR. - // Make it possible for After() to tell a bad dictionary from bad - // input. - err_ = Z_NEED_DICT; - } - } - - while (strm_.avail_in > 0 && - mode_ == GUNZIP && - err_ == Z_STREAM_END && - strm_.next_in[0] != 0x00) { - // Bytes remain in input buffer. Perhaps this is another compressed - // member in the same archive, or just trailing garbage. - // Trailing zero bytes are okay, though, since they are frequently - // used for padding. - - Reset(); - err_ = inflate(&strm_, flush_); - } - break; - default: - UNREACHABLE(); - } - - // pass any errors back to the main thread to deal with. - - // now After will emit the output, and - // either schedule another call to Process, - // or shift the queue and call Process. + ctx_.DoThreadPoolWork(); } bool CheckError() { - // Acceptable error states depend on the type of zlib stream. - switch (err_) { - case Z_OK: - case Z_BUF_ERROR: - if (strm_.avail_out != 0 && flush_ == Z_FINISH) { - Error("unexpected end of file"); - return false; - } - case Z_STREAM_END: - // normal statuses, not fatal - break; - case Z_NEED_DICT: - if (dictionary_.empty()) - Error("Missing dictionary"); - else - Error("Bad dictionary"); - return false; - default: - // something else. - Error("Zlib error"); - return false; - } - - return true; + const CompressionError err = ctx_.GetErrorInfo(); + if (!err.IsError()) return true; + EmitError(err); + return false; } @@ -400,8 +310,7 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { if (!CheckError()) return; - write_result_[0] = strm_.avail_out; - write_result_[1] = strm_.avail_in; + UpdateWriteResult(); // call the write() cb Local cb = PersistentToLocal(env()->isolate(), @@ -413,19 +322,15 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { } // TODO(addaleax): Switch to modern error system (node_errors.h). - void Error(const char* message) { + void EmitError(const CompressionError& err) { // If you hit this assertion, you forgot to enter the v8::Context first. CHECK_EQ(env()->context(), env()->isolate()->GetCurrentContext()); - if (strm_.msg != nullptr) { - message = strm_.msg; - } - HandleScope scope(env()->isolate()); Local args[3] = { - OneByteString(env()->isolate(), message), - Integer::New(env()->isolate(), err_), - OneByteString(env()->isolate(), ZlibStrerror(err_)) + OneByteString(env()->isolate(), err.message), + Integer::New(env()->isolate(), err.err), + OneByteString(env()->isolate(), err.code) }; MakeCallback(env()->onerror_string(), arraysize(args), args); @@ -435,12 +340,107 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { Close(); } + void MemoryInfo(MemoryTracker* tracker) const override { + tracker->TrackField("compression context", ctx_); + tracker->TrackFieldWithSize("zlib_memory", + zlib_memory_ + unreported_allocations_); + } + + protected: + CompressionContext* context() { return &ctx_; } + + void InitStream(uint32_t* write_result, Local write_js_callback) { + write_result_ = write_result; + write_js_callback_.Reset(env()->isolate(), write_js_callback); + init_done_ = true; + } + + // Allocation functions provided to zlib itself. We store the real size of + // the allocated memory chunk just before the "payload" memory we return + // to zlib. + // Because we use zlib off the thread pool, we can not report memory directly + // to V8; rather, we first store it as "unreported" memory in a separate + // field and later report it back from the main thread. + static void* AllocForZlib(void* data, uInt items, uInt size) { + CompressionStream* ctx = static_cast(data); + size_t real_size = + MultiplyWithOverflowCheck(static_cast(items), + static_cast(size)) + sizeof(size_t); + char* memory = UncheckedMalloc(real_size); + if (UNLIKELY(memory == nullptr)) return nullptr; + *reinterpret_cast(memory) = real_size; + ctx->unreported_allocations_.fetch_add(real_size, + std::memory_order_relaxed); + return memory + sizeof(size_t); + } + + static void FreeForZlib(void* data, void* pointer) { + if (UNLIKELY(pointer == nullptr)) return; + CompressionStream* ctx = static_cast(data); + char* real_pointer = static_cast(pointer) - sizeof(size_t); + size_t real_size = *reinterpret_cast(real_pointer); + ctx->unreported_allocations_.fetch_sub(real_size, + std::memory_order_relaxed); + free(real_pointer); + } + + // This is called on the main thread after zlib may have allocated something + // in order to report it back to V8. + void AdjustAmountOfExternalAllocatedMemory() { + ssize_t report = + unreported_allocations_.exchange(0, std::memory_order_relaxed); + if (report == 0) return; + CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast(-report)); + zlib_memory_ += report; + env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report); + } + + struct AllocScope { + explicit AllocScope(CompressionStream* stream) : stream(stream) {} + ~AllocScope() { stream->AdjustAmountOfExternalAllocatedMemory(); } + CompressionStream* stream; + }; + + private: + void Ref() { + if (++refs_ == 1) { + ClearWeak(); + } + } + + void Unref() { + CHECK_GT(refs_, 0); + if (--refs_ == 0) { + MakeWeak(); + } + } + + bool init_done_ = false; + bool write_in_progress_ = false; + bool pending_close_ = false; + bool closed_ = false; + unsigned int refs_ = 0; + uint32_t* write_result_ = nullptr; + Persistent write_js_callback_; + std::atomic unreported_allocations_{0}; + size_t zlib_memory_ = 0; + + CompressionContext ctx_; +}; + +class ZlibStream : public CompressionStream { + public: + ZlibStream(Environment* env, Local wrap, node_zlib_mode mode) + : CompressionStream(env, wrap) { + context()->SetMode(mode); + } + static void New(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args[0]->IsInt32()); node_zlib_mode mode = static_cast(args[0].As()->Value()); - new ZCtx(env, args.This(), mode); + new ZlibStream(env, args.This(), mode); } // just pull the ints out of the args and call the other Init @@ -459,42 +459,25 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { "init(windowBits, level, memLevel, strategy, writeResult, writeCallback," " dictionary)"); - ZCtx* ctx; - ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); + ZlibStream* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); Local context = args.GetIsolate()->GetCurrentContext(); // windowBits is special. On the compression side, 0 is an invalid value. // But on the decompression side, a value of 0 for windowBits tells zlib // to use the window size in the zlib header of the compressed stream. - uint32_t windowBits; - if (!args[0]->Uint32Value(context).To(&windowBits)) return; - - if (!((windowBits == 0) && - (ctx->mode_ == INFLATE || - ctx->mode_ == GUNZIP || - ctx->mode_ == UNZIP))) { - CHECK( - (windowBits >= Z_MIN_WINDOWBITS && windowBits <= Z_MAX_WINDOWBITS) && - "invalid windowBits"); - } + uint32_t window_bits; + if (!args[0]->Uint32Value(context).To(&window_bits)) return; - int level; + int32_t level; if (!args[1]->Int32Value(context).To(&level)) return; - CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) && - "invalid compression level"); - uint32_t memLevel; - if (!args[2]->Uint32Value(context).To(&memLevel)) return; - CHECK((memLevel >= Z_MIN_MEMLEVEL && memLevel <= Z_MAX_MEMLEVEL) && - "invalid memlevel"); + uint32_t mem_level; + if (!args[2]->Uint32Value(context).To(&mem_level)) return; uint32_t strategy; if (!args[3]->Uint32Value(context).To(&strategy)) return; - CHECK((strategy == Z_FILTERED || strategy == Z_HUFFMAN_ONLY || - strategy == Z_RLE || strategy == Z_FIXED || - strategy == Z_DEFAULT_STRATEGY) && - "invalid strategy"); CHECK(args[4]->IsUint32Array()); Local array = args[4].As(); @@ -512,261 +495,385 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { data + Buffer::Length(args[6])); } - bool ret = ctx->Init(level, windowBits, memLevel, strategy, write_result, - write_js_callback, std::move(dictionary)); - if (ret) - ctx->SetDictionary(); + wrap->InitStream(write_result, write_js_callback); + + AllocScope alloc_scope(wrap); + wrap->context()->SetAllocationFunctions( + AllocForZlib, FreeForZlib, static_cast(wrap)); + const CompressionError err = + wrap->context()->Init(level, window_bits, mem_level, strategy, + std::move(dictionary)); + if (err.IsError()) + wrap->EmitError(err); - return args.GetReturnValue().Set(ret); + return args.GetReturnValue().Set(!err.IsError()); } static void Params(const FunctionCallbackInfo& args) { CHECK(args.Length() == 2 && "params(level, strategy)"); - ZCtx* ctx; - ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); - Environment* env = ctx->env(); + ZlibStream* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + Local context = args.GetIsolate()->GetCurrentContext(); int level; - if (!args[0]->Int32Value(env->context()).To(&level)) return; + if (!args[0]->Int32Value(context).To(&level)) return; int strategy; - if (!args[1]->Int32Value(env->context()).To(&strategy)) return; - ctx->Params(level, strategy); + if (!args[1]->Int32Value(context).To(&strategy)) return; + + AllocScope alloc_scope(wrap); + const CompressionError err = wrap->context()->SetParams(level, strategy); + if (err.IsError()) + wrap->EmitError(err); } static void Reset(const FunctionCallbackInfo &args) { - ZCtx* ctx; - ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); - ctx->Reset(); - ctx->SetDictionary(); + ZlibStream* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + AllocScope alloc_scope(wrap); + const CompressionError err = wrap->context()->ResetStream(); + if (err.IsError()) + wrap->EmitError(err); } - bool Init(int level, int windowBits, int memLevel, - int strategy, uint32_t* write_result, - Local write_js_callback, - std::vector&& dictionary) { - AllocScope alloc_scope(this); - level_ = level; - windowBits_ = windowBits; - memLevel_ = memLevel; - strategy_ = strategy; + SET_MEMORY_INFO_NAME(ZlibStream) + SET_SELF_SIZE(ZlibStream) +}; - strm_.zalloc = AllocForZlib; - strm_.zfree = FreeForZlib; - strm_.opaque = static_cast(this); - flush_ = Z_NO_FLUSH; +void ZlibContext::Close() { + CHECK_LE(mode_, UNZIP); - err_ = Z_OK; + int status = Z_OK; + if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) { + status = deflateEnd(&strm_); + } else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW || + mode_ == UNZIP) { + status = inflateEnd(&strm_); + } - if (mode_ == GZIP || mode_ == GUNZIP) { - windowBits_ += 16; - } + CHECK(status == Z_OK || status == Z_DATA_ERROR); + mode_ = NONE; - if (mode_ == UNZIP) { - windowBits_ += 32; - } + dictionary_.clear(); +} - if (mode_ == DEFLATERAW || mode_ == INFLATERAW) { - windowBits_ *= -1; - } - switch (mode_) { - case DEFLATE: - case GZIP: - case DEFLATERAW: - err_ = deflateInit2(&strm_, - level_, - Z_DEFLATED, - windowBits_, - memLevel_, - strategy_); - break; - case INFLATE: - case GUNZIP: - case INFLATERAW: - case UNZIP: - err_ = inflateInit2(&strm_, windowBits_); - break; - default: - UNREACHABLE(); - } +void ZlibContext::DoThreadPoolWork() { + const Bytef* next_expected_header_byte = nullptr; + + // If the avail_out is left at 0, then it means that it ran out + // of room. If there was avail_out left over, then it means + // that all of the input was consumed. + switch (mode_) { + case DEFLATE: + case GZIP: + case DEFLATERAW: + err_ = deflate(&strm_, flush_); + break; + case UNZIP: + if (strm_.avail_in > 0) { + next_expected_header_byte = strm_.next_in; + } - dictionary_ = std::move(dictionary); + switch (gzip_id_bytes_read_) { + case 0: + if (next_expected_header_byte == nullptr) { + break; + } - write_in_progress_ = false; - init_done_ = true; + if (*next_expected_header_byte == GZIP_HEADER_ID1) { + gzip_id_bytes_read_ = 1; + next_expected_header_byte++; - if (err_ != Z_OK) { - dictionary_.clear(); - mode_ = NONE; - return false; - } + if (strm_.avail_in == 1) { + // The only available byte was already read. + break; + } + } else { + mode_ = INFLATE; + break; + } - write_result_ = write_result; - write_js_callback_.Reset(env()->isolate(), write_js_callback); - return true; - } + // fallthrough + case 1: + if (next_expected_header_byte == nullptr) { + break; + } - void SetDictionary() { - if (dictionary_.empty()) - return; + if (*next_expected_header_byte == GZIP_HEADER_ID2) { + gzip_id_bytes_read_ = 2; + mode_ = GUNZIP; + } else { + // There is no actual difference between INFLATE and INFLATERAW + // (after initialization). + mode_ = INFLATE; + } - err_ = Z_OK; + break; + default: + CHECK(0 && "invalid number of gzip magic number bytes read"); + } - switch (mode_) { - case DEFLATE: - case DEFLATERAW: - err_ = deflateSetDictionary(&strm_, - dictionary_.data(), - dictionary_.size()); - break; - case INFLATERAW: - // The other inflate cases will have the dictionary set when inflate() - // returns Z_NEED_DICT in Process() + // fallthrough + case INFLATE: + case GUNZIP: + case INFLATERAW: + err_ = inflate(&strm_, flush_); + + // If data was encoded with dictionary (INFLATERAW will have it set in + // SetDictionary, don't repeat that here) + if (mode_ != INFLATERAW && + err_ == Z_NEED_DICT && + !dictionary_.empty()) { + // Load it err_ = inflateSetDictionary(&strm_, dictionary_.data(), dictionary_.size()); - break; - default: - break; - } + if (err_ == Z_OK) { + // And try to decode again + err_ = inflate(&strm_, flush_); + } else if (err_ == Z_DATA_ERROR) { + // Both inflateSetDictionary() and inflate() return Z_DATA_ERROR. + // Make it possible for After() to tell a bad dictionary from bad + // input. + err_ = Z_NEED_DICT; + } + } - if (err_ != Z_OK) { - Error("Failed to set dictionary"); - } + while (strm_.avail_in > 0 && + mode_ == GUNZIP && + err_ == Z_STREAM_END && + strm_.next_in[0] != 0x00) { + // Bytes remain in input buffer. Perhaps this is another compressed + // member in the same archive, or just trailing garbage. + // Trailing zero bytes are okay, though, since they are frequently + // used for padding. + + ResetStream(); + err_ = inflate(&strm_, flush_); + } + break; + default: + UNREACHABLE(); } +} - void Params(int level, int strategy) { - AllocScope alloc_scope(this); - err_ = Z_OK; +void ZlibContext::SetBuffers(char* in, uint32_t in_len, + char* out, uint32_t out_len) { + strm_.avail_in = in_len; + strm_.next_in = reinterpret_cast(in); + strm_.avail_out = out_len; + strm_.next_out = reinterpret_cast(out); +} + + +void ZlibContext::SetFlush(int flush) { + flush_ = flush; +} - switch (mode_) { - case DEFLATE: - case DEFLATERAW: - err_ = deflateParams(&strm_, level, strategy); - break; - default: - break; - } - if (err_ != Z_OK && err_ != Z_BUF_ERROR) { - Error("Failed to set parameters"); +void ZlibContext::GetAfterWriteOffsets(uint32_t* avail_in, + uint32_t* avail_out) const { + *avail_in = strm_.avail_in; + *avail_out = strm_.avail_out; +} + + +CompressionError ZlibContext::ErrorForMessage(const char* message) const { + if (strm_.msg != nullptr) + message = strm_.msg; + + return CompressionError { message, ZlibStrerror(err_), err_ }; +} + + +CompressionError ZlibContext::GetErrorInfo() const { + // Acceptable error states depend on the type of zlib stream. + switch (err_) { + case Z_OK: + case Z_BUF_ERROR: + if (strm_.avail_out != 0 && flush_ == Z_FINISH) { + return ErrorForMessage("unexpected end of file"); } + case Z_STREAM_END: + // normal statuses, not fatal + break; + case Z_NEED_DICT: + if (dictionary_.empty()) + return ErrorForMessage("Missing dictionary"); + else + return ErrorForMessage("Bad dictionary"); + default: + // something else. + return ErrorForMessage("Zlib error"); } - void Reset() { - AllocScope alloc_scope(this); + return CompressionError {}; +} - err_ = Z_OK; - - switch (mode_) { - case DEFLATE: - case DEFLATERAW: - case GZIP: - err_ = deflateReset(&strm_); - break; - case INFLATE: - case INFLATERAW: - case GUNZIP: - err_ = inflateReset(&strm_); - break; - default: - break; - } - if (err_ != Z_OK) { - Error("Failed to reset stream"); - } +CompressionError ZlibContext::ResetStream() { + err_ = Z_OK; + + switch (mode_) { + case DEFLATE: + case DEFLATERAW: + case GZIP: + err_ = deflateReset(&strm_); + break; + case INFLATE: + case INFLATERAW: + case GUNZIP: + err_ = inflateReset(&strm_); + break; + default: + break; } - void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackField("dictionary", dictionary_); - tracker->TrackFieldWithSize("zlib_memory", - zlib_memory_ + unreported_allocations_); + if (err_ != Z_OK) + return ErrorForMessage("Failed to reset stream"); + + return SetDictionary(); +} + + +void ZlibContext::SetAllocationFunctions(alloc_func alloc, + free_func free, + void* opaque) { + strm_.zalloc = alloc; + strm_.zfree = free; + strm_.opaque = opaque; +} + + +CompressionError ZlibContext::Init( + int level, int window_bits, int mem_level, int strategy, + std::vector&& dictionary) { + if (!((window_bits == 0) && + (mode_ == INFLATE || + mode_ == GUNZIP || + mode_ == UNZIP))) { + CHECK( + (window_bits >= Z_MIN_WINDOWBITS && window_bits <= Z_MAX_WINDOWBITS) && + "invalid windowBits"); } - SET_MEMORY_INFO_NAME(ZCtx) - SET_SELF_SIZE(ZCtx) + CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) && + "invalid compression level"); - private: - void Ref() { - if (++refs_ == 1) { - ClearWeak(); - } + CHECK((mem_level >= Z_MIN_MEMLEVEL && mem_level <= Z_MAX_MEMLEVEL) && + "invalid memlevel"); + + CHECK((strategy == Z_FILTERED || strategy == Z_HUFFMAN_ONLY || + strategy == Z_RLE || strategy == Z_FIXED || + strategy == Z_DEFAULT_STRATEGY) && + "invalid strategy"); + + level_ = level; + window_bits_ = window_bits; + mem_level_ = mem_level; + strategy_ = strategy; + + flush_ = Z_NO_FLUSH; + + err_ = Z_OK; + + if (mode_ == GZIP || mode_ == GUNZIP) { + window_bits_ += 16; } - void Unref() { - CHECK_GT(refs_, 0); - if (--refs_ == 0) { - MakeWeak(); - } + if (mode_ == UNZIP) { + window_bits_ += 32; } - // Allocation functions provided to zlib itself. We store the real size of - // the allocated memory chunk just before the "payload" memory we return - // to zlib. - // Because we use zlib off the thread pool, we can not report memory directly - // to V8; rather, we first store it as "unreported" memory in a separate - // field and later report it back from the main thread. - static void* AllocForZlib(void* data, uInt items, uInt size) { - ZCtx* ctx = static_cast(data); - size_t real_size = - MultiplyWithOverflowCheck(static_cast(items), - static_cast(size)) + sizeof(size_t); - char* memory = UncheckedMalloc(real_size); - if (UNLIKELY(memory == nullptr)) return nullptr; - *reinterpret_cast(memory) = real_size; - ctx->unreported_allocations_.fetch_add(real_size, - std::memory_order_relaxed); - return memory + sizeof(size_t); + if (mode_ == DEFLATERAW || mode_ == INFLATERAW) { + window_bits_ *= -1; } - static void FreeForZlib(void* data, void* pointer) { - if (UNLIKELY(pointer == nullptr)) return; - ZCtx* ctx = static_cast(data); - char* real_pointer = static_cast(pointer) - sizeof(size_t); - size_t real_size = *reinterpret_cast(real_pointer); - ctx->unreported_allocations_.fetch_sub(real_size, - std::memory_order_relaxed); - free(real_pointer); + switch (mode_) { + case DEFLATE: + case GZIP: + case DEFLATERAW: + err_ = deflateInit2(&strm_, + level_, + Z_DEFLATED, + window_bits_, + mem_level_, + strategy_); + break; + case INFLATE: + case GUNZIP: + case INFLATERAW: + case UNZIP: + err_ = inflateInit2(&strm_, window_bits_); + break; + default: + UNREACHABLE(); } - // This is called on the main thread after zlib may have allocated something - // in order to report it back to V8. - void AdjustAmountOfExternalAllocatedMemory() { - ssize_t report = - unreported_allocations_.exchange(0, std::memory_order_relaxed); - if (report == 0) return; - CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast(-report)); - zlib_memory_ += report; - env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report); + dictionary_ = std::move(dictionary); + + if (err_ != Z_OK) { + dictionary_.clear(); + mode_ = NONE; + return ErrorForMessage(nullptr); } - struct AllocScope { - explicit AllocScope(ZCtx* ctx) : ctx(ctx) {} - ~AllocScope() { ctx->AdjustAmountOfExternalAllocatedMemory(); } - ZCtx* ctx; - }; + return SetDictionary(); +} - std::vector dictionary_; - int err_; - int flush_; - bool init_done_; - int level_; - int memLevel_; - node_zlib_mode mode_; - int strategy_; - z_stream strm_; - int windowBits_; - bool write_in_progress_; - bool pending_close_; - unsigned int refs_; - unsigned int gzip_id_bytes_read_; - uint32_t* write_result_; - Persistent write_js_callback_; - std::atomic unreported_allocations_{0}; - size_t zlib_memory_ = 0; -}; + +CompressionError ZlibContext::SetDictionary() { + if (dictionary_.empty()) + return CompressionError {}; + + err_ = Z_OK; + + switch (mode_) { + case DEFLATE: + case DEFLATERAW: + err_ = deflateSetDictionary(&strm_, + dictionary_.data(), + dictionary_.size()); + break; + case INFLATERAW: + // The other inflate cases will have the dictionary set when inflate() + // returns Z_NEED_DICT in Process() + err_ = inflateSetDictionary(&strm_, + dictionary_.data(), + dictionary_.size()); + break; + default: + break; + } + + if (err_ != Z_OK) { + return ErrorForMessage("Failed to set dictionary"); + } + + return CompressionError {}; +} + + +CompressionError ZlibContext::SetParams(int level, int strategy) { + err_ = Z_OK; + + switch (mode_) { + case DEFLATE: + case DEFLATERAW: + err_ = deflateParams(&strm_, level, strategy); + break; + default: + break; + } + + if (err_ != Z_OK && err_ != Z_BUF_ERROR) { + return ErrorForMessage("Failed to set parameters"); + } + + return CompressionError {}; +} void Initialize(Local target, @@ -774,17 +881,18 @@ void Initialize(Local target, Local context, void* priv) { Environment* env = Environment::GetCurrent(context); - Local z = env->NewFunctionTemplate(ZCtx::New); + Local z = env->NewFunctionTemplate(ZlibStream::New); z->InstanceTemplate()->SetInternalFieldCount(1); z->Inherit(AsyncWrap::GetConstructorTemplate(env)); - env->SetProtoMethod(z, "write", ZCtx::Write); - env->SetProtoMethod(z, "writeSync", ZCtx::Write); - env->SetProtoMethod(z, "init", ZCtx::Init); - env->SetProtoMethod(z, "close", ZCtx::Close); - env->SetProtoMethod(z, "params", ZCtx::Params); - env->SetProtoMethod(z, "reset", ZCtx::Reset); + env->SetProtoMethod(z, "write", ZlibStream::Write); + env->SetProtoMethod(z, "writeSync", ZlibStream::Write); + env->SetProtoMethod(z, "close", ZlibStream::Close); + + env->SetProtoMethod(z, "init", ZlibStream::Init); + env->SetProtoMethod(z, "params", ZlibStream::Params); + env->SetProtoMethod(z, "reset", ZlibStream::Reset); Local zlibString = FIXED_ONE_BYTE_STRING(env->isolate(), "Zlib"); z->SetClassName(zlibString); diff --git a/test/parallel/test-heapdump-zlib.js b/test/parallel/test-heapdump-zlib.js index f79e345821ea50..0f86576bd1f2fa 100644 --- a/test/parallel/test-heapdump-zlib.js +++ b/test/parallel/test-heapdump-zlib.js @@ -4,10 +4,10 @@ require('../common'); const { validateSnapshotNodes } = require('../common/heap'); const zlib = require('zlib'); -validateSnapshotNodes('Node / ZCtx', []); +validateSnapshotNodes('Node / ZlibStream', []); // eslint-disable-next-line no-unused-vars const gunzip = zlib.createGunzip(); -validateSnapshotNodes('Node / ZCtx', [ +validateSnapshotNodes('Node / ZlibStream', [ { children: [ { node_name: 'Zlib', edge_name: 'wrapped' },