diff --git a/src/arduino/app_bricks/wave_generator/README.md b/src/arduino/app_bricks/wave_generator/README.md new file mode 100644 index 00000000..c0b64a7c --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/README.md @@ -0,0 +1,108 @@ +# Wave Generator brick + +This brick provides continuous wave generation for real-time audio synthesis with multiple waveform types and smooth transitions. + +## Overview + +The Wave Generator brick allows you to: + +- Generate continuous audio waveforms in real-time +- Select between different waveform types (sine, square, sawtooth, triangle) +- Control frequency and amplitude dynamically during playback +- Configure smooth transitions with attack, release, and glide (portamento) parameters +- Stream audio to USB speakers with minimal latency + +It runs continuously in a background thread, producing audio blocks at a steady rate with configurable envelope parameters for professional-sounding synthesis. + +## Features + +- Four waveform types: sine, square, sawtooth, and triangle +- Real-time frequency and amplitude control with smooth transitions +- Configurable envelope parameters (attack, release, glide) +- Hardware volume control support +- Thread-safe operation for concurrent access +- Efficient audio generation using NumPy vectorization +- Custom speaker configuration support + +## Prerequisites + +Before using the Wave Generator brick, ensure you have the following: + +- USB-C® Hub with external power supply (5V, 3A) +- USB audio device (USB speaker or USB-C → 3.5mm adapter) +- Arduino UNO Q running in Network Mode or SBC Mode (USB-C port needed for the hub) + +## Code example and usage + +Here is a basic example for generating a 440 Hz sine wave tone: + +```python +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator() + +App.start_brick(wave_gen) + +# Set frequency to A4 note (440 Hz) +wave_gen.set_frequency(440.0) + +# Set amplitude to 80% +wave_gen.set_amplitude(0.8) + +App.run() +``` + +You can customize the waveform type and envelope parameters: + +```python +wave_gen = WaveGenerator( + wave_type="square", + attack=0.01, + release=0.03, + glide=0.02 +) + +App.start_brick(wave_gen) + +# Change waveform during playback +wave_gen.set_wave_type("triangle") + +# Adjust envelope parameters +wave_gen.set_envelope_params(attack=0.05, release=0.1, glide=0.05) + +App.run() +``` + +For specific hardware configurations, you can provide a custom Speaker instance: + +```python +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_peripherals.speaker import Speaker +from arduino.app_utils import App + +speaker = Speaker( + device=Speaker.USB_SPEAKER_2, + sample_rate=16000, + channels=1, + format="S16_LE" +) + +wave_gen = WaveGenerator(sample_rate=16000, speaker=speaker) + +App.start_brick(wave_gen) +wave_gen.set_frequency(440.0) +wave_gen.set_amplitude(0.7) + +App.run() +``` + +## Understanding Wave Generation + +The Wave Generator brick produces audio through continuous waveform synthesis. + +The `frequency` parameter controls the pitch of the output sound, measured in Hertz (Hz), where typical audible frequencies range from 20 Hz to 8000 Hz. + +The `amplitude` parameter controls the volume as a value between 0.0 (silent) and 1.0 (maximum), with smooth transitions handled by the attack and release envelope parameters. + +The `glide` parameter (also known as portamento) smoothly transitions between frequencies over time, creating sliding pitch effects similar to a theremin or synthesizer. Setting glide to 0 disables this effect but may cause audible clicks during fast frequency changes. diff --git a/src/arduino/app_bricks/wave_generator/__init__.py b/src/arduino/app_bricks/wave_generator/__init__.py new file mode 100644 index 00000000..ab8a5764 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +from .wave_generator import * + +__all__ = ["WaveGenerator"] diff --git a/src/arduino/app_bricks/wave_generator/brick_config.yaml b/src/arduino/app_bricks/wave_generator/brick_config.yaml new file mode 100644 index 00000000..0d8ae661 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/brick_config.yaml @@ -0,0 +1,4 @@ +id: arduino:wave_generator +name: Wave Generator +description: "Continuous wave generator for audio synthesis. Generates sine, square, sawtooth, and triangle waveforms with smooth frequency and amplitude transitions." +category: audio diff --git a/src/arduino/app_bricks/wave_generator/examples/01_basic_tone.py b/src/arduino/app_bricks/wave_generator/examples/01_basic_tone.py new file mode 100644 index 00000000..5acd492f --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/01_basic_tone.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Basic Wave Generator Example + +Generates a simple 440Hz sine wave (A4 note) and demonstrates +basic frequency and amplitude control. +""" + +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +# Create wave generator with default settings +wave_gen = WaveGenerator( + sample_rate=16000, + wave_type="sine", + glide=0.02, # 20ms smooth frequency transitions +) + +# Start the generator +App.start_brick(wave_gen) + +# Set initial frequency and amplitude +wave_gen.set_frequency(440.0) # A4 note (440 Hz) +wave_gen.set_amplitude(0.7) # 70% amplitude +wave_gen.set_volume(80) # 80% hardware volume + +print("Playing 440Hz sine wave (A4 note)") +print("Press Ctrl+C to stop") + +# Keep the application running +App.run() diff --git a/src/arduino/app_bricks/wave_generator/examples/02_waveform_types.py b/src/arduino/app_bricks/wave_generator/examples/02_waveform_types.py new file mode 100644 index 00000000..320757c3 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/02_waveform_types.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Waveform Comparison Example + +Cycles through different waveform types to hear the difference +between sine, square, sawtooth, and triangle waves. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator(sample_rate=16000, glide=0.02) +App.start_brick(wave_gen) + +# Set constant frequency and amplitude +wave_gen.set_frequency(440.0) +wave_gen.set_amplitude(0.6) + +waveforms = ["sine", "square", "sawtooth", "triangle"] + + +def cycle_waveforms(): + """Cycle through different waveform types.""" + for wave_type in waveforms: + print(f"Playing {wave_type} wave...") + wave_gen.set_wave_type(wave_type) + time.sleep(3) + # Silence + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Cycling through waveforms:") +print("sine → square → sawtooth → triangle") +print("Press Ctrl+C to stop") + +App.run(user_loop=cycle_waveforms) diff --git a/src/arduino/app_bricks/wave_generator/examples/03_frequency_sweep.py b/src/arduino/app_bricks/wave_generator/examples/03_frequency_sweep.py new file mode 100644 index 00000000..614d9962 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/03_frequency_sweep.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Frequency Sweep Example + +Demonstrates smooth frequency transitions (glide/portamento effect) +by sweeping through different frequency ranges. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator( + wave_type="sine", + glide=0.05, # 50ms glide for noticeable portamento +) + +App.start_brick(wave_gen) +wave_gen.set_amplitude(0.7) + + +def frequency_sweep(): + """Sweep through frequency ranges.""" + + # Low to high sweep + print("Sweeping low to high (220Hz → 880Hz)...") + for freq in range(220, 881, 20): + wave_gen.set_frequency(float(freq)) + time.sleep(0.1) + + time.sleep(0.5) + + # High to low sweep + print("Sweeping high to low (880Hz → 220Hz)...") + for freq in range(880, 219, -20): + wave_gen.set_frequency(float(freq)) + time.sleep(0.1) + # Fade out + print("Fading out...") + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Frequency sweep demonstration") +print("Listen for smooth glide between frequencies") +print("Press Ctrl+C to stop") + +App.run(user_loop=frequency_sweep) diff --git a/src/arduino/app_bricks/wave_generator/examples/04_envelope_control.py b/src/arduino/app_bricks/wave_generator/examples/04_envelope_control.py new file mode 100644 index 00000000..395099fb --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/04_envelope_control.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Envelope Control Example + +Demonstrates amplitude envelope control with different +attack and release times for various sonic effects. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator(wave_type="sine") +App.start_brick(wave_gen) + +wave_gen.set_frequency(440.0) +wave_gen.set_volume(80) + + +def envelope_demo(): + """Demonstrate different envelope settings.""" + + # Fast attack, fast release (percussive) + print("1. Percussive (fast attack/release)...") + wave_gen.set_envelope_params(attack=0.001, release=0.01, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(0.5) + wave_gen.set_amplitude(0.0) + time.sleep(1) + + # Slow attack, fast release (pad-like) + print("2. Pad-like (slow attack, fast release)...") + wave_gen.set_envelope_params(attack=0.2, release=0.05, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(1) + wave_gen.set_amplitude(0.0) + time.sleep(1) + + # Fast attack, slow release (sustained) + print("3. Sustained (fast attack, slow release)...") + wave_gen.set_envelope_params(attack=0.01, release=0.3, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(0.5) + wave_gen.set_amplitude(0.0) + time.sleep(1.5) + + # Medium attack and release (balanced) + print("4. Balanced (medium attack/release)...") + wave_gen.set_envelope_params(attack=0.05, release=0.05, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(0.8) + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Envelope Control Demonstration") +print("Listen to different attack/release characteristics") +print("Press Ctrl+C to stop") + +App.run(user_loop=envelope_demo) diff --git a/src/arduino/app_bricks/wave_generator/examples/05_external_speaker.py b/src/arduino/app_bricks/wave_generator/examples/05_external_speaker.py new file mode 100644 index 00000000..8d9a0968 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/05_external_speaker.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Custom Speaker Configuration Example + +Demonstrates how to use a pre-configured Speaker instance with WaveGenerator. +Use this approach when you need: +- Specific USB speaker selection (USB_SPEAKER_2, etc.) +- Different audio format (S16_LE, etc.) +- Explicit device name ("plughw:CARD=Device,DEV=0") +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_peripherals.speaker import Speaker +from arduino.app_utils import App + +# List available USB speakers +available_speakers = Speaker.list_usb_devices() +print(f"Available USB speakers: {available_speakers}") + +# Create and configure a Speaker with specific parameters +speaker = Speaker( + device=Speaker.USB_SPEAKER_1, # or None for auto-detect, or specific device + sample_rate=16000, + channels=1, + format="FLOAT_LE", +) + +# Create WaveGenerator with the external speaker +# WaveGenerator will manage the speaker's lifecycle (start/stop) +wave_gen = WaveGenerator( + sample_rate=16000, + speaker=speaker, # Pass pre-configured speaker + wave_type="sine", + glide=0.02, +) + +# Start the WaveGenerator (which will also start the speaker) +App.start_brick(wave_gen) + + +def play_sequence(): + """Play a simple frequency sequence.""" + frequencies = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25] # C4 to C5 + + for freq in frequencies: + print(f"Playing {freq:.2f} Hz") + wave_gen.set_frequency(freq) + wave_gen.set_amplitude(0.7) + time.sleep(0.5) + + # Fade out + wave_gen.set_amplitude(0.0) + time.sleep(1) + + +print("Playing musical scale with external speaker...") +print("Press Ctrl+C to stop") + +App.run(user_loop=play_sequence) + +# WaveGenerator automatically stops the speaker when it stops +print("Done") diff --git a/src/arduino/app_bricks/wave_generator/wave_generator.py b/src/arduino/app_bricks/wave_generator/wave_generator.py new file mode 100644 index 00000000..8df33cd4 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/wave_generator.py @@ -0,0 +1,390 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import logging +import math +import threading +import time +import numpy as np +from typing import Literal +from arduino.app_utils import Logger, brick +from arduino.app_peripherals.speaker import Speaker + +logger = Logger("WaveGenerator", logging.INFO) + + +WaveType = Literal["sine", "square", "sawtooth", "triangle"] + + +@brick +class WaveGenerator: + """Continuous wave generator brick for audio synthesis. + + This brick generates continuous audio waveforms (sine, square, sawtooth, triangle) + and streams them to a USB speaker in real-time. It provides smooth transitions + between frequency and amplitude changes using configurable envelope parameters. + + The generator runs continuously in a background thread, producing audio blocks + at a steady rate with minimal latency. + + Attributes: + sample_rate (int): Audio sample rate in Hz (default: 16000). + wave_type (WaveType): Type of waveform to generate. + frequency (float): Current output frequency in Hz. + amplitude (float): Current output amplitude (0.0-1.0). + """ + + def __init__( + self, + sample_rate: int = 16000, + wave_type: WaveType = "sine", + block_duration: float = 0.03, + attack: float = 0.01, + release: float = 0.03, + glide: float = 0.02, + speaker: Speaker = None, + ): + """Initialize the WaveGenerator brick. + + Args: + sample_rate (int): Audio sample rate in Hz (default: 16000). + wave_type (WaveType): Initial waveform type (default: "sine"). + block_duration (float): Duration of each audio block in seconds (default: 0.03). + attack (float): Attack time for amplitude envelope in seconds (default: 0.01). + release (float): Release time for amplitude envelope in seconds (default: 0.03). + glide (float): Frequency glide time (portamento) in seconds (default: 0.02). + speaker (Speaker, optional): Pre-configured Speaker instance. If None, a new Speaker + will be created with default settings (auto-detect device, FLOAT_LE format). + WaveGenerator will manage the speaker's lifecycle (calling start/stop). + + Raises: + SpeakerException: If no USB speaker is found or device is busy. + """ + self.sample_rate = int(sample_rate) + self.block_duration = float(block_duration) + self.wave_type = wave_type + + # Envelope parameters + self.attack = float(attack) + self.release = float(release) + self.glide = float(glide) + + # Target state (updated by user) + self._target_freq = 440.0 + self._target_amp = 0.0 + + # Current state (internal, smoothed) + self._current_freq = 440.0 + self._current_amp = 0.0 + self._phase = 0.0 + + # Pre-allocated buffers + self._buf_N = 0 + self._buf_phase_incs = None + self._buf_phases = None + self._buf_envelope = None + self._buf_samples = None + + # Speaker setup + if speaker is not None: + # Use externally provided Speaker instance + self._speaker = speaker + logger.info("Using externally provided Speaker instance") + else: + # Create internal Speaker instance with default settings + self._speaker = Speaker( + device=None, # Auto-detect first available USB speaker + sample_rate=sample_rate, + channels=1, + format="FLOAT_LE", + ) + logger.info( + "Created internal Speaker: device=auto-detect, sample_rate=%d, format=FLOAT_LE", + sample_rate, + ) + + # Producer thread control + self._running = threading.Event() + self._producer_thread = None + self._state_lock = threading.Lock() + + logger.info( + "WaveGenerator initialized: sample_rate=%d, wave_type=%s, block_dur=%.3fs", + sample_rate, + wave_type, + block_duration, + ) + + def start(self): + """Start the wave generator and audio output. + + This starts the speaker device and launches the producer thread that + continuously generates and streams audio blocks. + """ + if self._running.is_set(): + logger.warning("WaveGenerator is already running") + return + + logger.info("Starting WaveGenerator...") + self._speaker.start() + + # Set hardware speaker volume to maximum (100%) + try: + self._speaker.set_volume(100) + logger.info("Speaker hardware volume set to 100%") + except Exception as e: + logger.warning(f"Could not set speaker volume: {e}") + + self._running.set() + + self._producer_thread = threading.Thread(target=self._producer_loop, daemon=True, name="WaveGenerator-Producer") + self._producer_thread.start() + + logger.info("WaveGenerator started") + + def stop(self): + """Stop the wave generator and audio output. + + This stops the producer thread and closes the speaker device. + """ + if not self._running.is_set(): + logger.warning("WaveGenerator is not running") + return + + logger.info("Stopping WaveGenerator...") + self._running.clear() + + if self._producer_thread: + self._producer_thread.join(timeout=5) + if self._producer_thread.is_alive(): + logger.warning("Producer thread did not terminate in time") + self._producer_thread = None + + self._speaker.stop() + logger.info("WaveGenerator stopped") + + def set_frequency(self, frequency: float): + """Set the target output frequency. + + The frequency will smoothly transition to the new value over the + configured glide time. + + Args: + frequency (float): Target frequency in Hz (typically 20-8000 Hz). + """ + with self._state_lock: + self._target_freq = float(max(0.0, frequency)) + + def set_amplitude(self, amplitude: float): + """Set the target output amplitude. + + The amplitude will smoothly transition to the new value over the + configured attack/release time. + + Args: + amplitude (float): Target amplitude in range [0.0, 1.0]. + """ + with self._state_lock: + self._target_amp = float(max(0.0, min(1.0, amplitude))) + + def set_wave_type(self, wave_type: WaveType): + """Change the waveform type. + + Args: + wave_type (WaveType): One of "sine", "square", "sawtooth", "triangle". + + Raises: + ValueError: If wave_type is not valid. + """ + valid_types = ["sine", "square", "sawtooth", "triangle"] + if wave_type not in valid_types: + raise ValueError(f"Invalid wave_type '{wave_type}'. Must be one of {valid_types}") + + with self._state_lock: + self.wave_type = wave_type + logger.info(f"Wave type changed to: {wave_type}") + + def set_volume(self, volume: int): + """Set the speaker volume level. + + This is a wrapper that controls the hardware volume of the USB speaker device. + + Args: + volume (int): Hardware volume level (0-100). + + Raises: + SpeakerException: If the mixer is not available or if volume cannot be set. + """ + self._speaker.set_volume(volume) + logger.info(f"Speaker volume set to {volume}%") + + def get_volume(self) -> int: + """Get the current speaker volume level. + + Returns: + int: Current hardware volume level (0-100). + """ + try: + return self._speaker._mixer.getvolume()[0] if self._speaker._mixer else 100 + except Exception: + return 100 + + def set_envelope_params(self, attack: float = None, release: float = None, glide: float = None): + """Update envelope parameters. + + Args: + attack (float, optional): Attack time in seconds. + release (float, optional): Release time in seconds. + glide (float, optional): Frequency glide time in seconds. + """ + with self._state_lock: + if attack is not None: + self.attack = float(max(0.0, attack)) + if release is not None: + self.release = float(max(0.0, release)) + if glide is not None: + self.glide = float(max(0.0, glide)) + + def get_state(self) -> dict: + """Get current generator state. + + Returns: + dict: Dictionary containing current frequency, amplitude, wave type, etc. + """ + with self._state_lock: + return { + "frequency": self._current_freq, + "amplitude": self._current_amp, + "wave_type": self.wave_type, + "volume": self.get_volume(), + "phase": self._phase, + } + + def _producer_loop(self): + """Main producer loop running in background thread. + + Continuously generates audio blocks at a steady cadence and streams + them to the speaker device. + """ + logger.debug("Producer loop started") + next_time = time.perf_counter() + block_count = 0 + + while self._running.is_set(): + next_time += self.block_duration + + # Read target state + with self._state_lock: + target_freq = self._target_freq + target_amp = self._target_amp + wave_type = self.wave_type + + # Log every 100 blocks or when amplitude changes + block_count += 1 + if block_count % 100 == 0 or (block_count < 5): + logger.debug(f"Producer: block={block_count}, freq={target_freq:.1f}Hz, amp={target_amp:.3f}") + + # Generate audio block + try: + audio_block = self._generate_block(target_freq, target_amp, wave_type) + self._speaker.play(audio_block, block_on_queue=False) + except Exception as e: + logger.error(f"Error generating audio block: {e}") + + # Wait until next scheduled time + now = time.perf_counter() + sleep_time = next_time - now + if sleep_time > 0: + time.sleep(sleep_time) + else: + # We're falling behind, reset timing + next_time = now + + logger.debug("Producer loop terminated") + + def _generate_block(self, freq_target: float, amp_target: float, wave_type: str) -> np.ndarray: + """Generate a single audio block. + + Args: + freq_target (float): Target frequency in Hz. + amp_target (float): Target amplitude (0.0-1.0). + wave_type (str): Waveform type. + + Returns: + np.ndarray: Audio samples as float32 array. + """ + N = max(1, int(self.sample_rate * self.block_duration)) + + # Ensure buffers are allocated + if N > self._buf_N: + self._buf_N = N + self._buf_phase_incs = np.empty(self._buf_N, dtype=np.float32) + self._buf_phases = np.empty(self._buf_N, dtype=np.float32) + self._buf_envelope = np.empty(self._buf_N, dtype=np.float32) + self._buf_samples = np.empty(self._buf_N, dtype=np.float32) + + phases = self._buf_phases[:N] + envelope = self._buf_envelope[:N] + samples = self._buf_samples[:N] + + # === AMPLITUDE SMOOTHING === + amp_current = self._current_amp + if amp_target == amp_current or (self.attack <= 0.0 and self.release <= 0.0): + envelope.fill(amp_target) + else: + ramp = self.attack if amp_target > amp_current else self.release + if ramp <= 0.0: + envelope.fill(amp_target) + else: + frac = min(1.0, self.block_duration / ramp) + next_amp = amp_current + (amp_target - amp_current) * frac + envelope[:] = np.linspace(amp_current, next_amp, N, dtype=np.float32) + amp_current = float(envelope[-1]) + + # === FREQUENCY GLIDE (PORTAMENTO) === + freq_current = self._current_freq + phase_incs = self._buf_phase_incs[:N] + + if self.glide > 0.0 and freq_current != freq_target: + # Apply glide smoothing over time + frac = min(1.0, self.block_duration / self.glide) + next_freq = freq_current + (freq_target - freq_current) * frac + + # Linear interpolation within block + freq_ramp = np.linspace(freq_current, next_freq, N, dtype=np.float32) + phase_incs[:] = 2.0 * math.pi * freq_ramp / float(self.sample_rate) + + freq_current = float(next_freq) + else: + # No glide or already at target + phase_incr = 2.0 * math.pi * freq_target / float(self.sample_rate) + phase_incs.fill(phase_incr) + freq_current = freq_target + + # === PHASE ACCUMULATION === + np.cumsum(phase_incs, dtype=np.float32, out=phases) + phases += self._phase + self._phase = float(phases[-1] % (2.0 * math.pi)) + + # === WAVEFORM GENERATION === + if wave_type == "sine": + np.sin(phases, out=samples) + elif wave_type == "square": + samples[:] = np.where(np.sin(phases) >= 0, 1.0, -1.0) + elif wave_type == "sawtooth": + samples[:] = 2.0 * (phases / (2.0 * math.pi) % 1.0) - 1.0 + elif wave_type == "triangle": + samples[:] = 2.0 * np.abs(2.0 * (phases / (2.0 * math.pi) % 1.0) - 1.0) - 1.0 + else: + # Fallback to sine + np.sin(phases, out=samples) + + # === APPLY ENVELOPE AND GAIN === + np.multiply(samples, envelope, out=samples) + + # Update internal state + self._current_amp = amp_current + self._current_freq = freq_current + + return samples diff --git a/src/arduino/app_utils/ledmatrix.py b/src/arduino/app_utils/ledmatrix.py index 0f8d402b..e2866486 100644 --- a/src/arduino/app_utils/ledmatrix.py +++ b/src/arduino/app_utils/ledmatrix.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/tests/arduino/app_bricks/wave_generator/test_wave_generator.py b/tests/arduino/app_bricks/wave_generator/test_wave_generator.py new file mode 100644 index 00000000..12c3131a --- /dev/null +++ b/tests/arduino/app_bricks/wave_generator/test_wave_generator.py @@ -0,0 +1,485 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import pytest +import numpy as np +import threading +import time +from arduino.app_bricks.wave_generator import WaveGenerator +import arduino.app_utils.app as app +from arduino.app_utils import AppController + + +@pytest.fixture +def app_instance(monkeypatch): + """Provides a fresh AppController instance for each test.""" + instance = AppController() + monkeypatch.setattr(app, "App", instance) + return instance + + +@pytest.fixture(autouse=True) +def mock_speaker(monkeypatch): + """Mock Speaker to avoid hardware dependencies.""" + + class FakeSpeaker: + def __init__(self, device=None, sample_rate=16000, channels=1, format="FLOAT_LE"): + self.device = device or "fake_device" + self.sample_rate = sample_rate + self.channels = channels + self.format = format + self._is_started = False + self._played_data = [] + self._mixer = FakeMixer() + + def start(self): + self._is_started = True + + def stop(self): + self._is_started = False + + def play(self, data, block_on_queue=False): + if self._is_started: + self._played_data.append(data) + + def set_volume(self, volume: int): + self._mixer.setvolume(volume) + + def is_started(self): + return self._is_started + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + return False + + class FakeMixer: + def __init__(self): + self._volume = 100 + + def setvolume(self, volume: int): + self._volume = max(0, min(100, volume)) + + def getvolume(self): + return [self._volume] + + # Patch Speaker in the wave_generator module + monkeypatch.setattr("arduino.app_bricks.wave_generator.wave_generator.Speaker", FakeSpeaker) + return FakeSpeaker + + +def test_wave_generator_initialization_default(mock_speaker): + """Test WaveGenerator initializes with default parameters.""" + wave_gen = WaveGenerator() + + assert wave_gen.sample_rate == 16000 + assert wave_gen.wave_type == "sine" + assert wave_gen.block_duration == 0.03 + assert wave_gen.attack == 0.01 + assert wave_gen.release == 0.03 + assert wave_gen.glide == 0.02 + assert wave_gen._speaker is not None + assert wave_gen._speaker.sample_rate == 16000 + + +def test_wave_generator_initialization_custom(mock_speaker): + """Test WaveGenerator initializes with custom parameters.""" + wave_gen = WaveGenerator( + sample_rate=48000, + wave_type="square", + block_duration=0.05, + attack=0.02, + release=0.05, + glide=0.03, + ) + + assert wave_gen.sample_rate == 48000 + assert wave_gen.wave_type == "square" + assert wave_gen.block_duration == 0.05 + assert wave_gen.attack == 0.02 + assert wave_gen.release == 0.05 + assert wave_gen.glide == 0.03 + assert wave_gen._speaker.sample_rate == 48000 + + +def test_wave_generator_with_external_speaker(mock_speaker): + """Test WaveGenerator with externally provided Speaker.""" + external_speaker = mock_speaker(device="external_device", sample_rate=16000) + wave_gen = WaveGenerator(speaker=external_speaker) + + assert wave_gen._speaker is external_speaker + assert wave_gen._speaker.device == "external_device" + + +def test_wave_generator_start_stop(app_instance, mock_speaker): + """Test WaveGenerator start and stop methods.""" + wave_gen = WaveGenerator() + + # Initially not running + assert not wave_gen._running.is_set() + + # Start the generator + wave_gen.start() + assert wave_gen._running.is_set() + assert wave_gen._speaker.is_started() + assert wave_gen._producer_thread is not None + assert wave_gen._producer_thread.is_alive() + + time.sleep(0.1) # Let it run briefly + + # Stop the generator + wave_gen.stop() + assert not wave_gen._running.is_set() + assert not wave_gen._speaker.is_started() + + # Wait for thread to finish + time.sleep(0.1) + + +def test_set_frequency(mock_speaker): + """Test setting frequency.""" + wave_gen = WaveGenerator() + + wave_gen.set_frequency(440.0) + assert wave_gen._target_freq == 440.0 + + wave_gen.set_frequency(880.0) + assert wave_gen._target_freq == 880.0 + + # Test negative frequency (should be clamped to 0) + wave_gen.set_frequency(-100.0) + assert wave_gen._target_freq == 0.0 + + +def test_set_amplitude(mock_speaker): + """Test setting amplitude.""" + wave_gen = WaveGenerator() + + wave_gen.set_amplitude(0.5) + assert wave_gen._target_amp == 0.5 + + wave_gen.set_amplitude(1.0) + assert wave_gen._target_amp == 1.0 + + # Test out of range (should be clamped) + wave_gen.set_amplitude(1.5) + assert wave_gen._target_amp == 1.0 + + wave_gen.set_amplitude(-0.5) + assert wave_gen._target_amp == 0.0 + + +def test_set_wave_type(mock_speaker): + """Test setting wave type.""" + wave_gen = WaveGenerator() + + wave_gen.set_wave_type("sine") + assert wave_gen.wave_type == "sine" + + wave_gen.set_wave_type("square") + assert wave_gen.wave_type == "square" + + wave_gen.set_wave_type("sawtooth") + assert wave_gen.wave_type == "sawtooth" + + wave_gen.set_wave_type("triangle") + assert wave_gen.wave_type == "triangle" + + # Test invalid wave type + with pytest.raises(ValueError): + wave_gen.set_wave_type("invalid") + + +def test_set_volume(mock_speaker): + """Test setting hardware volume.""" + wave_gen = WaveGenerator() + + wave_gen.set_volume(70) + assert wave_gen._speaker._mixer._volume == 70 + + wave_gen.set_volume(100) + assert wave_gen._speaker._mixer._volume == 100 + + # Test get_volume + assert wave_gen.get_volume() == 100 + + wave_gen.set_volume(50) + assert wave_gen.get_volume() == 50 + + +def test_set_envelope_params(mock_speaker): + """Test setting envelope parameters.""" + wave_gen = WaveGenerator() + + wave_gen.set_envelope_params(attack=0.05) + assert wave_gen.attack == 0.05 + + wave_gen.set_envelope_params(release=0.1) + assert wave_gen.release == 0.1 + + wave_gen.set_envelope_params(glide=0.04) + assert wave_gen.glide == 0.04 + + # Test all at once + wave_gen.set_envelope_params(attack=0.02, release=0.06, glide=0.03) + assert wave_gen.attack == 0.02 + assert wave_gen.release == 0.06 + assert wave_gen.glide == 0.03 + + # Test negative values (should be clamped to 0) + wave_gen.set_envelope_params(attack=-0.01) + assert wave_gen.attack == 0.0 + + +def test_get_state(mock_speaker): + """Test getting current generator state.""" + wave_gen = WaveGenerator() + + wave_gen.set_frequency(440.0) + wave_gen.set_amplitude(0.8) + wave_gen.set_wave_type("square") + wave_gen.set_volume(90) + + state = wave_gen.get_state() + + assert "frequency" in state + assert "amplitude" in state + assert "wave_type" in state + assert state["wave_type"] == "square" + assert "volume" in state + assert state["volume"] == 90 + assert "phase" in state + + +def test_generate_block_sine(mock_speaker): + """Test generating a sine wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + # Generate a block + block = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Check block properties + assert isinstance(block, np.ndarray) + assert block.dtype == np.float32 + expected_samples = int(16000 * 0.03) # block_duration = 0.03 + assert len(block) == expected_samples + # Check amplitude is within range + assert np.max(np.abs(block)) <= 0.5 + + +def test_generate_block_square(mock_speaker): + """Test generating a square wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + block = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="square") + + assert isinstance(block, np.ndarray) + # Square wave has envelope applied, so check amplitude range + assert np.max(np.abs(block)) <= 0.5 + + +def test_generate_block_sawtooth(mock_speaker): + """Test generating a sawtooth wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sawtooth") + + # Verify internal state updated correctly + assert wave_gen._buf_samples is not None + + +def test_generate_block_triangle(mock_speaker): + """Test generating a triangle wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="triangle") + + # Verify internal state updated correctly + assert wave_gen._buf_samples is not None + + +def test_frequency_glide(mock_speaker): + """Test frequency glide (portamento) effect.""" + wave_gen = WaveGenerator(sample_rate=16000, glide=0.1) + + # Set initial frequency + wave_gen._current_freq = 220.0 + + # Generate block with new target frequency + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Current frequency should have moved towards target but not reached it + # (because glide time is longer than block duration) + assert wave_gen._current_freq > 220.0 + assert wave_gen._current_freq < 440.0 + + +def test_amplitude_envelope(mock_speaker): + """Test amplitude envelope (attack/release).""" + wave_gen = WaveGenerator(sample_rate=16000, attack=0.1, release=0.1) + + # Set initial amplitude + wave_gen._current_amp = 0.0 + + # Generate block with new target amplitude + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.8, wave_type="sine") + + # Current amplitude should have moved towards target but not reached it + assert wave_gen._current_amp > 0.0 + assert wave_gen._current_amp < 0.8 + + +def test_producer_loop_generates_audio(app_instance, mock_speaker): + """Test that producer loop generates and plays audio.""" + wave_gen = WaveGenerator() + + wave_gen.set_frequency(440.0) + wave_gen.set_amplitude(0.5) + wave_gen.start() + + # Let it run for a bit + time.sleep(0.2) + + # Check that audio was played + assert len(wave_gen._speaker._played_data) > 0 + + wave_gen.stop() + + +def test_thread_safety(mock_speaker): + """Test thread-safe access to parameters.""" + wave_gen = WaveGenerator() + + def set_params(): + for i in range(100): + wave_gen.set_frequency(440.0 + i) + wave_gen.set_amplitude(0.5) + time.sleep(0.001) + + def get_state(): + for i in range(100): + state = wave_gen.get_state() + assert "frequency" in state + time.sleep(0.001) + + wave_gen.start() + + # Start multiple threads accessing the generator + threads = [ + threading.Thread(target=set_params), + threading.Thread(target=get_state), + ] + + for t in threads: + t.start() + + for t in threads: + t.join(timeout=5) + + wave_gen.stop() + + +def test_buffer_preallocation(mock_speaker): + """Test that buffers are pre-allocated and reused.""" + wave_gen = WaveGenerator(sample_rate=16000) + + # Generate first block + block1 = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Check buffers are allocated + assert wave_gen._buf_N > 0 + assert wave_gen._buf_phase_incs is not None + assert wave_gen._buf_phases is not None + assert wave_gen._buf_envelope is not None + assert wave_gen._buf_samples is not None + + # Generate second block + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Buffers should still be the same size (reused) + assert wave_gen._buf_N == len(block1) + + +def test_phase_continuity(mock_speaker): + """Test that phase is continuous across blocks.""" + wave_gen = WaveGenerator(sample_rate=16000) + + initial_phase = wave_gen._phase + + # Generate multiple blocks + for _ in range(10): + wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Phase should have advanced + assert wave_gen._phase != initial_phase + # Phase should be wrapped to [0, 2π] + assert 0.0 <= wave_gen._phase < 2 * np.pi + + +def test_zero_amplitude_produces_silence(mock_speaker): + """Test that zero amplitude produces silent output.""" + wave_gen = WaveGenerator(sample_rate=16000) + + block = wave_gen._generate_block(freq_target=440.0, amp_target=0.0, wave_type="sine") + + # All samples should be zero or very close to zero + assert np.allclose(block, 0.0, atol=1e-6) + + +def test_app_controller_integration(app_instance, mock_speaker): + """Test integration with AppController (start/stop via App).""" + wave_gen = WaveGenerator() + + # Register manually to avoid auto-registration + app_instance.unregister(wave_gen) + app_instance.start_brick(wave_gen) + + assert wave_gen._running.is_set() + assert wave_gen._speaker.is_started() + + time.sleep(0.1) + + app_instance.stop_brick(wave_gen) + + assert not wave_gen._running.is_set() + assert not wave_gen._speaker.is_started() + + +def test_multiple_start_stop_cycles(app_instance, mock_speaker): + """Test starting and stopping multiple times.""" + wave_gen = WaveGenerator() + + for _ in range(3): + wave_gen.start() + assert wave_gen._running.is_set() + time.sleep(0.05) + + wave_gen.stop() + assert not wave_gen._running.is_set() + time.sleep(0.05) + + +def test_double_start_warning(app_instance, mock_speaker): + """Test that starting an already running generator logs a warning.""" + wave_gen = WaveGenerator() + + wave_gen.start() + assert wave_gen._running.is_set() + + # Try to start again (should warn but not crash) + wave_gen.start() + assert wave_gen._running.is_set() + + wave_gen.stop() + + +def test_double_stop_warning(app_instance, mock_speaker): + """Test that stopping a non-running generator logs a warning.""" + wave_gen = WaveGenerator() + + # Try to stop before starting (should warn but not crash) + wave_gen.stop() + assert not wave_gen._running.is_set() diff --git a/tests/arduino/app_utils/ledmatrix/test_frame.py b/tests/arduino/app_utils/ledmatrix/test_frame.py index ca5fe9b3..047c7e97 100644 --- a/tests/arduino/app_utils/ledmatrix/test_frame.py +++ b/tests/arduino/app_utils/ledmatrix/test_frame.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0