Source code for rofunc.devices.mmodal.sync

import sys
import datetime

import numpy as np
import pandas as pd

from rofunc.devices.xsens.process import load_mvnx
from rofunc.devices.zed.export import progress_bar
from pytz import timezone


# 1. get the time of each device
[docs]def get_optitrack_time(optitrack_input_path, time_reference='PM', time_zone='Etc/GMT-8'): ''' Args: optitrack_time_path: the path of optitrack csv file time_reference: the time reference of optitrack csv file, 'PM' or 'AM' time_zone: the time zone of optitrack csv file, check your timezone name by pytz.all_timezones Returns: the time list of optitrack (np.array) ''' # obtain the time of the first frame of optitrack first_columns = pd.read_csv(optitrack_input_path, nrows=0) optitrack_start_time = first_columns.columns[11][:23] year = int(optitrack_start_time[:4]) month = int(optitrack_start_time[5:7]) day = int(optitrack_start_time[8:10]) if time_reference == 'PM': hour = int(optitrack_start_time[11:13]) + 12 elif time_reference == 'AM': hour = int(optitrack_start_time[11:13]) else: raise ValueError('time_reference should be PM or AM') minute = int(optitrack_start_time[14:16]) second = int(optitrack_start_time[17:19]) millisecond = int(optitrack_start_time[20:23]) tz = timezone(time_zone) init_unix_time = datetime.datetime(year, month, day, hour, minute, second, millisecond * 1000) init_unix_time = init_unix_time.replace(tzinfo=tz).astimezone(timezone('UTC')) init_unix_time = int(init_unix_time.timestamp() * 1000) # obtain the time of each frame of optitrack timestamp_dataframe = pd.read_csv(optitrack_input_path, skiprows=6, usecols=['Time (Seconds)']) optitrack_timestamp = timestamp_dataframe.to_numpy() optitrack_timestamp = np.squeeze(optitrack_timestamp).tolist() optitrack_time = [int(init_unix_time + i * 1000) for i in optitrack_timestamp] return np.array(optitrack_time)
[docs]def get_xsens_time(mvnx_input_path): ''' Args: mvnx_path: the path of xsens mvnx file init_unix_time: the time of the first frame of xsens (millisecond) Returns: the time list of xsens (np.array) ''' mvnx_file = load_mvnx(mvnx_input_path) xsens_time = [int(i) for i in mvnx_file.file_data['frames']['time']] init_unix_time = mvnx_file.file_data['meta_data']['start_time'] init_unix_time = int(init_unix_time) xsens_time = [init_unix_time + i for i in xsens_time] return np.array(xsens_time)
[docs]def get_zed_time(svo_input_path): import pyzed.sl as sl # Specify SVO path parameter init_params = sl.InitParameters() init_params.set_from_svo_file(str(svo_input_path)) init_params.svo_real_time_mode = False # Don't convert in realtime init_params.coordinate_units = sl.UNIT.MILLIMETER # Use milliliter units (for depth measurements) # Create ZED objects zed = sl.Camera() # Open the SVO file specified as a parameter err = zed.open(init_params) if err != sl.ERROR_CODE.SUCCESS: sys.stdout.write(repr(err)) zed.close() exit() rt_param = sl.RuntimeParameters() rt_param.sensing_mode = sl.SENSING_MODE.FILL nb_frames = zed.get_svo_number_of_frames() zed_timelist = [] while True: if zed.grab(rt_param) == sl.ERROR_CODE.SUCCESS: svo_position = zed.get_svo_position() zed_time = zed.get_timestamp(sl.TIME_REFERENCE.IMAGE) zed_time = zed_time.get_milliseconds() zed_timelist.append(zed_time) progress_bar((svo_position + 1) / nb_frames * 100, 30) # Check if we have reached the end of the video if svo_position >= (nb_frames - 1): # End of SVO sys.stdout.write("\nSVO end has been reached. Exiting now.\n") break elif zed.grab(rt_param) == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: sys.stdout.write("\nSVO end has been reached. Exiting now.\n") break return np.array(zed_timelist)
# time sync: get synced index table of each device
[docs]def data_sync(optitrack_input_path, mvnx_input_path, svo_input_path): zed_time_array = get_zed_time(svo_input_path) xsens_time_array = get_xsens_time(mvnx_input_path) optitrack_time_array = get_optitrack_time(optitrack_input_path) xsens_index_list = [] optitrack_index_list = [] for zed_time in zed_time_array: xsens_index_list.append((np.abs(zed_time - xsens_time_array)).argmin()) optitrack_index_list.append((np.abs(zed_time - optitrack_time_array)).argmin()) return xsens_index_list, optitrack_index_list