Easy Audio/Video Capture with Python

At present it is difficult to obtain audio/video data in Python. For example, many deep learning methods assume you have easy access to your data in the form of a numpy array. Often you don’t. Based on the good efforts of those online, this post presents a number of Python classes to address this issue.

Just give me the code.

General Interface

Firstly, we can use threads to constantly update sensor data in the background. We can then read this data asynchronously.

Secondly, we can define a general interface for sensor data.

import threading

class SensorSource:
    """Abstract object for a sensory modality."""
    def __init__(self):
        """Initialise object."""
        pass
    
    def start(self):
        """Start capture source."""
        if self.started:
            print('[!] Asynchronous capturing has already been started.')
            return None
        self.started = True
        self.thread = threading.Thread(
            target=self.update,
            args=()
        )
        self.thread.start()
        return self
    
    def update(self):
        """Update data."""
        pass
    
    def read(self):
        """Read data."""
        pass
    
    def stop(self):
        """Stop daemon."""
        self.started = False
        self.thread.join()

Video

For our video capture class, we can use OpenCV. You can install this in a conda environment using

conda install opencv
or via pip using
pip install opencv
. This allows access to the cv2 library.

Beware: you may need to do a bit of tweaking to get your video capture working – different cameras / system configurations need different tweaks.

# Video source

import cv2

class VideoSource(SensorSource):
    """Object for video using OpenCV."""
    def __init__(self, src=0):
        """Initialise video capture."""
        # width=640, height=480
        self.src = src
        self.cap = cv2.VideoCapture(self.src)
        #self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        #self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
        self.grabbed, self.frame = self.cap.read()
        self.started = False
        self.read_lock = threading.Lock()
    
    def update(self):
        """Update based on new video data."""
        while self.started:
            grabbed, frame = self.cap.read()
            with self.read_lock:
                self.grabbed = grabbed
                self.frame = frame
                
    def read(self):
        """Read video."""
        with self.read_lock:
            frame = self.frame.copy()
            grabbed = self.grabbed
        return grabbed, frame

    def __exit__(self, exec_type, exc_value, traceback):
        self.cap.release()

The initialisation sets up the camera and the threading lock. The update method is run as part of the thread to continuously update the self.frame data. The data may then be (asynchronously) accessed using the read() method on the object. The exit line means that the camera resource is released when the object is deleted or the Python kernel is stopped so you can then use the camera in other applications.

Beware: I had issues setting the width and height so I have commented out those lines. Also remember OpenCV provides the data in BGR format – so channels 0, 1, 2 correspond to Blue, Green and Red rather than RGB. You might also want to set to YUV mode by adding the following to the __init__ method:

self.cap.set(16, 0)

Audio

You’ll see many posts online that use pyaudio for audio capture. I couldn’t get this to work in a conda environment due to an issue with the underlying PortAudio library. I had more success with alsaaudio:

conda install alsaaudio
or
pip install alsaaudio
.

# Audio source
import struct
from collections import deque
import numpy as np
import logging
import alsaaudio

class AudioSource(SensorSource):
    """Object for audio using alsaaudio."""
    def __init__(self, sample_freq=44100, nb_samples=65536):
        """Initialise audio capture."""
        # Initialise audio
        self.inp = alsaaudio.PCM(
            alsaaudio.PCM_CAPTURE,
            alsaaudio.PCM_NORMAL,
            device="default"
        )
        # set attributes: Mono, frequency, 16 bit little endian samples
        self.inp.setchannels(1)
        self.inp.setrate(sample_freq)
        self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
        self.inp.setperiodsize(512)
        self.read_lock = threading.Lock()
        # Create a FIFO structure for the data
        self._s_fifo = deque([0] * nb_samples, maxlen=nb_samples)
        self.l = 0
        self.started = False
        self.read_lock = threading.Lock()
    
    def update(self):
        """Update based on new audio data."""
        while self.started:
            self.l, data = self.inp.read()
            if self.l > 0:
                # extract and format sample 
                raw_smp_l = struct.unpack('h' * self.l, data)
                with self.read_lock:
                    self._s_fifo.extend(raw_smp_l)
            else:
                logging.error(
                    f'Sampler error occur (l={self.l} and len data={len(data)})'
                )
                
    def read(self):
        """Read audio."""
        with self.read_lock:
            return self.l, np.asarray(self._s_fifo, dtype=np.int16)

The approach for audio is similar to video. We set up an audio input source and a threading lock in the __init__ method. In the audio case, we are recording a (time) series of audio samples, so we do this in a buffer of length nb_samples. The deque object acts as a FIFO queue and provides this buffer. The update method is run continuously in the background within the thread and adds new samples to the queue over time, with old samples falling off the back of the queue. The struct library is used to decode the binary data from the alsaaudio object and convert it into integer values that we can add to the queue. When we read the data, we convert the queue to a 16-bit integer numpy array.

In both cases, the read() method returns a tuple: (data_check_value, data) where the data_check_value is a value returned from the underlying capture objects. It is often useful for debugging.

Combining and Simplifying

Now we have defined sensor data sources, we can combine them so that we only need to perform one read() call to obtain data from all sources. To do this, we create a wrapper object that allows us to iterate through each added sensor data source.

class CombinedSource:
    """Object to combine multiple modalities."""
    def __init__(self):
        """Initialise."""
        self.sources = dict()
    
    def add_source(self, source, name=None):
        """Add a source object.
        
        source is a derived class from SensorSource
        name is an optional string name."""
        if not name:
            name = source.__class__.__name__
        self.sources[name] = source
        
    def start(self):
        """Start all sources."""
        for name, source in self.sources.items():
            source.start()
    
    def read(self):
        """Read from all sources.
        
        return as dict of tuples."""
        data = dict()
        for name, source in self.sources.items():
            data[name] = source.read()[1]
        return data
    
    def stop(self):
        """Stop all sources."""
        for name, source in self.sources.items():
            source.stop()
            
    def __del__(self):
        for name, source in self.sources.items():
            if source.__class__.__name__ == "VideoSource":
                source.cap.release()
    
    def __exit__(self, exec_type, exc_value, traceback):
        for name, source in self.sources.items():
            if source.__class__.__name__ == "VideoSource":
                source.cap.release()

The delete and exit logic is added to clean up the camera object – without these the camera is kept open and locked, which can cause problems. Data is returned as a dictionary, indexed by a string name for the data source.

We can simplify things even further by creating a derived class that automatically adds an audio and video capture object.

class AVCapture(CombinedSource):
    """Auto populate with audio and video."""
    def __init__(self):
        """Initialise."""
        self.sources = dict()
        a = AudioSource()
        self.add_source(a, "audio")
        v = VideoSource()
        self.add_source(v, "video")

This then allows us to access audio and video data in a couple of lines.

av = AVCapture()
av.start()
data = av.read()

Here are some outputs from:

import matplotlib.pyplot as plt
plt.imshow(data["video"])
plt.plot(data["audio"])
Colours are crazy because imshow expects RGB not BGR!
BBC Radio 6 Music in Graphical Form

Finishing Off

You can find the code in a Gist here, together with some testing lines that you could easily convert into a library.

You can also expand the sensor classes to capture other data. I plan to create a class to capture CPU and memory use information.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s