任务看起来是这样的。我们有很多线程运行某种方法。同时运行它是可以的,但前提是一些条件已经满了。如果没有-线程应该等待。
这里是我所说的例子。
我们有一个耗时的方法doStuff,它以一个关键实例作为参数.只有当提供的密钥不相等时,它才能在多线程环境中正常工作。如果同时处理两个相等的键,则会失败。我们需要编写一个代码来阻止具有相同键的线程同时调用此方法。我编写了三种实现:通过带有这些键的ConcurrentHashMap,通过键索引的AtomicIntegerArray,以及通过简单的同步块来检查正在处理的密钥集。
public class KeyProblem {
static class Key {
private int index;
Key() {
this.index = (int) (Math.random() * 10) % 10;
}
public int getIndex() {
return index;
}
@Override
public String toString() {
return "Key{" +
"index=" + index +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key = (Key) o;
if (index != key.index) return false;
return true;
}
@Override
public int hashCode() {
return index;
}
}
private static ConcurrentHashMap<Key, Object> keysProcessedRightNowForCheck = new ConcurrentHashMap<Key, Object>();
public static void doStuff(Key key) {
Object sentinel = keysProcessedRightNowForCheck.putIfAbsent(key, new Object());
if (sentinel != null) {
System.out.println("ERROR! Equal keys! " + key + " " + Thread.currentThread().getName());
}
try {
System.out.println(String.format(" started by %s with %s ", Thread.currentThread().getName(), key));
Thread.sleep(500);
System.out.println(String.format(" finished by %s with %s ", Thread.currentThread().getName(), key));
} catch (InterruptedException e) {
}
keysProcessedRightNowForCheck.remove(key);
}
//first version: via ConcurrentHashMap
private static ConcurrentHashMap map = new ConcurrentHashMap();
private static Object waiter = new Object();
public static void viaConcurrentHashMap(Key key) throws InterruptedException {
while (map.putIfAbsent(key, new Object()) != null) {
synchronized (waiter) {
System.out.println("wait with key " + key);
waiter.wait();
System.out.println("done waiting with key " + key);
}
}
System.out.println("started stuff with " + key);
doStuff(key);
map.remove(key);
synchronized (waiter) {
System.out.println("notified after stuff with " + key);
waiter.notifyAll();
System.out.println("done waiting with key " + key);
}
}
//second version: via AtomicIntegerArray for a fixed number of keys
private static AtomicIntegerArray keyProcessed = new AtomicIntegerArray(10);
public static void viaAtomicIntegerArray(Key key) throws InterruptedException {
while (!keyProcessed.compareAndSet(key.getIndex(), 0, 1)) {
synchronized (waiter) {
System.out.println("wait with key " + key);
waiter.wait();
}
}
doStuff(key);
keyProcessed.decrementAndGet(key.getIndex());
synchronized (waiter) {
System.out.println("notified after stuff with " + key);
waiter.notifyAll();
}
}
//third version: via a simple lock
private static Object lock = new Object();
private static Set<Key> keys = new HashSet<Key>();
public static void viaSimpleSynchronized(Key key) throws InterruptedException {
synchronized (lock) {
while (keys.contains(key)) {
lock.wait();
}
keys.add(key);
}
doStuff(key);
synchronized (lock) {
keys.remove(key);
lock.notifyAll();
}
}
private static CyclicBarrier barrier;
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final int MAX = 100;
List<Key> keys = new ArrayList<Key>() {{
for (int i = 0; i < MAX; i++) add(new Key());
}};
barrier = new CyclicBarrier(MAX + 1);
long start = System.currentTimeMillis();
for (final Key key : keys) {
Thread t = new Thread() {
public void run() {
try {
// viaConcurrentHashMap(key);
viaSimpleSynchronized(key);
// viaAtomicIntegerArray(key);
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
};
t.start();
}
barrier.await();
System.out.println("system time [ms] " + (System.currentTimeMillis() - start));
//7 for array
}
}对于100个线程,运行时间约为7s,第三个版本的运行速度比预期稍慢。
最后,问题是:
1)我的代码是正确的还是线程安全的?
2)你能提出更好的实施建议吗?
3)在java.util.concurrent中是否有一些类以广义的方式解决了这一任务?我指的是一种障碍,只有在满足某些条件的情况下,才能让线程走。
发布于 2014-08-24 11:38:09
如果您能够同时保存内存中的所有键,并且作为单例(至少这就是示例的工作方式),那么一个非常简单的解决方案似乎如下所示:
synchronized (key) {}块中执行逻辑。发布于 2014-08-25 13:17:37
您的viaSimpleSynchronized方法是唯一没有中断的方法。另外两个人也有同样的错误:您假设执行操作和随后进入synchronized块在某种程度上是紧密的,而实际上,它们是不同的、不耦合的操作。
例如:
while (map.putIfAbsent(key, new Object()) != null) {
// in-between right at this point the other thread can do both, remove
// the key AND execute it’s synchronized(waiter) { waiter.notifyAll(); }
synchronized (waiter) {
System.out.println("wait with key " + key);
waiter.wait(); // therefore this can enter a wait lasting forever
System.out.println("done waiting with key " + key);
}
}在大量线程执行大量操作的测试中,您不会发现这样的错误,因为每个未争用的线程在调用notifyAll()时都会释放所有等待的线程,从而隐藏线程等待已经发生的通知的错误。
因此,这里有一些代码,当活动很少时,代码会意外地中断。
通常,当您在synchronized块之外执行检查时,即使它使用的是像ConcurrentHashMap这样的线程安全构造,也必须重新检查synchronized块中的条件。
唯一的例外是已知的永不恢复的条件(例如,一个“就绪”标志将从false传递到true,但永远不会返回到false)。
https://stackoverflow.com/questions/25471215
复制相似问题