我正在编写一个音频DSP应用程序,我选择使用生产者-消费者模型。我已经阅读了很多关于volatile和其他线程问题的文章,但是我有一些关于我的案例的一些细节的问题--特别是,我需要在线程之间共享的事情之一是数组。
我有一门代表制片人的课。为了允许处理时间的变化,生产者存储n缓冲区,每当有更多的音频数据可用时,它将轮流填充这些缓冲区,并将缓冲区传递给使用者线程。
我将从我的问题开始,然后我将尝试以足够的细节解释我的系统我也非常感谢关于我的实现及其线程安全性的一般性评论。
我的缓冲区由一个volatile byte[][]数组表示。我很清楚,volatile只会使引用不稳定,但是在阅读了这样的文章和各种博客文章之后,我似乎有两种选择:
我可以用AtomicIntegerArray。但是:
如果我正确理解(例如这篇博客文章),就我的情况而言,这是一个自我赋值:buffers[currentBuffer] = buffers[currentBuffer]将确保发布,您将在下面的代码中看到这一点。
我将尝试简单地概述producer类;这些是实例变量:
// The consumer - just an interface with a process(byte[]) method
AudioInputConsumer consumer;
// The audio data source
AudioSource source;
// The number of buffers
int bufferCount;
// Controls the main producer loop
volatile boolean isRunning = false;
// The actual buffers
volatile byte[][] buffers;
// The number of buffers left to process.
// Shared counter - the producer inrements and checks it has not run
// out of buffers, while the consumer decremenets when it processes a buffer
AtomicInteger buffersToProcess = new AtomicInteger(0);
// The producer thread.
Thread producerThread;
// The consumer thread.
Thread consumerThread;一旦启动了producerThread和consumerThread,它们就分别执行producerLoop和consumerLoop方法。
producerLoop在等待音频数据的同时,读取到缓冲区,在缓冲区上执行自分配,然后使用AtomicInteger实例向使用者循环发送信号。
private void producerLoop() {
int bufferSize = source.getBufferSize();
int currentBuffer = 0;
while (isRunning) {
if (buffersToProcess.get() == bufferCount) {
//This thread must be faster than the processing thread, we have run out
// of buffers: decide what to do
System.err.println("WARNING: run out of buffers");
}
source.read(buffers[currentBuffer], 0, bufferSize); // Read data into the buffer
buffers[currentBuffer] = buffers[currentBuffer]; // Self-assignment to force publication (?)
buffersToProcess.incrementAndGet(); // Signal to the other thread that there is data to read
currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
}
}consumerLoop等待直到AtomicInteger buffersToProcess大于零,然后调用使用者对象来对数据做它想做的任何事情。后来,buffersToProcess减少了,我们等待它再次变成非零。
private void consumerLoop() {
int currentBuffer = 0;
while (isRunning) {
if (buffersToProcess.get() > 0) {
consumer.process(buffers[currentBuffer]); // Process the data
buffersToProcess.decrementAndGet(); // Signal that we are done with this buffer
currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
}
Thread.yield();
}
}非常感谢!
发布于 2013-08-22 09:09:46
您确实需要原子性,因为写入数组是一个非原子的过程。具体来说,Java绝对不会保证写入数组成员对其他线程是不可见的,直到您选择发布它们。
一种选择是每次创建一个新数组,完全初始化它,然后在volatile上发布,但是这可能会导致很大的成本,因为Java坚持必须首先将一个新分配的数组归零,这是由于GC开销造成的。您可以通过“双缓冲”方案克服这一问题,其中只保留两个数组并在它们之间切换。这种方法有其危险:线程可能仍在从您的写入线程已经标记为非活动线程的数组中读取。这在很大程度上取决于代码的精确细节。
唯一的其他选择是在一个经典的,枯燥的,synchronized块中完成整个阅读和写作。这具有在延迟上非常可预测的优点。就我个人而言,我会从这一点开始,如果受到实际性能问题的绝对压力,我会继续讨论任何更复杂的问题。
您也可以使用读-写锁进行锁定,但是只有当多个线程同时读取数组时才会有效果。这似乎不是你的案子。
发布于 2013-08-22 09:50:17
看看java.util.concurrent.ArrayBlockingQueue。
当您在阻塞队列上调用wait ()时,它将自动等待,直到有什么可用的东西。
只需将byte[]放到队列中,消费者就会处理它。在这种情况下,需要知道要处理多少缓冲区是不相关的。通常,队列中的“终止”项表示最后一个缓冲区。它将有助于使用布尔终止标志将byte[]包装在类中。
优素福
发布于 2013-08-23 02:33:04
除了上面解释了易失性和原子工作方式的@Marko Topolnik之外,如果您仍然希望在写入数组时实现跨线程可见性的效果,则可以在同一个系列中使用Unsafe.putLongVolatile()、Unsafe.putObjectVolatile()和所有其他方法。
是的,它不是Java喜欢的,但它解决了您的问题。
https://stackoverflow.com/questions/18375958
复制相似问题