Skip to content

Conversation

@carboni123
Copy link

Previously, using picamera2.Encoder with camera.start_recording would raise a RuntimeError: Must pass Output if a proper output class wasn’t provided. This patch introduces a workaround that bypasses the need for picamera2.Encoder by allowing you to supply a custom output class—one that can simply inherit from io.BufferedIOBase with a write method.

I originally developed this class for a real-time image processing project where the overhead of an encoder was undesirable, especially when handling complex images with many varying edges. This change also makes it easier to transition from the picamera1 API, which only required a basic output class with a write method.

@carboni123 carboni123 closed this Feb 14, 2025
previously, it was only returning the red channel
@carboni123 carboni123 reopened this Feb 14, 2025
@davidplowman
Copy link
Collaborator

Hi, and thanks for the PR. I was just wanting to understand a little better what this class does.

Currently, you can derive from io.BufferedIOBase and wrap that in a FileOutput. Does this class do the same kind of thing, or does it enable something different?

I'm also a bit vague on the different between NoEncoder and the plain Encoder. If the issue with Encoder is simply that it requires you to pass an output, maybe it's worth changing the code so that you don't? Though I'm not really clear why an encoder without an output is useful - perhaps for testing? Maybe some examples of things that you couldn't do otherwise, or which would be more difficult, would help.

Thanks!

@carboni123
Copy link
Author

It wasn't clear in the documentation on how to provide a proper output (such as wrapping the output in a FileOutput, as you suggested). It seems that using NoEncoder allows direct access to the bytes stream, which could improve latency and flexibility in handling the stream. It also keeps the same logic as in picamera1 (bytes stream output with write method), which is the main reason I created the class.

About changing the Encoder class to allow any output with a write function, it certainly is a possibility, however I feel like passing the NoEncoder class seems straightforward and less disruptive than altering the Encoder class.

For context:
I've been working extensively with picamera and picamera2 libraries for a weed detection system prototype. In my original setup, I used camera.start_recording by simply passing a bytes stream to a custom VideoOutput class (inherited from SharedFrame - class shared between threads for image processing).

Picamera1 Implementation

self.stream = self.camera.start_recording(self.output, format="bgr")

Picamera2 Implementation

self.camera.start_recording(NoEncoder(), self.output)

Below are the two versions of the VideoOutput class (self.output) that I'm using:

Picamera1 Implementation

class VideoOutput(SharedFrame):
    def __init__(self):
        super().__init__()
        self.condition = Condition()
        self.timeout = 1
        self.processed = False

    def write(self, img):
        np_image = np.frombuffer(img, np.uint8).reshape((480, 640, 3)) if img is not None else self.green_image
        with self.condition:
            self.set_img(np_image)
            self.processed = False
            self.condition.notify_all()

    def wait_main(self):
        with self.condition:
            if self.processed:
                self.condition.wait(self.timeout)
            self.processed = True
    
    def wait(self):
        with self.condition:
            return self.condition.wait(self.timeout)

Picamera2 Implementation

class VideoOutput(SharedFrame, io.BufferedIOBase):
    def __init__(self):
        super().__init__()
        self.condition = Condition()
        self.timeout = 1
        self.processed = False

    def write(self, img):
        np_image = img.reshape((480, 640, 3)) if img is not None else self.green_image
        with self.condition:
            self.set_img(np_image)
            self.processed = False
            self.condition.notify_all()
    
    def wait_main(self):
        with self.condition:
            if self.processed:
                self.condition.wait(self.timeout)
            self.processed = True

    def wait(self):
        with self.condition:
            return self.condition.wait(self.timeout)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants