Skip to content

HTTP WRITE support #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions extension/httpfs/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "duckdb/main/client_context.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/main/secret/secret_manager.hpp"
#include "duckdb/storage/buffer_manager.hpp"
#include "http_state.hpp"

#include <chrono>
Expand Down Expand Up @@ -59,6 +60,7 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr<FileOpener> opener, optional_ptr<Fi
info);
FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result.ca_cert_file, info);
FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result.hf_max_per_page, info);
FileOpener::TryGetCurrentSetting(opener, "enable_http_write", result.enable_http_write, info);

// HTTP Secret lookups
KeyValueSecretReader settings_reader(*opener, info, "http");
Expand Down Expand Up @@ -576,7 +578,100 @@ int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes)
}

void HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
throw NotImplementedException("Writing to HTTP files not implemented");
auto &hfh = handle.Cast<HTTPFileHandle>();

// Check if HTTP write is enabled
if (!hfh.http_params.enable_http_write) {
throw NotImplementedException("Writing to HTTP files not implemented");
}

if (!buffer || nr_bytes <= 1) {
return;
}

// Initialize the write buffer if it is not already done
if (hfh.write_buffer.empty()) {
hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN);
hfh.write_buffer_idx = 0;
}

idx_t bytes_to_copy = nr_bytes;
idx_t buffer_offset = 0;

// Accumulate data into the write buffer
while (bytes_to_copy > 0) {
idx_t space_in_buffer = hfh.WRITE_BUFFER_LEN - hfh.write_buffer_idx;
idx_t copy_amount = MinValue<idx_t>(space_in_buffer, bytes_to_copy);

// Copy data to the write buffer
memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, (char *)buffer + buffer_offset, copy_amount);
hfh.write_buffer_idx += copy_amount;
bytes_to_copy -= copy_amount;
buffer_offset += copy_amount;

// std::cout << "Write buffer idx after write: " << hfh.write_buffer_idx << std::endl;

// If the buffer is full, send the data
if (hfh.write_buffer_idx == hfh.WRITE_BUFFER_LEN) {
// Perform the HTTP POST request
FlushBuffer(hfh);
}
}

// Update the file offset
hfh.file_offset += nr_bytes;

// std::cout << "Completed Write operation. Total bytes written: " << nr_bytes << std::endl;
}

void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) {
// If no data in buffer, return
if (hfh.write_buffer_idx <= 1) {
return;
}

// Prepare the URL and headers for the HTTP POST request
string path, proto_host_port;
ParseUrl(hfh.path, path, proto_host_port);

HeaderMap header_map;
auto headers = InitializeHeaders(header_map, hfh.http_params);

// Define the request lambda
std::function<duckdb_httplib_openssl::Result(void)> request([&]() {
auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh);
duckdb_httplib_openssl::Request req;
req.method = "POST";
req.path = path;
req.headers = *headers;
req.headers.emplace("Content-Type", "application/octet-stream");

// Prepare the request body from the write buffer
req.body = std::string(reinterpret_cast<const char *>(hfh.write_buffer.data()), hfh.write_buffer_idx);

// std::cout << "Sending request with " << hfh.write_buffer_idx << " bytes of data" << std::endl;

return client->send(req);
});

// Perform the HTTP POST request and handle retries
auto response = RunRequestWithRetry(request, hfh.path, "POST", hfh.http_params);

// Check if the response was successful (HTTP 200-299 status code)
if (response->code < 200 || response->code >= 300) {
throw HTTPException(*response, "HTTP POST request failed to '%s' with status code: %d", hfh.path.c_str(),
response->code);
}

// Reset the write buffer index after sending data
hfh.write_buffer_idx = 0;
}

void HTTPFileHandle::Close() {
auto &fs = (HTTPFileSystem &)file_system;
if (flags.OpenForWriting()) {
fs.FlushBuffer(*this);
}
}

int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
Expand Down Expand Up @@ -829,5 +924,15 @@ ResponseWrapper::ResponseWrapper(duckdb_httplib_openssl::Response &res, string &
body = res.body;
}

HTTPFileHandle::~HTTPFileHandle() = default;
HTTPFileHandle::~HTTPFileHandle() {
if (Exception::UncaughtException()) {
return;
}

try {
Close();
} catch (...) { // NOLINT
}
}

} // namespace duckdb
3 changes: 3 additions & 0 deletions extension/httpfs/httpfs_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ static void LoadInternal(DatabaseInstance &instance) {
LogicalType::BOOLEAN, Value(false));
config.AddExtensionOption("ca_cert_file", "Path to a custom certificate file for self-signed certificates.",
LogicalType::VARCHAR, Value(""));
// Experimental HTTPFS write
config.AddExtensionOption("enable_http_write", "Enable HTTPFS POST write", LogicalType::BOOLEAN, Value(false));

// Global S3 config
config.AddExtensionOption("s3_region", "S3 Region", LogicalType::VARCHAR, Value("us-east-1"));
config.AddExtensionOption("s3_access_key_id", "S3 Access Key ID", LogicalType::VARCHAR);
Expand Down
14 changes: 12 additions & 2 deletions extension/httpfs/include/httpfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct HTTPParams {
static constexpr uint64_t DEFAULT_RETRY_WAIT_MS = 100;
static constexpr float DEFAULT_RETRY_BACKOFF = 4;
static constexpr bool DEFAULT_FORCE_DOWNLOAD = false;
static constexpr bool DEFAULT_ENABLE_HTTP_WRITE = false;
static constexpr bool DEFAULT_KEEP_ALIVE = true;
static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false;
static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0;
Expand All @@ -52,6 +53,7 @@ struct HTTPParams {
uint64_t retry_wait_ms = DEFAULT_RETRY_WAIT_MS;
float retry_backoff = DEFAULT_RETRY_BACKOFF;
bool force_download = DEFAULT_FORCE_DOWNLOAD;
bool enable_http_write = DEFAULT_ENABLE_HTTP_WRITE;
bool keep_alive = DEFAULT_KEEP_ALIVE;
bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION;
idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE;
Expand Down Expand Up @@ -116,6 +118,12 @@ class HTTPFileHandle : public FileHandle {
duckdb::unique_ptr<data_t[]> read_buffer;
constexpr static idx_t READ_BUFFER_LEN = 1000000;

// duckdb::unique_ptr<data_t[]> write_buffer;
constexpr static idx_t WRITE_BUFFER_LEN = 1000000;
std::vector<data_t> write_buffer; // Use a vector instead of a fixed-size array
idx_t write_buffer_idx = 0; // Tracks the current index in the buffer
idx_t current_buffer_len;

shared_ptr<HTTPState> state;

void AddHeaders(HeaderMap &map);
Expand All @@ -126,8 +134,7 @@ class HTTPFileHandle : public FileHandle {
void StoreClient(unique_ptr<duckdb_httplib_openssl::Client> client);

public:
void Close() override {
}
void Close() override;

protected:
//! Create a new Client
Expand All @@ -139,6 +146,8 @@ class HTTPFileHandle : public FileHandle {
};

class HTTPFileSystem : public FileSystem {
friend HTTPFileHandle;

public:
static duckdb::unique_ptr<duckdb_httplib_openssl::Client>
GetClient(const HTTPParams &http_params, const char *proto_host_port, optional_ptr<HTTPFileHandle> hfs);
Expand Down Expand Up @@ -211,6 +220,7 @@ class HTTPFileSystem : public FileSystem {
// Global cache
mutex global_cache_lock;
duckdb::unique_ptr<HTTPMetadataCache> global_metadata_cache;
void FlushBuffer(HTTPFileHandle &hfh);
};

} // namespace duckdb
2 changes: 1 addition & 1 deletion extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ else ()
endif()

duckdb_extension_load(httpfs
DONT_LINK
### DONT_LINK
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}
INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/extension/httpfs/include
${LOAD_HTTPFS_TESTS}
Expand Down
Loading