commit 372fe204598c8e52d76f8dbc4d082c1d2075f2b5
parent cae3767e43481d30eab1dadba1d6e55659de2ead
Author: Alexander Kojevnikov <alexander@kojevnikov.com>
Date: Fri, 9 Jul 2010 13:16:33 +1000
Decode audio and run FFTs in different threads (issue 11)
Diffstat:
1 file changed, 100 insertions(+), 29 deletions(-)
diff --git a/src/spek-pipeline.vala b/src/spek-pipeline.vala
@@ -38,11 +38,20 @@ namespace Spek {
private uint8[] buffer;
private Fft.Plan fft;
- private int nfft;
+ private int nfft; // Size of the FFT transform.
+ private const int NFFT = 64; // Number of FFTs to pre-fetch.
+ private int input_size;
+ private int input_pos;
private float[] input;
private float[] output;
private unowned Thread reader_thread = null;
+ private unowned Thread worker_thread;
+ private Mutex reader_mutex;
+ private Cond reader_cond;
+ private Mutex worker_mutex;
+ private Cond worker_cond;
+ private bool worker_done = false;
private bool quit = false;
public Pipeline (string file_name, int bands, int samples, int threshold, Callback cb) {
@@ -83,7 +92,8 @@ namespace Spek {
this.buffer = new uint8[cx.buffer_size];
this.nfft = 2 * bands - 2;
this.fft = new Fft.Plan (nfft, threshold);
- this.input = new float[nfft];
+ this.input_size = nfft * (NFFT * 2 + 1);
+ this.input = new float[input_size];
this.output = new float[bands];
this.cx.start (samples);
}
@@ -95,6 +105,12 @@ namespace Spek {
public void start () {
stop ();
+ input_pos = 0;
+ reader_mutex = new Mutex ();
+ reader_cond = new Cond ();
+ worker_mutex = new Mutex ();
+ worker_cond = new Cond ();
+
try {
reader_thread = Thread.create (reader_func, true);
} catch (ThreadError e) {
@@ -114,41 +130,101 @@ namespace Spek {
}
private void * reader_func () {
- int pos = 0;
- int sample = 0;
- int64 frames = 0;
- int64 num_fft = 0;
- int64 acc_error = 0;
- float cf = 2f * (float) Math.PI / nfft;
+ int pos = 0, prev_pos = 0;
+ int block_size = cx.width * cx.channels / 8;
int size;
- Memory.set (output, 0, sizeof (float) * bands);
+ try {
+ worker_thread = Thread.create (worker_func, true);
+ } catch (ThreadError e) {
+ return null;
+ }
while ((size = cx.read (this.buffer)) > 0) {
- lock (quit) {
- if (quit) {
- return null;
- }
- }
+ lock (quit) if (quit) break;
uint8 *buffer = (uint8 *) this.buffer;
- var block_size = cx.width * cx.channels / 8;
while (size >= block_size) {
input[pos] = average_input (buffer);
buffer += block_size;
size -= block_size;
- pos = (pos + 1) % nfft;
+ pos = (pos + 1) % input_size;
+
+ // Wake up the worker if we have enough data.
+ if ((pos > prev_pos ? pos : pos + input_size) - prev_pos == nfft * NFFT) {
+ reader_sync (prev_pos = pos);
+ }
+ }
+ assert (size == 0);
+ }
+
+ if (pos != prev_pos) {
+ // Process the remaining data.
+ reader_sync (pos);
+ }
+ // Force the worker to quit.
+ reader_sync (-1);
+ worker_thread.join ();
+
+ return null;
+ }
+
+ private void reader_sync (int pos) {
+ reader_mutex.lock ();
+ while (!worker_done) reader_cond.wait (reader_mutex);
+ worker_done = false;
+ reader_mutex.unlock ();
+
+ worker_mutex.lock ();
+ input_pos = pos;
+ worker_cond.signal ();
+ worker_mutex.unlock ();
+ }
+
+ private void * worker_func () {
+ int sample = 0;
+ int64 frames = 0;
+ int64 num_fft = 0;
+ int64 acc_error = 0;
+ float cf = 2f * (float) Math.PI / nfft;
+ int head = 0, tail = 0;
+ int prev_head = 0;
+
+ Memory.set (output, 0, sizeof (float) * bands);
+
+ while (true) {
+ reader_mutex.lock ();
+ worker_done = true;
+ reader_cond.signal ();
+ reader_mutex.unlock ();
+
+ worker_mutex.lock ();
+ while (tail == input_pos) worker_cond.wait (worker_mutex);
+ tail = input_pos;
+ worker_mutex.unlock ();
+
+ if (tail == -1) {
+ return null;
+ }
+
+ while (true) {
+ head = (head + 1) % input_size;
+ if (head == tail) {
+ head = prev_head;
+ break;
+ }
frames++;
// If we have enough frames for an FFT or we
// have all frames required for the interval run
// an FFT. In the last case we probably take the
// FFT of frames that we already handled.
- if (frames % nfft == 0 ||
- acc_error < cx.error_base && frames == cx.frames_per_interval ||
- acc_error >= cx.error_base && frames == 1 + cx.frames_per_interval) {
+ bool int_full = acc_error < cx.error_base && frames == cx.frames_per_interval;
+ bool int_over = acc_error >= cx.error_base && frames == 1 + cx.frames_per_interval;
+ if (frames % nfft == 0 || int_full || int_over) {
+ prev_head = head;
for (int i = 0; i < nfft; i++) {
- float val = input[(pos + i) % nfft];
+ float val = input[(input_size + head - nfft + i) % input_size];
// Hamming window.
val *= 0.53836f - 0.46164f * Math.cosf (cf * i);
fft.input[i] = val;
@@ -160,10 +236,8 @@ namespace Spek {
}
}
// Do we have the FFTs for one interval?
- if (acc_error < cx.error_base && frames == cx.frames_per_interval ||
- acc_error >= cx.error_base && frames == 1 + cx.frames_per_interval) {
-
- if (acc_error >= cx.error_base) {
+ if (int_full || int_over) {
+ if (int_over) {
acc_error -= cx.error_base;
} else {
acc_error += cx.error_per_interval;
@@ -173,18 +247,15 @@ namespace Spek {
output[i] /= num_fft;
}
+ if (sample == samples) break;
cb (sample++, output);
- if (sample == samples) {
- return null;
- }
+
Memory.set (output, 0, sizeof (float) * bands);
frames = 0;
num_fft = 0;
}
}
- assert (size == 0);
}
- return null;
}
private float average_input (uint8 *buffer) {