import numpy as np
import socket
import struct
import os
import threading
from typing import List
import rofunc as rf
from rofunc.utils.logger.beauty_logger import beauty_print
event = threading.Event()
[docs]def byte_to_str(data, n):
fmt = "!{}c".format(n)
char_array = struct.unpack(fmt, data)
str_out = ""
for c in char_array:
s = c.decode("utf-8")
str_out += s
return str_out
[docs]def byte_to_float(data):
return struct.unpack("!f", data)[0]
[docs]def byte_to_uint32(data):
return struct.unpack("!I", data)[0]
[docs]def byte_to_uint16(data):
return struct.unpack("!H", data)[0]
[docs]def byte_to_uint8(data):
return struct.unpack("!B", data)[0]
[docs]class Datagram(object):
def __init__(self, header, payload):
self.header = header
self.payload = payload
@property
def is_object(self):
return self.header.is_object
[docs] def decode_to_pose_array_msg(
self, ref_frame, ref_frame_id=None, scaling_factor=1.0
):
"""
Decode the bytes in the streaming data to pose array message.
Args:
ref_frame: str Reference frame name of the generated pose array message.
ref_frame_id: None/int If not None, all poses will be shifted subject to
the frame with this ID. This frame should belong to the human.
scaling_factor: float Scale the position of the pose if src_frame_id is not None.
Its value equals to the robot/human body dimension ratio
Returns:
pose_array_msg:
"""
pose_array_msg = []
for i in range(self.header.item_num):
item = self.payload[
i * self.header.item_size: (i + 1) * self.header.item_size
]
pose_msg = self._decode_to_pose_msg(item)
if pose_msg is None:
return None
pose_array_msg.append(pose_msg)
return pose_array_msg
@staticmethod
def _decode_to_pose_msg(item: str) -> List:
"""
Decode a type 02 stream to pose.
Args:
item: stream item from xsens
Returns:
pose: human pose
"""
if len(item) != 32:
print(
"XSensInterface: Payload pose data size is not 32: {}".format(len(item))
)
return None
x = byte_to_float(item[4:8])
y = byte_to_float(item[8:12])
z = byte_to_float(item[12:16])
qw = byte_to_float(item[16:20])
qx = byte_to_float(item[20:24])
qy = byte_to_float(item[24:28])
qz = byte_to_float(item[28:32])
pose = [x, y, z, qx, qy, qz, qw]
return pose
[docs]class XsensInterface(object):
def __init__(
self,
udp_ip,
udp_port,
ref_frame,
scaling=1.0,
buffer_size=4096,
**kwargs # DO NOT REMOVE
):
super(XsensInterface, self).__init__()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self._sock.bind((udp_ip, udp_port))
self._buffer_size = buffer_size
self._body_frames = {
"pelvis": 0,
"l5": 1,
"l3": 2,
"t12": 3,
"t8": 4,
"neck": 5,
"head": 6,
"right_shoulder": 7,
"right_upper_arm": 8,
"right_forearm": 9,
"right_hand": 10,
"left_shoulder": 11,
"left_upper_arm": 12,
"left_forearm": 13,
"left_hand": 14,
"right_upper_leg": 15,
"right_lower_leg": 16,
"right_foot": 17,
"right_toe": 18,
"left_upper_leg": 19,
"left_lower_leg": 20,
"left_foot": 21,
"left_toe": 22,
}
if ref_frame is not None:
ref_frame_lowercase = ref_frame.lower()
if ref_frame_lowercase in self._body_frames:
self.ref_frame = ref_frame_lowercase
self.ref_frame_id = self._body_frames[ref_frame_lowercase]
elif ref_frame_lowercase == "" or ref_frame_lowercase == "world":
print("XSensInterface: Reference frame is the world frame")
self.ref_frame = "world"
self.ref_frame_id = None
else:
print("XSensInterface: Using customized reference frame {}".format(ref_frame_lowercase))
self.ref_frame = ref_frame_lowercase
self.ref_frame_id = None
else:
print("XSensInterface: Reference frame is the world frame")
self.ref_frame = "world"
self.ref_frame_id = None
self.scaling_factor = scaling
self.header = None
self.object_poses = None
self.body_segments_poses = None
[docs] def get_datagram(self):
"""[Main entrance function] Get poses from the datagram."""
data, _ = self._sock.recvfrom(self._buffer_size)
datagram = self._get_datagram(data)
if datagram is not None:
pose_array_msg = datagram.decode_to_pose_array_msg(
self.ref_frame, self.ref_frame_id
)
return pose_array_msg
else:
return None
[docs] def save_file_thread(self, root_dir: str, exp_name: str) -> None:
"""
save xsens motion data to the file
Args:
root_dir: root dictionary
exp_name: dictionary saving the npy file, named according to time
Returns:
None
"""
xsens_data = []
while True:
data = self.get_datagram()
print(type(data))
if type(data) == list:
print(data)
xsens_data.append(data)
if event.isSet():
np.save(root_dir + '/' + exp_name + '/' + 'xsens_data.npy', np.array(xsens_data))
break
@staticmethod
def _get_header(data):
"""
Get the header data from the received MVN Awinda datagram.
:param data: Tuple From self._sock.recvfrom(self._buffer_size)
:return: header
"""
if len(data) < 24:
print(
"XSensInterface: Data length {} is less than 24".format(len(data))
)
return None
id_str = byte_to_str(data[0:6], 6)
sample_counter = byte_to_uint32(data[6:10])
datagram_counter = struct.unpack("!B", data[10].to_bytes(1, 'big'))
item_number = byte_to_uint8(data[11:12])
time_code = byte_to_uint32(data[12:16])
character_id = byte_to_uint8(data[16:17])
body_segments_num = byte_to_uint8(data[17:18])
props_num = byte_to_uint8(data[18:19])
finger_segments_num = byte_to_uint8(data[19:20])
# 20 21 are reserved for future use
payload_size = byte_to_uint16(data[22:24])
header = Header(
[
id_str,
sample_counter,
datagram_counter,
item_number,
time_code,
character_id,
body_segments_num,
props_num,
finger_segments_num,
payload_size,
]
)
# rospy.logdebug(header.__repr__())
return header
def _get_datagram(self, data):
header = self._get_header(data)
if header is not None and header.is_valid:
self.header = header
return Datagram(header, data[24:])
else:
return None
[docs]def on_press(key):
from pynput.keyboard import Key
# When pressing the Esc key, stop recording
if key == Key.esc:
event.set()
beauty_print("Esc button is pressed, stop recording", type="info")
return False
[docs]def record(root_dir: str, exp_name: str, ip: str, port: int, ref_frame: str = None) -> None:
"""
record xsens motion data
:param root_dir: root directory
:param exp_name: npy file location
:param ip: ip address of server computer
:param port: port number of optitrack server
:param ref_frame: reference frame
:return:
"""
from pynput.keyboard import Listener
if os.path.exists('{}/{}'.format(root_dir, exp_name)):
raise Exception('There are already some files in {}, please rename the exp_name.'.format(
'{}/{}'.format(root_dir, exp_name)))
else:
rf.oslab.create_dir('{}/{}'.format(root_dir, exp_name))
beauty_print('Recording folder: {}/{}'.format(root_dir, exp_name), type='info')
interface = XsensInterface(ip, port, ref_frame=ref_frame)
listener = Listener(on_press=on_press)
listener.start()
xsens_thread = threading.Thread(target=interface.save_file_thread, args=(root_dir, exp_name))
xsens_thread.start()
xsens_thread.join()
beauty_print('Xsens record finished', type='info')