Skip to content

feat(query): iceberg data cache #17787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
445 changes: 294 additions & 151 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,15 @@ http = "1"
humantime = "2.1.0"
hyper = "1"
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f", features = [

## in branch dev
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3", features = [
"storage-all",
] }
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f" }
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f" }
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f" }
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3" }
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3" }
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3" }

indexmap = "2.0.0"
indicatif = "0.17.5"
itertools = "0.13.0"
Expand Down Expand Up @@ -450,7 +453,7 @@ reqwest = { version = "0.12", default-features = false, features = [
reqwest-hickory-resolver = "0.2"
ringbuffer = "0.14.2"
rmp-serde = "1.1.1"
roaring = { version = "0.10.1", features = ["serde"] }
roaring = { version = "^0.10", features = ["serde"] }
rotbl = { version = "0.1.2", features = [] }
rust_decimal = "1.26"
rustix = "0.38.37"
Expand Down
23 changes: 21 additions & 2 deletions src/common/base/src/rangemap/range_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,15 @@ pub struct RangeMerger {
}

impl RangeMerger {
pub fn from_iter<I>(iter: I, max_gap_size: u64, max_range_size: u64) -> Self
where I: IntoIterator<Item = Range<u64>> {
pub fn from_iter<I>(
iter: I,
max_gap_size: u64,
max_range_size: u64,
whole_read_size: Option<u64>,
) -> Self
where
I: IntoIterator<Item = Range<u64>>,
{
let mut raw_ranges: Vec<_> = iter.into_iter().collect();
raw_ranges.sort_by(|a, b| a.start.cmp(&b.start));

Expand All @@ -57,6 +64,18 @@ impl RangeMerger {
ranges: Vec::with_capacity(raw_ranges.len()),
};

if let Some(whole_read_size) = whole_read_size {
if !raw_ranges.is_empty() {
let max_end = raw_ranges.iter().map(|r| r.end).max().unwrap_or(0);

if max_end - raw_ranges[0].start <= whole_read_size {
let r = raw_ranges.first().unwrap().start..max_end;
rs.ranges = vec![r];
return rs;
}
}
}

for range in &raw_ranges {
rs.add(range);
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/base/tests/it/range_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl fmt::Display for Array {
fn test_range_merger() -> Result<()> {
let v = [3..6, 1..5, 7..11, 8..9, 9..12, 4..8, 13..15, 18..20];

let mr = RangeMerger::from_iter(v, 0, 100);
let mr = RangeMerger::from_iter(v, 0, 100, None);
let actual = format!("{}", Array(mr.ranges()));
let expect = "[1,12] [13,15] [18,20] ";
assert_eq!(actual, expect);
Expand All @@ -47,7 +47,7 @@ fn test_range_merger_with_gap() -> Result<()> {

// max_gap_size = 1
{
let mr = RangeMerger::from_iter(v.clone(), 1, 100);
let mr = RangeMerger::from_iter(v.clone(), 1, 100, None);
let actual = format!("{}", Array(mr.ranges()));
let expect = "[1,15] [18,20] ";
assert_eq!(actual, expect);
Expand All @@ -65,7 +65,7 @@ fn test_range_merger_with_gap() -> Result<()> {

// max_gap_size = 2
{
let mr = RangeMerger::from_iter(v.clone(), 2, 100);
let mr = RangeMerger::from_iter(v.clone(), 2, 100, None);
let actual = format!("{}", Array(mr.ranges()));
let expect = "[1,15] [18,20] ";
assert_eq!(actual, expect);
Expand All @@ -83,7 +83,7 @@ fn test_range_merger_with_gap() -> Result<()> {

// max_gap_size = 3
{
let mr = RangeMerger::from_iter(v.clone(), 3, 100);
let mr = RangeMerger::from_iter(v.clone(), 3, 100, None);
let actual = format!("{}", Array(mr.ranges()));
let expect = "[1,20] ";
assert_eq!(actual, expect);
Expand All @@ -101,7 +101,7 @@ fn test_range_merger_with_gap() -> Result<()> {

// max_gap_size = 3, max_range_size = 5
{
let mr = RangeMerger::from_iter(v.clone(), 3, 4);
let mr = RangeMerger::from_iter(v.clone(), 3, 4, None);
let actual = format!("{}", Array(mr.ranges()));
let expect = "[1,5] [3,8] [7,11] [8,12] [13,20] ";
assert_eq!(actual, expect);
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ databend-common-native = { workspace = true }
databend-enterprise-storage-encryption = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
opendal = { workspace = true }
parquet = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub use operator::build_operator;
pub use operator::check_operator;
pub use operator::init_operator;
pub use operator::DataOperator;
pub use operator::OperatorRegistry;

pub mod metrics;
pub use crate::metrics::StorageMetrics;
Expand Down
27 changes: 27 additions & 0 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,30 @@ pub async fn check_operator(
))
})
}

pub trait OperatorRegistry: Send + Sync {
fn get_operator_path<'a>(&self, _location: &'a str) -> Result<(Operator, &'a str)>;
}

impl OperatorRegistry for Operator {
fn get_operator_path<'a>(&self, location: &'a str) -> Result<(Operator, &'a str)> {
Ok((self.clone(), location))
}
}

impl OperatorRegistry for DataOperator {
fn get_operator_path<'a>(&self, location: &'a str) -> Result<(Operator, &'a str)> {
Ok((self.operator.clone(), location))
}
}

impl OperatorRegistry for iceberg::io::FileIO {
fn get_operator_path<'a>(&self, location: &'a str) -> Result<(Operator, &'a str)> {
let file_io = self
.new_input(location)
.map_err(|err| std::io::Error::new(ErrorKind::Unsupported, err.message()))?;

let pos = file_io.relative_path_pos();
Ok((file_io.get_operator().clone(), &location[pos..]))
}
}
14 changes: 11 additions & 3 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,20 @@ impl StageFileInfo {
}
}

pub fn dedup_key(&self) -> Option<String> {
pub fn dedup_key(&self) -> String {
// should not use last_modified because the accuracy is in seconds for S3.
if let Some(md5) = &self.md5 {
Some(md5.clone())
md5.clone()
} else {
self.etag.clone()
let last_modified = self
.last_modified
.as_ref()
.map(|x| x.to_string())
.unwrap_or_default();

self.etag
.clone()
.unwrap_or(format!("{}/{last_modified}/{}", self.path, self.size))
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions src/query/catalog/src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use databend_common_meta_store::MetaStore;
use databend_common_meta_store::MetaStoreProvider;
use databend_common_meta_types::anyerror::func_name;
use databend_storages_common_session::SessionState;
use parking_lot::RwLock;

use super::Catalog;
use super::CatalogCreator;
Expand All @@ -54,6 +55,7 @@ pub struct CatalogManager {

/// catalog_creators is the catalog creators that registered.
pub catalog_creators: HashMap<CatalogType, Arc<dyn CatalogCreator>>,
pub catalog_caches: RwLock<HashMap<String, Arc<dyn Catalog>>>,
}

impl CatalogManager {
Expand Down Expand Up @@ -121,6 +123,7 @@ impl CatalogManager {
default_catalog,
external_catalogs,
catalog_creators,
catalog_caches: Default::default(),
};

Ok(Arc::new(catalog_manager))
Expand All @@ -141,16 +144,33 @@ impl CatalogManager {
session_state: SessionState,
) -> Result<Arc<dyn Catalog>> {
let typ = info.meta.catalog_option.catalog_type();

if typ == CatalogType::Default {
return self.get_default_catalog(session_state);
}

let tid = std::thread::current().id();
let key = format!(
"{:?}_{}_{:?}",
info.catalog_name(),
info.meta.created_on.timestamp(),
tid
);

{
let r = self.catalog_caches.read();
if let Some(v) = r.get(&key) {
return Ok(v.clone());
}
}
let creator = self
.catalog_creators
.get(&typ)
.ok_or_else(|| ErrorCode::BadArguments(format!("unknown catalog type: {:?}", typ)))?;
creator.try_create(info)

let v = creator.try_create(info)?;
let mut w = self.catalog_caches.write();
w.insert(key, v.clone());
Ok(v)
}

/// Get a catalog from manager.
Expand All @@ -174,10 +194,8 @@ impl CatalogManager {
if let Some(ctl) = self.external_catalogs.get(catalog_name) {
return Ok(ctl.clone());
}

let tenant = Tenant::new_or_err(tenant, func_name!())?;
let ident = CatalogNameIdent::new(tenant, catalog_name);

// Get catalog from metasrv.
let info = self.meta.get_catalog(&ident).await?;
self.build_catalog(info, session_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::io::Cursor;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
use databend_common_base::base::tokio::sync::Mutex;
use databend_common_expression::ColumnId;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
Expand Down Expand Up @@ -73,10 +72,6 @@ pub struct ParquetTableInfo {
pub compression_ratio: f64,
pub leaf_fields: Arc<Vec<TableField>>,

// These fields are only used in coordinator node of the cluster,
// so we don't need to serialize them.
#[serde(skip)]
pub parquet_metas: Arc<Mutex<Vec<Arc<FullParquetMeta>>>>,
#[serde(skip)]
pub need_stats_provider: bool,
#[serde(skip)]
Expand Down Expand Up @@ -136,7 +131,6 @@ mod tests {
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
use databend_common_base::base::tokio::sync::Mutex;
use databend_common_storage::StageFilesInfo;
use parquet::basic::ConvertedType;
use parquet::basic::Repetition;
Expand Down Expand Up @@ -214,7 +208,6 @@ mod tests {
files_to_read: None,
schema_from: "".to_string(),
compression_ratio: 0.0,
parquet_metas: Arc::new(Mutex::new(vec![])),
need_stats_provider: false,
max_threads: 1,
max_memory_usage: 10000,
Expand Down
Loading
Loading