import enum
import multiprocessing
import os
import sys
import cv2
import numpy as np
import rofunc as rf
[docs]class AppType(enum.Enum):
LEFT_AND_RIGHT = 1
LEFT_AND_DEPTH = 2
LEFT_AND_DEPTH_16 = 3
[docs]def progress_bar(percent_done, bar_length=50):
done_length = int(bar_length * percent_done / 100)
bar = '=' * done_length + '-' * (bar_length - done_length)
sys.stdout.write('[%s] %f%s\r' % (bar, percent_done, '%'))
sys.stdout.flush()
[docs]def export(filepath, mode=1):
"""
Export the svo file with specific mode.
Args:
filepath: SVO file path (input) : path/to/file.svo
mode: Export mode: 0=Export LEFT+RIGHT AVI.
1=Export LEFT+DEPTH_VIEW AVI.
2=Export LEFT+RIGHT image sequence.
3=Export LEFT+DEPTH_VIEW image sequence.
4=Export LEFT+DEPTH_16Bit image sequence.
Returns:
"""
import pyzed.sl as sl
# Get input parameters
svo_input_path = filepath
root_path = filepath.split('.svo')[0]
output_dir = root_path + '_export'
rf.oslab.create_dir(output_dir)
output_as_video = True
if mode == 0:
app_type = AppType.LEFT_AND_RIGHT
mode_name = 'left_and_right.avi'
elif mode == 1:
app_type = AppType.LEFT_AND_DEPTH
mode_name = 'left_and_depth.avi'
elif mode == 2:
app_type = AppType.LEFT_AND_RIGHT
mode_name = 'left_and_right_img_seq'
elif mode == 3:
app_type = AppType.LEFT_AND_DEPTH
mode_name = 'left_and_depth_img_seq'
elif mode == 4:
app_type = AppType.LEFT_AND_DEPTH_16
mode_name = 'left_and_depth_16_img_seq'
else:
raise Exception('Wrong mode index, should be one of [0, 1, 2, 3, 4]')
output_path = os.path.join(output_dir, mode_name)
# Check if exporting to AVI or SEQUENCE
if mode != 0 and mode != 1:
output_as_video = False
if not output_as_video:
rf.oslab.create_dir(output_path)
# Specify SVO path parameter
init_params = sl.InitParameters()
init_params.set_from_svo_file(str(svo_input_path))
init_params.svo_real_time_mode = False # Don't convert in realtime
init_params.coordinate_units = sl.UNIT.MILLIMETER # Use milliliter units (for depth measurements)
# Create ZED objects
zed = sl.Camera()
# Open the SVO file specified as a parameter
err = zed.open(init_params)
if err != sl.ERROR_CODE.SUCCESS:
sys.stdout.write(repr(err))
zed.close()
exit()
# Get image size
image_size = zed.get_camera_information().camera_resolution
width = image_size.width
height = image_size.height
width_sbs = width * 2
# Prepare side by side image container equivalent to CV_8UC4
svo_image_sbs_rgba = np.zeros((height, width_sbs, 4), dtype=np.uint8)
# Prepare single image containers
left_image = sl.Mat()
right_image = sl.Mat()
depth_image = sl.Mat()
video_writer = None
if output_as_video:
# Create video writer with MPEG-4 part 2 codec
video_writer = cv2.VideoWriter(str(output_path),
cv2.VideoWriter_fourcc('M', '4', 'S', '2'),
max(zed.get_camera_information().camera_fps, 25),
(width_sbs, height))
if not video_writer.isOpened():
sys.stdout.write("OpenCV video writer cannot be opened. Please check the .avi file path and write "
"permissions.\n")
zed.close()
exit()
rt_param = sl.RuntimeParameters()
rt_param.sensing_mode = sl.SENSING_MODE.FILL
# Start SVO conversion to AVI/SEQUENCE
sys.stdout.write("Converting SVO to {}. Use Ctrl-C to interrupt conversion.\n".format(mode_name))
nb_frames = zed.get_svo_number_of_frames()
time_table = []
while True:
if zed.grab(rt_param) == sl.ERROR_CODE.SUCCESS:
svo_position = zed.get_svo_position()
# Retrieve SVO images
zed.retrieve_image(left_image, sl.VIEW.LEFT)
timestamp = zed.get_timestamp(sl.TIME_REFERENCE.IMAGE) # Get the image timestamp
if app_type == AppType.LEFT_AND_RIGHT:
zed.retrieve_image(right_image, sl.VIEW.RIGHT)
elif app_type == AppType.LEFT_AND_DEPTH:
zed.retrieve_image(right_image, sl.VIEW.DEPTH)
elif app_type == AppType.LEFT_AND_DEPTH_16:
zed.retrieve_measure(depth_image, sl.MEASURE.DEPTH)
if output_as_video:
# Copy the left image to the left side of SBS image
svo_image_sbs_rgba[0:height, 0:width, :] = left_image.get_data()
# Copy the right image to the right side of SBS image
svo_image_sbs_rgba[0:, width:, :] = right_image.get_data()
# Convert SVO image from RGBA to RGB
ocv_image_sbs_rgb = cv2.cvtColor(svo_image_sbs_rgba, cv2.COLOR_RGBA2RGB)
# Write the RGB image in the video
video_writer.write(ocv_image_sbs_rgb)
else:
# Generate file names
filename1 = os.path.join(output_path, "left_{}.png".format(str(svo_position).zfill(6)))
filename2 = os.path.join(output_path, "right_{}.png".format(
str(svo_position).zfill(6))) if app_type == AppType.LEFT_AND_RIGHT \
else os.path.join(output_path, "depth_{}.png".format(str(svo_position).zfill(6)))
time_table.append((filename1, str(timestamp.get_milliseconds())))
# Save Left images
cv2.imwrite(str(filename1), left_image.get_data())
if app_type != AppType.LEFT_AND_DEPTH_16:
# Save right images
cv2.imwrite(str(filename2), right_image.get_data())
else:
# Save depth images (convert to uint16)
cv2.imwrite(str(filename2), depth_image.get_data().astype(np.uint16))
# Display progress
progress_bar((svo_position + 1) / nb_frames * 100, 30)
# Check if we have reached the end of the video
if svo_position >= (nb_frames - 1): # End of SVO
sys.stdout.write("\nSVO end has been reached. Exiting now.\n")
break
elif zed.grab(rt_param) == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
sys.stdout.write("\nSVO end has been reached. Exiting now.\n")
break
with open(os.path.join(output_dir, "time_table.txt"), "w") as f:
f.write("filename,timestamp\n")
for frame, timestamp in time_table:
f.write("{},{}\n".format(frame, timestamp))
if output_as_video:
# Close the video writer
video_writer.release()
zed.close()
return 0
[docs]def parallel(z):
return export(z[0], z[1])
[docs]def export_batch(filedir, all_mode=True, mode_lst=None, core_num=10):
files = os.listdir(filedir)
pool = multiprocessing.Pool(core_num)
if all_mode and mode_lst is None:
filepaths = []
for mode in range(5):
for file in files:
if file[-3:] == 'svo':
filepaths.append((os.path.join(filedir, file), mode))
pool.map(parallel, filepaths)
elif mode_lst is not None:
filepaths = []
for mode in mode_lst:
for file in files:
if file[-3:] == 'svo':
filepaths.append((os.path.join(filedir, file), mode))
pool.map(parallel, filepaths)
else:
raise Exception('Wrong parameters')
pool.close()
pool.join()