Skip to content

feat: impl NgramIndex for FuseTable, improve like query performance #17852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn apply_block_pruning(
let segment_locs = table_snapshot.segments.clone();
let segment_locs = create_segment_location_vector(segment_locs, None);

FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, None)?
FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, vec![], None)?
.read_pruning(segment_locs)
.await
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/functions/src/scalars/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ where for<'a> T::ScalarRef<'a>: DFHash {
);
}

struct CityHasher64 {
pub struct CityHasher64 {
seed: u64,
value: u64,
}

impl CityHasher64 {
fn with_seed(s: u64) -> Self {
pub fn with_seed(s: u64) -> Self {
Self { seed: s, value: 0 }
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/functions/src/scalars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ mod vector;
pub use comparison::ALL_COMP_FUNC_NAMES;
use databend_functions_scalar_arithmetic::arithmetic;
use databend_functions_scalar_numeric_basic_arithmetic::register_numeric_basic_arithmetic;
pub use hash::CityHasher64;
pub use hash::DFHash;
pub use string::ALL_STRING_FUNC_NAMES;

pub fn register(registry: &mut FunctionRegistry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_expression::ComputedExpr;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::Value;
use databend_common_meta_app::schema::TableIndexType;
use databend_common_sql::plans::ShowCreateTablePlan;
use databend_common_storages_fuse::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
use databend_common_storages_stream::stream_table::StreamTable;
Expand Down Expand Up @@ -242,9 +243,14 @@ impl ShowCreateTableInterpreter {
let option = format!("{} = '{}'", key, value);
options.push(option);
}
let index_type = match index_field.index_type {
TableIndexType::Inverted => "INVERTED",
TableIndexType::Ngram => "NGRAM",
};
let mut index_str = format!(
" {} INVERTED INDEX {} ({})",
" {} {} INDEX {} ({})",
sync,
index_type,
display_ident(
&index_field.name,
force_quoted_ident,
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ impl<'a> BlockWriter<'a> {
let bloom_index_cols = BloomIndexColumns::All;
let bloom_columns_map =
bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?;
let mut builder = BloomIndexBuilder::create(FunctionContext::default(), bloom_columns_map);
let mut builder =
BloomIndexBuilder::create(FunctionContext::default(), bloom_columns_map, &[])?;
builder.add_block(block)?;
let maybe_bloom_index = builder.finalize()?;
if let Some(bloom_index) = maybe_bloom_index {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn apply_block_pruning(
let ctx: Arc<dyn TableContext> = ctx;
let segment_locs = table_snapshot.segments.clone();
let segment_locs = create_segment_location_vector(segment_locs, None);
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, None)?
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, vec![], None)?
.read_pruning(segment_locs)
.await
.map(|v| v.into_iter().map(|(_, v)| v).collect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async fn apply_snapshot_pruning(
schema.clone(),
push_down,
bloom_index_cols,
vec![],
None,
)?);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async fn apply_snapshot_pruning(
schema,
push_down,
bloom_index_cols,
vec![],
None,
)?);

Expand Down
47 changes: 43 additions & 4 deletions src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ use crate::MetadataRef;
use crate::RefreshAggregatingIndexRewriter;
use crate::SUPPORTED_AGGREGATING_INDEX_FUNCTIONS;

const MAXIMUM_BLOOM_SIZE: u64 = 10 * 1024 * 1024;
const MINIMUM_BLOOM_SIZE: u64 = 512;

// valid values for inverted index option tokenizer
static INDEX_TOKENIZER_VALUES: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
let mut r = HashSet::new();
Expand Down Expand Up @@ -580,13 +583,49 @@ impl Binder {
let value = val.to_lowercase();
match key.as_str() {
"gram_size" => {
if value.parse::<u32>().is_err() {
return Err(ErrorCode::IndexOptionInvalid(format!(
"value `{value}` is not a legal number",
)));
match value.parse::<usize>() {
Ok(num) => {
if num == 0 {
return Err(ErrorCode::IndexOptionInvalid(
"`gram_size` cannot be 0",
));
}
}
Err(_) => {
return Err(ErrorCode::IndexOptionInvalid(format!(
"value `{value}` is not a legal number",
)));
}
}
options.insert("gram_size".to_string(), value);
}
"bloom_size" => {
match value.parse::<u64>() {
Ok(num) => {
if num == 0 {
return Err(ErrorCode::IndexOptionInvalid(
"`bloom_size` cannot be 0",
));
}
if num < MINIMUM_BLOOM_SIZE {
return Err(ErrorCode::IndexOptionInvalid(format!(
"bloom_size: `{num}` is too small (bloom_size is minimum: {MINIMUM_BLOOM_SIZE})",
)));
}
if num > MAXIMUM_BLOOM_SIZE {
return Err(ErrorCode::IndexOptionInvalid(format!(
"bloom_size: `{num}` is too large (bloom_size is maximum: {MAXIMUM_BLOOM_SIZE})",
)));
}
}
Err(_) => {
return Err(ErrorCode::IndexOptionInvalid(format!(
"value `{value}` is not a legal number",
)));
}
}
options.insert("bloom_size".to_string(), value);
}
_ => {
return Err(ErrorCode::IndexOptionInvalid(format!(
"index option `{key}` is invalid key for create ngram index statement",
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/common/cache/src/cache_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use databend_common_catalog::plan::PartStatistics;
pub use databend_common_catalog::plan::Partitions;
pub use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
pub use databend_storages_common_index::filters::Xor8Filter;
pub use databend_storages_common_index::filters::FilterImpl;
pub use databend_storages_common_index::BloomIndexMeta;
pub use databend_storages_common_index::InvertedIndexFile;
pub use databend_storages_common_index::InvertedIndexMeta;
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
pub type TableSnapshotStatisticCache = InMemoryLruCache<TableSnapshotStatistics>;
/// In memory object cache of bloom filter.
/// For each indexed data block, the bloom xor8 filter of column is cached individually
pub type BloomIndexFilterCache = HybridCache<Xor8Filter>;
pub type BloomIndexFilterCache = HybridCache<FilterImpl>;
/// In memory object cache of parquet FileMetaData of bloom index data
pub type BloomIndexMetaCache = HybridCache<BloomIndexMeta>;

Expand Down Expand Up @@ -123,7 +123,7 @@ impl CachedObject<(PartStatistics, Partitions)> for (PartStatistics, Partitions)
}
}

impl CachedObject<Xor8Filter> for Xor8Filter {
impl CachedObject<FilterImpl> for FilterImpl {
type Cache = BloomIndexFilterCache;
fn cache() -> Option<Self::Cache> {
CacheManager::instance().get_bloom_index_filter_cache()
Expand Down Expand Up @@ -235,10 +235,10 @@ impl From<TableSnapshotStatistics> for CacheValue<TableSnapshotStatistics> {
}
}

impl From<Xor8Filter> for CacheValue<Xor8Filter> {
fn from(value: Xor8Filter) -> Self {
impl From<FilterImpl> for CacheValue<FilterImpl> {
fn from(value: FilterImpl) -> Self {
CacheValue {
mem_bytes: std::mem::size_of::<Xor8Filter>() + value.filter.finger_prints.len(),
mem_bytes: value.mem_bytes(),
inner: Arc::new(value),
}
}
Expand Down
Loading
Loading