Skip to main content

Quick Start with the ReductStore Python SDK

· 6 min read
Alexey Timin
Software Engineer - Database, Rust, C++

This quick start guide will walk you through the process of installing and using the ReductStore Python client SDK to interact with a ReductStore instance.

Installing the SDK

To install the ReductStore SDK, you will need to have Python 3.7 or higher installed on your machine. Once Python is installed, you can use the pip package manager to install the reduct-py package:

pip install reduct-py

Running ReductStore as a Docker Container

If you don't already have a ReductStore instance running, you can easily spin one up as a Docker container. To do so, run the following command:

docker run -p 8383:8383 reductstore/reductstore

This will start a ReductStore instance listening on port 8383 of your local machine.

Using the SDK

Now that you have the reduct-py SDK installed and a ReductStore instance running, you can start using the SDK to interact with the ReductStore service.

Here is an example of using the SDK to perform a few different operations on a bucket:

from pathlib import Path
from time import time_ns

from reduct import Client, BucketSettings, QuotaType

CURRENT_FILE = Path(__file__)


async def main():
# Create a ReductStore client
client = Client("http://localhost:8383")

# Get or create a bucket with 1Gb quota
bucket = await client.create_bucket(
"my-bucket",
BucketSettings(quota_type=QuotaType.FIFO, quota_size=1_000_000_000),
exist_ok=True,
)

# The simplest case. Write some data with the current timestamp
await bucket.write("entry-1", b"Hello, World!")

# More complex case. Upload a file in chunks with a custom timestamp unix timestamp in microseconds
async def file_reader():
"""Read the current example in chunks of 50 bytes"""
with open(CURRENT_FILE, "rb") as file:
while True:
data = file.read(50) # Read in chunks of 50 bytes
if not data:
break
yield data

ts = int(time_ns() / 10000)
await bucket.write(
"entry-1",
file_reader(),
timestamp=ts,
content_length=CURRENT_FILE.stat().st_size,
)

# The simplest case. Read the data by a certain ts
async with bucket.read("entry-1", timestamp=ts) as record:
print(f"Record timestamp: {record.timestamp}")
print(f"Record size: {record.size}")
print(await record.read_all())

# More complex case. Iterate over all records in the entry and read them in chunks
async for record in bucket.query("entry-1"):
print(f"Record timestamp: {record.timestamp}")
print(f"Record size: {record.size}")
async for chunk in record.read(50):
print(chunk)


if __name__ == "__main__":
import asyncio

asyncio.run(main())

Let's break down what this example is doing.

Creating a Client

To create a ReductStore client, you can use the Client class from the reduct module. Pass the URL of the ReductStore instance you want to connect to as an argument to the Client constructor:

# Create a ReductStore client
client = Client("http://localhost:8383")

Creating a Bucket

To get or create a bucket, you can use the create_bucket method on a Client instance. Pass the name of the bucket you want to get or create as an argument, along with a BucketSettings object to specify the desired quota for the bucket. Set the exist_ok argument to True to create the bucket if it does not exist:

# Get or create a bucket with 1Gb quota
bucket = await client.create_bucket(
"my-bucket",
BucketSettings(quota_type=QuotaType.FIFO, quota_size=1_000_000_000),
exist_ok=True,
)

Writing data to a Bucket

To write data to an entry in a bucket with the current timestamp, you can use the write method on a Bucket instance. Pass the name of the entry you want to write to as an argument, along with the data you want to write. The write method will automatically use the current timestamp when writing the data to the entry:

# The simplest case. Write some data with the current timestamp
await bucket.write("entry-1", b"Hello, World!")

This is the simplest case for writing data to an entry in a bucket using the SDK. The write method also allows you to specify a custom timestamp. Let's look at an example of uploading data in chunks with a custom timestamp:

To upload a file in chunks to an entry in a bucket, pass the name of the entry you want to write to as an argument, along with a generator function that yields the file data in chunks. Specify the total size of the file with the content_length argument, and specify a custom timestamp with the timestamp argument:

# More complex case. Upload a file in chunks with a custom timestamp UNIX timestamp in microseconds
async def file_reader():
"""Read the current example in chunks of 50 bytes"""
with open(CURRENT_FILE, "rb") as file:
while True:
data = file.read(50) # Read in chunks of 50 bytes
if not data:
break
yield data

ts = int(time_ns() / 10000)
await bucket.write(
"entry-1",
file_reader(),
timestamp=ts,
content_length=CURRENT_FILE.stat().st_size,
)

This code snippet shows how to use the write method to upload a file in chunks to an entry in a bucket. The generator function file_reader reads the file in chunks of 50 bytes and yields the data. The write method reads the data from the generator and writes it to the entry in the bucket.

Reading data from a Bucket

To read data from an entry in a bucket, you can use the read method on a Bucket instance. Pass the name of the entry you want to read from and the timestamp of the specific record you want to read as arguments. Use the async with statement to open the record, and then use the read_all method to read all of the data in the record:

# The simplest case. Read the data by a certain ts
async with bucket.read("entry-1", timestamp=ts) as record:
print(f"Record timestamp: {record.timestamp}")
print(f"Record size: {record.size}")
print(await record.read_all())

To iterate over all records in an entry, you can use the query method on a Bucket instance. Pass the name of the entry you want to iterate over as an argument. Use a for loop to iterate over the records, and use the read method on each record to read the data in chunks:

# More complex case. Iterate over all records in the entry and read them in chunks
async for record in bucket.query("entry-1"):
print(f"Record timestamp: {record.timestamp}")
print(f"Record size: {record.size}")
async for chunk in record.read(50):
print(chunk)

The query method returns an async iterator to the records in the entry. By default, the method returns all records in the entry. However, you can use the start and stop arguments to specify a time interval for the records you want to retrieve. The start argument specifies the beginning of the time interval, and the stop argument specifies the end of the time interval. Both arguments are timestamps in UNIX microseconds.

start_ts = 1609459200000000  # January 1, 2021 at 00:00:00
stop_ts = 1609827200000000 # January 31, 2021 at 23:59:59
async for record in bucket.query("entry-1", start=start_ts, stop=stop_ts):
# Process the record

if you have any questions or feedback, don't hesitate to use the ReductStore Community forum.

Share
Subscribe to our blogs