Bucket Module
Bucket module for ReductStore HTTP API
Bucket​
class Bucket()
A bucket of data in Reduct Storage
get_settings​
async def get_settings() -> BucketSettings
Get current bucket settings
Returns:
BucketSettings
- the bucket settings
Raises:
ReductError
- if there is an HTTP error
set_settings​
async def set_settings(settings: BucketSettings)
Update bucket settings
Arguments:
settings
- new settings
Raises:
ReductError
- if there is an HTTP error
info​
async def info() -> BucketInfo
Get statistics about bucket
Returns:
BucketInfo
- the bucket information
Raises:
ReductError
- if there is an HTTP error
get_entry_list​
async def get_entry_list() -> List[EntryInfo]
Get list of entries with their stats
Returns:
List[EntryInfo]
- the list of entries with stats
Raises:
ReductError
- if there is an HTTP error
remove​
async def remove()
Remove bucket
Raises:
ReductError
- if there is an HTTP error
remove_entry​
async def remove_entry(entry_name: str)
Remove entry from bucket
Arguments:
entry_name
- name of entry
Raises:
ReductError
- if there is an HTTP error
remove_record​
async def remove_record(entry_name: str, timestamp: Union[int, datetime, float,
str])
Remove record from entry
Arguments:
entry_name
- name of entrytimestamp
- timestamp of record
Raises:
ReductError
- if there is an HTTP error
remove_batch​
async def remove_batch(entry_name: str,
batch: Batch) -> Dict[int, ReductError]
Remove batch of records from entries in a sole request
Arguments:
entry_name
- name of entry in the bucketbatch
- list of timestamps
Returns:
Dict[int, ReductError]: the dictionary of errors with record timestamps as keys
Raises:
ReductError
- if there is an HTTP error
remove_query​
async def remove_query(entry_name: str,
start: Optional[Union[int, datetime, float,
str]] = None,
stop: Optional[Union[int, datetime, float, str]] = None,
when: Optional[Dict] = None,
**kwargs) -> int
Query data to remove within a time interval The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).
Arguments:
entry_name
- name of entry in the bucketstart
- the beginning of the time interval. If None, then from the first recordstop
- the end of the time interval. If None, then to the latest recordwhen
- condtion to filter
Arguments:
include
dict - remove records which have all labels from this dict (DEPRECATED use when)exclude
dict - remove records which doesn't have all labels from this (DEPRECATED use when) each_s(Union[int, float]): remove a record for each S secondseach_n(int)
- remove each N-th recordstrict(bool)
- if True: strict query
Returns:
number of removed records
rename_entry​
async def rename_entry(old_name: str, new_name: str)
Rename entry
Arguments:
old_name
- old name of entrynew_name
- new name of entry
Raises:
ReductError
- if there is an HTTP error
rename​
async def rename(new_name: str)
Rename bucket
Arguments:
new_name
- new name of bucket
Raises:
ReductError
- if there is an HTTP error
read​
@asynccontextmanager
async def read(entry_name: str,
timestamp: Optional[Union[int, datetime, float, str]] = None,
head: bool = False) -> AsyncIterator[Record]
Read a record from entry
Arguments:
entry_name
- name of entry in the bucket. If None: get the latest record. The timestamp can be int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds), or str (ISO 8601 string).timestamp
- UNIX timestamp in microseconds - if None: get the latest recordhead
- if True: get only the header of a recod with metadata
Returns:
AsyncIterator[Record]
- the record object
Raises:
ReductError
- if there is an HTTP error
Example:
async def reader():
async with bucket.read("entry", timestamp=123456789) as record:
data = await record.read_all()
write​
async def write(entry_name: str,
data: Union[bytes, AsyncIterator[bytes]],
timestamp: Optional[Union[int, datetime, float, str]] = None,
content_length: Optional[int] = None,
**kwargs)
Write a record to entry
Arguments:
entry_name
- name of entry in the bucketdata
- bytes to write or async iteratortimestamp
- timestamp of record. int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds), str (ISO 8601 string). If None: current timecontent_length
- content size in bytes, needed only when the data is an iterator
Arguments:
labels
dict - labels as key-valuescontent_type
str - content type of data
Raises:
ReductError
- if there is an HTTP error
Example:
await bucket.write("entry-1", b"some_data",
timestamp="2021-09-10T10:30:00")
# You can write data chunk-wise using an asynchronous iterator and the
# size of the content:
async def sender():
for chunk in [b"part1", b"part2", b"part3"]:
yield chunk
await bucket.write("entry-1", sender(), content_length=15)
write_batch​
async def write_batch(entry_name: str, batch: Batch) -> Dict[int, ReductError]
Write a batch of records to entries in a sole request
Arguments:
entry_name
- name of entry in the bucketbatch
- list of records
Returns:
Dict[int, ReductError]: the dictionary of errors with record timestamps as keys
Raises:
ReductError
- if there is an HTTP or communication error
update​
async def update(entry_name: str, timestamp: Union[int, datetime, float, str],
labels: Dict[str, str])
Update labels of an existing record If a label doesn't exist, it will be created. If a label is empty, it will be removed.
Arguments:
entry_name
- name of entry in the buckettimestamp
- timestamp of record in microsecondslabels
- new labels
Raises:
ReductError
- if there is an HTTP error
Example:
await bucket.update("entry-1", "2022-01-01T01:00:00",
{"label1"
- "value1", "label2": ""})
update_batch​
async def update_batch(entry_name: str,
batch: Batch) -> Dict[int, ReductError]
Update labels of existing records If a label doesn't exist, it will be created. If a label is empty, it will be removed.
Arguments:
entry_name
- name of entry in the bucketbatch
- dict of timestamps as keys and labels as values
Returns:
Dict[int, ReductError]: the dictionary of errors with record timestamps as keys
Raises:
ReductError
- if there is an HTTP error
Example:
batch = Batch()
batch.add(1640995200000000, labels={"label1": "value1", "label2": ""})
await bucket.update_batch("entry-1", batch)
query​
async def query(entry_name: str,
start: Optional[Union[int, datetime, float, str]] = None,
stop: Optional[Union[int, datetime, float, str]] = None,
ttl: Optional[int] = None,
when: Optional[Dict] = None,
**kwargs) -> AsyncIterator[Record]
Query data for a time interval The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).
Arguments:
entry_name
- name of entry in the bucketstart
- the beginning of the time interval. If None, then from the first recordstop
- the end of the time interval. If None, then to the latest recordttl
- Time To Live of the request in secondswhen
- condtion to filter records
Arguments:
include
dict - query records which have all labels from this dict (DEPRECATED use when)exclude
dict - query records which doesn't have all labels from this (DEPRECATED use when)head
bool - if True: get only the header of a recod with metadata each_s(Union[int, float]): return a record for each S secondseach_n(int)
- return each N-th recordlimit
int - limit the number of recordsstrict(bool)
- if True: strict query
Returns:
AsyncIterator[Record]
- iterator to the records
Example:
async for record in bucket.query("entry-1", stop=time.time_ns() / 1000):
data: bytes = record.read_all()
# or
async for chunk in record.read(n=1024):
print(chunk)
get_full_info​
async def get_full_info() -> BucketFullInfo
Get full information about bucket (settings, statistics, entries)
Returns:
BucketFullInfo
- the full information about the bucket