Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/viam/components/camera/camera.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import sys
from typing import Any, Dict, Final, List, Optional, Tuple
from typing import Any, Dict, Final, Optional, Sequence, Tuple

from viam.media.video import NamedImage, ViamImage
from viam.proto.common import ResponseMetadata
Expand Down Expand Up @@ -66,11 +66,11 @@ async def get_image(
async def get_images(
self,
*,
filter_source_names: Optional[List[str]] = None,
filter_source_names: Optional[Sequence[str]] = None,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Tuple[List[NamedImage], ResponseMetadata]:
) -> Tuple[Sequence[NamedImage], ResponseMetadata]:
"""Get simultaneous images from different imagers, along with associated metadata.
This should not be used for getting a time series of images from the same imager.

Expand All @@ -82,9 +82,13 @@ async def get_images(
first_image = images[0]
timestamp = metadata.captured_at

Args:
filter_source_names (Sequence[str]): The filter_source_names parameter can be used to filter only the images from the specified
source names. When unspecified, all images are returned.

Returns:
Tuple[List[NamedImage], ResponseMetadata]: A tuple containing two values; the first [0] a list of images
returned from the camera system, and the second [1] the metadata associated with this response.
Tuple[Sequence[NamedImage], ResponseMetadata]: A tuple containing two values; the first [0] a list of images
returned from the camera system, and the second [1] the metadata associated with this response.

For more information, see `Camera component <https://docs.viam.com/dev/reference/apis/components/camera/#getimages>`_.
"""
Expand Down
16 changes: 9 additions & 7 deletions src/viam/components/camera/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple

from grpclib.client import Channel

Expand Down Expand Up @@ -41,26 +41,26 @@ async def get_image(
md = kwargs.get("metadata", self.Metadata()).proto
request = GetImageRequest(name=self.name, mime_type=mime_type, extra=dict_to_struct(extra))
response: GetImageResponse = await self.client.GetImage(request, timeout=timeout, metadata=md)
return ViamImage(response.image, response.mime_type)
return ViamImage(response.image, CameraMimeType.from_string(response.mime_type))

async def get_images(
self,
*,
filter_source_names: Optional[List[str]] = None,
filter_source_names: Optional[Sequence[str]] = None,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Tuple[List[NamedImage], ResponseMetadata]:
) -> Tuple[Sequence[NamedImage], ResponseMetadata]:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetImagesRequest(name=self.name, extra=dict_to_struct(extra), filter_source_names=filter_source_names)
response: GetImagesResponse = await self.client.GetImages(request, timeout=timeout, metadata=md)
imgs = []
for img_data in response.images:
if img_data.mime_type:
mime_type = img_data.mime_type
mime_type = CameraMimeType.from_string(img_data.mime_type)
else:
# TODO(RSDK-11728): remove this once we deleted the format field
mime_type = str(CameraMimeType.from_proto(img_data.format))
mime_type = CameraMimeType.from_proto(img_data.format)
img = NamedImage(img_data.source_name, img_data.image, mime_type)
imgs.append(img)
resp_metadata: ResponseMetadata = response.response_metadata
Expand Down Expand Up @@ -99,6 +99,8 @@ async def do_command(
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md)
return struct_to_dict(response.result)

async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]:
async def get_geometries(
self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs
) -> Sequence[Geometry]:
md = kwargs.get("metadata", self.Metadata())
return await get_geometries(self.client, self.name, extra, timeout, md)
12 changes: 4 additions & 8 deletions src/viam/components/camera/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse
from viam.proto.component.camera import (
CameraServiceBase,
Format,
GetImageRequest,
GetImageResponse,
GetImagesRequest,
Expand Down Expand Up @@ -54,16 +53,13 @@ async def GetImages(self, stream: Stream[GetImagesRequest, GetImagesResponse]) -
timeout=timeout,
metadata=stream.metadata,
extra=struct_to_dict(request.extra),
filter_source_names=list(request.filter_source_names),
filter_source_names=request.filter_source_names,
)
img_bytes_lst = []
for img in images:
try:
mime_type = CameraMimeType.from_string(img.mime_type) # this can ValueError if the mime_type is not a CameraMimeType
fmt = mime_type.to_proto()
except ValueError:
# TODO(RSDK-11728): remove this once we deleted the format field
fmt = Format.FORMAT_UNSPECIFIED
mime_type = CameraMimeType.from_string(img.mime_type)
# TODO(RSDK-11728): remove this fmt logic once we deleted the format field
fmt = mime_type.to_proto() # Will be Format.FORMAT_UNSPECIFIED if an unsupported/custom mime type is set

img_bytes = img.data
img_bytes_lst.append(Image(source_name=name, mime_type=img.mime_type, format=fmt, image=img_bytes))
Expand Down
4 changes: 2 additions & 2 deletions src/viam/components/component_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
from logging import Logger
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Optional, SupportsBytes, SupportsFloat, Union, cast
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Optional, Sequence, SupportsBytes, SupportsFloat, Union, cast

from typing_extensions import Self

Expand Down Expand Up @@ -46,7 +46,7 @@ def from_robot(cls, robot: "RobotClient", name: str) -> Self:
async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]:
raise NotImplementedError()

async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]:
async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> Sequence[Geometry]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why these Sequence changes? Does it type-hint grpc repeated sets better?

Copy link
Member Author

@njooma njooma Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea exactly -- a superset of list that allows us to save some casts

"""
Get all geometries associated with the component, in their current configuration, in the
`frame <https://docs.viam.com/operate/mobility/define-geometry/>`__ of the component.
Expand Down
6 changes: 5 additions & 1 deletion src/viam/media/utils/pil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ def pil_to_viam_image(image: Image.Image, mime_type: CameraMimeType) -> ViamImag
Returns:
ViamImage: The resulting ViamImage
"""
# Make sure at runtime the mime_type string is actually a CameraMimeType
if not isinstance(mime_type, CameraMimeType):
raise ValueError(f"Cannot encode to unsupported mimetype: {mime_type}")

if mime_type.name in LIBRARY_SUPPORTED_FORMATS:
buf = BytesIO()
if image.mode == "RGBA" and mime_type == CameraMimeType.JPEG:
image = image.convert("RGB")
image.save(buf, format=mime_type.name)
data = buf.getvalue()
else:
raise ValueError(f"Cannot encode image to {mime_type}")
raise ValueError(f"Cannot encode to unsupported mimetype: {mime_type}")

return ViamImage(data, mime_type)
101 changes: 76 additions & 25 deletions src/viam/media/video.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,61 @@
from array import array
from enum import Enum
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple

from typing_extensions import Self
from typing_extensions import ClassVar, Self

from viam.errors import NotSupportedError
from viam.proto.component.camera import Format

from .viam_rgba import RGBA_HEADER_LENGTH, RGBA_MAGIC_NUMBER


class CameraMimeType(str, Enum):
VIAM_RGBA = "image/vnd.viam.rgba"
VIAM_RAW_DEPTH = "image/vnd.viam.dep"
JPEG = "image/jpeg"
PNG = "image/png"
PCD = "pointcloud/pcd"
class _FrozenClassAttributesMeta(type):
"""
A metaclass that prevents the reassignment of existing class attributes.
"""

def __setattr__(cls, name: str, value: Any):
# Check if the attribute `name` already exists on the class
if name in cls.__dict__:
# If it exists, raise an error to prevent overwriting
raise AttributeError(f"Cannot reassign constant '{name}'")
# If it's a new attribute, allow it to be set
super().__setattr__(name, value)
Comment on lines +12 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh this is clever



class CameraMimeType(str, metaclass=_FrozenClassAttributesMeta):
"""
The compatible mime-types for cameras and vision services.

You can use the `CameraMimeType.CUSTOM(...)` method to use an unlisted mime-type.
"""

VIAM_RGBA: ClassVar[Self]
VIAM_RAW_DEPTH: ClassVar[Self]
JPEG: ClassVar[Self]
PNG: ClassVar[Self]
PCD: ClassVar[Self]

@property
def name(self) -> str:
for key, value in self.__class__.__dict__.items():
if value == self:
return key
return "CUSTOM"

@property
def value(self) -> str:
return self

@classmethod
def CUSTOM(cls, mime_type: str) -> Self:
"""
Create a custom mime type.

Args:
mime_type (str): The mimetype as a string
"""
return cls.from_string(mime_type)

@classmethod
def from_string(cls, value: str) -> Self:
Expand All @@ -28,13 +68,10 @@ def from_string(cls, value: str) -> Self:
Self: The mimetype
"""
value_mime = value[:-5] if value.endswith("+lazy") else value # ViamImage lazy encodes by default
try:
return cls(value_mime)
except ValueError:
raise ValueError(f"Invalid mimetype: {value}")
return cls(value_mime)

@classmethod
def from_proto(cls, format: Format.ValueType) -> "CameraMimeType":
def from_proto(cls, format: Format.ValueType) -> Self:
"""Returns the mimetype from a proto enum.

Args:
Expand All @@ -44,14 +81,15 @@ def from_proto(cls, format: Format.ValueType) -> "CameraMimeType":
Self: The mimetype.
"""
mimetypes = {
Format.FORMAT_RAW_RGBA: CameraMimeType.VIAM_RGBA,
Format.FORMAT_RAW_DEPTH: CameraMimeType.VIAM_RAW_DEPTH,
Format.FORMAT_JPEG: CameraMimeType.JPEG,
Format.FORMAT_PNG: CameraMimeType.PNG,
Format.FORMAT_RAW_RGBA: cls.VIAM_RGBA,
Format.FORMAT_RAW_DEPTH: cls.VIAM_RAW_DEPTH,
Format.FORMAT_JPEG: cls.JPEG,
Format.FORMAT_PNG: cls.PNG,
}
return mimetypes.get(format, CameraMimeType.JPEG)
return cls(mimetypes.get(format, cls.JPEG))

def to_proto(self) -> Format.ValueType:
@property
def proto(self) -> Format.ValueType:
"""Returns the mimetype in a proto enum.

Returns:
Expand All @@ -65,6 +103,19 @@ def to_proto(self) -> Format.ValueType:
}
return formats.get(self, Format.FORMAT_UNSPECIFIED)

def to_proto(self) -> Format.ValueType:
"""
DEPRECATED: Use `CameraMimeType.proto`
"""
return self.proto


CameraMimeType.VIAM_RGBA = CameraMimeType.from_string("image/vnd.viam.rgba")
CameraMimeType.VIAM_RAW_DEPTH = CameraMimeType.from_string("image/vnd.viam.dep")
CameraMimeType.JPEG = CameraMimeType.from_string("image/jpeg")
CameraMimeType.PNG = CameraMimeType.from_string("image/png")
CameraMimeType.PCD = CameraMimeType.from_string("pointcloud/pcd")


class ViamImage:
"""A native implementation of an image.
Expand All @@ -73,11 +124,11 @@ class ViamImage:
"""

_data: bytes
_mime_type: str
_mime_type: CameraMimeType
_height: Optional[int] = None
_width: Optional[int] = None

def __init__(self, data: bytes, mime_type: str) -> None:
def __init__(self, data: bytes, mime_type: CameraMimeType) -> None:
self._data = data
self._mime_type = mime_type
self._width, self._height = _getDimensions(data, mime_type)
Expand All @@ -88,7 +139,7 @@ def data(self) -> bytes:
return self._data

@property
def mime_type(self) -> str:
def mime_type(self) -> CameraMimeType:
"""The mime type of the image"""
return self._mime_type

Expand Down Expand Up @@ -131,12 +182,12 @@ class NamedImage(ViamImage):
"""The name of the image
"""

def __init__(self, name: str, data: bytes, mime_type: str) -> None:
def __init__(self, name: str, data: bytes, mime_type: CameraMimeType) -> None:
self.name = name
super().__init__(data, mime_type)


def _getDimensions(image: bytes, mime_type: str) -> Tuple[Optional[int], Optional[int]]:
def _getDimensions(image: bytes, mime_type: CameraMimeType) -> Tuple[Optional[int], Optional[int]]:
try:
if mime_type == CameraMimeType.JPEG:
return _getDimensionsFromJPEG(image)
Expand Down
6 changes: 5 additions & 1 deletion src/viam/services/vision/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ async def capture_all_from_camera(
result = CaptureAllResult()
result.extra = struct_to_dict(response.extra)
if return_image:
mime_type = CameraMimeType.from_proto(response.image.format)
# TODO(RSDK-11728): remove this branching logic once we deleted the format field
if response.image.mime_type:
mime_type = CameraMimeType.from_string(response.image.mime_type)
else:
mime_type = CameraMimeType.from_proto(response.image.format)
img = ViamImage(response.image.image, mime_type)
result.image = img
if return_classifications:
Expand Down
14 changes: 8 additions & 6 deletions src/viam/services/vision/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .vision import Vision


class VisionRPCService(UnimplementedVisionServiceBase, ResourceRPCServiceBase):
class VisionRPCService(UnimplementedVisionServiceBase, ResourceRPCServiceBase[Vision]):
"""
gRPC service for a Vision service
"""
Expand All @@ -50,9 +50,11 @@ async def CaptureAllFromCamera(self, stream: Stream[CaptureAllFromCameraRequest,
)
img = None
if result.image is not None:
fmt = result.image.mime_type.to_proto()
mime_type = CameraMimeType.from_string(result.image.mime_type)
# TODO(RSDK-11728): remove this fmt logic once we deleted the format field
fmt = mime_type.to_proto() # Will be Format.FORMAT_UNSPECIFIED if an unsupported/custom mime type is set
img_bytes = result.image.data
img = Image(source_name=request.camera_name, format=fmt, image=img_bytes)
img = Image(source_name=request.camera_name, mime_type=mime_type, format=fmt, image=img_bytes)
response = CaptureAllFromCameraResponse(
image=img,
detections=result.detections,
Expand All @@ -79,7 +81,7 @@ async def GetDetections(self, stream: Stream[GetDetectionsRequest, GetDetections
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None

image = ViamImage(request.image, request.mime_type)
image = ViamImage(request.image, CameraMimeType.from_string(request.mime_type))

result = await vision.get_detections(image, extra=extra, timeout=timeout)
response = GetDetectionsResponse(detections=result)
Expand All @@ -104,7 +106,7 @@ async def GetClassifications(self, stream: Stream[GetClassificationsRequest, Get
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None

image = ViamImage(request.image, request.mime_type)
image = ViamImage(request.image, CameraMimeType.from_string(request.mime_type))

result = await vision.get_classifications(image, request.n, extra=extra, timeout=timeout)
response = GetClassificationsResponse(classifications=result)
Expand All @@ -117,7 +119,7 @@ async def GetObjectPointClouds(self, stream: Stream[GetObjectPointCloudsRequest,
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await vision.get_object_point_clouds(request.camera_name, extra=extra, timeout=timeout)
response = GetObjectPointCloudsResponse(mime_type=CameraMimeType.PCD.value, objects=result)
response = GetObjectPointCloudsResponse(mime_type=CameraMimeType.PCD, objects=result)
await stream.send_message(response)

async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/viam/services/worldstatestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from viam.proto.service.worldstatestore import StreamTransformChangesResponse, TransformChangeType
from viam.resource.registry import Registry, ResourceRegistration

from .client import WorldStateStoreClient
from .service import WorldStateStoreService
from .worldstatestore import WorldStateStore
from viam.proto.service.worldstatestore import StreamTransformChangesResponse, TransformChangeType

__all__ = [
"WorldStateStore",
Expand Down
Loading
Loading