Skip to content

feat: new fuse table option enable_parquet_encoding #17675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/query/formats/src/output_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl OutputFormat for ParquetOutputFormat {
return Ok(vec![]);
}
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?;
// Unloading data, enable encoding unconditionally in this case, since ...
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, true)?;
Ok(buf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_sql::BloomIndexColumns;
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING;
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
Expand Down Expand Up @@ -75,6 +76,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
r.insert("transient");
r.insert(OPT_KEY_TEMP_PREFIX);
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING);
r
});

Expand Down Expand Up @@ -196,10 +198,7 @@ pub fn is_valid_bloom_index_columns(
pub fn is_valid_change_tracking(
options: &BTreeMap<String, String>,
) -> databend_common_exception::Result<()> {
if let Some(value) = options.get(OPT_KEY_CHANGE_TRACKING) {
value.to_lowercase().parse::<bool>()?;
}
Ok(())
is_valid_bool_opt(OPT_KEY_CHANGE_TRACKING, options)
}

pub fn is_valid_random_seed(
Expand All @@ -210,3 +209,19 @@ pub fn is_valid_random_seed(
}
Ok(())
}

pub fn is_valid_fuse_parquet_encoding_opt(
options: &BTreeMap<String, String>,
) -> databend_common_exception::Result<()> {
is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, options)
}

fn is_valid_bool_opt(
key: &str,
options: &BTreeMap<String, String>,
) -> databend_common_exception::Result<()> {
if let Some(value) = options.get(key) {
value.parse::<bool>()?;
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt;
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
Expand Down Expand Up @@ -425,6 +426,8 @@ impl CreateTableInterpreter {
is_valid_random_seed(&table_meta.options)?;
// check table level data_retention_period_in_hours
is_valid_data_retention_period(&table_meta.options)?;
// check enable_parquet_encoding
is_valid_fuse_parquet_encoding_opt(&table_meta.options)?;

for table_option in table_meta.options.iter() {
let key = table_option.0.to_lowercase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt;
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -71,6 +72,8 @@ impl Interpreter for SetOptionsInterpreter {
is_valid_row_per_block(&self.plan.set_options)?;
// check data_retention_period
is_valid_data_retention_period(&self.plan.set_options)?;
// check enable_parquet_encoding
is_valid_fuse_parquet_encoding_opt(&self.plan.set_options)?;

// check storage_format
let error_str = "invalid opt for fuse table in alter table statement";
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl<'a> BlockWriter<'a> {
vec![index_block],
&mut data,
TableCompression::None,
false,
)?;
let size = data.len() as u64;
data_accessor.write(&location.0, data).await?;
Expand Down
35 changes: 30 additions & 5 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use parquet::arrow::ArrowWriter;
use parquet::basic::Encoding;
use parquet::file::properties::EnabledStatistics;
use parquet::file::properties::WriterProperties;
use parquet::file::properties::WriterVersion;
use parquet::format::FileMetaData;

/// Serialize data blocks to parquet format.
Expand All @@ -30,17 +31,41 @@ pub fn blocks_to_parquet(
blocks: Vec<DataBlock>,
write_buffer: &mut Vec<u8>,
compression: TableCompression,
enable_encoding: bool,
) -> Result<FileMetaData> {
assert!(!blocks.is_empty());
let props = WriterProperties::builder()
let builder = WriterProperties::builder()
.set_compression(compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::None)
.set_bloom_filter_enabled(false)
.build();
.set_bloom_filter_enabled(false);

let builder = if enable_encoding {
// Enable dictionary encoding and fallback encodings.
//
// Memo for quick lookup:
// The fallback encoding "strategy" used by parquet-54.2.1 is:
//
// ~~~
// (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
// (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
// (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
// (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
// (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
// _ => Encoding::PLAIN,
// ~~~
//
builder
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_dictionary_enabled(true)
} else {
builder
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
};

let props = builder.build();
let batches = blocks
.into_iter()
.map(|block| block.to_record_batch(table_schema))
Expand Down
3 changes: 1 addition & 2 deletions src/query/storages/fuse/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block";
pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page";
pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold";
pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size";

pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";

pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";
pub const FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING: &str = "enable_parquet_encoding";

pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";
Expand Down
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ use crate::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
use crate::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING;
use crate::FUSE_OPT_KEY_FILE_SIZE;
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
use crate::FUSE_OPT_KEY_ROW_PER_PAGE;
Expand Down Expand Up @@ -272,11 +273,14 @@ impl FuseTable {
let block_per_seg =
self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);

let enable_parquet_encoding = self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, false);

WriteSettings {
storage_format: self.storage_format,
table_compression: self.table_compression,
max_page_size,
block_per_seg,
enable_parquet_encoding,
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ pub fn serialize_block(
let schema = Arc::new(schema.remove_virtual_computed_fields());
match write_settings.storage_format {
FuseStorageFormat::Parquet => {
let result =
blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?;
let result = blocks_to_parquet(
&schema,
vec![block],
buf,
write_settings.table_compression,
write_settings.enable_parquet_encoding,
)?;
let meta = column_parquet_metas(&result, &schema)?;
Ok(meta)
}
Expand Down Expand Up @@ -210,6 +215,7 @@ impl BloomIndexState {
vec![index_block],
&mut data,
TableCompression::None,
false,
)?;
let data_size = data.len() as u64;
Ok(Self {
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/io/write/write_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct WriteSettings {
pub max_page_size: usize,

pub block_per_seg: usize,
pub enable_parquet_encoding: bool,
}

impl Default for WriteSettings {
Expand All @@ -35,6 +36,7 @@ impl Default for WriteSettings {
table_compression: TableCompression::default(),
max_page_size: DEFAULT_ROW_PER_PAGE,
block_per_seg: DEFAULT_BLOCK_PER_SEGMENT,
enable_parquet_encoding: false,
}
}
}
2 changes: 2 additions & 0 deletions src/query/storages/result_cache/src/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ impl ResultCacheWriter {
#[async_backtrace::framed]
pub async fn write_to_storage(&self) -> Result<String> {
let mut buf = Vec::with_capacity(self.current_bytes);
// TODO doc why encoding is not enabled
let _ = blocks_to_parquet(
&self.schema,
self.blocks.clone(),
&mut buf,
TableCompression::None,
false,
)?;

let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
statement ok
create or replace database test_tbl_opt_parquet_encoding;

statement ok
use test_tbl_opt_parquet_encoding;

#############################################
# Create table with parquet encoding option #
#############################################

statement ok
create or replace table t_encoded (c int, s string) enable_parquet_encoding = 'true' compression = 'lz4';

statement ok
create or replace table t(c int, s string) compression = 'lz4';

statement ok
insert into t_encoded(c, s) select number as c, to_string(number) as s from numbers(1000000);

statement ok
optimize table t_encoded compact;

statement ok
insert into t(c, s) select number as c, to_string(number) as s from numbers(1000000);

statement ok
optimize table t compact;

# In this case, lz4 with encoding produces smaller block files
query T
with
e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't_encoded') limit 1),
p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't') limit 1)
select e.c < p.c from e, p
----
1


################################
# Alter table parquet encoding #
################################


# 1. prepare plain encoded data and keep the file size
statement ok
create or replace table tbl (c int, s string) compression = 'lz4';

statement ok
insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000);

# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented
statement ok
optimize table tbl compact;

statement ok
create temp table tbl_size(s uint64);

statement ok
insert into tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1;


# 2. truncate table data and insert the same data with parquet encoding enabled
statement ok
truncate table tbl;

statement ok
ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'true');

statement ok
insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000);

# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented, let's compact them
statement ok
optimize table tbl compact;


# 3. check that file size of newly created blocks with encoding is smaller

query T
with
e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1),
p as (select s as c from tbl_size)
select e.c < p.c from e,p
----
1

# keep the size, will be used later
statement ok
create temp table e_tbl_size(s uint64);

statement ok
insert into e_tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1;

# 4. check that table option `enable_parquet_encoding` could be turned off

statement ok
truncate table tbl;

statement ok
ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'false');

statement ok
insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000);

statement ok
optimize table tbl compact;


# 3. check that file size of newly created blocks with encoding is smaller
query T
with
p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1),
e as (select s as c from e_tbl_size)
select e.c < p.c from e,p
----
1


# Test invalid option value

statement error 1001
ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'invalid');


Loading