Data Replication With ReductStore
Data replication is a process of copying data from one database to another. ReductStore provides simple and efficient append-only replication to stream data from one bucket to another one.
Concepts
The data replication in ReductStore is based on the concept of a Replication Task. A replication task is a configurable thread that filters and copies records from a source bucket to a target bucket. The buckets can belong to the same or different ReductStore instances. For more information on buckets, see the Buckets guide.
Once a replication task is created, a ReductStore instance starts a new thread that waits for new records in the source bucket. When a new record arrives, the HTTP frontend stores the record in the source bucket and registers it in a transaction log. The replication task periodically checks the transaction log for new records and replicates them to the target bucket. For efficiency, the replication task replicates multiple records in a single batch. Once the record has been successfully replicated, the replication task deletes the record from the transaction log. This approach ensures that data is replicated in real time and that the replication process is fault-tolerant and can recover from failures.
Conditional Replication
A replication task can filter records before replicating them to the target bucket. You can specify the following filters:
Parameter | Description | Type |
---|---|---|
entries | A list of entries that the replication task will use to filter records. Only records with these entries will be replicated. If the list is empty, all records will be replicated. You can use the * wildcard to match any entry. | List of strings |
include | A list of key-value pairs that the replication task will use to filter records. Only records with these labels will be replicated. | List of key-value pairs |
exclude | A list of key-value pairs that the replication task will use to filter records. Records with these labels will not be replicated. | List of key-value pairs |
each_s | Replicate a record every S seconds | Float |
each_n | Replicate only every N record | Integer |
Usage Example
Data replication may seem complex, but it is actually quite simple. Let's take a simple example:
Imagine we collect high frequency vibration sensor data from an engine in the sensor-data
bucket.
The data from each sensor is stored in a separate record.
We want to replicate only the data from the sensor-1
entry to the remote-data
bucket in another ReductStore instance.
However, we only want to replicate the records if the engine is working and the sensor data is not corrupted.
In this case, the conditional replication settings will be:
entries: ["sensor-1"]
include:
engine_status: "working"
exclude:
sensor_status: "corrupted"
See the next section for more information on how to create a replication task with conditional replication settings.
Managing Data Replication Tasks
Here you will find examples of how to create, list, retrieve, update, and delete replication tasks using the ReductStore SDKs, REST API, CLI and Web Console.
Pay attention that all the examples are written for a local ReductStore instance available at http://127.0.0.1:8383
with API token my-token
.
For more information on setting up a local ReductStore instance, see the Getting Started guide.
Creating a Replication Task
To spin up a new replication task, you must provide the following information:
- Source Bucket: The name of the bucket in the source database from which data will be replicated.
- Remote Bucket: The name of the bucket in the target database to which data will be replicated.
- Remote URL: The URL of the target database.
- Remote Token: The API token of the target database.
- Filter Settings: See the Conditional Replication section for more information.
Let's create a replication task that replicates all records from the source-bucket
to the remote-bucket
by using
the ReductStore SDKs, REST API, CLI and Web Console. You can also provision a Replication Task by using environment variables.
A created replication task replicates only new records written to the source bucket after the task is created. It doesn't replicate existing records in the source bucket. However, you can manually replicate existing records using the Manual Data Replication feature.
- CLI
- Web Console
- Python
- JavaScript
- Rust
- C++
- cURL
- Provisioning
reduct-cli alias add local -L http://localhost:8383 -t "my-token"
# Create a source bucket
reduct-cli bucket create local/src-bucket
# Create a replication between the source bucket and the demo bucket at https://play.reduct.store
reduct-cli replica create local/my-replication src-bucket https://demo@play.reduct.store/demo
Steps to create a replication task using the Web Console: 1. Open the Web Console at http://127.0.0.1:8383 in your browser. 2. Enter the API token if the authorization is enabled. 3. Click on the "Replication" tab in the left sidebar. 4. Click on the plus icon in the top right corner to create a new replication task: 5. In the "Create a new replication" dialog, entre the name of the replication name and settings:
- Click on the "Create Replication" button to create the replication task.
import asyncio
from reduct import Client, ReplicationSettings
async def main():
# Create a client instance, then create a bucket as source bucket
client = Client("http://127.0.0.1:8383", api_token="my-token")
await client.create_bucket("my-bucket", exist_ok=True)
# Set up a replication to a destination bucket for records
# from the "py-example" entry and with labels "anomaly=1"
replication_settings = ReplicationSettings(
src_bucket="my-bucket",
dst_bucket="demo",
dst_host="https://play.reduct.store",
dst_api_token="reductstore",
entries=["py-example"],
include={"anomaly": "1"},
)
await client.create_replication("my-replication", replication_settings)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client, QuotaType, ReplicationSettings } from "reduct-js";
import assert from "node:assert";
// Create a client instance, then create a bucket as source bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
await client.getOrCreateBucket("my-bucket");
// Set up a replication to a destination bucket for records
// from the "js-example" entry and with labels "anomaly=1"
const settings = {
srcBucket: "my-bucket",
dstBucket: "demo",
dstHost: "https://play.reduct.store",
dstToken: "reductstore",
entries: ["js-entry"],
include: { anomaly: "1" },
};
await client.createReplication("my-replication", settings);
use reduct_rs::{Labels, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then create a bucket as source bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let _ = client
.create_bucket("my-bucket")
.exist_ok(true)
.send()
.await?;
// Set up a replication to a destination bucket for records
// from the "rs-example" entry and with labels "anomaly=1"
let _ = client
.create_replication("my-bucket")
.src_bucket("my-bucket")
.dst_bucket("demo")
.dst_host("https://play.reduct.store")
.dst_token("reductstore")
.entries(vec!["rs-example".to_string()])
.include(Labels::from_iter(vec![(
"anomaly".to_string(),
"1".to_string(),
)]))
.send()
.await?;
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
// Create a client instance, then create a bucket as source bucket
auto client = IClient::Build("http://127.0.0.1:8383", {
.api_token = "my-token"
});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Set up a replication to a destination bucket
// for records from the "cpp-example" entry and with labels "anomaly=1"
auto repl_err = client->CreateReplication("my-replication", IClient::ReplicationSettings{
.src_bucket = "my-bucket",
.dst_bucket = "demo",
.dst_host = "https://play.reduct.store",
.dst_token = "reductstore",
.entries = {"cpp-example"},
.include = {{"anomaly", "1"}},
});
assert(repl_err == Error::kOk);
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
curl -X POST \
-d '{"src_bucket":"example-bucket", "dst_bucket":"demo", "dst_host": "https://play.reduct.store/", "dst_token": "my-token"}' \
-H "${AUTH_HEADER}" \
-a "${API_PATH}"/replications/my_replication
version: "3"
services:
reductstore:
image: reduct/store:latest
ports:
- "8383:8383"
volumes:
- ./data:/data
environment:
- RS_API_TOKEN=my-api-token
- RS_BUCKET_1_NAME=src_bucket
- RS_REPLICATION_1_NAME=my_replication
- RS_REPLICATION_1_SRC_BUCKET=src_bucket
- RS_REPLICATION_1_DST_BUCKET=demo
- RS_REPLICATION_1_DST_HOST=https://play.reduct.store
- RS_REPLICATION_1_DST_TOKEN=reductstore
- RS_REPLICATION_1_ENTRIES=exampl-*
- RS_REPLICATION_1_INCLUDE_anomaly=1
Browse Replication Tasks
You can list all replication tasks and get detailed information about a specific replication task using the ReductStore SDKs, REST API, CLI and Web Console. The detailed information includes status, current settings and statistics of the replication task:
- Status: The status of the replication task. It can be
Active
orInactive
. Inactive replication tasks are paused and don't replicate data usually because the target database is unreachable. - Provisioned: Whether the replication task is provisioned or not. Provisioned replication tasks are created using environment variables.
- Number of Pending Records: The number of records that are waiting to be replicated.
- Number of Failed Records: The number of records that failed to be replicated for in the last hour
- Number of Replicated Records: The number of records that were successfully replicated for the last hour
- Error List: A list of errors that occurred during the replication process for the last hour
For the first hour, the Number of Failed Records
and Number of Replicated Records
are interpolated.
- CLI
- Web Console
- Python
- JavaScript
- Rust
- C++
- cURL
reduct-cli alias add local -L http://localhost:8383 -t "my-token"
# List all replications
reduct-cli replica ls local --full
# Browse a specific replication
reduct-cli replica show local/example-replication
Steps to browse a replication task using the Web Console: 1. Open the Web Console at http://127.0.0.1:8383 in your browser. 2. Enter the API token if the authorization is enabled. 3. Click on the "Replication" tab in the left sidebar. 4. You will see a list of all replication tasks with their status 5. Click on a specific replication task in the list: 6. You will see the details of the replication task: 7. You can also see or update the replication task settings by clicking he cog icon(⚙️) in the replication task panel.
import asyncio
from reduct import Client, ReplicationSettings
async def main():
# Create a client instance
client = Client("http://127.0.0.1:8383", api_token="my-token")
# List all replications
for replication in await client.get_replications():
print("Replication: ", replication.name)
print("Active: ", replication.is_active)
print("Pending records: ", replication.pending_records)
print("Provisioned: ", replication.is_provisioned)
# Get all details of a replication
replication = await client.get_replication_detail("example-replication")
print("Replication: ", replication.info.name)
print("Settings: ", replication.settings)
print("Failed records (last hour): ", replication.diagnostics.hourly.errored)
print("Successful records (last hour): ", replication.diagnostics.hourly.ok)
print("Errors (last hour): ", replication.diagnostics.hourly.errors)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client, QuotaType, ReplicationSettings } from "reduct-js";
import assert from "node:assert";
// Create a client instance
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
// List all replications
for (const replication of await client.getReplicationList()) {
console.log("Replication: ", replication.name);
console.log("Active: ", replication.isActive);
console.log("Pending records: ", replication.pendingRecords);
console.log("Provisioned: ", replication.isActive);
}
// Get all details of a replication
const replication = await client.getReplication("example-replication");
console.log("Replication: ", replication.info.name);
console.log("Settings: ", replication.settings);
console.log(
"Failed records (last hour): ",
replication.diagnostics.hourly.errored,
);
console.log(
"Successful records (last hour): ",
replication.diagnostics.hourly.ok,
);
console.log("Errors (last hour): ", replication.diagnostics.hourly.errors);
use reduct_rs::{Labels, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
// List all replications
for replication in client.list_replications().await? {
println!("Replication: {}", replication.name);
println!("Active: {}", replication.is_active);
println!("Pending records: {}", replication.pending_records);
println!("Provisioned: {}", replication.is_provisioned);
}
// Get all details of a replication
let replication = client.get_replication("example-replication").await?;
println!("Replication: {}", replication.info.name);
println!("Settings: {:?}", replication.settings);
println!(
"Failed records (last hour): {}",
replication.diagnostics.hourly.errored
);
println!(
"Successful records (last hour): {}",
replication.diagnostics.hourly.ok
);
println!(
"Errors (last hour): {:?}",
replication.diagnostics.hourly.errors
);
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
// Create a client instance
auto client = IClient::Build("http://127.0.0.1:8383", {
.api_token = "my-token"
});
// List all replications
auto [replications, list_err] = client->GetReplicationList();
for (auto& replication : replications) {
std::cout << "Replication: " << replication.name << std::endl;
std::cout << "Active: " << replication.is_active << std::endl;
std::cout << "Pending records: " << replication.pending_records << std::endl;
std::cout << "Provisioned: " << replication.is_provisioned << std::endl;
}
// Get all details of a replication
auto [replication, detail_err] = client->GetReplication("example-replication");
std::cout << "Replication: " << replication.info.name << std::endl;
std::cout << "Source Bucket: " << replication.settings.src_bucket << std::endl;
std::cout << "Failed records (last hour): " << replication.diagnostics.hourly.errored << std::endl;
std::cout << "Successful records (last hour): " << replication.diagnostics.hourly.ok << std::endl;
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# List all replications
curl -H "${AUTH_HEADER}" \
-a "${API_PATH}"/replications/
# Browse a specific replication
curl -H "${AUTH_HEADER}" \
-a "${API_PATH}"/replications/example-replication
Removing a Replication Task
You can remove a replication task by using the ReductStore SDKs, REST API, CLI and Web Console. Once you remove a replication task, the replication process stops immediately, and the transaction log is deleted from the database.
You can't remove a provisioned replication task. Before removing a provisioned replication task, you need to unset the corresponding environment variables and restart the ReductStore instance.
- CLI
- Web Console
- Python
- JavaScript
- Rust
- C++
- cURL
reduct-cli alias add local -L http://localhost:8383 -t "my-token"
reduct-cli replica rm local/repl-to-remove --yes
Steps to remove a replication task using the Web Console: 1. Open the Web Console at http://127.0.0.1:8383 in your browser. 2. Enter the API token if the authorization is enabled. 3. Click on the "Replication" tab in the left sidebar. 4. You will see a list of all replication tasks with their status 5. Click on a specific replication task in the list: 6. Click on the "Remove" button in the replication task panel. 7. Confirm the deletion by typing the replication task name and clicking on the Remove button:
import asyncio
from reduct import Client, ReplicationSettings
async def main():
# Create a client instance
client = Client("http://127.0.0.1:8383", api_token="my-token")
# Remove the `repl-to-remove` replication
await client.delete_replication("repl-to-remove")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import { Client, QuotaType, ReplicationSettings } from "reduct-js";
import assert from "node:assert";
// Create a client instance
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
// Remove the `repl-to-remove` replication
await client.deleteReplication("repl-to-remove");
use reduct_rs::{ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
// Remove the `repl-to-remove` replication
client.delete_replication("repl-to-remove").await?;
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
// Create a client instance
auto client = IClient::Build("http://127.0.0.1:8383", {
.api_token = "my-token"
});
// Remove the `repl-to-remove` replication
auto err = client->RemoveReplication("repl-to-remove");
assert(err == Error::kOk);
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
curl -X DELETE \
-H "${AUTH_HEADER}" \
-a "${API_PATH}"/replications/repl-to-remove
Manual Data Replication
You can also manually replicate data if you need to copy specific time periods or records from one bucket to another.
To do this, you can use ReducerCLI's cp
command. Here we'll copy all records from the src-instance/example-bucket
to the dst-instance/demo
bucket that have the anomaly=true
label and do not have the status=ok
label.
- CLI
reduct-cli alias add src-instance -L http://localhost:8383 -t my-token
reduct-cli alias add dst-instance -L https://play.reduct.store -t reductstore
reduct-cli cp src-instance/example-bucket dst-instance/demo --include "anomaly=true" --exclude "status=ok"