Skip to main content
Version: Next

Bucket Module

Bucket module for ReductStore HTTP API

Bucket​

class Bucket()

A bucket of data in Reduct Storage

get_settings​

async def get_settings() -> BucketSettings

Get current bucket settings

Returns:

  • BucketSettings - the bucket settings

Raises:

  • ReductError - if there is an HTTP error

set_settings​

async def set_settings(settings: BucketSettings)

Update bucket settings

Arguments:

  • settings - new settings

Raises:

  • ReductError - if there is an HTTP error

info​

async def info() -> BucketInfo

Get statistics about bucket

Returns:

  • BucketInfo - the bucket information

Raises:

  • ReductError - if there is an HTTP error

get_entry_list​

async def get_entry_list() -> List[EntryInfo]

Get list of entries with their stats

Returns:

  • List[EntryInfo] - the list of entries with stats

Raises:

  • ReductError - if there is an HTTP error

remove​

async def remove()

Remove bucket

Raises:

  • ReductError - if there is an HTTP error

remove_entry​

async def remove_entry(entry_name: str)

Remove entry from bucket

Arguments:

  • entry_name - name of entry

Raises:

  • ReductError - if there is an HTTP error

remove_record​

async def remove_record(entry_name: str, timestamp: Union[int, datetime, float,
str])

Remove record from entry

Arguments:

  • entry_name - name of entry
  • timestamp - timestamp of record

Raises:

  • ReductError - if there is an HTTP error

remove_batch​

async def remove_batch(entry_name: str,
batch: Batch) -> Dict[int, ReductError]

Remove batch of records from entries in a sole request

Arguments:

  • entry_name - name of entry in the bucket
  • batch - list of timestamps

Returns:

Dict[int, ReductError]: the dictionary of errors with record timestamps as keys

Raises:

  • ReductError - if there is an HTTP error

remove_query​

async def remove_query(entry_name: str,
start: Optional[Union[int, datetime, float,
str]] = None,
stop: Optional[Union[int, datetime, float, str]] = None,
when: Optional[Dict] = None,
**kwargs) -> int

Query data to remove within a time interval The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).

Arguments:

  • entry_name - name of entry in the bucket
  • start - the beginning of the time interval. If None, then from the first record
  • stop - the end of the time interval. If None, then to the latest record
  • when - condtion to filter

Arguments:

  • include dict - remove records which have all labels from this dict (DEPRECATED use when)
  • exclude dict - remove records which doesn't have all labels from this (DEPRECATED use when) each_s(Union[int, float]): remove a record for each S seconds
  • each_n(int) - remove each N-th record
  • strict(bool) - if True: strict query

Returns:

number of removed records

rename_entry​

async def rename_entry(old_name: str, new_name: str)

Rename entry

Arguments:

  • old_name - old name of entry
  • new_name - new name of entry

Raises:

  • ReductError - if there is an HTTP error

rename​

async def rename(new_name: str)

Rename bucket

Arguments:

  • new_name - new name of bucket

Raises:

  • ReductError - if there is an HTTP error

read​

@asynccontextmanager
async def read(entry_name: str,
timestamp: Optional[Union[int, datetime, float, str]] = None,
head: bool = False) -> AsyncIterator[Record]

Read a record from entry

Arguments:

  • entry_name - name of entry in the bucket. If None: get the latest record. The timestamp can be int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds), or str (ISO 8601 string).
  • timestamp - UNIX timestamp in microseconds - if None: get the latest record
  • head - if True: get only the header of a recod with metadata

Returns:

  • AsyncIterator[Record] - the record object

Raises:

  • ReductError - if there is an HTTP error

Example:

      async def reader():
async with bucket.read("entry", timestamp=123456789) as record:
data = await record.read_all()

write​

async def write(entry_name: str,
data: Union[bytes, AsyncIterator[bytes]],
timestamp: Optional[Union[int, datetime, float, str]] = None,
content_length: Optional[int] = None,
**kwargs)

Write a record to entry

Arguments:

  • entry_name - name of entry in the bucket
  • data - bytes to write or async iterator
  • timestamp - timestamp of record. int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds), str (ISO 8601 string). If None: current time
  • content_length - content size in bytes, needed only when the data is an iterator

Arguments:

  • labels dict - labels as key-values
  • content_type str - content type of data

Raises:

  • ReductError - if there is an HTTP error

Example:

      await bucket.write("entry-1", b"some_data",
timestamp="2021-09-10T10:30:00")

# You can write data chunk-wise using an asynchronous iterator and the
# size of the content:

async def sender():
for chunk in [b"part1", b"part2", b"part3"]:
yield chunk
await bucket.write("entry-1", sender(), content_length=15)

write_batch​

async def write_batch(entry_name: str, batch: Batch) -> Dict[int, ReductError]

Write a batch of records to entries in a sole request

Arguments:

  • entry_name - name of entry in the bucket
  • batch - list of records

Returns:

Dict[int, ReductError]: the dictionary of errors with record timestamps as keys

Raises:

  • ReductError - if there is an HTTP or communication error

update​

async def update(entry_name: str, timestamp: Union[int, datetime, float, str],
labels: Dict[str, str])

Update labels of an existing record If a label doesn't exist, it will be created. If a label is empty, it will be removed.

Arguments:

  • entry_name - name of entry in the bucket
  • timestamp - timestamp of record in microseconds
  • labels - new labels

Raises:

  • ReductError - if there is an HTTP error

Example:

      await bucket.update("entry-1", "2022-01-01T01:00:00",
  • {"label1" - "value1", "label2": ""})

update_batch​

async def update_batch(entry_name: str,
batch: Batch) -> Dict[int, ReductError]

Update labels of existing records If a label doesn't exist, it will be created. If a label is empty, it will be removed.

Arguments:

  • entry_name - name of entry in the bucket
  • batch - dict of timestamps as keys and labels as values

Returns:

Dict[int, ReductError]: the dictionary of errors with record timestamps as keys

Raises:

  • ReductError - if there is an HTTP error

Example:

      batch = Batch()
batch.add(1640995200000000, labels={"label1": "value1", "label2": ""})
await bucket.update_batch("entry-1", batch)

query​

async def query(entry_name: str,
start: Optional[Union[int, datetime, float, str]] = None,
stop: Optional[Union[int, datetime, float, str]] = None,
ttl: Optional[int] = None,
when: Optional[Dict] = None,
**kwargs) -> AsyncIterator[Record]

Query data for a time interval The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).

Arguments:

  • entry_name - name of entry in the bucket
  • start - the beginning of the time interval. If None, then from the first record
  • stop - the end of the time interval. If None, then to the latest record
  • ttl - Time To Live of the request in seconds
  • when - condtion to filter records

Arguments:

  • include dict - query records which have all labels from this dict (DEPRECATED use when)
  • exclude dict - query records which doesn't have all labels from this (DEPRECATED use when)
  • head bool - if True: get only the header of a recod with metadata each_s(Union[int, float]): return a record for each S seconds
  • each_n(int) - return each N-th record
  • limit int - limit the number of records
  • strict(bool) - if True: strict query

Returns:

  • AsyncIterator[Record] - iterator to the records

Example:

      async for record in bucket.query("entry-1", stop=time.time_ns() / 1000):
data: bytes = record.read_all()
# or
async for chunk in record.read(n=1024):
print(chunk)

get_full_info​

async def get_full_info() -> BucketFullInfo

Get full information about bucket (settings, statistics, entries)

Returns:

  • BucketFullInfo - the full information about the bucket

subscribe​

async def subscribe(entry_name: str,
start: Optional[Union[int, datetime, float, str]] = None,
poll_interval=1.0,
when: Optional[Dict] = None,
**kwargs) -> AsyncIterator[Record]

Query records from the start timestamp and wait for new records The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds) datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).

Arguments:

  • entry_name - name of entry in the bucket
  • start - the beginning timestamp to read records. If None, then from the first record.
  • poll_interval - inteval to ask new records in seconds
  • when - condtion to filter records

Arguments:

  • include dict - query records which have all labels from this dict (DEPRECATED use when)
  • exclude dict - query records which doesn't have all labels from this (DEPRECATED use when)
  • head bool - if True: get only the header of a recod with metadata
  • strict(bool) - if True: strict query

Returns:

  • AsyncIterator[Record] - iterator to the records

Example:

      async for record in bucket.subscribes("entry-1"):
data: bytes = record.read_all()
# or
async for chunk in record.read(n=1024):
print(chunk)