From 20169b3280f461f7fb898a2472fdc64cdfd50cc8 Mon Sep 17 00:00:00 2001 From: lmangani Date: Tue, 4 Mar 2025 16:58:50 +0000 Subject: [PATCH] HTTP WRITE support --- extension/httpfs/httpfs.cpp | 109 +++++++++++++++++++++++++- extension/httpfs/httpfs_extension.cpp | 3 + extension/httpfs/include/httpfs.hpp | 14 +++- extension_config.cmake | 2 +- 4 files changed, 123 insertions(+), 5 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 6e1f635..b93b9ce 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -12,6 +12,7 @@ #include "duckdb/main/client_context.hpp" #include "duckdb/main/database.hpp" #include "duckdb/main/secret/secret_manager.hpp" +#include "duckdb/storage/buffer_manager.hpp" #include "http_state.hpp" #include @@ -59,6 +60,7 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr opener, optional_ptr(); + + // Check if HTTP write is enabled + if (!hfh.http_params.enable_http_write) { + throw NotImplementedException("Writing to HTTP files not implemented"); + } + + if (!buffer || nr_bytes <= 1) { + return; + } + + // Initialize the write buffer if it is not already done + if (hfh.write_buffer.empty()) { + hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN); + hfh.write_buffer_idx = 0; + } + + idx_t bytes_to_copy = nr_bytes; + idx_t buffer_offset = 0; + + // Accumulate data into the write buffer + while (bytes_to_copy > 0) { + idx_t space_in_buffer = hfh.WRITE_BUFFER_LEN - hfh.write_buffer_idx; + idx_t copy_amount = MinValue(space_in_buffer, bytes_to_copy); + + // Copy data to the write buffer + memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, (char *)buffer + buffer_offset, copy_amount); + hfh.write_buffer_idx += copy_amount; + bytes_to_copy -= copy_amount; + buffer_offset += copy_amount; + + // std::cout << "Write buffer idx after write: " << hfh.write_buffer_idx << std::endl; + + // If the buffer is full, send the data + if (hfh.write_buffer_idx == hfh.WRITE_BUFFER_LEN) { + // Perform the HTTP POST request + FlushBuffer(hfh); + } + } + + // Update the file offset + hfh.file_offset += nr_bytes; + + // std::cout << "Completed Write operation. Total bytes written: " << nr_bytes << std::endl; +} + +void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) { + // If no data in buffer, return + if (hfh.write_buffer_idx <= 1) { + return; + } + + // Prepare the URL and headers for the HTTP POST request + string path, proto_host_port; + ParseUrl(hfh.path, path, proto_host_port); + + HeaderMap header_map; + auto headers = InitializeHeaders(header_map, hfh.http_params); + + // Define the request lambda + std::function request([&]() { + auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); + duckdb_httplib_openssl::Request req; + req.method = "POST"; + req.path = path; + req.headers = *headers; + req.headers.emplace("Content-Type", "application/octet-stream"); + + // Prepare the request body from the write buffer + req.body = std::string(reinterpret_cast(hfh.write_buffer.data()), hfh.write_buffer_idx); + + // std::cout << "Sending request with " << hfh.write_buffer_idx << " bytes of data" << std::endl; + + return client->send(req); + }); + + // Perform the HTTP POST request and handle retries + auto response = RunRequestWithRetry(request, hfh.path, "POST", hfh.http_params); + + // Check if the response was successful (HTTP 200-299 status code) + if (response->code < 200 || response->code >= 300) { + throw HTTPException(*response, "HTTP POST request failed to '%s' with status code: %d", hfh.path.c_str(), + response->code); + } + + // Reset the write buffer index after sending data + hfh.write_buffer_idx = 0; +} + +void HTTPFileHandle::Close() { + auto &fs = (HTTPFileSystem &)file_system; + if (flags.OpenForWriting()) { + fs.FlushBuffer(*this); + } } int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) { @@ -829,5 +924,15 @@ ResponseWrapper::ResponseWrapper(duckdb_httplib_openssl::Response &res, string & body = res.body; } -HTTPFileHandle::~HTTPFileHandle() = default; +HTTPFileHandle::~HTTPFileHandle() { + if (Exception::UncaughtException()) { + return; + } + + try { + Close(); + } catch (...) { // NOLINT + } +} + } // namespace duckdb diff --git a/extension/httpfs/httpfs_extension.cpp b/extension/httpfs/httpfs_extension.cpp index 536dce4..828eb5e 100644 --- a/extension/httpfs/httpfs_extension.cpp +++ b/extension/httpfs/httpfs_extension.cpp @@ -39,6 +39,9 @@ static void LoadInternal(DatabaseInstance &instance) { LogicalType::BOOLEAN, Value(false)); config.AddExtensionOption("ca_cert_file", "Path to a custom certificate file for self-signed certificates.", LogicalType::VARCHAR, Value("")); + // Experimental HTTPFS write + config.AddExtensionOption("enable_http_write", "Enable HTTPFS POST write", LogicalType::BOOLEAN, Value(false)); + // Global S3 config config.AddExtensionOption("s3_region", "S3 Region", LogicalType::VARCHAR, Value("us-east-1")); config.AddExtensionOption("s3_access_key_id", "S3 Access Key ID", LogicalType::VARCHAR); diff --git a/extension/httpfs/include/httpfs.hpp b/extension/httpfs/include/httpfs.hpp index 9dc9eda..161aabd 100644 --- a/extension/httpfs/include/httpfs.hpp +++ b/extension/httpfs/include/httpfs.hpp @@ -42,6 +42,7 @@ struct HTTPParams { static constexpr uint64_t DEFAULT_RETRY_WAIT_MS = 100; static constexpr float DEFAULT_RETRY_BACKOFF = 4; static constexpr bool DEFAULT_FORCE_DOWNLOAD = false; + static constexpr bool DEFAULT_ENABLE_HTTP_WRITE = false; static constexpr bool DEFAULT_KEEP_ALIVE = true; static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; @@ -52,6 +53,7 @@ struct HTTPParams { uint64_t retry_wait_ms = DEFAULT_RETRY_WAIT_MS; float retry_backoff = DEFAULT_RETRY_BACKOFF; bool force_download = DEFAULT_FORCE_DOWNLOAD; + bool enable_http_write = DEFAULT_ENABLE_HTTP_WRITE; bool keep_alive = DEFAULT_KEEP_ALIVE; bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; @@ -116,6 +118,12 @@ class HTTPFileHandle : public FileHandle { duckdb::unique_ptr read_buffer; constexpr static idx_t READ_BUFFER_LEN = 1000000; + // duckdb::unique_ptr write_buffer; + constexpr static idx_t WRITE_BUFFER_LEN = 1000000; + std::vector write_buffer; // Use a vector instead of a fixed-size array + idx_t write_buffer_idx = 0; // Tracks the current index in the buffer + idx_t current_buffer_len; + shared_ptr state; void AddHeaders(HeaderMap &map); @@ -126,8 +134,7 @@ class HTTPFileHandle : public FileHandle { void StoreClient(unique_ptr client); public: - void Close() override { - } + void Close() override; protected: //! Create a new Client @@ -139,6 +146,8 @@ class HTTPFileHandle : public FileHandle { }; class HTTPFileSystem : public FileSystem { + friend HTTPFileHandle; + public: static duckdb::unique_ptr GetClient(const HTTPParams &http_params, const char *proto_host_port, optional_ptr hfs); @@ -211,6 +220,7 @@ class HTTPFileSystem : public FileSystem { // Global cache mutex global_cache_lock; duckdb::unique_ptr global_metadata_cache; + void FlushBuffer(HTTPFileHandle &hfh); }; } // namespace duckdb diff --git a/extension_config.cmake b/extension_config.cmake index 0046a0b..9ccc06b 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -9,7 +9,7 @@ else () endif() duckdb_extension_load(httpfs - DONT_LINK +### DONT_LINK SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/extension/httpfs/include ${LOAD_HTTPFS_TESTS}