Unable to Successfully Write ROS2 Bag File
Description: I am encountering an issue while attempting to manipulate ROS2 bag files. Specifically, my workflow involves reading a ROS2 bag file, extracting all its topics, selecting a single topic, and then writing the selected topic into another ROS2 bag file. However, despite following this process, I am unable to successfully write the data into the new bag file.
Actual Behavior: The data writing process fails to complete successfully. Instead, it partially executes by creating a folder with a metadata.yaml file and a db3 file, yet it does not write the intended data into the bag file.
ROS2 Version: Humble Operating System: Ubuntu 22.04
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CameraInfo
import os
from rosbags.rosbag2 import Reader, Writer, WriterError
from rosbags.serde import deserialize_cdr, serialize_cdr
from rosbags.rosbag2 import Writer
from rosbags.typesys.types import sensor_msgs__msg__CameraInfo as CameraInfo
from ament_index_python.packages import get_package_share_directory
import yaml
import shutil
class ReadWriteBag(Node):
def __init__(self):
super().__init__("read_write_bag_node")
# Load the configuration file
config = self.load_config()
output_bag_folder = '/home/bagfolder'
TOPIC = '/flir_boson/camera_info'
# Remove the existing output bag folder, if it exists
if os.path.exists(output_bag_folder):
shutil.rmtree(output_bag_folder)
# Open the input bag file for reading
with Reader(config['rosbag']['file_location']) as reader:
# Register connections before writing
for connection in reader.connections:
print(f"Connection ID: {connection.id}, Topic: {connection.topic}, Message Type: {connection.msgtype}")
# write the TOPIC into another bag file
with Writer(output_bag_folder) as writer:
writer.open()
conn = writer.add_connection(TOPIC, CameraInfo.__msgtype__)
for rawdata in reader.messages():
msg = deserialize_cdr(rawdata, connection.msgtype)
writer.write(conn, serialize_cdr(msg, msg.__msgtype__))
def load_config(self):
package_name = 'py_testing_pkg'
config_file = os.path.join(get_package_share_directory(package_name), 'config', 'camera_config.yaml')
with open(config_file, 'r') as stream:
config = yaml.safe_load(stream)
return config
def main(args=None):
rclpy.init(args=args)
node = ReadWriteBag()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if rclpy.ok():
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Any assistance or insights into resolving this problem would be greatly appreciated. Thank you