Source code for natnet.protocol.ModelDefinitionsMessage

# coding: utf-8
"""ModelDef message implementation.

Copyright (c) 2017, Matthew Edwards.  This file is subject to the 3-clause BSD
license, as found in the LICENSE file in the top-level directory of this
distribution and at https://github.com/mje-nz/python_natnet/blob/master/LICENSE.
No part of python_natnet, including this file, may be copied, modified,
propagated, or distributed except according to the terms contained in the
LICENSE file.

This message contains descriptions of all tracked models (rigid bodies, skeletons, markersets) and devices.

It is not sent automatically when tracked models changes. The next FrameOfData will have a flag set, then the client
sends a RequestModelDefinitions message to prompt the server to send this.
"""

__all__ = ['ModelDefinitionsMessage', 'MarkersetDescription', 'RigidBodyDescription', 'SkeletonDescription',
           'ForcePlateDescription', 'DeviceDescription']
try:
    # Only need this for type annotations
    from typing import Optional  # noqa: F401
except ImportError:
    pass

import enum

import attr

from .common import (MessageId, SerDesRegistry, Version, int32_t, register_message, uint32_t,
                     vector3_t)


class ModelType(enum.IntEnum):
    MarkerSet = 0
    RigidBody = 1
    Skeleton = 2
    # I assume:
    ForcePlate = 3
    Device = 4


class ModelRegistry(SerDesRegistry):
    """Abuse SerDesRegistry a bit to use for model types."""

    @staticmethod
    def serialize(model):
        model_type = model.message_id
        payload = model.serialize()
        return uint32_t.pack(model_type) + payload

    def deserialize_header(*args, **kwargs):
        raise NotImplementedError()

    def deserialize_payload(*args, **kwargs):
        raise NotImplementedError()

    def deserialize(self, data, version=None, strict=None):
        model_type = data.unpack(uint32_t)
        try:
            impl = self._implementation_types[model_type]
        except KeyError:
            raise ValueError('Unknown model definition type {}'.format(model_type))
        return impl.deserialize(data, version)


_registry = ModelRegistry()


[docs]@_registry.register_message(ModelType.MarkerSet) @attr.s class MarkersetDescription(object): """Description of a markerset. Attributes: name (str): marker_names: (list[str]) """ name = attr.ib() marker_names = attr.ib() @classmethod def deserialize(cls, data, version=None): name = data.unpack_cstr() marker_count = data.unpack(uint32_t) marker_names = [data.unpack_cstr() for i in range(marker_count)] return cls(name, marker_names) def serialize(self): return self.name.encode('utf-8') + b'\0' + uint32_t.pack(len(self.marker_names)) + \ b''.join(m.encode('utf-8') + b'\0' for m in self.marker_names)
[docs]@_registry.register_message(ModelType.RigidBody) @attr.s class RigidBodyDescription(object): """Description of a rigid body. Attributes: name (str): Rigid body name if available id\_ (int): Streaming ID parent_id (int): For a rigid body which is part of a hierarchy (i.e., a skeleton), the ID of the parent rigid body offset_from_parent (tuple[float, float, float]): (x, y, z) offset relative to parent marker_positions (list[tuple[float, float, float]]): List of marker positions, if available required_active_labels (list[int]): List of expected active marker labels, if available """ name = attr.ib() # type: Optional[str] id_ = attr.ib() parent_id = attr.ib() offset_from_parent = attr.ib() marker_positions = attr.ib() # type: Optional[list[float, float, float]] required_active_labels = attr.ib() @classmethod def deserialize(cls, data, version, skip_markers=None): name = None if version >= Version(2): name = data.unpack_cstr() id_ = data.unpack(int32_t) parent_id = data.unpack(int32_t) offset_from_parent = data.unpack(vector3_t) marker_positions = [] required_active_labels = [] # Marker positions are included in version 3, but apparently not when this rigid body is # inside a skeleton? I don't have the equipment to test this. if skip_markers is None: skip_markers = version < Version(3) if not skip_markers: marker_count = data.unpack(uint32_t) marker_positions = [data.unpack(vector3_t) for i in range(marker_count)] required_active_labels = [data.unpack(uint32_t) for i in range(marker_count)] return cls(name, id_, parent_id, offset_from_parent, marker_positions, required_active_labels) def serialize(self): return self.name.encode('utf-8') + b'\0' + int32_t.pack(self.id_) + \ int32_t.pack(self.parent_id) + vector3_t.pack(*self.offset_from_parent) + \ uint32_t.pack(len(self.marker_positions)) + \ b''.join(vector3_t.pack(*m) for m in self.marker_positions) + \ b''.join(uint32_t.pack(l) for l in self.required_active_labels)
[docs]@_registry.register_message(ModelType.Skeleton) @attr.s class SkeletonDescription(object): """Description of a skeleton. Attributes: name (str): id\_ (int): Streaming ID rigid_bodies (list[:class:`RigidBodyDescription`]): """ name = attr.ib() id_ = attr.ib() rigid_bodies = attr.ib() @classmethod def deserialize(cls, data, version=None): name = data.unpack_cstr() id_ = data.unpack(int32_t) rigid_body_count = data.unpack(int32_t) rigid_bodies = [RigidBodyDescription.deserialize(data, version, skip_markers=True) for i in range(rigid_body_count)] return cls(name, id_, rigid_bodies)
@_registry.register_message(ModelType.ForcePlate) @attr.s class ForcePlateDescription(object): id_ = attr.ib() serial_number = attr.ib() width = attr.ib() length = attr.ib() origin = attr.ib() calibration_matrix = attr.ib() corners = attr.ib() plate_type = attr.ib() channel_data_type = attr.ib() channels = attr.ib() channel_names = attr.ib() @classmethod def deserialize(cls, data, version=None): raise NotImplementedError @_registry.register_message(ModelType.Device) @attr.s class DeviceDescription(object): id_ = attr.ib() # type: int name = attr.ib() # type: str serial_number = attr.ib() # type: str device_type_ = attr.ib() # type: int channel_data_type = attr.ib() # type: int channel_names = attr.ib() # type: list[str] @classmethod def deserialize(cls, data, version=None): raise NotImplementedError
[docs]@register_message(MessageId.ModelDef) @attr.s class ModelDefinitionsMessage(object): """Tracked model definitions. Attributes: models: Mixed list of :class:`MarkersetDescription`, :class:`RigidBodyDescription`, :class:`SkeletonDescription`, :class:`ForcePlateDescription`, and :class:`DeviceDescription`.""" models = attr.ib() # type: list @classmethod def deserialize(cls, data, version): models = [] definition_count = data.unpack(uint32_t) for i in range(definition_count): models.append(_registry.deserialize(data, version)) return cls(models) def serialize(self): return uint32_t.pack(len(self.models)) + b''.join(_registry.serialize(m) for m in self.models)