Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 77 additions & 64 deletions pybricksdev/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pybricksdev.connections.pybricks import (
HubDisconnectError,
HubPowerButtonPressedError,
PybricksHub,
)

PROG_NAME = (
Expand Down Expand Up @@ -184,70 +185,7 @@ def add_parser(self, subparsers: argparse._SubParsersAction):

async def run(self, args: argparse.Namespace):

# Pick the right connection
if args.conntype == "ble":
from pybricksdev.ble import find_device as find_ble
from pybricksdev.connections.pybricks import PybricksHubBLE

# It is a Pybricks Hub with BLE. Device name or address is given.
print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
device_or_address = await find_ble(args.name)
hub = PybricksHubBLE(device_or_address)
elif args.conntype == "usb":
from usb.core import find as find_usb

from pybricksdev.connections.pybricks import PybricksHubUSB
from pybricksdev.usb import (
EV3_USB_PID,
LEGO_USB_VID,
MINDSTORMS_INVENTOR_USB_PID,
NXT_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
SPIKE_PRIME_USB_PID,
)

def is_pybricks_usb(dev):
return (
(dev.idVendor == LEGO_USB_VID)
and (
dev.idProduct
in [
NXT_USB_PID,
EV3_USB_PID,
SPIKE_PRIME_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
MINDSTORMS_INVENTOR_USB_PID,
]
)
and dev.product.endswith("Pybricks")
)

device_or_address = find_usb(custom_match=is_pybricks_usb)

if device_or_address is None:
print("Pybricks Hub not found.", file=sys.stderr)
exit(1)

hub = PybricksHubUSB(device_or_address)
else:
raise ValueError(f"Unknown connection type: {args.conntype}")

# Connect to the address and run the script
await hub.connect()
try:
with _get_script_path(args.file) as script_path:
if args.start:
await hub.run(script_path, args.wait or args.stay_connected)
else:
if args.stay_connected:
# if the user later starts the program by pressing the button on the hub,
# we still want the hub stdout to print to Python's stdout
hub.print_output = True
hub._enable_line_handler = True
await hub.download(script_path)

if not args.stay_connected:
return
async def stay_connected_menu(hub: PybricksHub):

class ResponseOptions(IntEnum):
RECOMPILE_RUN = 0
Expand Down Expand Up @@ -359,6 +297,10 @@ async def reconnect_hub():
case _:
return

except SyntaxError as e:
print("\nA syntax error occurred while parsing your program:")
print(e, "\n")

except HubPowerButtonPressedError:
# This means the user pressed the button on the hub to re-start the
# current program, so the menu was canceled and we are now printing
Expand All @@ -375,6 +317,77 @@ async def reconnect_hub():
await asyncio.sleep(0.3)
hub = await reconnect_hub()

# Pick the right connection
if args.conntype == "ble":
from pybricksdev.ble import find_device as find_ble
from pybricksdev.connections.pybricks import PybricksHubBLE

# It is a Pybricks Hub with BLE. Device name or address is given.
print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
device_or_address = await find_ble(args.name)
hub = PybricksHubBLE(device_or_address)
elif args.conntype == "usb":
from usb.core import find as find_usb

from pybricksdev.connections.pybricks import PybricksHubUSB
from pybricksdev.usb import (
EV3_USB_PID,
LEGO_USB_VID,
MINDSTORMS_INVENTOR_USB_PID,
NXT_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
SPIKE_PRIME_USB_PID,
)

def is_pybricks_usb(dev):
return (
(dev.idVendor == LEGO_USB_VID)
and (
dev.idProduct
in [
NXT_USB_PID,
EV3_USB_PID,
SPIKE_PRIME_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
MINDSTORMS_INVENTOR_USB_PID,
]
)
and dev.product.endswith("Pybricks")
)

device_or_address = find_usb(custom_match=is_pybricks_usb)

if device_or_address is None:
print("Pybricks Hub not found.", file=sys.stderr)
exit(1)

hub = PybricksHubUSB(device_or_address)
else:
raise ValueError(f"Unknown connection type: {args.conntype}")

# Connect to the address and run the script
await hub.connect()
try:
with _get_script_path(args.file) as script_path:
if args.start:
await hub.run(script_path, args.wait or args.stay_connected)
else:
if args.stay_connected:
# if the user later starts the program by pressing the button on the hub,
# we still want the hub stdout to print to Python's stdout
hub.print_output = True
hub._enable_line_handler = True
await hub.download(script_path)

if args.stay_connected:
await stay_connected_menu(hub)

except SyntaxError as e:
print("\nA syntax error occurred while parsing your program:")
print(e, "\n")
if args.stay_connected:
await stay_connected_menu(hub)

finally:
await hub.disconnect()

Expand Down