# coding: utf-8
"""FrameOfData message implementation.
Copyright (c) 2017, Matthew Edwards. This file is subject to the 3-clause BSD
license, as found in the LICENSE file in the top-level directory of this
distribution and at https://github.com/mje-nz/python_natnet/blob/master/LICENSE.
No part of python_natnet, including this file, may be copied, modified,
propagated, or distributed except according to the terms contained in the
LICENSE file.
This is the most complicated message.
All positions are given as (x, y, z) tuples of floats, in meters, in whatever co-ordinate frame
Motive is using (Y-up or Z-up depending on the streaming settings). All orientations are given in
quaternion form as (x, y, z, w) tuples of floats.
Note that I have only tested rigid bodies -- skeletons, force plates and peripheral devices may or
may not work.
"""
__all__ = ['Markerset', 'RigidBody', 'Skeleton', 'LabelledMarker', 'AnalogChannelData', 'Device',
'TimingInfo', 'MocapFrameMessage']
try:
# Only need this for type annotations
from typing import Optional # noqa: F401
except ImportError:
pass
import attr
from .common import (MessageId, Version, double_t, float_t, int16_t, quaternion_t, register_message,
uint16_t, uint32_t, uint64_t, vector3_t)
[docs]@attr.s
class Markerset(object):
"""Vaguely-defined grouping of markers which doesn't seem to serve any useful purpose.
Attributes:
name (str): Markerset name
markers (list[tuple[float, float, float]]): Position for each marker
"""
name = attr.ib()
markers = attr.ib()
[docs] @classmethod
def deserialize(cls, data, version=None):
"""Deserialize a Markerset from a ParseBuffer."""
name = data.unpack_cstr()
marker_count = data.unpack(uint32_t)
markers = [data.unpack(vector3_t) for i in range(marker_count)]
return Markerset(name, markers)
def serialize(self):
return self.name.encode('utf-8') + b'\0' + uint32_t.pack(len(self.markers)) + \
b''.join(vector3_t.pack(*marker) for marker in self.markers)
[docs]@attr.s
class RigidBody(object):
"""Rigid body data.
Note that as of NatNet 3, the individual marker positions, IDs and sizes are not included here
(and if you don't need them you can prevent Motive from streaming them at all). If you need
them, search the list of LabelledMarkers for markers with the right model ID.
Attributes:
id (int): Streaming ID
position (tuple[float, float, float]):
orientation (tuple([float, float, float, float]):
mean_error (float or None): Mean error per marker, if available
"""
id_ = attr.ib() # type: int
position = attr.ib()
orientation = attr.ib()
mean_error = attr.ib() # type: Optional[float]
_params = attr.ib() # type: Optional[int]
[docs] @classmethod
def deserialize(cls, data, version):
"""Deserialize a RigidBody from a ParseBuffer."""
id_ = data.unpack(uint32_t)
position = data.unpack(vector3_t)
orientation = data.unpack(quaternion_t)
if version < Version(3):
# TODO: Store these?
marker_count = data.unpack(uint32_t)
marker_positions = [data.unpack(vector3_t) for i in range(marker_count)] # noqa: F841
if version >= Version(2):
marker_ids = [data.unpack(uint32_t) for i in range(marker_count)] # noqa: F841
marker_sizes = [data.unpack(float_t) for i in range(marker_count)] # noqa: F841
# TODO: Padding is in 3.0.1 SDK sample but not 3.1.0?
padding = data.unpack(uint32_t) # noqa: F841
mean_error = None
if version >= Version(2):
mean_error = data.unpack(float_t)
params = None
if version >= Version(2, 6) or version.major == 0:
# TODO: Shouldn't this be a uint16_t?
params = data.unpack(int16_t)
return cls(id_, position, orientation, mean_error, params)
def serialize(self):
return uint32_t.pack(self.id_) + vector3_t.pack(*self.position) + \
quaternion_t.pack(*self.orientation) + float_t.pack(self.mean_error) + \
int16_t.pack(self._params)
@property
def tracking_valid(self):
"""True if rigid body is being tracked successfully."""
assert self._params is not None
return (self._params & 0x01) != 0
[docs]@attr.s
class Skeleton(object):
"""Skeleton data, which consists of a set of rigid bodies.
Apparently the rigid bodies will have their ID equal to skeleton_id << 16 + bone_id.
Attributes:
id\_ (int): Skeleton ID (I'm not sure where this is set)
rigid_bodies (list[:class:`RigidBody`]):
"""
id_ = attr.ib() # type: int
rigid_bodies = attr.ib() # type: list[RigidBody]
[docs] @classmethod
def deserialize(cls, data, version=None):
"""Deserialize a Skeleton from a ParseBuffer."""
id_ = data.unpack(uint32_t)
rigid_body_count = data.unpack(uint32_t)
rigid_bodies = [RigidBody.deserialize(data, version) for i in range(rigid_body_count)]
return cls(id_, rigid_bodies)
[docs]@attr.s
class LabelledMarker(object):
"""A single marker and associated information.
Note that this is **not** only markers that are part of rigid bodies.
Attributes:
model_id (int): ID of containing rigid body, or 0 if the marker is not part of a rigid body
marker_id (int): Marker ID (starting at 0 for rigid body markers, or a large number
otherwise)
position (tuple[float, float, float]):
size (float): Estimated marker size in meters
residual (float or None): Marker error in mm/ray, if available
"""
model_id = attr.ib()
marker_id = attr.ib()
position = attr.ib()
size = attr.ib()
_params = attr.ib() # type: Optional[int]
residual = attr.ib()
[docs] @classmethod
def deserialize(cls, data, version):
"""Deserialize a LabelledMarker from a ParseBuffer."""
marker_id = data.unpack(uint16_t)
model_id = data.unpack(uint16_t)
position = data.unpack(vector3_t)
size = data.unpack(float_t)
params = None
if version >= Version(2, 6) or version.major == 0:
# TODO: Shouldn't this be a uint16_t?
params = data.unpack(int16_t)
residual = None
if version >= Version(3) or version.major == 0:
residual = data.unpack(float_t)
return cls(model_id, marker_id, position, size, params, residual)
def serialize(self):
return uint16_t.pack(self.marker_id) + uint16_t.pack(self.model_id) + \
vector3_t.pack(*self.position) + float_t.pack(self.size) + \
int16_t.pack(self._params) + float_t.pack(self.residual)
_OCCLUDED = 0x01
_POINT_CLOUD_SOLVED = 0x02
_MODEL_SOLVED = 0x04
_HAS_MODEL = 0x08
_UNLABELLED = 0x10
_ACTIVE = 0x20
@property
def occluded(self):
"""True if the marker is occluded."""
assert self._params is not None
return (self._params & self._OCCLUDED) != 0
@property
def point_cloud_solved(self):
"""True if the marker is "point cloud solved" i.e. its position was calculated directly."""
assert self._params is not None
return (self._params & self._POINT_CLOUD_SOLVED) != 0
@property
def model_solved(self):
"""True if the marker is "model solved" i.e. its position was calculated from a rigid body."""
assert self._params is not None
return (self._params & self._MODEL_SOLVED) != 0
@property
def has_model(self):
"""True if the marker has an associated asset in the data stream (e.g. rigid body)."""
assert self._params is not None
return (self._params & self._HAS_MODEL) != 0
@property
def unlabelled(self):
"""True if the marker is 'unlabelled', but has a point cloud ID i.e. does not have an associated asset."""
assert self._params is not None
return (self._params & self._UNLABELLED) != 0
@property
def active(self):
"""True if the marker is an actively labeled LED marker."""
assert self._params is not None
return (self._params & self._ACTIVE) != 0
@attr.s
class AnalogChannelData(object):
values = attr.ib() # type: list[int]
@classmethod
def deserialize(cls, data, version=None):
frame_count = data.unpack(uint32_t)
values = [data.unpack(uint32_t) for i in range(frame_count)]
return cls(values)
@attr.s
class Device(object):
id_ = attr.ib() # type: int
channels = attr.ib() # type: list[AnalogChannelData]
@classmethod
def deserialize(cls, data, version=None):
id_ = data.unpack(uint32_t)
channel_count = data.unpack(uint32_t)
channels = [AnalogChannelData.deserialize(data, version) for i in range(channel_count)]
return cls(id_, channels)
[docs]@attr.s
class TimingInfo(object):
"""Timing information.
Attributes:
timecode (int): SMPTE timecode, if available
timecode_subframe (int): SMPTE timecode subframe, if available
timestamp (float): Software timestamp (in seconds since software startup)
camera_mid_exposure_timestamp (int or None): Camera mid exposure time (in performance
counter ticks), if available
camera_data_received_timestamp (int or None): Time camera data was received (in performance
counter ticks), if available
transmit_timestamp (int or None): Time frame was transmitted (in performance counter ticks),
if available
"""
timecode = attr.ib()
timecode_subframe = attr.ib()
timestamp = attr.ib()
camera_mid_exposure_timestamp = attr.ib()
camera_data_received_timestamp = attr.ib()
transmit_timestamp = attr.ib()
[docs] @classmethod
def deserialize(cls, data, version):
"""Deserialize timing information from a ParseBuffer."""
timecode = data.unpack(uint32_t)
timecode_subframe = data.unpack(uint32_t)
if version >= Version(2, 7):
timestamp = data.unpack(double_t)
else:
timestamp = data.unpack(float_t)
camera_mid_exposure_timestamp = None
camera_data_received_timestamp = None
transmit_timestamp = None
if version >= Version(3) or version.major == 0:
camera_mid_exposure_timestamp = data.unpack(uint64_t)
camera_data_received_timestamp = data.unpack(uint64_t)
transmit_timestamp = data.unpack(uint64_t)
return cls(timecode, timecode_subframe, timestamp, camera_mid_exposure_timestamp,
camera_data_received_timestamp, transmit_timestamp)
def serialize(self):
return uint32_t.pack(self.timecode) + uint32_t.pack(self.timecode_subframe) + \
double_t.pack(self.timestamp) + uint64_t.pack(self.camera_mid_exposure_timestamp) + \
uint64_t.pack(self.camera_data_received_timestamp) + \
uint64_t.pack(self.transmit_timestamp)
[docs]@register_message(MessageId.FrameOfData)
@attr.s
class MocapFrameMessage(object):
"""Frame of mocap data.
Attributes:
frame_number (int):
markersets (list of :class:`Markerset`):
rigid_bodies (list of :class:`RigidBody`):
skeletons (list of :class:`Skeleton`):
labelled_markers (list of :class:`LabelledMarker`): A LabelledMarker instance for each
tracked marker, whether part of a rigid body or not
force_plates (list of :class:`Device`):
devices (list of :class:`Device`):
timing_info (:class:`TimingInfo`): Timestamps (in server time)
"""
frame_number = attr.ib()
markersets = attr.ib() # type: list[Markerset]
rigid_bodies = attr.ib() # type: list[RigidBody]
skeletons = attr.ib() # type: list[Skeleton]
labelled_markers = attr.ib() # type: list[LabelledMarker]
force_plates = attr.ib() # type: list[Device]
devices = attr.ib() # type: list[Device]
timing_info = attr.ib() # type: TimingInfo
_params = attr.ib() # type: int
[docs] @classmethod
def deserialize(cls, data, version):
"""Deserialize a FrameOfData message.
Args:
data (:class:`~natnet.protocol.common.ParseBuffer`):
version (:class:`~natnet.protocol.common.Version`):
Returns:
MocapFrameMessage: Deserialized message
"""
frame_number = data.unpack(uint32_t)
markerset_count = data.unpack(uint32_t)
markersets = [Markerset.deserialize(data, version) for i in range(markerset_count)]
unlabelled_markers_count = data.unpack(uint32_t)
data.skip(vector3_t, unlabelled_markers_count)
rigid_body_count = data.unpack(uint32_t)
rigid_bodies = [RigidBody.deserialize(data, version) for i in range(rigid_body_count)]
skeletons = []
if version > Version(2):
# TODO: Original version check here contradicted comment
skeleton_count = data.unpack(uint32_t)
skeletons = [Skeleton.deserialize(data, version) for i in range(skeleton_count)]
labelled_markers = []
if version >= Version(2, 3):
# TODO: Original version check here contradicted PacketClient
labelled_marker_count = data.unpack(uint32_t)
labelled_markers = [LabelledMarker.deserialize(data, version) for i in range(labelled_marker_count)]
force_plates = []
if version >= Version(2, 9):
force_plate_count = data.unpack(uint32_t)
# Force plates and devices have the same data
force_plates = [Device.deserialize(data, version) for i in range(force_plate_count)]
devices = []
if version >= Version(2, 11):
device_count = data.unpack(uint32_t)
devices = [Device.deserialize(data, version) for i in range(device_count)]
timing_info = TimingInfo.deserialize(data, version)
# TODO: Shouldn't this be a uint16_t?
params = data.unpack(int16_t)
# No idea what this is, but this is how long packets are
unknown = data.unpack(uint32_t) # noqa: F841
return cls(frame_number, markersets, rigid_bodies, skeletons, labelled_markers,
force_plates, devices, timing_info, params)
def serialize(self, include_unlabelled=False):
frame_number = uint32_t.pack(self.frame_number)
markersets = uint32_t.pack(len(self.markersets)) + \
b''.join(m.serialize() for m in self.markersets)
unlabelled_markers = uint32_t.pack(0)
if include_unlabelled:
# Hack to match recorded packet in tests
positions = [l.position for l in self.labelled_markers if l.model_id != 0]
unlabelled_markers = uint32_t.pack(len(positions)) + \
b''.join(vector3_t.pack(*p) for p in positions)
rigid_bodies = uint32_t.pack(len(self.rigid_bodies)) + \
b''.join(r.serialize() for r in self.rigid_bodies)
skeletons = uint32_t.pack(len(self.skeletons)) + \
b''.join(s.serialize() for s in self.skeletons)
labelled_markers = uint32_t.pack(len(self.labelled_markers)) + \
b''.join(l.serialize() for l in self.labelled_markers)
force_plates = uint32_t.pack(len(self.force_plates)) + \
b''.join(f.serialize() for f in self.force_plates)
devices = uint32_t.pack(len(self.devices)) + \
b''.join(d.serialize() for d in self.devices)
timing_info = self.timing_info.serialize()
params = uint16_t.pack(self._params)
unknown = uint32_t.pack(0)
return frame_number + markersets + unlabelled_markers + rigid_bodies + skeletons + \
labelled_markers + force_plates + devices + timing_info + params + unknown
@property
def is_recording(self):
"""True if Motive is recording."""
assert self._params is not None
return (self._params & 0x01) != 0
@property
def tracked_models_changed(self):
"""True if the tracked models have changed since the last frame."""
assert self._params is not None
return (self._params & 0x02) != 0