//===----------------------------------------------------------------------===// // DuckDB // // duckdb/storage/compression/chimp/chimp_compress.hpp // // //===----------------------------------------------------------------------===// #pragma once #include "duckdb/storage/compression/chimp/chimp.hpp" #include "duckdb/function/compression_function.hpp" #include "duckdb/storage/compression/chimp/chimp_analyze.hpp" #include "duckdb/common/helper.hpp" #include "duckdb/common/limits.hpp" #include "duckdb/common/types/null_value.hpp" #include "duckdb/function/compression/compression.hpp" #include "duckdb/main/config.hpp" #include "duckdb/storage/buffer_manager.hpp" #include "duckdb/storage/table/column_data_checkpointer.hpp" #include "duckdb/storage/table/column_segment.hpp" #include "duckdb/common/operator/subtract.hpp" #include namespace duckdb { template struct ChimpCompressionState : public CompressionState { public: using CHIMP_TYPE = typename ChimpType::type; explicit ChimpCompressionState(ColumnDataCheckpointer &checkpointer, ChimpAnalyzeState *analyze_state) : checkpointer(checkpointer), function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_CHIMP)) { CreateEmptySegment(checkpointer.GetRowGroup().start); // These buffers are recycled for every group, so they only have to be set once state.AssignLeadingZeroBuffer((uint8_t *)leading_zero_blocks); state.AssignFlagBuffer((uint8_t *)flags); state.AssignPackedDataBuffer((uint16_t *)packed_data_blocks); } ColumnDataCheckpointer &checkpointer; CompressionFunction &function; unique_ptr current_segment; BufferHandle handle; idx_t group_idx = 0; uint8_t flags[ChimpPrimitives::CHIMP_SEQUENCE_SIZE / 4]; uint8_t leading_zero_blocks[ChimpPrimitives::LEADING_ZERO_BLOCK_BUFFERSIZE]; uint16_t packed_data_blocks[ChimpPrimitives::CHIMP_SEQUENCE_SIZE]; // Ptr to next free spot in segment; data_ptr_t segment_data; data_ptr_t metadata_ptr; uint32_t next_group_byte_index_start = ChimpPrimitives::HEADER_SIZE; // The total size of metadata in the current segment idx_t metadata_byte_size = 0; ChimpState state; public: idx_t RequiredSpace() const { idx_t required_space = ChimpPrimitives::MAX_BYTES_PER_VALUE; // Any value could be the last, // so the cost of flushing metadata should be factored into the cost // byte offset of data required_space += sizeof(byte_index_t); // amount of leading zero blocks required_space += sizeof(uint8_t); // first leading zero block required_space += 3; // amount of flag bytes required_space += sizeof(uint8_t); // first flag byte required_space += 1; return required_space; } // How many bytes the data occupies for the current segment idx_t UsedSpace() const { return state.chimp.output.BytesWritten(); } idx_t RemainingSpace() const { return metadata_ptr - (handle.Ptr() + UsedSpace()); } idx_t CurrentGroupMetadataSize() const { idx_t metadata_size = 0; metadata_size += 3 * state.chimp.leading_zero_buffer.BlockCount(); metadata_size += state.chimp.flag_buffer.BytesUsed(); metadata_size += 2 * state.chimp.packed_data_buffer.index; return metadata_size; } // The current segment has enough space to fit this new value bool HasEnoughSpace() { if (handle.Ptr() + AlignValue(ChimpPrimitives::HEADER_SIZE + UsedSpace() + RequiredSpace()) >= (metadata_ptr - CurrentGroupMetadataSize())) { return false; } return true; } void CreateEmptySegment(idx_t row_start) { group_idx = 0; metadata_byte_size = 0; auto &db = checkpointer.GetDatabase(); auto &type = checkpointer.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, row_start); compressed_segment->function = function; current_segment = std::move(compressed_segment); next_group_byte_index_start = ChimpPrimitives::HEADER_SIZE; auto &buffer_manager = BufferManager::GetBufferManager(db); handle = buffer_manager.Pin(current_segment->block); segment_data = handle.Ptr() + current_segment->GetBlockOffset() + ChimpPrimitives::HEADER_SIZE; metadata_ptr = handle.Ptr() + current_segment->GetBlockOffset() + Storage::BLOCK_SIZE; state.AssignDataBuffer(segment_data); state.chimp.Reset(); } void Append(UnifiedVectorFormat &vdata, idx_t count) { auto data = UnifiedVectorFormat::GetData(vdata); for (idx_t i = 0; i < count; i++) { auto idx = vdata.sel->get_index(i); WriteValue(data[idx], vdata.validity.RowIsValid(idx)); } } void WriteValue(CHIMP_TYPE value, bool is_valid) { if (!HasEnoughSpace()) { // Segment is full auto row_start = current_segment->start + current_segment->count; FlushSegment(); CreateEmptySegment(row_start); } current_segment->count++; if (is_valid) { T floating_point_value = Load(const_data_ptr_cast(&value)); NumericStats::Update(current_segment->stats.statistics, floating_point_value); } else { //! FIXME: find a cheaper alternative to storing a NULL // store this as "value_identical", only using 9 bits for a NULL value = state.chimp.previous_value; } Chimp128Compression::Store(value, state.chimp); group_idx++; if (group_idx == ChimpPrimitives::CHIMP_SEQUENCE_SIZE) { FlushGroup(); } } void FlushGroup() { // Has to be called first to flush the last values in the LeadingZeroBuffer state.chimp.Flush(); metadata_ptr -= sizeof(byte_index_t); metadata_byte_size += sizeof(byte_index_t); // Store where this groups data starts, relative to the start of the segment Store(next_group_byte_index_start, metadata_ptr); next_group_byte_index_start = UsedSpace(); const uint8_t leading_zero_block_count = state.chimp.leading_zero_buffer.BlockCount(); // Every 8 values are packed in one block D_ASSERT(leading_zero_block_count <= ChimpPrimitives::CHIMP_SEQUENCE_SIZE / 8); metadata_ptr -= sizeof(uint8_t); metadata_byte_size += sizeof(uint8_t); // Store how many leading zero blocks there are Store(leading_zero_block_count, metadata_ptr); const uint64_t bytes_used_by_leading_zero_blocks = 3 * leading_zero_block_count; metadata_ptr -= bytes_used_by_leading_zero_blocks; metadata_byte_size += bytes_used_by_leading_zero_blocks; // Store the leading zeros (8 per 3 bytes) for this group memcpy((void *)metadata_ptr, (void *)leading_zero_blocks, bytes_used_by_leading_zero_blocks); //! This is max 1024, because it's the amount of flags there are, not the amount of bytes that takes up const uint16_t flag_bytes = state.chimp.flag_buffer.BytesUsed(); #ifdef DEBUG const idx_t padding = (current_segment->count % ChimpPrimitives::CHIMP_SEQUENCE_SIZE) == 0 ? ChimpPrimitives::CHIMP_SEQUENCE_SIZE : 0; const idx_t size_of_group = padding + current_segment->count % ChimpPrimitives::CHIMP_SEQUENCE_SIZE; D_ASSERT((AlignValue(size_of_group - 1) / 4) == flag_bytes); #endif metadata_ptr -= flag_bytes; metadata_byte_size += flag_bytes; // Store the flags (4 per byte) for this group memcpy((void *)metadata_ptr, (void *)flags, flag_bytes); // Store the packed data blocks (2 bytes each) // We dont need to store an extra count for this, // as the count can be derived from unpacking the flags and counting the '1' flags // FIXME: this does stop us from skipping groups with point queries, // because the metadata has a variable size, and we have to extract all flags + iterate them to know this size const uint16_t packed_data_blocks_count = state.chimp.packed_data_buffer.index; metadata_ptr -= packed_data_blocks_count * 2; metadata_byte_size += packed_data_blocks_count * 2; if ((uint64_t)metadata_ptr & 1) { // Align on a two-byte boundary metadata_ptr--; metadata_byte_size++; } memcpy((void *)metadata_ptr, (void *)packed_data_blocks, packed_data_blocks_count * sizeof(uint16_t)); state.chimp.Reset(); group_idx = 0; } // FIXME: only do this if the wasted space meets a certain threshold (>= 20%) void FlushSegment() { if (group_idx) { // Only call this when the group actually has data that needs to be flushed FlushGroup(); } state.chimp.output.Flush(); auto &checkpoint_state = checkpointer.GetCheckpointState(); auto dataptr = handle.Ptr(); // Compact the segment by moving the metadata next to the data. idx_t bytes_used_by_data = ChimpPrimitives::HEADER_SIZE + UsedSpace(); idx_t metadata_offset = AlignValue(bytes_used_by_data); // Verify that the metadata_ptr does not cross this threshold D_ASSERT(dataptr + metadata_offset <= metadata_ptr); idx_t metadata_size = dataptr + Storage::BLOCK_SIZE - metadata_ptr; idx_t total_segment_size = metadata_offset + metadata_size; #ifdef DEBUG uint32_t verify_bytes; memcpy((void *)&verify_bytes, metadata_ptr, 4); #endif memmove(dataptr + metadata_offset, metadata_ptr, metadata_size); #ifdef DEBUG D_ASSERT(verify_bytes == *(uint32_t *)(dataptr + metadata_offset)); #endif // Store the offset of the metadata of the first group (which is at the highest address). Store(metadata_offset + metadata_size, dataptr); handle.Destroy(); checkpoint_state.FlushSegment(std::move(current_segment), total_segment_size); } void Finalize() { FlushSegment(); current_segment.reset(); } }; // Compression Functions template unique_ptr ChimpInitCompression(ColumnDataCheckpointer &checkpointer, unique_ptr state) { return make_uniq>(checkpointer, (ChimpAnalyzeState *)state.get()); } template void ChimpCompress(CompressionState &state_p, Vector &scan_vector, idx_t count) { auto &state = (ChimpCompressionState &)state_p; UnifiedVectorFormat vdata; scan_vector.ToUnifiedFormat(count, vdata); state.Append(vdata, count); } template void ChimpFinalizeCompress(CompressionState &state_p) { auto &state = (ChimpCompressionState &)state_p; state.Finalize(); } } // namespace duckdb