Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

from pydantic import field_validator
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
from unitree_sdk2py.go2.sport.sport_client import (
SportClient,
PathPoint,
SPORT_PATH_POINT_SIZE,
)

from ....utils.logger import logging
from ....utils.registry import registry
from ...base import ArgSchema, BaseTool

CURRENT_PATH = Path(__file__).parents[0]

ARGSCHEMA = {
"switch": {
"type": "bool",
"description": "switch to enable or disable free avoid.",
"required": True,
},
}


@registry.register_tool()
class FreeAvoid(BaseTool):
"""Tool for making Unitree Go2 robot to enable or disable free avoid."""

class Config:
"""Configuration for this pydantic object."""

extra = "allow"
arbitrary_types_allowed = True

args_schema: ArgSchema = ArgSchema(**ARGSCHEMA)
description: str = "Control the Unitree Go2 robot to enable or disable free avoid."
network_interface_name: Optional[str]

def __init__(self, **data: Any) -> None:
super().__init__(**data)
ChannelFactoryInitialize(0, self.network_interface_name)
self.sport_client = SportClient()
self.sport_client.SetTimeout(10.0)
self.sport_client.Init()

@field_validator("network_interface_name")
@classmethod
def network_interface_name_validator(cls, network_interface_name: Union[str, None]) -> Union[str, None]:
if network_interface_name == None:
raise ValueError("network interface name is not provided.")
return network_interface_name

def _run(
self,
switch: bool = True
) -> Dict[str, Any]:
"""
Control the Unitree Go2 robot to free avoid.
"""

try:
self.sport_client.FreeAvoid(switch)
return {
"code": 0,
"msg": "success",
}
except Exception as e:
logging.error(f"Free avoid failed: {e}")
return {
"code": 500,
"msg": "failed",
}
89 changes: 89 additions & 0 deletions omagent-core/src/omagent_core/tool_system/tools/ut_dog/move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

from pydantic import field_validator
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
from unitree_sdk2py.go2.sport.sport_client import (
SportClient,
PathPoint,
SPORT_PATH_POINT_SIZE,
)

from ....utils.logger import logging
from ....utils.registry import registry
from ...base import ArgSchema, BaseTool

CURRENT_PATH = Path(__file__).parents[0]

ARGSCHEMA = {
"vx": {
"type": "float",
"description": "The distance moved in the x-axis direction per call, measured in meters (m). A positive value indicates movement forward, while a negative value indicates movement backward.",
"required": False,
},
"vy": {
"type": "float",
"description": "The distance moved in the y-axis direction per call, measured in meters (m). A positive value indicates movement to the left, while a negative value indicates movement to the right.",
"required": False,
},
"vyaw": {
"type": "float",
"description": "The angle rotated around the z-axis per call, measured in radians (rad). A positive value indicates clockwise rotation, while a negative value indicates counterclockwise rotation.",
"required": False,
},
}


@registry.register_tool()
class Move(BaseTool):
"""Tool for making Unitree Go2 robot move."""

class Config:
"""Configuration for this pydantic object."""

extra = "allow"
arbitrary_types_allowed = True

args_schema: ArgSchema = ArgSchema(**ARGSCHEMA)
description: str = "Control the Go2 to move."
network_interface_name: Optional[str]

def __init__(self, **data: Any) -> None:
super().__init__(**data)
ChannelFactoryInitialize(0, self.network_interface_name)
self.sport_client = SportClient()
self.sport_client.SetTimeout(10.0)
self.sport_client.Init()

@field_validator("network_interface_name")
@classmethod
def network_interface_name_validator(cls, network_interface_name: Union[str, None]) -> Union[str, None]:
if network_interface_name == None:
raise ValueError("network interface name is not provided.")
return network_interface_name

def _run(
self,
vx: float = 0,
vy: float = 0,
vyaw: float = 0
) -> Dict[str, Any]:
"""
Control the Go2 to move.
"""

try:
# self.sport_client.BalanceStand()
self.sport_client.Move(vx, vy, vyaw)
return {
"code": 0,
"msg": "success",
}
except Exception as e:
logging.error(f"Move failed: {e}")
return {
"code": 500,
"msg": "failed",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

from pydantic import field_validator
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
from unitree_sdk2py.go2.sport.sport_client import (
SportClient,
PathPoint,
SPORT_PATH_POINT_SIZE,
)

from ....utils.logger import logging
from ....utils.registry import registry
from ...base import ArgSchema, BaseTool

CURRENT_PATH = Path(__file__).parents[0]

ARGSCHEMA = {
}


@registry.register_tool()
class StandDown(BaseTool):
"""Tool for making Unitree Go2 robot stand down. The robot lies down, and the motor joints remain locked."""

class Config:
"""Configuration for this pydantic object."""

extra = "allow"
arbitrary_types_allowed = True

args_schema: ArgSchema = ArgSchema(**ARGSCHEMA)
description: str = "Control the Unitree Go2 robot to stand down. The robot lies down, and the motor joints remain locked."
network_interface_name: Optional[str]

def __init__(self, **data: Any) -> None:
super().__init__(**data)
ChannelFactoryInitialize(0, self.network_interface_name)
self.sport_client = SportClient()
self.sport_client.SetTimeout(10.0)
self.sport_client.Init()

@field_validator("network_interface_name")
@classmethod
def network_interface_name_validator(cls, network_interface_name: Union[str, None]) -> Union[str, None]:
if network_interface_name == None:
raise ValueError("network interface name is not provided.")
return network_interface_name

def _run(
self
) -> Dict[str, Any]:
"""
Control the Go2 to stand down.
"""

try:
self.sport_client.StandDown()
return {
"code": 0,
"msg": "success",
}
except Exception as e:
logging.error(f"Stand down failed: {e}")
return {
"code": 500,
"msg": "failed",
}

if __name__ == "__main__":
tool = StandDown(network_interface_name="eth0")
tool.run()
70 changes: 70 additions & 0 deletions omagent-core/src/omagent_core/tool_system/tools/ut_dog/stand_up.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

from pydantic import field_validator
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
from unitree_sdk2py.go2.sport.sport_client import (
SportClient,
PathPoint,
SPORT_PATH_POINT_SIZE,
)

from ....utils.logger import logging
from ....utils.registry import registry
from ...base import ArgSchema, BaseTool

CURRENT_PATH = Path(__file__).parents[0]

ARGSCHEMA = {
}


@registry.register_tool()
class StandUp(BaseTool):
"""Tool for making Unitree Go2 robot stand up. The robot stands up normally, with the joint motor locked. Compared to the balanced standing mode, this mode does not maintain a constant balance posture. The default standing height is 0.33m."""

class Config:
"""Configuration for this pydantic object."""

extra = "allow"
arbitrary_types_allowed = True

args_schema: ArgSchema = ArgSchema(**ARGSCHEMA)
description: str = "Control the Unitree Go2 robot to stand up. The robot stands up normally, with the joint motor locked. Compared to the balanced standing mode, this mode does not maintain a constant balance posture. The default standing height is 0.33m."
network_interface_name: Optional[str]

def __init__(self, **data: Any) -> None:
super().__init__(**data)
ChannelFactoryInitialize(0, self.network_interface_name)
self.sport_client = SportClient()
self.sport_client.SetTimeout(10.0)
self.sport_client.Init()

@field_validator("network_interface_name")
@classmethod
def network_interface_name_validator(cls, network_interface_name: Union[str, None]) -> Union[str, None]:
if network_interface_name == None:
raise ValueError("network interface name is not provided.")
return network_interface_name

def _run(
self
) -> Dict[str, Any]:
"""
Control the Go2 to stand up.
"""

try:
self.sport_client.StandUp()
return {
"code": 0,
"msg": "success",
}
except Exception as e:
logging.error(f"Stand up failed: {e}")
return {
"code": 500,
"msg": "failed",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

from pydantic import field_validator
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
from unitree_sdk2py.go2.sport.sport_client import (
SportClient,
PathPoint,
SPORT_PATH_POINT_SIZE,
)

from ....utils.logger import logging
from ....utils.registry import registry
from ...base import ArgSchema, BaseTool

CURRENT_PATH = Path(__file__).parents[0]

ARGSCHEMA = {
}


@registry.register_tool()
class StopMove(BaseTool):
"""Tool for making Unitree Go2 robot stop move. This operation will stop the current motion and reset the motion command to the default value."""

class Config:
"""Configuration for this pydantic object."""

extra = "allow"
arbitrary_types_allowed = True

args_schema: ArgSchema = ArgSchema(**ARGSCHEMA)
description: str = "Control the Unitree Go2 robot to stop move. This operation will stop the current motion and reset the motion command to the default value."
network_interface_name: Optional[str]

def __init__(self, **data: Any) -> None:
super().__init__(**data)
ChannelFactoryInitialize(0, self.network_interface_name)
self.sport_client = SportClient()
self.sport_client.SetTimeout(10.0)
self.sport_client.Init()

@field_validator("network_interface_name")
@classmethod
def network_interface_name_validator(cls, network_interface_name: Union[str, None]) -> Union[str, None]:
if network_interface_name == None:
raise ValueError("network interface name is not provided.")
return network_interface_name

def _run(
self,
) -> Dict[str, Any]:
"""
Control the Go2 to stop move.
"""

try:
self.sport_client.StopMove()
return {
"code": 0,
"msg": "success",
}
except Exception as e:
logging.error(f"Stop move failed: {e}")
return {
"code": 500,
"msg": "failed",
}