At present it is difficult to obtain audio/video data in Python. For example, many deep learning methods assume you have easy access to your data in the form of a numpy array. Often you don’t. Based on the good efforts of those online, this post presents a number of Python classes to address this issue.
General Interface
Firstly, we can use threads to constantly update sensor data in the background. We can then read this data asynchronously.
Secondly, we can define a general interface for sensor data.
import threading
class SensorSource:
"""Abstract object for a sensory modality."""
def __init__(self):
"""Initialise object."""
pass
def start(self):
"""Start capture source."""
if self.started:
print('[!] Asynchronous capturing has already been started.')
return None
self.started = True
self.thread = threading.Thread(
target=self.update,
args=()
)
self.thread.start()
return self
def update(self):
"""Update data."""
pass
def read(self):
"""Read data."""
pass
def stop(self):
"""Stop daemon."""
self.started = False
self.thread.join()
Video
For our video capture class, we can use OpenCV. You can install this in a conda environment using
conda install opencvor via pip using
pip install opencv. This allows access to the cv2 library.
Beware: you may need to do a bit of tweaking to get your video capture working – different cameras / system configurations need different tweaks.
# Video source
import cv2
class VideoSource(SensorSource):
"""Object for video using OpenCV."""
def __init__(self, src=0):
"""Initialise video capture."""
# width=640, height=480
self.src = src
self.cap = cv2.VideoCapture(self.src)
#self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
#self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.grabbed, self.frame = self.cap.read()
self.started = False
self.read_lock = threading.Lock()
def update(self):
"""Update based on new video data."""
while self.started:
grabbed, frame = self.cap.read()
with self.read_lock:
self.grabbed = grabbed
self.frame = frame
def read(self):
"""Read video."""
with self.read_lock:
frame = self.frame.copy()
grabbed = self.grabbed
return grabbed, frame
def __exit__(self, exec_type, exc_value, traceback):
self.cap.release()
The initialisation sets up the camera and the threading lock. The update method is run as part of the thread to continuously update the self.frame data. The data may then be (asynchronously) accessed using the read() method on the object. The exit line means that the camera resource is released when the object is deleted or the Python kernel is stopped so you can then use the camera in other applications.
Beware: I had issues setting the width and height so I have commented out those lines. Also remember OpenCV provides the data in BGR format – so channels 0, 1, 2 correspond to Blue, Green and Red rather than RGB. You might also want to set to YUV mode by adding the following to the __init__ method:
self.cap.set(16, 0)
Audio
You’ll see many posts online that use pyaudio for audio capture. I couldn’t get this to work in a conda environment due to an issue with the underlying PortAudio library. I had more success with alsaaudio:
conda install alsaaudioor
pip install alsaaudio.
# Audio source
import struct
from collections import deque
import numpy as np
import logging
import alsaaudio
class AudioSource(SensorSource):
"""Object for audio using alsaaudio."""
def __init__(self, sample_freq=44100, nb_samples=65536):
"""Initialise audio capture."""
# Initialise audio
self.inp = alsaaudio.PCM(
alsaaudio.PCM_CAPTURE,
alsaaudio.PCM_NORMAL,
device="default"
)
# set attributes: Mono, frequency, 16 bit little endian samples
self.inp.setchannels(1)
self.inp.setrate(sample_freq)
self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
self.inp.setperiodsize(512)
self.read_lock = threading.Lock()
# Create a FIFO structure for the data
self._s_fifo = deque([0] * nb_samples, maxlen=nb_samples)
self.l = 0
self.started = False
self.read_lock = threading.Lock()
def update(self):
"""Update based on new audio data."""
while self.started:
self.l, data = self.inp.read()
if self.l > 0:
# extract and format sample
raw_smp_l = struct.unpack('h' * self.l, data)
with self.read_lock:
self._s_fifo.extend(raw_smp_l)
else:
logging.error(
f'Sampler error occur (l={self.l} and len data={len(data)})'
)
def read(self):
"""Read audio."""
with self.read_lock:
return self.l, np.asarray(self._s_fifo, dtype=np.int16)
The approach for audio is similar to video. We set up an audio input source and a threading lock in the __init__ method. In the audio case, we are recording a (time) series of audio samples, so we do this in a buffer of length nb_samples. The deque object acts as a FIFO queue and provides this buffer. The update method is run continuously in the background within the thread and adds new samples to the queue over time, with old samples falling off the back of the queue. The struct library is used to decode the binary data from the alsaaudio object and convert it into integer values that we can add to the queue. When we read the data, we convert the queue to a 16-bit integer numpy array.
In both cases, the read() method returns a tuple: (data_check_value, data) where the data_check_value is a value returned from the underlying capture objects. It is often useful for debugging.
Combining and Simplifying
Now we have defined sensor data sources, we can combine them so that we only need to perform one read() call to obtain data from all sources. To do this, we create a wrapper object that allows us to iterate through each added sensor data source.
class CombinedSource:
"""Object to combine multiple modalities."""
def __init__(self):
"""Initialise."""
self.sources = dict()
def add_source(self, source, name=None):
"""Add a source object.
source is a derived class from SensorSource
name is an optional string name."""
if not name:
name = source.__class__.__name__
self.sources[name] = source
def start(self):
"""Start all sources."""
for name, source in self.sources.items():
source.start()
def read(self):
"""Read from all sources.
return as dict of tuples."""
data = dict()
for name, source in self.sources.items():
data[name] = source.read()[1]
return data
def stop(self):
"""Stop all sources."""
for name, source in self.sources.items():
source.stop()
def __del__(self):
for name, source in self.sources.items():
if source.__class__.__name__ == "VideoSource":
source.cap.release()
def __exit__(self, exec_type, exc_value, traceback):
for name, source in self.sources.items():
if source.__class__.__name__ == "VideoSource":
source.cap.release()
The delete and exit logic is added to clean up the camera object – without these the camera is kept open and locked, which can cause problems. Data is returned as a dictionary, indexed by a string name for the data source.
We can simplify things even further by creating a derived class that automatically adds an audio and video capture object.
class AVCapture(CombinedSource):
"""Auto populate with audio and video."""
def __init__(self):
"""Initialise."""
self.sources = dict()
a = AudioSource()
self.add_source(a, "audio")
v = VideoSource()
self.add_source(v, "video")
This then allows us to access audio and video data in a couple of lines.
av = AVCapture()
av.start()
data = av.read()
Here are some outputs from:
import matplotlib.pyplot as plt
plt.imshow(data["video"])
plt.plot(data["audio"])


Finishing Off
You can find the code in a Gist here, together with some testing lines that you could easily convert into a library.
You can also expand the sensor classes to capture other data. I plan to create a class to capture CPU and memory use information.