Skip to main content
Version: 1.9.x

Data Ingestion With ReductStore

Data ingestion is the process of collecting, transferring, and loading data into a system. In ReductStore, data ingestion is the first step in storing data. This guide provides an overview of data ingestion in ReductStore and explains how to ingest data using the ReductStore SDKs or the HTTP API.

Concepts​

Data ingestion in ReductStore is based on HTTP API. Each record is sent as a binary object in the body of the POST request and must have the following information:

  • Bucket name, which must exist in the instance.
  • Entry name, which is a unique identifier of the record in the bucket. If the entry doesn't exist, it will be created.
  • Timestamp as a Unix timestamp in microseconds, which is used for sorting and querying data. Must be unique for each record.

Additionally, a writer can add the following information:

  • Labels as key-value pairs which can be used for annotating and querying data
  • Content type which can be used for data interpretation

ReductStore uses a streaming approach to ingest data. The storage engine receives the data in chunks and streams them in a file system. This enables efficient data ingestion and reduces the memory footprint of the server.

info

In case of little size records, ReductStore provides a batch ingestion mode. In this mode, the server receives a batch of records in a single request to reduce the overhead of the HTTP protocol.

Limitations​

The following limitations are applied to the data ingestion process:

  • Currently, records and their metadata are immutable. Once a record is ingested successfully, it cannot be overwritten or changed.
  • ReductStore doesn't have any limitations on the size of the record, however, the metadata is sent in the HTTP headers of the request and the size of the headers can be limited by the server or client configuration.
  • The storage engine needs to know the total size of a record in advance, even if it's sent in chunks. This allows it to reserve the right amount of space for optimal performance

Typical Data Ingestion Cases​

In this section, you can learn how to implement typical data ingestion cases using the ReductStore SDKs or HTTP API. Please note that all examples have been created for a local ReductStore instance accessible at http://127.0.0.1:8383 using the API token 'my-token'.

For more information on how to set up a local ReductStore instance, refer to the Getting Started guide.

Simple Data Ingestion​

The simplest way to write data into ReductStore is by sending an entire single record. In this case, the record is transmitted as a binary object in the body of the POST request.

import time
import asyncio
from reduct import Client, Bucket


async def main():
# Create a client instance, then get or create a bucket
client = Client("http://127.0.0.1:8383", api_token="my-token")
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)

# Send a record to the "py-example" entry with the current timestamp
ts = time.time()
await bucket.write("py-example", b"Some binary data", ts)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Streaming Data​

For large records, using the streaming approach is recommended. Here, a client application sends a record in chunks, avoiding loading the entire record into memory. This approach is particularly beneficial for real-time data ingestion, where data is received from a stream and needs to be sent to ReductStore without buffering.

info

Despite sending the record in chunks, the size of the record must be known beforehand due to the limitations mentioned above.

import io
import time
import asyncio
from reduct import Client, Bucket

IO_BUFFER = io.BytesIO(b"Some let's say huge binary data")


async def main():
# Create a client instance, then get or create a bucket
client = Client("http://127.0.0.1:8383", api_token="my-token")
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)

# Async iterator that reads data from in chunks
async def data_reader():
while True:
data = IO_BUFFER.read(5) # Read in chunks of 5 bytes
if not data:
break
yield data

# Stream the buffer to the "py-example" entry with the current timestamp
ts = time.time()
await bucket.write("py-example", data_reader(), ts, content_length=IO_BUFFER.tell())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Annotating Data​

ReductStore allows you to annotate records by adding labels. Labels, which are key-value pairs, can be used to filter and query data. You can also specify the data type by adding a content type to the record, which can help interpret the data on the client side.

import time
import asyncio
from reduct import Client, Bucket


async def main():
# Create a client instance, then get or create a bucket
client = Client("http://127.0.0.1:8383", api_token="my-token")
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)

# Send a record with labels and content type
ts = time.time()
await bucket.write(
"py-example",
b"Some binary data",
ts,
labels={"name": "example", "type": "simple"},
content_type="text/plain",
)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Batching Data​

For smaller records, it is recommended to use batch ingestion mode. This mode allows a client application to send a batch of records in a single request, reducing the HTTP protocol overhead.

If the request is valid but one or more records contain errors, the server does not return an HTTP error. Instead, it provides a map of the records with errors so that the client side can verify them.

import time
import asyncio
from typing import Dict

from reduct import Client, Bucket, Batch, ReductError


async def main():
# Create a client instance, then get or create a bucket
client = Client("http://127.0.0.1:8383", api_token="my-token")
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)

# Prepare a batch of records
batch = Batch()
batch.add(
"2024-02-02T10:00:00",
b"Records #1",
)
batch.add(
"2024-02-02T10:00:01",
b"Records #2",
)
batch.add(
"2024-02-02T10:00:02",
b"Records #3",
)

# Write the batch to the "py-example" entry of the bucket
errors: Dict[int, ReductError] = await bucket.write_batch("py-example", batch)

# Check statuses and raise first error
for timestamp, err in errors.items():
raise err


loop = asyncio.get_event_loop()
loop.run_until_complete(main())