How to Store and Manage ROS Data
In this tutorial, we will create a custom ROS 2 Humble package called rosbag2reduct
that records incoming ROS 2 topics into MCAP bag files on a Raspberry Pi and automatically uploads those files to a ReductStore instance with metadata labels. We'll walk through setting up ROS 2 Humble on the Pi, interfacing a USB camera using the v4l2_camera
driver, deploying a lightweight YOLOv5 (nano) object detection node (using ONNX Runtime) to produce detection metadata, and implementing the rosbag2reduct
node to capture data and offload it. We will also cover installing ReductStore on the Pi, configuring replication of labeled data to a central storage on your laptop (using label-based filters via the web console). This end-to-end guide is structured with clear steps, code examples, and configuration snippets to help you build and deploy the system.
Prerequisites and Architecture
Before beginning, ensure you have the following:
- Hardware: Raspberry Pi, a USB camera, and an internet connection.
- OS: Ubuntu or Desktop Server (22.04 LTS or later) on the Raspberry Pi.
- Laptop/PC: A separate machine on the same network, to serve as a central data store.
In this tutorial, we will set up the following architecture:
Architecture: Raspberry Pi with ROS 2, USB camera, YOLOv5n, and ReductStore; Laptop with ReductStore Web Console.
- Raspberry Pi: Running ROS 2 Humble, interfacing with a USB camera, and running a YOLOv5n object detection node with ONNX Runtime.
- Laptop/PC: Running a ReductStore instance (acting as a central data store) and ReductStore's Web Console for managing data.
- Network: The Pi and the laptop are on the same network. The Pi will upload data to the ReductStore instance on the laptop automatically.
1. Install Ubuntu on Raspberry Pi
If not already done, flash Ubuntu for Raspberry Pi on a microSD card and boot your Pi. You can download Ubuntu images for Raspberry Pi from the official site. Ensure you have internet access and update the system:
# On Raspberry Pi
sudo apt update && sudo apt upgrade -y
2. Install ROS 2 Humble on Raspberry Pi
Follow the steps below to set up ROS 2 on your Raspberry Pi. This guide assumes you are using a supported 64-bit Ubuntu OS (e.g., 22.04, 24.04). ROS 2 is available in multiple distributions such as Jazzy, Humble, and Noetic. If you're unsure, we recommend using the latest LTS version, which we can install via the commands below.
-
Setup Locale:
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8 -
Add ROS 2 apt Repository:
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update && sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
sudo apt update -
Install ROS 2 Packages: For a baseline, install ROS 2 base packages (or
ros-<distro>-desktop
if you need GUI tools):sudo apt install ros-${ROS_DISTRO}-ros-base -y
This will install core ROS 2 packages including
rosbag2
. You may verify the installation by sourcing the setup script:source /opt/ros/${ROS_DISTRO}/setup.bash
ros2 -h # Should show ROS 2 commands -
Install Build Tools: We will create custom packages, so install development tools:
sudo apt install python3-colcon-common-extensions python3-rosdep -y
sudo rosdep init
rosdep update
3. Create a ROS 2 Workspace
Set up a workspace for our project (if you don't have one):
# On Raspberry Pi
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build # just to initialize, will be empty initially
Add source ~/ros2_ws/install/setup.bash
to your ~/.bashrc
so that the workspace is sourced on each new shell, or remember to source it in each terminal when using the workspace. We will add packages to this workspace in subsequent steps.
Setting up the USB Camera with v4l2_camera
We'll use the v4l2_camera
ROS 2 package to interface with the USB camera via Video4Linux2. This package publishes images from any V4L2-compatible camera (most USB webcams) as ROS 2 image topics.
1. Install the v4l2_camera
Package
On the Raspberry Pi, install the driver node via apt:
sudo apt install ros-${ROS_DISTRO}-v4l2-camera -y
This installs the v4l2_camera
node and its dependencies. Alternatively, you could build it from source, but the binary is available for Humble.
2. Connect and Verify the Camera
Plug in the USB camera to the Pi. Verify that it's recognized by listing video devices:
ls /dev/video*
# You should see /dev/video0 (and possibly /dev/video1, etc. if multiple video capture interfaces are connected)
If /dev/video0
is present, the system sees the camera (at list one). You might also install v4l2-utils
and run v4l2-ctl --list-devices
to see the camera name and capabilities. You can also run v4l2-ctl --list-formats-ext
to see supported resolutions and formats.
3. Run the Camera Node
Launch the camera driver to start publishing images:
# In a sourced ROS 2 environment on the Pi
ros2 run v4l2_camera v4l2_camera_node
By default, this node will open /dev/video0
and start publishing images to the ~/image_raw
topic (type sensor_msgs/Image
) at a default resolution of 640x480 and pixel format YUYV converted to rgb8
(see ROS 2 camera driver for Video4Linux2 Documentation). You should see console output from the node indicating it opened the device and is streaming.
Open a new terminal (with ROS sourced) on the Pi (or from a laptop connected to the ROS 2 network) and verify images are coming through, e.g., by running rqt_image_view
:
sudo apt install ros-${ROS_DISTRO}-rqt-image-view -y # if not installed
ros2 run rqt_image_view rqt_image_view
In rqt_image_view
, select /image_raw
to view the camera feed. This confirms the camera setup is working.
Note: You can adjust parameters by remapping or via ROS 2 parameters, e.g., to change resolution or device:
ros2 run v4l2_camera v4l2_camera_node --ros-args -p image_size:="[1280,720]" -p video_device:="/dev/video0"
This would set the camera to 1280x720 resolution (if supported).
Deploying a Lightweight YOLOv5 Object Detection Node
Next, we set up an object detection node to analyze the camera images and output metadata (object_detected
and confidence_score
). We'll use YOLOv5n (Nano) - the smallest YOLOv5 model ("only" 1.9 million parameters) which is ideal for resource-constrained devices (see releases at ultralytics/yolov5). We will run inference using the ONNX Runtime, which allows running the model without needing the full PyTorch framework on the Pi.
1. Install ONNX Runtime and Dependencies
On the Raspberry Pi, install the ONNX Runtime Python package and OpenCV (for image processing):
pip install onnxruntime opencv-python
(If pip
isn't available, use sudo apt install python3-pip
to install it. You may also install numpy
if not already present, as ONNX Runtime will likely need it.)
2. YOLOv5n ONNX Model
We need the YOLOv5n model in ONNX format. To do that, we can clone the YOLOv5 repository on a more powerful machine than the Pi and export the model:
# On a PC or via Colab:
git clone https://github.com/ultralytics/yolov5.git
cd yolov5
pip install -r requirements.txt # includes PyTorch
python export.py --weights yolov5n.pt --include onnx
This will create yolov5n.onnx
. Transfer that file to your Raspberry Pi (e.g., via SCP).
For this tutorial, assume yolov5n.onnx
is now on the Raspberry Pi (e.g., placed in ~/ros2_ws/src
).
3. Create a ROS 2 Package for the YOLO Node (optional)
You can integrate the YOLO inference in the same package as rosbag2reduct
, but for modularity, let's create a separate ROS 2 Python package called yolo_detector
.
In the workspace src directory, run:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python yolo_detector --dependencies rclpy sensor_msgs std_msgs
This will create a yolo_detector
folder with a Python package structure. Edit yolo_detector/package.xml
to add dependencies for opencv-python
and onnxruntime
(since these are non-ROS dependencies, we list them for documentation; you might use pip
in the installation step rather than rosdep). For example, inside <exec_depend>
tags, add:
<exec_depend>onnxruntime</exec_depend>
<exec_depend>opencv-python</exec_depend>
4. Implement the YOLO Detection Node
Create a file yolo_detector/yolo_detector/yolo_node.py
with the following content:
YoloDetectorNode (Python code)
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String, Float32
import cv2
import numpy as np
import onnxruntime as ort
class YoloDetectorNode(Node):
def __init__(self):
super().__init__('yolo_detector')
# Load the YOLOv5n ONNX model
model_path = '/path/to/yolov5n.onnx' # TODO: update to actual path
self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
self.get_logger().info(f"Loaded model {model_path}")
# Get model input details for preprocessing
model_inputs = self.session.get_inputs()
self.input_name = model_inputs[0].name
self.input_shape = model_inputs[0].shape # e.g., [1, 3, 640, 640]
self.img_height = self.input_shape[2]
self.img_width = self.input_shape[3]
# Subscribers and publishers
self.subscription = self.create_subscription(Image, '/image_raw', self.image_callback, 10)
self.pub_object = self.create_publisher(String, 'object_detected', 10)
self.pub_conf = self.create_publisher(Float32, 'confidence_score', 10)
# If the model requires normalization factors or specific transformations, define them:
self.mean = np.array([0.0, 0.0, 0.0]) # YOLOv5 models assume 0-255 input, no mean subtraction
self.std = np.array([255.0, 255.0, 255.0]) # we'll scale 0-1 later by dividing by 255
def image_callback(self, msg: Image):
# Convert ROS Image message to OpenCV format (BGR array)
# Assuming msg.encoding is 'rgb8' as provided by v4l2_camera default output
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, -1)
# Convert RGB to BGR as YOLO model might expect BGR input (depending on training)
img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# Resize and pad image to model input shape (letterboxing if needed)
input_img = cv2.resize(img_bgr, (self.img_width, self.img_height))
# Convert to float32 and normalize 0-1
input_img = input_img.astype('float32') / 255.0
# transpose to [channels, height, width]
input_blob = np.transpose(input_img, (2, 0, 1))
input_blob = np.expand_dims(input_blob, axis=0) # shape [1,3,H,W]
# Run inference
outputs = self.session.run(None, {self.input_name: input_blob})
# Parse outputs to find the highest confidence detection (for simplicity)
# YOLOv5 ONNX output typically includes [1, num_boxes, 85] array (for COCO: 4 box coords, 1 objness, 80 class scores)
detections = outputs[0]
# Filter by confidence threshold (e.g., 0.5)
conf_threshold = 0.5
best_label = "none"
best_conf = 0.0
if detections is not None:
for det in detections[0]:
obj_conf = det[4]
class_conf = det[5:] # class confidences
score = obj_conf * np.max(class_conf)
class_id = np.argmax(class_conf)
if score > conf_threshold and score > best_conf:
best_conf = float(score)
best_label = str(class_id) # or use a class id->name mapping
# Publish results
self.pub_object.publish(String(data=best_label))
self.pub_conf.publish(Float32(data=best_conf))
self.get_logger().info(f"Detected: {best_label} ({best_conf:.2f})")
def main(args=None):
rclpy.init(args=args)
node = YoloDetectorNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Explanation: This node subscribes to the camera images (/image_raw
), processes each frame through the YOLOv5n model, and publishes two topics:
object_detected
(std_msgs/String): the class label (or ID) of the primary detected object (or"none"
if none above threshold).confidence_score
(std_msgs/Float32): the confidence score of that detection.
For simplicity, we took the detection with highest confidence above a threshold. In a real scenario, you probably output multiple detections or more info (bounding boxes, etc.), but we only need metadata for this tutorial.
Make sure to adjust the model_path
to the actual location of your yolov5n.onnx
. Also note that without class name mapping, best_label
is currently the class index (as string). You can map this index to an actual label (e.g., using the COCO class list below).
COCO Class List
names:
0: person
1: bicycle
2: car
3: motorcycle
4: airplane
5: bus
6: train
7: truck
8: boat
9: traffic light
10: fire hydrant
11: stop sign
12: parking meter
13: bench
14: bird
15: cat
16: dog
17: horse
18: sheep
19: cow
20: elephant
21: bear
22: zebra
23: giraffe
24: backpack
25: umbrella
26: handbag
27: tie
28: suitcase
29: frisbee
30: skis
31: snowboard
32: sports ball
33: kite
34: baseball bat
35: baseball glove
36: skateboard
37: surfboard
38: tennis racket
39: bottle
40: wine glass
41: cup
42: fork
43: knife
44: spoon
45: bowl
46: banana
47: apple
48: sandwich
49: orange
50: broccoli
51: carrot
52: hot dog
53: pizza
54: donut
55: cake
56: chair
57: couch
58: potted plant
59: bed
60: dining table
61: toilet
62: tv
63: laptop
64: mouse
65: remote
66: keyboard
67: cell phone
68: microwave
69: oven
70: toaster
71: sink
72: refrigerator
73: book
74: clock
75: vase
76: scissors
77: teddy bear
78: hair drier
79: toothbrush
Also, update setup.py
entry points to include our node script:
setup(
...
entry_points={
'console_scripts': [
'yolo_node = yolo_detector.yolo_node:main',
],
},
)
5. Build and Run the YOLO Node
Add onnxruntime
and opencv-python
to your workspace's requirements (you might include them in a requirements.txt
for the package and use pip
to install, since they are pip packages). For now, ensure they are installed via pip as done earlier.
Build the workspace:
cd ~/ros2_ws
colcon build --packages-select yolo_detector
source install/local_setup.bash
Run the YOLO detection node in a new terminal on the Pi:
ros2 run yolo_detector yolo_node.py
You should see log output from the node whenever it processes an image (every frame or at least when something is detected, depending on your logging). The node will publish messages on object_detected
and confidence_score
topics.
You can echo these topics in another terminal to verify:
ros2 topic echo /object_detected
ros2 topic echo /confidence_score
For example, if a person is detected with 85% confidence. You should see messages like:
object: "0"
confidence: 0.85
Now we have a camera streaming images and a detector outputting metadata. Next, we'll create the rosbag2reduct
package to record these data and handle file rotation and uploading to ReductStore.
Creating the rosbag2reduct
Package
Our rosbag2reduct
package will be a ROS 2 node that does the following:
- Subscribe to the relevant topics (e.g.
/image_raw
,object_detected
,confidence_score
). - Record those topics into a bag file using the rosbag2 Python API (in MCAP format).
- Label each bag file with the latest detection metadata.
- After a fixed time interval (bag rotation period), close the current bag, then upload it to ReductStore with Python client SDK, including the metadata as labels, and delete the local bag file.
- Start a new bag file and repeat.
This is not optimal (writing to disk then uploading) and is done for simplicity. In a real scenario, you should stream directly to ReductStore or separate storage topics by type. More on this in the Best Practices and Further Improvements section.
1. Create the Package Structure
Run the ROS 2 package creation command:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python rosbag2reduct --dependencies rclpy rosbag2_py sensor_msgs std_msgs
This creates a new folder rosbag2reduct
with a basic Python package setup. Update rosbag2reduct/package.xml
to include <exec_depend>reduct-py</exec_depend>
(ReductStore's Python SDK) since we'll use that. Also make sure rosbag2_py
is listed as a dependency (the above command included it). In setup.py
, add an entry point for our main node if desired (though we can also run the Python file directly with ros2 run as long as it's installed).
After creation, the structure should look like:
ros2_ws/src/rosbag2reduct/
├── package.xml
├── setup.cfg
├── setup.py
├── resource/
│ └── rosbag2reduct
└── rosbag2reduct
├── __init__.py
└── recorder_node.py # (we will create this)
2. Install ReductStore Python SDK and MCAP Support
Before coding, install the ReductStore client library (reduct-py
) in your Python environment:
pip install reduct-py
This gives us the reduct
module for interacting with a ReductStore server. (We will set up the actual ReductStore server on the Pi soon, but we can write the code first.)
Also, install the MCAP storage format support for rosbag2:
sudo apt install ros-${ROS_DISTRO}-rosbag2-storage-mcap
3. Implementing the rosbag2reduct
Node
Open a new file rosbag2reduct/recorder_node.py
and add the following code from the snippet below. This code defines the Rosbag2ReductNode
class, which is a ROS 2 node that subscribes to the camera images and metadata topics, records them into bag files, and uploads the bag files to ReductStore with metadata labels.
Rosbag2ReductNode (Python code)
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32
from sensor_msgs.msg import Image
from rclpy.serialization import serialize_message
from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions, TopicMetadata
import os
import shutil
import datetime
import asyncio
from reduct import Client, Bucket
class Rosbag2ReductNode(Node):
def __init__(self):
super().__init__('rosbag2reduct')
self.bag_duration = 60.0
self.storage_id = 'mcap' # Or 'sqlite3'
self.current_bag_index = 0
self.writer = None
self.current_bag_name = self._open_new_bag()
self.create_subscription(Image, '/image_raw', self._image_callback, 10)
self.create_subscription(String, 'object_detected', lambda msg: self._metadata_callback(msg, 'object_detected'), 10)
self.create_subscription(Float32, 'confidence_score', lambda msg: self._metadata_callback(msg, 'confidence_score'), 10)
self.last_object_detected = "none"
self.last_confidence = 0.0
self.create_timer(self.bag_duration, self._rotate_bag_timer)
self.reduct_url = "http://127.0.0.1:8383"
self.bucket_name = "pi_robot"
self.entry_name = "rosbags"
self.reduct_client = Client(self.reduct_url)
asyncio.get_event_loop().run_until_complete(self._ensure_bucket())
self.get_logger().info(
f"rosbag2reduct node initialized, writing to {self.storage_id} bags and uploading to ReductStore bucket '{self.bucket_name}'.")
def _open_new_bag(self):
self.writer = SequentialWriter()
bag_name = f"rosbag_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.current_bag_index}"
storage_options = StorageOptions(uri=bag_name, storage_id=self.storage_id)
converter_options = ConverterOptions('', '')
self.writer.open(storage_options, converter_options)
self.writer.create_topic(TopicMetadata(
name='/image_raw',
type='sensor_msgs/msg/Image',
serialization_format='cdr'))
self.writer.create_topic(TopicMetadata(
name='object_detected',
type='std_msgs/msg/String',
serialization_format='cdr'))
self.writer.create_topic(TopicMetadata(
name='confidence_score',
type='std_msgs/msg/Float32',
serialization_format='cdr'))
self.writer.create_topic(TopicMetadata(
0,
'/image_raw',
'sensor_msgs/msg/Image',
'cdr'
))
self.writer.create_topic(TopicMetadata(
0,
'object_detected',
'std_msgs/msg/String',
'cdr'
))
self.writer.create_topic(TopicMetadata(
0,
'confidence_score',
'std_msgs/msg/Float32',
'cdr'
))
self.get_logger().info(f"Opened new bag: {bag_name}")
return bag_name
def _image_callback(self, msg: Image):
self.writer.write('/image_raw', serialize_message(msg), self.get_clock().now().nanoseconds)
def _metadata_callback(self, msg, topic_name):
if topic_name == 'object_detected':
self.last_object_detected = msg.data
self.writer.write('object_detected', serialize_message(msg), self.get_clock().now().nanoseconds)
elif topic_name == 'confidence_score':
self.last_confidence = msg.data
self.writer.write('confidence_score', serialize_message(msg), self.get_clock().now().nanoseconds)
def _rotate_bag_timer(self):
object_label = self.last_object_detected
confidence_val = float(self.last_confidence)
old_bag_name = self.current_bag_name
self.current_bag_index += 1
self.current_bag_name = self._open_new_bag()
self.get_logger().info(f"Uploading bag {old_bag_name} with metadata: object={object_label}, confidence={confidence_val:.2f}")
asyncio.get_event_loop().run_until_complete(
self._upload_bag_file(old_bag_name, object_label, confidence_val))
# Clean up the bag directory if a file was uploaded
bag_dir = old_bag_name
if os.path.isdir(bag_dir):
try:
shutil.rmtree(bag_dir)
self.get_logger().info(f"Deleted bag directory {bag_dir} after upload.")
except Exception as e:
self.get_logger().warn(f"Could not delete directory {bag_dir}: {e}")
else:
self.get_logger().warn(f"Bag directory {bag_dir} not found.")
async def _ensure_bucket(self):
await self.reduct_client.create_bucket(self.bucket_name, exist_ok=True)
async def _upload_bag_file(self, bag_name, object_label, confidence_val):
bag_dir = bag_name
mcap_file = None
# Find the first .mcap file in the bag directory
try:
for filename in os.listdir(bag_dir):
if filename.endswith(".mcap"):
mcap_file = os.path.join(bag_dir, filename)
break
except FileNotFoundError:
self.get_logger().error(f"Bag directory {bag_dir} does not exist.")
return
if not mcap_file:
self.get_logger().warn(f"No .mcap file found in {bag_dir} — skipping upload.")
return
try:
with open(mcap_file, 'rb') as f:
data = f.read()
except Exception as e:
self.get_logger().error(f"Error reading {mcap_file}: {e}")
return
bucket: Bucket = await self.reduct_client.get_bucket(self.bucket_name)
timestamp = datetime.datetime.utcnow().isoformat() + "Z"
labels = {"object": object_label, "confidence": str(confidence_val)}
await bucket.write(self.entry_name, data, timestamp=timestamp, labels=labels)
self.get_logger().info(f"Uploaded {mcap_file} to ReductStore with labels {labels}.")
def main(args=None):
rclpy.init(args=args)
node = Rosbag2ReductNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
try:
rclpy.shutdown()
except Exception:
pass
Let's break down key parts of this implementation:
- We use rosbag2's Python API:
SequentialWriter
to record messages. We specify MCAP as the storage format when opening the bag. We explicitly register three topics (/image_raw
,object_detected
,confidence_score
) with their message types, so we can write to them. In each subscription callback, we callself.writer.write(topic_name, serialize_message(msg), timestamp)
to append to the bag. - We maintain
last_object_detected
andlast_confidence
variables to store the most recent detection metadata. The_metadata_callback
updates these whenever a message on those topics arrives, and writes the message to the bag as well. - A ROS timer triggers
_rotate_bag_timer()
everyself.bag_duration
seconds (e.g., every 60 seconds). This function closes the current bag and opens a new one (by calling_open_new_bag()
which increments a bag index and starts a new file). We then proceed to upload the bag file (that we just closed) to ReductStore. - ReductStore upload: Using
reduct-py
, we ensure a bucket namedpi_robot
exists in ReductStore (which we assume is running locally on the Pi at127.0.0.1:8383
). We then read the bag file into memory and usebucket.write()
to store it in the bucket under therosbags
entry. We label each record with the detection metadata (e.g.,labels={"object": "1", "confidence": "0.85"}
). - After a successful upload, we delete the local
.mcap
file to save space on the Pi (since it's now stored in ReductStore).
This is a basic implementation. In a real-world scenario, you will likely want to handle prediction and timestamping differently, add more metadata, and of course use a more efficient way to upload data directly to ReductStore.
4. Build the rosbag2reduct
Package
Ensure that in setup.py
of rosbag2reduct
you have an entry point if needed, e.g.:
entry_points={
'console_scripts': [
'rosbag2reduct = rosbag2reduct.recorder_node:main'
],
},
This will allow us to run ros2 run rosbag2reduct rosbag2reduct
to launch the node. Now build the package:
cd ~/ros2_ws
colcon build --packages-select rosbag2reduct
source install/local_setup.bash
If everything compiles (installs) without errors, we're ready to run the system.
Installing and Configuring ReductStore on the Raspberry Pi
Before running the rosbag2reduct
node, we need a ReductStore server running on the Pi to accept uploads. ReductStore is a lightweight time-series object storage, perfect for edge devices. We will install it on the Pi and create a bucket for our bag files.
1. Install ReductStore on Raspberry Pi
The easiest way on Ubuntu is to use snap or Docker. We'll use snap for simplicity:
# On Raspberry Pi
sudo apt update
sudo apt install snapd -y # if snapd is not already installed
sudo snap install reductstore
This will install ReductStore from the Snap Store. The snap should set up ReductStore as a service listening on port 8383 by default. (If using a different OS or if snap isn't desired, you can use Docker: e.g., docker run -d -p 8383:8383 -v ~/reduct_data:/data reduct/store:latest
to run ReductStore in a container.)
Note: The database will listen on
http://0.0.0.0:8383
(accessible to the LAN). Ensure this port is allowed through any firewall if you want external access.
Check that ReductStore is running:
sudo snap services reductstore # should show active
curl http://127.0.0.1:8383/api/v1/info
The curl
command should return JSON info about the instance (like version, uptime, etc.).
2. (Optional) Configure ReductStore
By default, ReductStore doesn't require authentication for local use (anonymous access is allowed). This is fine for our edge scenario on a local network. If you want to set up access tokens or adjust storage quotas, you can do so by following the Access Control documentation. For now, we'll use defaults.
We will use the Web Console to verify data and to set up replication later. The web console is accessible from a browser at the server's address (it's the same as the API endpoint). For example, on the Pi, open http://<raspberrypi_ip>:8383
in a browser - you should see the ReductStore Web Console interface (a simple GUI).
3. Create a Bucket for ROS bag data
Our rosbag2reduct
code will attempt to create a bucket named "pi_robot"
. We called create_bucket("pi_robot", exist_ok=True)
in the code, so the bucket will be created on first run if it doesn't exist. You can also create it manually via the web console:
- Navigate to Buckets and create a new bucket named “pi_robot”. (You can set a quota, e.g., a FIFO quota to avoid filling up the disk on the Pi.)
ReductStore Web Console: Creating a bucket named "pi_robot" for storing ROS bags.
Now, ReductStore is set up on the Pi and ready to accept data. We can run the complete system to record ROS data, detect objects, and upload to ReductStore.
Running the Complete System
We have three ROS 2 nodes to run on the Raspberry Pi:
- The camera driver (
v4l2_camera_node
) - The YOLO detection node (
YoloDetectorNode
) - The rosbag2reduct recorder/uploader node (
Rosbag2ReductNode
)
It's best to run each in its own terminal (or use a launch file to launch them together). For clarity, we'll do it step-by-step:
Terminal 1: Camera node
# Terminal 1 on Pi (source ROS 2 and workspace)
ros2 run v4l2_camera v4l2_camera_node
Terminal 2: YOLO detection node
# Terminal 2 on Pi (source ROS 2 and workspace)
ros2 run yolo_detector yolo_node.py
(If you set up the entry point, you could do ros2 run yolo_detector yolo_detector
or similar, but here we assume running the script directly.)
Terminal 3: rosbag2reduct recorder node
# Terminal 3 on Pi (source ROS 2 and workspace)
ros2 run rosbag2reduct rosbag2reduct
(This uses the console script entry point we defined. Alternatively ros2 run rosbag2reduct recorder_node.py
if not configured as an entry point.)
Now monitor the outputs:
- The camera node should just stream (no text output unless error or warning).
- The YOLO node will log detections (as we coded with
get_logger().info
on each detection). - The rosbag2reduct node will log bag rotations and uploads. For example, you should see logs like “Opened new bag: rosbag20230325_101500_0” and later “Uploading bag 0 with metadata: object=person, confidence=0.85” then “Uploaded rosbag... to ReductStore with labels ...” etc.
Let this run for a while. By default, every 60 seconds it will finalize a bag and upload it. If you want to trigger a rotation sooner (for testing), you could reduce bag_duration
or even manually call the rotation function (not easily via ROS, unless you expose a service but it's not implemented here).
1. Verify Data in ReductStore
On the Pi (or from any machine that can access the Pi's port 8383), open the ReductStore Web Console in a browser: http://<raspberrypi_ip>:8383
. You should see the bucket “pi_robot”. Click it to see the list of entries.
Each uploaded bag file appears as a record in the rosbags
entry of the pi_robot
bucket. The record name is the bag file name, and you should be able to see its labels, e.g., object: 1
and confidence: 0.85
attached to that record.. You can also see the timestamp of each record (when it was uploaded).
You have successfully set up the edge device to capture ROS data and push it to ReductStore with metadata labels!
Setting up Replication to a Central ReductStore (Laptop)
With data being collected on the Pi, we likely want to aggregate it on a central server (e.g., your laptop or a cloud instance) for analysis or long-term storage.
ReductStore's replication feature allows the Pi (source) to push new records to another ReductStore instance (destination) in real-time, filtering by labels so that, for example, only important events (e.g., specific objects or high-confidence detections) are sent for long-term storage.
Replication lets you stream only the data you need from the edge to the cloud or between edge devices. You can filter by label and push data without constant polling. If the device is offline or the destination is down, the data waits and replicates later.
In this section, we'll:
- Run ReductStore on the laptop.
- Use the ReductStore Web Console to create a replication task on the Pi's instance that filters and forwards data to the laptop's instance based on labels.
- Verify that replication works.
1. Install/Run ReductStore on Laptop
On your laptop (assuming Ubuntu 22.04 or any system with Docker or Snap):
-
Option A: Docker - run ReductStore in a container:
docker run -d -p 8383:8383 -v ~/reduct_data:/data reduct/store:latest
This runs ReductStore locally on port 8383 and stores data in
~/reduct_data
on your laptop. -
Option B: Native - you could similarly install via Snap (
sudo snap install reductstore
) or use a binary. Docker is quick and easy.
After starting it, ensure you can access it:
curl http://127.0.0.1:8383/api/v1/info
should return info as before (but for the laptop's instance).
Open the web console on the laptop: http://localhost:8383
and keep it open for monitoring.
Create a bucket named “pi_robot” on the laptop as well, otherwise the replication task will fail (it needs the destination bucket to exist).
Use provisioning to automate bucket creation and other setup steps in a real deployment. See Configuration/Provisioning Documentation for more.
Networking: Make sure your laptop is accessible from the Pi. If both are on the same LAN, you might use the laptop's IP (e.g., 192.168.x.x). If the laptop's ReductStore is in Docker, ensure the port 8383 is open (it is published in the run command above). For testing, you might temporarily disable firewall or ensure port 8383 is allowed.
Find your laptop's IP address (e.g., hostname -I
on Linux) - let's say it's 192.168.1.100
for example.
2. Configure Replication on the Pi via Web Console
On the Pi's web console (http://<raspberrypi_ip>:8383
), look for “Replications” and the “+” to add replication). We want to create a replication task that sends data to the laptop.
Fill in the replication settings as:
- Source Bucket: pi_robot (on Pi)
- Destination URL:
http://<laptop_ip>:8383
(e.g.,http://192.168.1.100:8383
) - Destination Bucket: pi_robot (on laptop)
- Replication Name: (give it a name like “to_laptop”)
- Entries:
rosbags
(or whatever entry name you used inbucket.write()
), or leave blank to replicate all entries. - Filter Records: add the "Exclude" filter rule for
object
equals"none"
.
Start the replication task. The Pi's ReductStore will now start forwarding new records that meet the criteria to the laptop, in real-time. It's a push model from Pi to laptop, so the laptop doesn't need to know about the Pi or poll it - the Pi will push new records as they arrive.
ReductStore Web Console: Setting up a replication task to forward data to a central storage.
3. Test Replication
Back on the Pi, ensure the rosbag2reduct
node is still running and creating new records. When the next bag file is uploaded on the Pi, if it meets the replication filter conditions, the Pi's ReductStore replication task will immediately send it to the laptop.
On the laptop's web console, open the “pi_robot” bucket. You should start seeing records appear that correspond to those on the Pi (with a slight delay for transfer). The labels should also be present. If you configured a filter (e.g., confidence > 0.8), try to produce a detection above that confidence on the Pi (point the camera at an easily recognized object or adjust threshold on the Pi code for testing).
Records not meeting the condition will stay only on the Pi and eventually be overwritten if Pi's FIFO quota is set.
You can also check the replication status on the Pi's web console; it may show last replicated record timestamp etc., indicating it's working.
At this point, we have a robust pipeline:
- Raspberry Pi captures camera data and detection metadata.
rosbag2reduct
segments the data into time-based bags, labels them, and pushes to local storage.- ReductStore on Pi retains recent data and automatically forwards critical data to the laptop's ReductStore based on labels.
- The laptop accumulates the forwarded data in its own ReductStore bucket, which you can browse or integrate with other systems.
Best Practices and Further Improvements
This tutorial only scratches the surface of building a robust data acquisition and storage pipeline for robotics. Here are some best practices and further improvements to consider:
📊 Separate Storage Topics by Types
Separate storage streams for different topic categories (e.g., telemetry vs. raw sensor data) to optimise storage and retrieval. This allows you to apply different retention policies, access controls, or replication rules to each category.
- 📖 See 3 Ways to Store ROS Topics
- 📘 Example: How to Store Images in ROS 2
Here are some common data categories and their characteristics to consider:
Category | Examples | Characteristics |
---|---|---|
Lightweight telemetry | GPS, IMU, joint states, system status | Low bandwidth, near real-time, useful for business analytics |
Downsampled sensor data | Lower framerate/resolution camera or lidar data | Mid-size, great for monitoring and incident triage |
Full-resolution data | Raw camera frames, high-fps lidar, depth maps | High volume (up to 1TB/hour), needed for debugging or model retraining |
🗜️ Compress High-Bandwidth Topics
We can se ROS 2's built-in compression mechanisms for image topics and large messages to reduce bandwidth and storage costs.
- 🖼️ Example: Use
/image_raw/compressed
instead of raw image streams
🔐 Use Token-Based Authentication
This is especially relevant for cloud storage or remote ReductStore instances. Use access tokens to secure data transfers and prevent unauthorized access.
⚙️ Use Non-Blocking Operations
When uploading large files or performing storage operations, use non-blocking (asynchronous) methods to avoid freezing your ROS 2 nodes. This keeps your system responsive and prevents dropped messages or missed frames.
📉 Combine Downsampling with Replication
This is typical in ELT (Extract-Load-Transform) pipelines. The idea is to save everything locally (on the robot) at high resolution and framerate, then stream part of the data to the cloud or a central server at a lower resolution or lower frequency.
- ➡️ Example: Store 1 FPS video in cloud, keep 30 FPS original on robot SSD
- 🔄 See example in: Building a Data Acquisition System for Manufacturing
❄️ Offload Data to Cold Storage
For long-term archiving, consider offloading data to cold storage to reduce costs while keeping data accessible.
- ⚖️ Example: Keep 30 days of data locally, archive older data to Google Cloud Storage.
- 📖 Guide: Deploy on ReductStore Cloud
Conclusion
In this tutorial, we set up a complete pipeline on a Raspberry Pi running ROS 2 Humble that captures camera data and AI output, records it to MCAP bag files with a custom Python node, and automatically offloads those files to a time-series object store. We added labels to each record (object ID detected and YOLO's confidence level). We configured replication to forward labeled data to a central instance on a laptop, filtering by labels to reduce bandwidth - for example, forwarding only "interesting" events (excluding "none" detections).
The end-to-end system demonstrates how to build a robust data logging and centralized storage solution for robotics. Of course, this tutorial must be adapted to your specific use case and environment. Keep in mind that the pipeline is highly customizable and can be adapted to different scenarios (separating topics by type, using specific filters, adding more metadata, etc.).
We hope this end-to-end tutorial helps you build your own ROS 2 data acquisition system. Happy hacking!
Thanks for reading, I hope this article will help you choose the right storage strategy for your vibration data. If you have any questions or comments, feel free to use the ReductStore Community Forum.