Source code for rofunc.devices.optitrack.record
import socket
import re
import os
import threading
import numpy as np
from typing import Tuple, List
import rofunc as rf
[docs]def data_process(data: str):
"""
Args:
data: string received from optitrack win server
Returns:
position, orientation: position and orientation of the rigidbody
"""
data = data.split('ID')
print(len(data[1:]))
position_list = []
orientation_list = []
for i in data[1:]:
position = re.findall(r"Position\s*:\s*(.*)", i)[0]
orientation = re.findall(r"Orientation\s*:\s*(.*)", i)[0]
position_list.append(eval(position))
orientation_list.append(eval(orientation))
return position_list, orientation_list
[docs]def opti_run(root_dir: str, exp_name: str, ip: str, port: int) -> None:
"""
Args:
root_dir: root dictionary
exp_name: dictionary saving the npy file, named according to time
ip: ip address
port: port
Returns:
None
"""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((ip, port))
print("Connected to socket: {}:{}".format(ip, port))
opti_data = np.array([])
while True:
utf_data = client.recv(1024).decode("utf-8")
raw_position, raw_orientation = data_process(utf_data)
client.send("ok".encode("utf-8"))
raw_pose = np.append(raw_position, raw_orientation)
opti_data = np.append(opti_data, raw_pose, axis=0)
np.save(root_dir + '/' + exp_name + '/' + 'opti_data.npy', opti_data)
[docs]def record(root_dir: str, exp_name: str, ip: str, port: int) -> None:
"""
Args:
root_dir: root directory
exp_name: npy file location
ip: ip address of server computer
port: port number of optitrack server
Returns: None
"""
if os.path.exists('{}/{}'.format(root_dir, exp_name)):
raise Exception('There are already some files in {}, please rename the exp_name.'.format(
'{}/{}'.format(root_dir, exp_name)))
else:
rf.oslab.create_dir('{}/{}'.format(root_dir, exp_name))
print('Recording folder: {}/{}'.format(root_dir, exp_name))
opti_thread = threading.Thread(target=opti_run, args=(root_dir, exp_name, ip, port))
opti_thread.start()
opti_thread.join()
print('Optitrack record finished')