diff --git a/src/query/formats/src/output_format/parquet.rs b/src/query/formats/src/output_format/parquet.rs index 589a76250edba..86398eb3709c1 100644 --- a/src/query/formats/src/output_format/parquet.rs +++ b/src/query/formats/src/output_format/parquet.rs @@ -53,7 +53,8 @@ impl OutputFormat for ParquetOutputFormat { return Ok(vec![]); } let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?; + // Unloading data, enable encoding unconditionally in this case, since ... + let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, true)?; Ok(buf) } } diff --git a/src/query/service/src/interpreters/common/table_option_validation.rs b/src/query/service/src/interpreters/common/table_option_validation.rs index a55a4383d468c..adb01d3e46870 100644 --- a/src/query/service/src/interpreters/common/table_option_validation.rs +++ b/src/query/service/src/interpreters/common/table_option_validation.rs @@ -26,6 +26,7 @@ use databend_common_sql::BloomIndexColumns; use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; +use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING; use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; @@ -75,6 +76,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock> = LazyLock::new( r.insert("transient"); r.insert(OPT_KEY_TEMP_PREFIX); r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH); + r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING); r }); @@ -196,10 +198,7 @@ pub fn is_valid_bloom_index_columns( pub fn is_valid_change_tracking( options: &BTreeMap, ) -> databend_common_exception::Result<()> { - if let Some(value) = options.get(OPT_KEY_CHANGE_TRACKING) { - value.to_lowercase().parse::()?; - } - Ok(()) + is_valid_bool_opt(OPT_KEY_CHANGE_TRACKING, options) } pub fn is_valid_random_seed( @@ -210,3 +209,19 @@ pub fn is_valid_random_seed( } Ok(()) } + +pub fn is_valid_fuse_parquet_encoding_opt( + options: &BTreeMap, +) -> databend_common_exception::Result<()> { + is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, options) +} + +fn is_valid_bool_opt( + key: &str, + options: &BTreeMap, +) -> databend_common_exception::Result<()> { + if let Some(value) = options.get(key) { + value.parse::()?; + } + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 31c3991187a94..2103a81d4b4b8 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -64,6 +64,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c use crate::interpreters::common::table_option_validation::is_valid_change_tracking; use crate::interpreters::common::table_option_validation::is_valid_create_opt; use crate::interpreters::common::table_option_validation::is_valid_data_retention_period; +use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt; use crate::interpreters::common::table_option_validation::is_valid_random_seed; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table; @@ -425,6 +426,8 @@ impl CreateTableInterpreter { is_valid_random_seed(&table_meta.options)?; // check table level data_retention_period_in_hours is_valid_data_retention_period(&table_meta.options)?; + // check enable_parquet_encoding + is_valid_fuse_parquet_encoding_opt(&table_meta.options)?; for table_option in table_meta.options.iter() { let key = table_option.0.to_lowercase(); diff --git a/src/query/service/src/interpreters/interpreter_table_set_options.rs b/src/query/service/src/interpreters/interpreter_table_set_options.rs index 2147a607ae162..2435565ce588b 100644 --- a/src/query/service/src/interpreters/interpreter_table_set_options.rs +++ b/src/query/service/src/interpreters/interpreter_table_set_options.rs @@ -35,6 +35,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns; use crate::interpreters::common::table_option_validation::is_valid_create_opt; use crate::interpreters::common::table_option_validation::is_valid_data_retention_period; +use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -71,6 +72,8 @@ impl Interpreter for SetOptionsInterpreter { is_valid_row_per_block(&self.plan.set_options)?; // check data_retention_period is_valid_data_retention_period(&self.plan.set_options)?; + // check enable_parquet_encoding + is_valid_fuse_parquet_encoding_opt(&self.plan.set_options)?; // check storage_format let error_str = "invalid opt for fuse table in alter table statement"; diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index 68a5d829e1a82..2794f5e8d2799 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -143,6 +143,7 @@ impl<'a> BlockWriter<'a> { vec![index_block], &mut data, TableCompression::None, + false, )?; let size = data.len() as u64; data_accessor.write(&location.0, data).await?; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 53c32579a2ea8..3c6524487b41e 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -22,6 +22,7 @@ use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; +use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; /// Serialize data blocks to parquet format. @@ -30,17 +31,41 @@ pub fn blocks_to_parquet( blocks: Vec, write_buffer: &mut Vec, compression: TableCompression, + enable_encoding: bool, ) -> Result { assert!(!blocks.is_empty()); - let props = WriterProperties::builder() + let builder = WriterProperties::builder() .set_compression(compression.into()) // use `usize::MAX` to effectively limit the number of row groups to 1 .set_max_row_group_size(usize::MAX) - .set_encoding(Encoding::PLAIN) - .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .build(); + .set_bloom_filter_enabled(false); + + let builder = if enable_encoding { + // Enable dictionary encoding and fallback encodings. + // + // Memo for quick lookup: + // The fallback encoding "strategy" used by parquet-54.2.1 is: + // + // ~~~ + // (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE, + // (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, + // (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, + // (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, + // (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, + // _ => Encoding::PLAIN, + // ~~~ + // + builder + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_dictionary_enabled(true) + } else { + builder + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + }; + + let props = builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index 17ee47854b374..1ecd05dad7c3b 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -18,10 +18,9 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block"; pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page"; pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold"; pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size"; - pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours"; - pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids"; +pub const FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING: &str = "enable_parquet_encoding"; pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b"; pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i"; diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index e3c5bb81682ca..783ca99203da1 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -124,6 +124,7 @@ use crate::FUSE_OPT_KEY_ATTACH_COLUMN_IDS; use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; +use crate::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING; use crate::FUSE_OPT_KEY_FILE_SIZE; use crate::FUSE_OPT_KEY_ROW_PER_BLOCK; use crate::FUSE_OPT_KEY_ROW_PER_PAGE; @@ -272,11 +273,14 @@ impl FuseTable { let block_per_seg = self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + let enable_parquet_encoding = self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, false); + WriteSettings { storage_format: self.storage_format, table_compression: self.table_compression, max_page_size, block_per_seg, + enable_parquet_encoding, } } diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 7d3519aa42d82..955627a668adb 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -71,8 +71,13 @@ pub fn serialize_block( let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = - blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?; + let result = blocks_to_parquet( + &schema, + vec![block], + buf, + write_settings.table_compression, + write_settings.enable_parquet_encoding, + )?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) } @@ -210,6 +215,7 @@ impl BloomIndexState { vec![index_block], &mut data, TableCompression::None, + false, )?; let data_size = data.len() as u64; Ok(Self { diff --git a/src/query/storages/fuse/src/io/write/write_settings.rs b/src/query/storages/fuse/src/io/write/write_settings.rs index f49a6c9d5b007..ccf6e40e154ee 100644 --- a/src/query/storages/fuse/src/io/write/write_settings.rs +++ b/src/query/storages/fuse/src/io/write/write_settings.rs @@ -26,6 +26,7 @@ pub struct WriteSettings { pub max_page_size: usize, pub block_per_seg: usize, + pub enable_parquet_encoding: bool, } impl Default for WriteSettings { @@ -35,6 +36,7 @@ impl Default for WriteSettings { table_compression: TableCompression::default(), max_page_size: DEFAULT_ROW_PER_PAGE, block_per_seg: DEFAULT_BLOCK_PER_SEGMENT, + enable_parquet_encoding: false, } } } diff --git a/src/query/storages/result_cache/src/write/writer.rs b/src/query/storages/result_cache/src/write/writer.rs index 0ba10167d2762..a2299b2832daa 100644 --- a/src/query/storages/result_cache/src/write/writer.rs +++ b/src/query/storages/result_cache/src/write/writer.rs @@ -72,11 +72,13 @@ impl ResultCacheWriter { #[async_backtrace::framed] pub async fn write_to_storage(&self) -> Result { let mut buf = Vec::with_capacity(self.current_bytes); + // TODO doc why encoding is not enabled let _ = blocks_to_parquet( &self.schema, self.blocks.clone(), &mut buf, TableCompression::None, + false, )?; let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple()); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test new file mode 100644 index 0000000000000..e80ae5b6c1a62 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test @@ -0,0 +1,124 @@ +statement ok +create or replace database test_tbl_opt_parquet_encoding; + +statement ok +use test_tbl_opt_parquet_encoding; + +############################################# +# Create table with parquet encoding option # +############################################# + +statement ok +create or replace table t_encoded (c int, s string) enable_parquet_encoding = 'true' compression = 'lz4'; + +statement ok +create or replace table t(c int, s string) compression = 'lz4'; + +statement ok +insert into t_encoded(c, s) select number as c, to_string(number) as s from numbers(1000000); + +statement ok +optimize table t_encoded compact; + +statement ok +insert into t(c, s) select number as c, to_string(number) as s from numbers(1000000); + +statement ok +optimize table t compact; + +# In this case, lz4 with encoding produces smaller block files +query T +with + e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't_encoded') limit 1), + p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't') limit 1) + select e.c < p.c from e, p +---- +1 + + +################################ +# Alter table parquet encoding # +################################ + + +# 1. prepare plain encoded data and keep the file size +statement ok +create or replace table tbl (c int, s string) compression = 'lz4'; + +statement ok +insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000); + +# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented +statement ok +optimize table tbl compact; + +statement ok +create temp table tbl_size(s uint64); + +statement ok +insert into tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1; + + +# 2. truncate table data and insert the same data with parquet encoding enabled +statement ok +truncate table tbl; + +statement ok +ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'true'); + +statement ok +insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000); + +# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented, let's compact them +statement ok +optimize table tbl compact; + + +# 3. check that file size of newly created blocks with encoding is smaller + +query T +with + e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1), + p as (select s as c from tbl_size) + select e.c < p.c from e,p +---- +1 + +# keep the size, will be used later +statement ok +create temp table e_tbl_size(s uint64); + +statement ok +insert into e_tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1; + +# 4. check that table option `enable_parquet_encoding` could be turned off + +statement ok +truncate table tbl; + +statement ok +ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'false'); + +statement ok +insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000); + +statement ok +optimize table tbl compact; + + +# 3. check that file size of newly created blocks with encoding is smaller +query T +with + p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1), + e as (select s as c from e_tbl_size) + select e.c < p.c from e,p +---- +1 + + +# Test invalid option value + +statement error 1001 +ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'invalid'); + +