Data Ingestion With ReductStore
Data ingestion is the process of collecting, transferring, and loading data into a system. In ReductStore, data ingestion is the first step in storing data. This guide provides an overview of data ingestion in ReductStore and explains how to ingest data using the ReductStore SDKs or the HTTP API.
Concepts​
Data ingestion in ReductStore is based on HTTP API. Each record is sent as a binary object in the body of the POST request and must have the following information:
- Bucket name, which must exist in the instance.
- Entry name, which is a unique identifier of a data source in the bucket. If the entry doesn't exist, it will be created.
- Timestamp as a Unix timestamp in microseconds, which is used for sorting and querying data. Must be unique for each record.
Additionally, a writer can add the following information:
- Labels as key-value pairs which can be used for annotating and querying data
- Content type which can be used for data interpretation
ReductStore uses a streaming approach to ingest data. The storage engine receives the data in chunks and streams them in a file system. This enables efficient data ingestion and reduces the memory footprint of the server.
In case of little size records, ReductStore provides a batch ingestion mode. In this mode, the server receives a batch of records in a single request to reduce the overhead of the HTTP protocol.
Limitations​
The following limitations are applied to the data ingestion process:
- Currently, records and their metadata are immutable. Once a record is ingested successfully, it cannot be overwritten or changed.
- ReductStore doesn't have any limitations on the size of the record, however, the metadata is sent in the HTTP headers of the request and the size of the headers can be limited by the server or client configuration.
- The storage engine needs to know the total size of a record in advance, even if it's sent in chunks. This allows it to reserve the right amount of space for optimal performance
Typical Data Ingestion Cases​
In this section, you can learn how to implement typical data ingestion cases using the ReductStore SDKs or HTTP API. Please note that all examples have been created for a local ReductStore instance accessible at http://127.0.0.1:8383 using the API token 'my-token'.
For more information on how to set up a local ReductStore instance, refer to the Getting Started guide.
Simple Data Ingestion​
The simplest way to write data into ReductStore is by sending an entire single record. In this case, the record is transmitted as a binary object in the body of the POST request.
- Python
- JavaScript
- Rust
- C++
- cURL
import time
import asyncio
from reduct import Client, Bucket
async def main():
# Create a client instance, then get or create a bucket
async with Client("http://127.0.0.1:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Send a record to the "py-example" entry with the current timestamp
ts = time.time()
await bucket.write("py-example", b"Some binary data", ts)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client } from "reduct-js";
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Send a record to the "entry-1" entry with the current timestamp in microseconds
const timestamp = BigInt(Date.now()) * 1000n;
let record = await bucket.beginWrite("entry-1", timestamp);
await record.write("Some binary data");
use std::time::SystemTime;
use reduct_rs::{ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Send a record to the "rs-example" entry with the current timestamp
let timestamp = SystemTime::now();
bucket
.write_record("rs-example")
.timestamp(timestamp)
.data("Some binary data")
.send()
.await?;
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Send a record to the "cpp-example" entry with the current timestamp
IBucket::Time ts = IBucket::Time::clock::now();
auto err = bucket->Write("cpp-example", ts, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
return 0;
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# Write a record to bucket "example-bucket" and entry "entry_1"
TIME=`date +%s000000`
curl -d "Some binary data" \
-H "${AUTH_HEADER}" \
-X POST -a ${API_PATH}/b/example-bucket/entry_1?ts=${TIME}
Streaming Data​
For large records, using the streaming approach is recommended. Here, a client application sends a record in chunks, avoiding loading the entire record into memory. This approach is particularly beneficial for real-time data ingestion, where data is received from a stream and needs to be sent to ReductStore without buffering.
Despite sending the record in chunks, the size of the record must be known beforehand due to the limitations mentioned above.
- Python
- JavaScript
- Rust
- C++
import io
import time
import asyncio
from reduct import Client, Bucket
IO_BUFFER = io.BytesIO(b"Some let's say huge binary data")
async def main():
# Create a client instance, then get or create a bucket
async with Client("http://127.0.0.1:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Async iterator that reads data from in chunks
async def data_reader():
while True:
data = IO_BUFFER.read(5) # Read in chunks of 5 bytes
if not data:
break
yield data
# Stream the buffer to the "py-example" entry with the current timestamp
ts = time.time()
await bucket.write(
"py-example", data_reader(), ts, content_length=IO_BUFFER.tell()
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client } from "reduct-js";
import { Readable } from "stream";
const DATA = ["Some", "let's", "say", "huge", "binary", "data"];
const stream_to_send = Readable.from(DATA);
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Stream the data to the "entry-1" entry with the current timestamp in microseconds
const timestamp = BigInt(Date.now()) * 1000n;
let record = await bucket.beginWrite("entry-1", timestamp);
await record.write(stream_to_send, DATA.join("").length);
use bytes::Bytes;
use futures::Stream;
use reduct_rs::{ReductClient, ReductError};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::SystemTime;
use tokio;
// A custom stream that will stream items of a vector of bytes
struct CustomStream {
data_to_stream: Vec<Bytes>,
data_len: u64,
}
impl Stream for CustomStream {
// Pay attention to a result with the ReductError type
type Item = Result<Bytes, ReductError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.data_to_stream.is_empty() {
Poll::Ready(None)
} else {
let data = this.data_to_stream.remove(0);
Poll::Ready(Some(Ok(data)))
}
}
}
impl CustomStream {
// Move the data to stream and calculate the total length
fn new(data: Vec<&'static str>) -> Self {
let data_len = data.iter().map(|s| s.len()).sum::<usize>() as u64;
CustomStream {
data_to_stream: data.iter().map(|s| Bytes::from(*s)).collect::<Vec<Bytes>>(),
data_len,
}
}
}
#[tokio::main]
async fn main() -> Result<(), ReductError> {
let stream = CustomStream::new(vec!["Some", "let's", "say", "huge", "binary", "data"]);
let data_len = stream.data_len;
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Send a record to the "rs-example" entry with the current timestamp
let timestamp = SystemTime::now();
bucket
.write_record("rs-example")
.timestamp(timestamp)
.stream(stream)
.content_length(data_len)
.send()
.await?;
Ok(())
}
J
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
std::string data = "Some let's say huge binary data";
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Stream the data to the "cpp-example" entry with the current timestamp
IBucket::Time ts = IBucket::Time::clock::now();
auto err = bucket->Write("cpp-example", ts, [data](auto rec) {
rec->Write(data.size(),
[data](auto offset, auto size) {
// this lambda is called multiple times when a chunk of data is needed to be sent
return std::pair{true, data.substr(offset, size)};
});
});
assert(err == Error::kOk);
return 0;
}
Annotating Data​
ReductStore allows you to annotate records by adding labels. Labels, which are key-value pairs, can be used to filter and query data. You can also specify the data type by adding a content type to the record, which can help interpret the data on the client side.
- Python
- JavaScript
- Rust
- C++
- cURL
import time
import asyncio
from reduct import Client, Bucket
async def main():
# Create a client instance, then get or create a bucket
async with Client("http://127.0.0.1:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Send a record with labels and content type
ts = time.time()
await bucket.write(
"py-example",
b"Some binary data",
ts,
labels={"name": "example", "score": 0.9},
content_type="text/plain",
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client } from "reduct-js";
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Send a record with labels and content type
const timestamp = BigInt(Date.now()) * 1000n;
let record = await bucket.beginWrite("entry-1", {
timestamp: timestamp,
contentType: "plain/text",
labels: {
name: "example",
type: "simple",
},
});
await record.write("Some binary data");
use std::time::SystemTime;
use bytes::Bytes;
use reduct_rs::{ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Send a record with labels and content type
let timestamp = SystemTime::now();
bucket
.write_record("rs-example")
.timestamp(timestamp)
.data(Bytes::from("Hello, World!"))
.add_label("name", "example")
.add_label("type", "simple")
.content_type("text/plain")
.send()
.await?;
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Send a record with labels and content type
IBucket::Time ts = IBucket::Time::clock::now();
auto err = bucket->Write("cpp-example", {
.timestamp = ts,
.labels = {{"name", "example"},
{"type", "simple"}},
.content_type = "text/plain",
}, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
return 0;
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# Send a record with labels and content type
TIME=`date +%s000000`
curl -d "Some binary data" \
-H "${AUTH_HEADER}" \
-H "x-reduct-label-name: example" \
-H "x-reduct-label-score: 0.9" \
-H "Content-Type: plain/text" \
-X POST -a ${API_PATH}/b/example-bucket/entry_1?ts=${TIME}
Batching Data​
For smaller records, it is recommended to use batch ingestion mode. This mode allows a client application to send a batch of records in a single request, reducing the HTTP protocol overhead.
If the request is valid but one or more records contain errors, the server does not return an HTTP error. Instead, it provides a map of the records with errors so that the client side can verify them.
- Python
- JavaScript
- Rust
- C++
import time
import asyncio
from typing import Dict
from reduct import Client, Bucket, Batch, ReductError
async def main():
# Create a client instance, then get or create a bucket
async with Client("http://127.0.0.1:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Prepare a batch of records
batch = Batch()
batch.add(
"2024-02-02T10:00:00",
b"Records #1",
)
batch.add(
"2024-02-02T10:00:01",
b"Records #2",
)
batch.add(
"2024-02-02T10:00:02",
b"Records #3",
)
# Write the batch to the "py-example" entry of the bucket
errors: Dict[int, ReductError] = await bucket.write_batch("py-example", batch)
# Check statuses and raise first error
for timestamp, err in errors.items():
raise err
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client } from "reduct-js";
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Prepare a batch of records
const batch = await bucket.beginWriteBatch("js-example");
const timestamp = BigInt(Date.now()) * 1000n; // Current timestamp in microseconds
batch.add(timestamp, "Records #1");
batch.add(timestamp + 1000000n, "Records #2");
// Send the batch
const errors = await batch.write();
// Check for errors and throw first one
for (const [_timestamp, err] of errors.entries()) {
throw err;
}
use reduct_rs::{RecordBuilder, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Prepare a batch of records
let record_1 = RecordBuilder::new()
.timestamp_us(1_000_000)
.data("Records #1")
.build();
let record_2 = RecordBuilder::new()
.timestamp_us(2_000_000)
.data("Records #2")
.build();
// Send the batch of records to the "rs-example" entry
let errors = bucket
.write_batch("rs-example")
.add_records(vec![record_1, record_2])
.send()
.await?;
// Check if there are any errors and return first one
for (_timestamp, error) in errors {
return Err(error);
}
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
using std::chrono_literals::operator ""s;
int main() {
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Send a batch of records to the "cpp-example" entry
auto [record_errors, http_err] = bucket->WriteBatch("cpp-example", [](auto batch) {
IBucket::Time ts = IBucket::Time::clock::now();
batch->AddRecord(ts, "Records #1");
batch->AddRecord(ts + 1s, "Records #2");
});
assert(http_err == Error::kOk);
// Check if all records were sent successfully
for (auto& [ts, err] : record_errors) {
assert(err == Error::kOk);
}
return 0;
}