In my robotics projects I want to capture live audio and video data and convert it into Numpy multi-dimensional arrays for further processing. To save you several days, this blog post explains how I go about doing this.
Audio / Video Not Audio + Video
A first realisation is that you need to capture audio and video independently. You can record movie files with audio, but as far as I could find there is no simple way to live capture both audio and video data.
Video
For video processing, I found there were two different approaches that could be used to process video data:

- Open CV in Python; and
- Wrapping FFMPEG using SubProcess.
Open CV
The default library for video processing in Python is OpenCV. Things have come a long way since my early experiences with OpenCV in C++ over a decade ago. Now there is a nice Python wrapper and you don’t need to touch any low-level code. The tutorials here are a good place to start.
I generally use Conda/Anaconda these days to manage my Python environments (the alternative being old skool virtual environments). Setting up a new environment with Jupyter Notebook and Open CV is now straightforward:
conda install opencv jupyter
As a note – installing OpenCV in Conda seems to have been a pain up to a few years ago. There are thus several out of date Stack Overflow answers that come up in the searches, that refer to installing from specific sources (e.g. from menpo). This appears not to be needed now.
One problem I had in Linux (Ubuntu 18.04) is that the GTK libraries didn’t play nicely in the Conda environment. I could capture images from the webcam but not display them in a window. This lead me to look for alternative visualisation strategies that I describe below.
A good place to start with OpenCV is this video tutorial. As drawing windows led to errors I designed a workaround where I used PIL (Python Image Library) and IPython to generate an image from the Numpy array and then show it at about 30 fps. The code separates out each of the YUV components and displays them next to each other. This is useful for bio-inspired processing.
# Imports import PIL import io import cv2 import matplotlib.pyplot as plt from IPython import display import time import numpy as np # Function to convert array to JPEG for display as video frame def showarray(a, fmt='jpeg'): f = io.BytesIO() PIL.Image.fromarray(a).save(f, fmt) display.display(display.Image(data=f.getvalue())) # Initialise camera cam = cv2.VideoCapture(0) # Optional - set to YUV mode (remove for BGR) cam.set(16, 0) # These allow for a frame rate to be printed t1 = time.time() # Loops until an interrupt try: while(True): t2 = time.time() # Capture frame-by-frame ret, frame = cam.read() # Join components horizontally joined_array = np.concatenate( ( frame[:,:,0], frame[:, 1::2, 1], frame[:, 0::2, 1] ), axis=1) # Use above function to show array showarray(joined_array) # Print frame rate print(f"{int(1/(t2-t1))} FPS") # Display the frame until new frame is available display.clear_output(wait=True) t1 = t2 except KeyboardInterrupt: # Release the camera when interrupted cam.release() print("Stream stopped")</code></pre>
In the above code, “frame” is a three-dimensional tensor or array where the first dimension relates to rows of the image (e.g. the y-direction of the image), the second dimension relates to columns of the image (e.g. the x-direction of the image) and the third dimension relates to the three colour channels. Often for image processing it is useful to separate out the channels and just work on a single channel at a time (e.g. equivalent to a 2D matrix or grayscale image).
FFMPEG
An alternative to using OpenCV is to use subprocess to wrap the FFMPEG, a command line video and audio processing utility.
This is a little trickier as it involves accessing the video buffers. I have based on solution on this guide by Zulko here.
#Imports import subprocess as sp import numpy as np import matplotlib.pyplot as plt FFMPEG_BIN = "ffmpeg" # Define command line command command = [ FFMPEG_BIN, '-i', '/dev/video0', '-f', 'image2pipe', '-pix_fmt', 'rgb24', '-an','-sn', #-an, -sn disables audio and sub-title processing respectively '-vcodec', 'rawvideo', '-'] # Open pipe pipe = sp.Popen(command, stdout = sp.PIPE, bufsize=(640*480*3)) # Display a few frames no_of_frames = 5 fig, axes = plt.subplots(no_of_frames, 1) for i in range(0, no_of_frames): # Get the raw byte values from the buffer raw_image = pipe.stdout.read(640*480*3) # transform the byte read into a numpy array image = np.frombuffer(raw_image, dtype='uint8') image = image.reshape((480,640, 3)) # Flush the pipe pipe.stdout.flush() axes[i].imshow(image)
Now I had issues flushing the pipe in a Jupyter notebook, so I ended up using the OpenCV method in the end. Also it is trickier working out the byte structure for YUV data.
Audio

For audio, there are also a number of options. I have tried:
Now PyAudio appears to be preferred. However, I am quickly learning that audio / video processing in Python is not yet as polished as pure image processing or building a neural network.
PyAudio provides a series of wrappers around the PortAudio libraries. However, I had issues getting this to work in an Conda environment. Initially, no audio devices showed up. After a long time working through Stack Overflow, I found that installing from the Conda-Forge source did allow me to find audio devices (see here). But even though I could see the audio devices I then had errors opening an audio stream. (One tip for both audio and video is to look at your terminal output when capturing audio and video – the low level errors will be displayed here rather than in a Jupyter notebook.)
AlsaAudio
Given my difficulties with PyAudio I then tried AlsaAudio. I had more success with this.
My starting point was the code for recording audio that is provided in the AlsaAudio Github repository. The code below records a snippet of audio then loads it from the file into a Numpy array. It became the starting point for a streaming solution.
# Imports import alsaaudio import time import numpy as np # Setup Audio for Capture inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK, device="default") inp.setchannels(1) inp.setrate(44100) inp.setformat(alsaaudio.PCM_FORMAT_S16_LE) inp.setperiodsize(160) # Record a short snippet with open("test.wav", 'wb') as f: loops = 1000000 while loops > 0: loops -= 1 # Read data from device l, data = inp.read() if l: f.write(data) time.sleep(.001) f = open("test.wav", 'rb') # Open the device in playback mode. out = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, device="default") # Set attributes: Mono, 44100 Hz, 16 bit little endian frames out.setchannels(1) out.setrate(44100) out.setformat(alsaaudio.PCM_FORMAT_S16_LE) # The period size controls the internal number of frames per period. # The significance of this parameter is documented in the ALSA api. # We also have 2 bytes per sample so 160*2 = 320 = number of bytes read from buffer out.setperiodsize(160) # Read data from stdin data = f.read(320) numpy_array = np.frombuffer(data, dtype='<i2') while data: out.write(data) data = f.read(320) decoded_block = np.frombuffer(data, dtype='<i2') numpy_array = np.concatenate((numpy_array, decoded_block))
The numpy_array is then a long array of sound amplitudes.
Sampler Object
I found a nice little Gist for computing the FFT here. This uses a Sampler object to wrap the AlsaAudio object.
from collections import deque import struct import sys import threading import alsaaudio import numpy as np # some const # 44100 Hz sampling rate (for 0-22050 Hz view, 0.0227ms/sample) SAMPLE_FREQ = 44100 # 66000 samples buffer size (near 1.5 second) NB_SAMPLE = 66000 class Sampler(threading.Thread): def __init__(self): # init thread threading.Thread.__init__(self) self.daemon = True # init ALSA audio self.inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL, device="default") # set attributes: Mono, frequency, 16 bit little endian samples self.inp.setchannels(1) self.inp.setrate(SAMPLE_FREQ) self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE) self.inp.setperiodsize(512) # sample FIFO self._s_lock = threading.Lock() self._s_fifo = deque([0] * NB_SAMPLE, maxlen=NB_SAMPLE) def get_sample(self): with self._s_lock: return list(self._s_fifo) def run(self): while True: # read data from device l, data = self.inp.read() if l > 0: # extract and format sample (normalize sample to 1.0/-1.0 float) raw_smp_l = struct.unpack('h' * l, data) smp_l = (float(raw_smp / 32767) for raw_smp in raw_smp_l) with self._s_lock: self._s_fifo.extend(smp_l) else: print('sampler error occur (l=%s and len data=%s)' % (l, len(data)), file=sys.stderr)
Next Steps
This is where I am so far.
The next steps are:
- look into threading and multiprocessing so that we can run parallel audio and video sampling routines;
- extend the audio (and video?) processing to obtain the FFT; and
- optimise for speed of capture.