Skip to main content

How to Store and Manage ROS Data

· 29 min read
Anthony Cavin
Data Scientist - ML/AI, Python, TypeScript

ROS 2 Data Storage Tutorial

In this tutorial, we will create a custom ROS 2 Humble package called rosbag2reduct that records incoming ROS 2 topics into MCAP bag files on a Raspberry Pi and automatically uploads those files to a ReductStore instance with metadata labels. We'll walk through setting up ROS 2 Humble on the Pi, interfacing a USB camera using the v4l2_camera driver, deploying a lightweight YOLOv5 (nano) object detection node (using ONNX Runtime) to produce detection metadata, and implementing the rosbag2reduct node to capture data and offload it. We will also cover installing ReductStore on the Pi, configuring replication of labeled data to a central storage on your laptop (using label-based filters via the web console). This end-to-end guide is structured with clear steps, code examples, and configuration snippets to help you build and deploy the system.

Prerequisites and Architecture

Before beginning, ensure you have the following:

  • Hardware: Raspberry Pi, a USB camera, and an internet connection.
  • OS: Ubuntu or Desktop Server (22.04 LTS or later) on the Raspberry Pi.
  • Laptop/PC: A separate machine on the same network, to serve as a central data store.

In this tutorial, we will set up the following architecture:

Tutorial Architecture

Architecture: Raspberry Pi with ROS 2, USB camera, YOLOv5n, and ReductStore; Laptop with ReductStore Web Console.

  • Raspberry Pi: Running ROS 2 Humble, interfacing with a USB camera, and running a YOLOv5n object detection node with ONNX Runtime.
  • Laptop/PC: Running a ReductStore instance (acting as a central data store) and ReductStore's Web Console for managing data.
  • Network: The Pi and the laptop are on the same network. The Pi will upload data to the ReductStore instance on the laptop automatically.

1. Install Ubuntu on Raspberry Pi

If not already done, flash Ubuntu for Raspberry Pi on a microSD card and boot your Pi. You can download Ubuntu images for Raspberry Pi from the official site. Ensure you have internet access and update the system:

# On Raspberry Pi
sudo apt update && sudo apt upgrade -y

2. Install ROS 2 Humble on Raspberry Pi

Follow the steps below to set up ROS 2 on your Raspberry Pi. This guide assumes you are using a supported 64-bit Ubuntu OS (e.g., 22.04, 24.04). ROS 2 is available in multiple distributions such as Jazzy, Humble, and Noetic. If you're unsure, we recommend using the latest LTS version, which we can install via the commands below.

  1. Setup Locale:

    sudo locale-gen en_US en_US.UTF-8
    sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
    export LANG=en_US.UTF-8
  2. Add ROS 2 apt Repository:

    sudo apt install software-properties-common
    sudo add-apt-repository universe
    sudo apt update && sudo apt install curl -y
    sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
    echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
    sudo apt update
  3. Install ROS 2 Packages: For a baseline, install ROS 2 base packages (or ros-<distro>-desktop if you need GUI tools):

    sudo apt install ros-${ROS_DISTRO}-ros-base -y

    This will install core ROS 2 packages including rosbag2. You may verify the installation by sourcing the setup script:

    source /opt/ros/${ROS_DISTRO}/setup.bash
    ros2 -h # Should show ROS 2 commands
  4. Install Build Tools: We will create custom packages, so install development tools:

    sudo apt install python3-colcon-common-extensions python3-rosdep -y
    sudo rosdep init
    rosdep update

3. Create a ROS 2 Workspace

Set up a workspace for our project (if you don't have one):

# On Raspberry Pi
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build # just to initialize, will be empty initially

Add source ~/ros2_ws/install/setup.bash to your ~/.bashrc so that the workspace is sourced on each new shell, or remember to source it in each terminal when using the workspace. We will add packages to this workspace in subsequent steps.

Setting up the USB Camera with v4l2_camera

We'll use the v4l2_camera ROS 2 package to interface with the USB camera via Video4Linux2. This package publishes images from any V4L2-compatible camera (most USB webcams) as ROS 2 image topics.

1. Install the v4l2_camera Package

On the Raspberry Pi, install the driver node via apt:

sudo apt install ros-${ROS_DISTRO}-v4l2-camera -y

This installs the v4l2_camera node and its dependencies. Alternatively, you could build it from source, but the binary is available for Humble.

2. Connect and Verify the Camera

Plug in the USB camera to the Pi. Verify that it's recognized by listing video devices:

ls /dev/video*
# You should see /dev/video0 (and possibly /dev/video1, etc. if multiple video capture interfaces are connected)

If /dev/video0 is present, the system sees the camera (at list one). You might also install v4l2-utils and run v4l2-ctl --list-devices to see the camera name and capabilities. You can also run v4l2-ctl --list-formats-ext to see supported resolutions and formats.

3. Run the Camera Node

Launch the camera driver to start publishing images:

# In a sourced ROS 2 environment on the Pi
ros2 run v4l2_camera v4l2_camera_node

By default, this node will open /dev/video0 and start publishing images to the ~/image_raw topic (type sensor_msgs/Image) at a default resolution of 640x480 and pixel format YUYV converted to rgb8 (see ROS 2 camera driver for Video4Linux2 Documentation). You should see console output from the node indicating it opened the device and is streaming.

Open a new terminal (with ROS sourced) on the Pi (or from a laptop connected to the ROS 2 network) and verify images are coming through, e.g., by running rqt_image_view:

sudo apt install ros-${ROS_DISTRO}-rqt-image-view -y  # if not installed
ros2 run rqt_image_view rqt_image_view

In rqt_image_view, select /image_raw to view the camera feed. This confirms the camera setup is working.

Note: You can adjust parameters by remapping or via ROS 2 parameters, e.g., to change resolution or device:

ros2 run v4l2_camera v4l2_camera_node --ros-args -p image_size:="[1280,720]" -p video_device:="/dev/video0"

This would set the camera to 1280x720 resolution (if supported).

Deploying a Lightweight YOLOv5 Object Detection Node

Next, we set up an object detection node to analyze the camera images and output metadata (object_detected and confidence_score). We'll use YOLOv5n (Nano) - the smallest YOLOv5 model ("only" 1.9 million parameters) which is ideal for resource-constrained devices (see releases at ultralytics/yolov5). We will run inference using the ONNX Runtime, which allows running the model without needing the full PyTorch framework on the Pi.

1. Install ONNX Runtime and Dependencies

On the Raspberry Pi, install the ONNX Runtime Python package and OpenCV (for image processing):

pip install onnxruntime opencv-python

(If pip isn't available, use sudo apt install python3-pip to install it. You may also install numpy if not already present, as ONNX Runtime will likely need it.)

2. YOLOv5n ONNX Model

We need the YOLOv5n model in ONNX format. To do that, we can clone the YOLOv5 repository on a more powerful machine than the Pi and export the model:

# On a PC or via Colab:
git clone https://github.com/ultralytics/yolov5.git
cd yolov5
pip install -r requirements.txt # includes PyTorch
python export.py --weights yolov5n.pt --include onnx

This will create yolov5n.onnx. Transfer that file to your Raspberry Pi (e.g., via SCP).

For this tutorial, assume yolov5n.onnx is now on the Raspberry Pi (e.g., placed in ~/ros2_ws/src).

3. Create a ROS 2 Package for the YOLO Node (optional)

You can integrate the YOLO inference in the same package as rosbag2reduct, but for modularity, let's create a separate ROS 2 Python package called yolo_detector.

In the workspace src directory, run:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python yolo_detector --dependencies rclpy sensor_msgs std_msgs

This will create a yolo_detector folder with a Python package structure. Edit yolo_detector/package.xml to add dependencies for opencv-python and onnxruntime (since these are non-ROS dependencies, we list them for documentation; you might use pip in the installation step rather than rosdep). For example, inside <exec_depend> tags, add:

<exec_depend>onnxruntime</exec_depend>
<exec_depend>opencv-python</exec_depend>

4. Implement the YOLO Detection Node

Create a file yolo_detector/yolo_detector/yolo_node.py with the following content:

YoloDetectorNode (Python code)
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String, Float32
import cv2
import numpy as np
import onnxruntime as ort

class YoloDetectorNode(Node):
def __init__(self):
super().__init__('yolo_detector')
# Load the YOLOv5n ONNX model
model_path = '/path/to/yolov5n.onnx' # TODO: update to actual path
self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
self.get_logger().info(f"Loaded model {model_path}")

# Get model input details for preprocessing
model_inputs = self.session.get_inputs()
self.input_name = model_inputs[0].name
self.input_shape = model_inputs[0].shape # e.g., [1, 3, 640, 640]
self.img_height = self.input_shape[2]
self.img_width = self.input_shape[3]

# Subscribers and publishers
self.subscription = self.create_subscription(Image, '/image_raw', self.image_callback, 10)
self.pub_object = self.create_publisher(String, 'object_detected', 10)
self.pub_conf = self.create_publisher(Float32, 'confidence_score', 10)

# If the model requires normalization factors or specific transformations, define them:
self.mean = np.array([0.0, 0.0, 0.0]) # YOLOv5 models assume 0-255 input, no mean subtraction
self.std = np.array([255.0, 255.0, 255.0]) # we'll scale 0-1 later by dividing by 255

def image_callback(self, msg: Image):
# Convert ROS Image message to OpenCV format (BGR array)
# Assuming msg.encoding is 'rgb8' as provided by v4l2_camera default output
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, -1)
# Convert RGB to BGR as YOLO model might expect BGR input (depending on training)
img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

# Resize and pad image to model input shape (letterboxing if needed)
input_img = cv2.resize(img_bgr, (self.img_width, self.img_height))
# Convert to float32 and normalize 0-1
input_img = input_img.astype('float32') / 255.0
# transpose to [channels, height, width]
input_blob = np.transpose(input_img, (2, 0, 1))
input_blob = np.expand_dims(input_blob, axis=0) # shape [1,3,H,W]

# Run inference
outputs = self.session.run(None, {self.input_name: input_blob})

# Parse outputs to find the highest confidence detection (for simplicity)
# YOLOv5 ONNX output typically includes [1, num_boxes, 85] array (for COCO: 4 box coords, 1 objness, 80 class scores)
detections = outputs[0]
# Filter by confidence threshold (e.g., 0.5)
conf_threshold = 0.5
best_label = "none"
best_conf = 0.0
if detections is not None:
for det in detections[0]:
obj_conf = det[4]
class_conf = det[5:] # class confidences
score = obj_conf * np.max(class_conf)
class_id = np.argmax(class_conf)
if score > conf_threshold and score > best_conf:
best_conf = float(score)
best_label = str(class_id) # or use a class id->name mapping

# Publish results
self.pub_object.publish(String(data=best_label))
self.pub_conf.publish(Float32(data=best_conf))
self.get_logger().info(f"Detected: {best_label} ({best_conf:.2f})")

def main(args=None):
rclpy.init(args=args)
node = YoloDetectorNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

Explanation: This node subscribes to the camera images (/image_raw), processes each frame through the YOLOv5n model, and publishes two topics:

  • object_detected (std_msgs/String): the class label (or ID) of the primary detected object (or "none" if none above threshold).
  • confidence_score (std_msgs/Float32): the confidence score of that detection.

For simplicity, we took the detection with highest confidence above a threshold. In a real scenario, you probably output multiple detections or more info (bounding boxes, etc.), but we only need metadata for this tutorial.

Make sure to adjust the model_path to the actual location of your yolov5n.onnx. Also note that without class name mapping, best_label is currently the class index (as string). You can map this index to an actual label (e.g., using the COCO class list below).

COCO Class List
names:
0: person
1: bicycle
2: car
3: motorcycle
4: airplane
5: bus
6: train
7: truck
8: boat
9: traffic light
10: fire hydrant
11: stop sign
12: parking meter
13: bench
14: bird
15: cat
16: dog
17: horse
18: sheep
19: cow
20: elephant
21: bear
22: zebra
23: giraffe
24: backpack
25: umbrella
26: handbag
27: tie
28: suitcase
29: frisbee
30: skis
31: snowboard
32: sports ball
33: kite
34: baseball bat
35: baseball glove
36: skateboard
37: surfboard
38: tennis racket
39: bottle
40: wine glass
41: cup
42: fork
43: knife
44: spoon
45: bowl
46: banana
47: apple
48: sandwich
49: orange
50: broccoli
51: carrot
52: hot dog
53: pizza
54: donut
55: cake
56: chair
57: couch
58: potted plant
59: bed
60: dining table
61: toilet
62: tv
63: laptop
64: mouse
65: remote
66: keyboard
67: cell phone
68: microwave
69: oven
70: toaster
71: sink
72: refrigerator
73: book
74: clock
75: vase
76: scissors
77: teddy bear
78: hair drier
79: toothbrush

Also, update setup.py entry points to include our node script:

setup(
...
entry_points={
'console_scripts': [
'yolo_node = yolo_detector.yolo_node:main',
],
},
)

5. Build and Run the YOLO Node

Add onnxruntime and opencv-python to your workspace's requirements (you might include them in a requirements.txt for the package and use pip to install, since they are pip packages). For now, ensure they are installed via pip as done earlier.

Build the workspace:

cd ~/ros2_ws
colcon build --packages-select yolo_detector
source install/local_setup.bash

Run the YOLO detection node in a new terminal on the Pi:

ros2 run yolo_detector yolo_node.py

You should see log output from the node whenever it processes an image (every frame or at least when something is detected, depending on your logging). The node will publish messages on object_detected and confidence_score topics.

You can echo these topics in another terminal to verify:

ros2 topic echo /object_detected
ros2 topic echo /confidence_score

For example, if a person is detected with 85% confidence. You should see messages like:

object: "0"
confidence: 0.85

Now we have a camera streaming images and a detector outputting metadata. Next, we'll create the rosbag2reduct package to record these data and handle file rotation and uploading to ReductStore.

Creating the rosbag2reduct Package

Our rosbag2reduct package will be a ROS 2 node that does the following:

  • Subscribe to the relevant topics (e.g. /image_raw, object_detected, confidence_score).
  • Record those topics into a bag file using the rosbag2 Python API (in MCAP format).
  • Label each bag file with the latest detection metadata.
  • After a fixed time interval (bag rotation period), close the current bag, then upload it to ReductStore with Python client SDK, including the metadata as labels, and delete the local bag file.
  • Start a new bag file and repeat.
warning

This is not optimal (writing to disk then uploading) and is done for simplicity. In a real scenario, you should stream directly to ReductStore or separate storage topics by type. More on this in the Best Practices and Further Improvements section.

1. Create the Package Structure

Run the ROS 2 package creation command:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python rosbag2reduct --dependencies rclpy rosbag2_py sensor_msgs std_msgs

This creates a new folder rosbag2reduct with a basic Python package setup. Update rosbag2reduct/package.xml to include <exec_depend>reduct-py</exec_depend> (ReductStore's Python SDK) since we'll use that. Also make sure rosbag2_py is listed as a dependency (the above command included it). In setup.py, add an entry point for our main node if desired (though we can also run the Python file directly with ros2 run as long as it's installed).

After creation, the structure should look like:

ros2_ws/src/rosbag2reduct/
├── package.xml
├── setup.cfg
├── setup.py
├── resource/
│ └── rosbag2reduct
└── rosbag2reduct
├── __init__.py
└── recorder_node.py # (we will create this)

2. Install ReductStore Python SDK and MCAP Support

Before coding, install the ReductStore client library (reduct-py) in your Python environment:

pip install reduct-py

This gives us the reduct module for interacting with a ReductStore server. (We will set up the actual ReductStore server on the Pi soon, but we can write the code first.)

Also, install the MCAP storage format support for rosbag2:

sudo apt install ros-${ROS_DISTRO}-rosbag2-storage-mcap

3. Implementing the rosbag2reduct Node

Open a new file rosbag2reduct/recorder_node.py and add the following code from the snippet below. This code defines the Rosbag2ReductNode class, which is a ROS 2 node that subscribes to the camera images and metadata topics, records them into bag files, and uploads the bag files to ReductStore with metadata labels.

Rosbag2ReductNode (Python code)
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32
from sensor_msgs.msg import Image

from rclpy.serialization import serialize_message
from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions, TopicMetadata

import os
import shutil
import datetime
import asyncio
from reduct import Client, Bucket


class Rosbag2ReductNode(Node):
def __init__(self):
super().__init__('rosbag2reduct')
self.bag_duration = 60.0
self.storage_id = 'mcap' # Or 'sqlite3'

self.current_bag_index = 0
self.writer = None
self.current_bag_name = self._open_new_bag()

self.create_subscription(Image, '/image_raw', self._image_callback, 10)
self.create_subscription(String, 'object_detected', lambda msg: self._metadata_callback(msg, 'object_detected'), 10)
self.create_subscription(Float32, 'confidence_score', lambda msg: self._metadata_callback(msg, 'confidence_score'), 10)

self.last_object_detected = "none"
self.last_confidence = 0.0

self.create_timer(self.bag_duration, self._rotate_bag_timer)

self.reduct_url = "http://127.0.0.1:8383"
self.bucket_name = "pi_robot"
self.entry_name = "rosbags"
self.reduct_client = Client(self.reduct_url)
asyncio.get_event_loop().run_until_complete(self._ensure_bucket())
self.get_logger().info(
f"rosbag2reduct node initialized, writing to {self.storage_id} bags and uploading to ReductStore bucket '{self.bucket_name}'.")

def _open_new_bag(self):
self.writer = SequentialWriter()
bag_name = f"rosbag_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.current_bag_index}"
storage_options = StorageOptions(uri=bag_name, storage_id=self.storage_id)
converter_options = ConverterOptions('', '')
self.writer.open(storage_options, converter_options)

self.writer.create_topic(TopicMetadata(
name='/image_raw',
type='sensor_msgs/msg/Image',
serialization_format='cdr'))
self.writer.create_topic(TopicMetadata(
name='object_detected',
type='std_msgs/msg/String',
serialization_format='cdr'))
self.writer.create_topic(TopicMetadata(
name='confidence_score',
type='std_msgs/msg/Float32',
serialization_format='cdr'))

self.writer.create_topic(TopicMetadata(
0,
'/image_raw',
'sensor_msgs/msg/Image',
'cdr'
))
self.writer.create_topic(TopicMetadata(
0,
'object_detected',
'std_msgs/msg/String',
'cdr'
))
self.writer.create_topic(TopicMetadata(
0,
'confidence_score',
'std_msgs/msg/Float32',
'cdr'
))

self.get_logger().info(f"Opened new bag: {bag_name}")
return bag_name

def _image_callback(self, msg: Image):
self.writer.write('/image_raw', serialize_message(msg), self.get_clock().now().nanoseconds)

def _metadata_callback(self, msg, topic_name):
if topic_name == 'object_detected':
self.last_object_detected = msg.data
self.writer.write('object_detected', serialize_message(msg), self.get_clock().now().nanoseconds)
elif topic_name == 'confidence_score':
self.last_confidence = msg.data
self.writer.write('confidence_score', serialize_message(msg), self.get_clock().now().nanoseconds)

def _rotate_bag_timer(self):
object_label = self.last_object_detected
confidence_val = float(self.last_confidence)
old_bag_name = self.current_bag_name

self.current_bag_index += 1
self.current_bag_name = self._open_new_bag()

self.get_logger().info(f"Uploading bag {old_bag_name} with metadata: object={object_label}, confidence={confidence_val:.2f}")
asyncio.get_event_loop().run_until_complete(
self._upload_bag_file(old_bag_name, object_label, confidence_val))

# Clean up the bag directory if a file was uploaded
bag_dir = old_bag_name
if os.path.isdir(bag_dir):
try:
shutil.rmtree(bag_dir)
self.get_logger().info(f"Deleted bag directory {bag_dir} after upload.")
except Exception as e:
self.get_logger().warn(f"Could not delete directory {bag_dir}: {e}")
else:
self.get_logger().warn(f"Bag directory {bag_dir} not found.")

async def _ensure_bucket(self):
await self.reduct_client.create_bucket(self.bucket_name, exist_ok=True)

async def _upload_bag_file(self, bag_name, object_label, confidence_val):
bag_dir = bag_name
mcap_file = None

# Find the first .mcap file in the bag directory
try:
for filename in os.listdir(bag_dir):
if filename.endswith(".mcap"):
mcap_file = os.path.join(bag_dir, filename)
break
except FileNotFoundError:
self.get_logger().error(f"Bag directory {bag_dir} does not exist.")
return

if not mcap_file:
self.get_logger().warn(f"No .mcap file found in {bag_dir} — skipping upload.")
return

try:
with open(mcap_file, 'rb') as f:
data = f.read()
except Exception as e:
self.get_logger().error(f"Error reading {mcap_file}: {e}")
return

bucket: Bucket = await self.reduct_client.get_bucket(self.bucket_name)
timestamp = datetime.datetime.utcnow().isoformat() + "Z"
labels = {"object": object_label, "confidence": str(confidence_val)}
await bucket.write(self.entry_name, data, timestamp=timestamp, labels=labels)
self.get_logger().info(f"Uploaded {mcap_file} to ReductStore with labels {labels}.")


def main(args=None):
rclpy.init(args=args)
node = Rosbag2ReductNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
try:
rclpy.shutdown()
except Exception:
pass

Let's break down key parts of this implementation:

  • We use rosbag2's Python API: SequentialWriter to record messages. We specify MCAP as the storage format when opening the bag. We explicitly register three topics (/image_raw, object_detected, confidence_score) with their message types, so we can write to them. In each subscription callback, we call self.writer.write(topic_name, serialize_message(msg), timestamp) to append to the bag.
  • We maintain last_object_detected and last_confidence variables to store the most recent detection metadata. The _metadata_callback updates these whenever a message on those topics arrives, and writes the message to the bag as well.
  • A ROS timer triggers _rotate_bag_timer() every self.bag_duration seconds (e.g., every 60 seconds). This function closes the current bag and opens a new one (by calling _open_new_bag() which increments a bag index and starts a new file). We then proceed to upload the bag file (that we just closed) to ReductStore.
  • ReductStore upload: Using reduct-py, we ensure a bucket named pi_robot exists in ReductStore (which we assume is running locally on the Pi at 127.0.0.1:8383). We then read the bag file into memory and use bucket.write() to store it in the bucket under the rosbags entry. We label each record with the detection metadata (e.g., labels={"object": "1", "confidence": "0.85"}).
  • After a successful upload, we delete the local .mcap file to save space on the Pi (since it's now stored in ReductStore).
info

This is a basic implementation. In a real-world scenario, you will likely want to handle prediction and timestamping differently, add more metadata, and of course use a more efficient way to upload data directly to ReductStore.

4. Build the rosbag2reduct Package

Ensure that in setup.py of rosbag2reduct you have an entry point if needed, e.g.:

entry_points={
'console_scripts': [
'rosbag2reduct = rosbag2reduct.recorder_node:main'
],
},

This will allow us to run ros2 run rosbag2reduct rosbag2reduct to launch the node. Now build the package:

cd ~/ros2_ws
colcon build --packages-select rosbag2reduct
source install/local_setup.bash

If everything compiles (installs) without errors, we're ready to run the system.

Installing and Configuring ReductStore on the Raspberry Pi

Before running the rosbag2reduct node, we need a ReductStore server running on the Pi to accept uploads. ReductStore is a lightweight time-series object storage, perfect for edge devices. We will install it on the Pi and create a bucket for our bag files.

1. Install ReductStore on Raspberry Pi

The easiest way on Ubuntu is to use snap or Docker. We'll use snap for simplicity:

# On Raspberry Pi
sudo apt update
sudo apt install snapd -y # if snapd is not already installed
sudo snap install reductstore

This will install ReductStore from the Snap Store. The snap should set up ReductStore as a service listening on port 8383 by default. (If using a different OS or if snap isn't desired, you can use Docker: e.g., docker run -d -p 8383:8383 -v ~/reduct_data:/data reduct/store:latest to run ReductStore in a container.)

Note: The database will listen on http://0.0.0.0:8383 (accessible to the LAN). Ensure this port is allowed through any firewall if you want external access.

Check that ReductStore is running:

sudo snap services reductstore  # should show active
curl http://127.0.0.1:8383/api/v1/info

The curl command should return JSON info about the instance (like version, uptime, etc.).

2. (Optional) Configure ReductStore

By default, ReductStore doesn't require authentication for local use (anonymous access is allowed). This is fine for our edge scenario on a local network. If you want to set up access tokens or adjust storage quotas, you can do so by following the Access Control documentation. For now, we'll use defaults.

We will use the Web Console to verify data and to set up replication later. The web console is accessible from a browser at the server's address (it's the same as the API endpoint). For example, on the Pi, open http://<raspberrypi_ip>:8383 in a browser - you should see the ReductStore Web Console interface (a simple GUI).

3. Create a Bucket for ROS bag data

Our rosbag2reduct code will attempt to create a bucket named "pi_robot". We called create_bucket("pi_robot", exist_ok=True) in the code, so the bucket will be created on first run if it doesn't exist. You can also create it manually via the web console:

  • Navigate to Buckets and create a new bucket named “pi_robot”. (You can set a quota, e.g., a FIFO quota to avoid filling up the disk on the Pi.)

ReductStore Web Console Bucket

ReductStore Web Console: Creating a bucket named "pi_robot" for storing ROS bags.

Now, ReductStore is set up on the Pi and ready to accept data. We can run the complete system to record ROS data, detect objects, and upload to ReductStore.

Running the Complete System

We have three ROS 2 nodes to run on the Raspberry Pi:

  • The camera driver (v4l2_camera_node)
  • The YOLO detection node (YoloDetectorNode)
  • The rosbag2reduct recorder/uploader node (Rosbag2ReductNode)

It's best to run each in its own terminal (or use a launch file to launch them together). For clarity, we'll do it step-by-step:

Terminal 1: Camera node

# Terminal 1 on Pi (source ROS 2 and workspace)
ros2 run v4l2_camera v4l2_camera_node

Terminal 2: YOLO detection node

# Terminal 2 on Pi (source ROS 2 and workspace)
ros2 run yolo_detector yolo_node.py

(If you set up the entry point, you could do ros2 run yolo_detector yolo_detector or similar, but here we assume running the script directly.)

Terminal 3: rosbag2reduct recorder node

# Terminal 3 on Pi (source ROS 2 and workspace)
ros2 run rosbag2reduct rosbag2reduct

(This uses the console script entry point we defined. Alternatively ros2 run rosbag2reduct recorder_node.py if not configured as an entry point.)

Now monitor the outputs:

  • The camera node should just stream (no text output unless error or warning).
  • The YOLO node will log detections (as we coded with get_logger().info on each detection).
  • The rosbag2reduct node will log bag rotations and uploads. For example, you should see logs like “Opened new bag: rosbag20230325_101500_0” and later “Uploading bag 0 with metadata: object=person, confidence=0.85” then “Uploaded rosbag... to ReductStore with labels ...” etc.

Let this run for a while. By default, every 60 seconds it will finalize a bag and upload it. If you want to trigger a rotation sooner (for testing), you could reduce bag_duration or even manually call the rotation function (not easily via ROS, unless you expose a service but it's not implemented here).

1. Verify Data in ReductStore

On the Pi (or from any machine that can access the Pi's port 8383), open the ReductStore Web Console in a browser: http://<raspberrypi_ip>:8383. You should see the bucket “pi_robot”. Click it to see the list of entries.

Each uploaded bag file appears as a record in the rosbags entry of the pi_robot bucket. The record name is the bag file name, and you should be able to see its labels, e.g., object: 1 and confidence: 0.85 attached to that record.. You can also see the timestamp of each record (when it was uploaded).

You have successfully set up the edge device to capture ROS data and push it to ReductStore with metadata labels!

Setting up Replication to a Central ReductStore (Laptop)

With data being collected on the Pi, we likely want to aggregate it on a central server (e.g., your laptop or a cloud instance) for analysis or long-term storage.

ReductStore's replication feature allows the Pi (source) to push new records to another ReductStore instance (destination) in real-time, filtering by labels so that, for example, only important events (e.g., specific objects or high-confidence detections) are sent for long-term storage.

info

Replication lets you stream only the data you need from the edge to the cloud or between edge devices. You can filter by label and push data without constant polling. If the device is offline or the destination is down, the data waits and replicates later.

In this section, we'll:

  • Run ReductStore on the laptop.
  • Use the ReductStore Web Console to create a replication task on the Pi's instance that filters and forwards data to the laptop's instance based on labels.
  • Verify that replication works.

1. Install/Run ReductStore on Laptop

On your laptop (assuming Ubuntu 22.04 or any system with Docker or Snap):

  • Option A: Docker - run ReductStore in a container:

    docker run -d -p 8383:8383 -v ~/reduct_data:/data reduct/store:latest

    This runs ReductStore locally on port 8383 and stores data in ~/reduct_data on your laptop.

  • Option B: Native - you could similarly install via Snap (sudo snap install reductstore) or use a binary. Docker is quick and easy.

After starting it, ensure you can access it:

curl http://127.0.0.1:8383/api/v1/info

should return info as before (but for the laptop's instance).

Open the web console on the laptop: http://localhost:8383 and keep it open for monitoring.

Create a bucket named “pi_robot” on the laptop as well, otherwise the replication task will fail (it needs the destination bucket to exist).

tip

Use provisioning to automate bucket creation and other setup steps in a real deployment. See Configuration/Provisioning Documentation for more.

Networking: Make sure your laptop is accessible from the Pi. If both are on the same LAN, you might use the laptop's IP (e.g., 192.168.x.x). If the laptop's ReductStore is in Docker, ensure the port 8383 is open (it is published in the run command above). For testing, you might temporarily disable firewall or ensure port 8383 is allowed.

Find your laptop's IP address (e.g., hostname -I on Linux) - let's say it's 192.168.1.100 for example.

2. Configure Replication on the Pi via Web Console

On the Pi's web console (http://<raspberrypi_ip>:8383), look for “Replications” and the “+” to add replication). We want to create a replication task that sends data to the laptop.

Fill in the replication settings as:

  • Source Bucket: pi_robot (on Pi)
  • Destination URL: http://<laptop_ip>:8383 (e.g., http://192.168.1.100:8383)
  • Destination Bucket: pi_robot (on laptop)
  • Replication Name: (give it a name like “to_laptop”)
  • Entries: rosbags (or whatever entry name you used in bucket.write()), or leave blank to replicate all entries.
  • Filter Records: add the "Exclude" filter rule for object equals "none".

Start the replication task. The Pi's ReductStore will now start forwarding new records that meet the criteria to the laptop, in real-time. It's a push model from Pi to laptop, so the laptop doesn't need to know about the Pi or poll it - the Pi will push new records as they arrive.

ReductStore Web Console Replication

ReductStore Web Console: Setting up a replication task to forward data to a central storage.

3. Test Replication

Back on the Pi, ensure the rosbag2reduct node is still running and creating new records. When the next bag file is uploaded on the Pi, if it meets the replication filter conditions, the Pi's ReductStore replication task will immediately send it to the laptop.

On the laptop's web console, open the “pi_robot” bucket. You should start seeing records appear that correspond to those on the Pi (with a slight delay for transfer). The labels should also be present. If you configured a filter (e.g., confidence > 0.8), try to produce a detection above that confidence on the Pi (point the camera at an easily recognized object or adjust threshold on the Pi code for testing).

Records not meeting the condition will stay only on the Pi and eventually be overwritten if Pi's FIFO quota is set.

You can also check the replication status on the Pi's web console; it may show last replicated record timestamp etc., indicating it's working.

At this point, we have a robust pipeline:

  • Raspberry Pi captures camera data and detection metadata.
  • rosbag2reduct segments the data into time-based bags, labels them, and pushes to local storage.
  • ReductStore on Pi retains recent data and automatically forwards critical data to the laptop's ReductStore based on labels.
  • The laptop accumulates the forwarded data in its own ReductStore bucket, which you can browse or integrate with other systems.

Best Practices and Further Improvements

This tutorial only scratches the surface of building a robust data acquisition and storage pipeline for robotics. Here are some best practices and further improvements to consider:

📊 Separate Storage Topics by Types

Separate storage streams for different topic categories (e.g., telemetry vs. raw sensor data) to optimise storage and retrieval. This allows you to apply different retention policies, access controls, or replication rules to each category.

Here are some common data categories and their characteristics to consider:

CategoryExamplesCharacteristics
Lightweight telemetryGPS, IMU, joint states, system statusLow bandwidth, near real-time, useful for business analytics
Downsampled sensor dataLower framerate/resolution camera or lidar dataMid-size, great for monitoring and incident triage
Full-resolution dataRaw camera frames, high-fps lidar, depth mapsHigh volume (up to 1TB/hour), needed for debugging or model retraining

🗜️ Compress High-Bandwidth Topics

We can se ROS 2's built-in compression mechanisms for image topics and large messages to reduce bandwidth and storage costs.

  • 🖼️ Example: Use /image_raw/compressed instead of raw image streams

🔐 Use Token-Based Authentication

This is especially relevant for cloud storage or remote ReductStore instances. Use access tokens to secure data transfers and prevent unauthorized access.

⚙️ Use Non-Blocking Operations

When uploading large files or performing storage operations, use non-blocking (asynchronous) methods to avoid freezing your ROS 2 nodes. This keeps your system responsive and prevents dropped messages or missed frames.

📉 Combine Downsampling with Replication

This is typical in ELT (Extract-Load-Transform) pipelines. The idea is to save everything locally (on the robot) at high resolution and framerate, then stream part of the data to the cloud or a central server at a lower resolution or lower frequency.

❄️ Offload Data to Cold Storage

For long-term archiving, consider offloading data to cold storage to reduce costs while keeping data accessible.

Conclusion

In this tutorial, we set up a complete pipeline on a Raspberry Pi running ROS 2 Humble that captures camera data and AI output, records it to MCAP bag files with a custom Python node, and automatically offloads those files to a time-series object store. We added labels to each record (object ID detected and YOLO's confidence level). We configured replication to forward labeled data to a central instance on a laptop, filtering by labels to reduce bandwidth - for example, forwarding only "interesting" events (excluding "none" detections).

The end-to-end system demonstrates how to build a robust data logging and centralized storage solution for robotics. Of course, this tutorial must be adapted to your specific use case and environment. Keep in mind that the pipeline is highly customizable and can be adapted to different scenarios (separating topics by type, using specific filters, adding more metadata, etc.).

We hope this end-to-end tutorial helps you build your own ROS 2 data acquisition system. Happy hacking!


Thanks for reading, I hope this article will help you choose the right storage strategy for your vibration data. If you have any questions or comments, feel free to use the ReductStore Community Forum.

Share
Subscribe to our blogs
Comments from the Community