首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >实现条件线程屏障的最佳方法

实现条件线程屏障的最佳方法
EN

Stack Overflow用户
提问于 2014-08-24 11:26:34
回答 2查看 689关注 0票数 1

任务看起来是这样的。我们有很多线程运行某种方法。同时运行它是可以的,但前提是一些条件已经满了。如果没有-线程应该等待。

这里是我所说的例子。

我们有一个耗时的方法doStuff,它以一个关键实例作为参数.只有当提供的密钥不相等时,它才能在多线程环境中正常工作。如果同时处理两个相等的键,则会失败。我们需要编写一个代码来阻止具有相同键的线程同时调用此方法。我编写了三种实现:通过带有这些键的ConcurrentHashMap,通过键索引的AtomicIntegerArray,以及通过简单的同步块来检查正在处理的密钥集。

代码语言:javascript
复制
public class KeyProblem {

    static class Key {

        private int index;

        Key() {
            this.index = (int) (Math.random() * 10) % 10;
        }

        public int getIndex() {
            return index;
        }

        @Override
        public String toString() {
            return "Key{" +
                    "index=" + index +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;

            Key key = (Key) o;

            if (index != key.index) return false;

            return true;
        }

        @Override
        public int hashCode() {
            return index;
        }
    }

    private static ConcurrentHashMap<Key, Object> keysProcessedRightNowForCheck = new ConcurrentHashMap<Key, Object>();

    public static void doStuff(Key key) {
        Object sentinel = keysProcessedRightNowForCheck.putIfAbsent(key, new Object());
        if (sentinel != null) {
            System.out.println("ERROR! Equal keys! " + key + " " + Thread.currentThread().getName());
        }
        try {
            System.out.println(String.format("   started by %s with %s ", Thread.currentThread().getName(), key));
            Thread.sleep(500);
            System.out.println(String.format("   finished by %s with %s ", Thread.currentThread().getName(), key));
        } catch (InterruptedException e) {
        }
        keysProcessedRightNowForCheck.remove(key);
    }

    //first version: via ConcurrentHashMap

    private static ConcurrentHashMap map = new ConcurrentHashMap();
    private static Object waiter = new Object();

    public static void viaConcurrentHashMap(Key key) throws InterruptedException {
        while (map.putIfAbsent(key, new Object()) != null) {
            synchronized (waiter) {
                System.out.println("wait with key " + key);
                waiter.wait();
                System.out.println("done waiting with key " + key);
            }
        }
        System.out.println("started stuff with " + key);
        doStuff(key);
        map.remove(key);
        synchronized (waiter) {
            System.out.println("notified after stuff with " + key);
            waiter.notifyAll();
            System.out.println("done waiting with key " + key);
        }
    }

    //second version: via AtomicIntegerArray for a fixed number of keys

    private static AtomicIntegerArray keyProcessed = new AtomicIntegerArray(10);

    public static void viaAtomicIntegerArray(Key key) throws InterruptedException {

        while (!keyProcessed.compareAndSet(key.getIndex(), 0, 1)) {
            synchronized (waiter) {
                System.out.println("wait with key " + key);
                waiter.wait();
            }
        }

        doStuff(key);
        keyProcessed.decrementAndGet(key.getIndex());

        synchronized (waiter) {
            System.out.println("notified after stuff with " + key);
            waiter.notifyAll();
        }

    }

    //third version: via a simple lock

    private static Object lock = new Object();
    private static Set<Key> keys = new HashSet<Key>();

    public static void viaSimpleSynchronized(Key key) throws InterruptedException {
        synchronized (lock) {
            while (keys.contains(key)) {
                lock.wait();
            }
            keys.add(key);
        }

        doStuff(key);
        synchronized (lock) {
            keys.remove(key);
            lock.notifyAll();
        }
    }

    private static CyclicBarrier barrier;

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

        final int MAX = 100;

        List<Key> keys = new ArrayList<Key>() {{
            for (int i = 0; i < MAX; i++) add(new Key());
        }};

        barrier = new CyclicBarrier(MAX + 1);

        long start = System.currentTimeMillis();

        for (final Key key : keys) {
            Thread t = new Thread() {
                public void run() {
                    try {
//                        viaConcurrentHashMap(key);
                        viaSimpleSynchronized(key);
//                        viaAtomicIntegerArray(key);
                        barrier.await();
                    } catch (InterruptedException e) {
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                    }

                }
            };
            t.start();
        }

        barrier.await();
        System.out.println("system time [ms] " + (System.currentTimeMillis() - start));
        //7 for array
    }
}

对于100个线程,运行时间约为7s,第三个版本的运行速度比预期稍慢。

最后,问题是:

1)我的代码是正确的还是线程安全的?

2)你能提出更好的实施建议吗?

3)在java.util.concurrent中是否有一些类以广义的方式解决了这一任务?我指的是一种障碍,只有在满足某些条件的情况下,才能让线程走。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-08-24 11:38:09

如果您能够同时保存内存中的所有键,并且作为单例(至少这就是示例的工作方式),那么一个非常简单的解决方案似乎如下所示:

  1. 取取适当的钥匙;
  2. synchronized (key) {}块中执行逻辑。
票数 2
EN

Stack Overflow用户

发布于 2014-08-25 13:17:37

您的viaSimpleSynchronized方法是唯一没有中断的方法。另外两个人也有同样的错误:您假设执行操作和随后进入synchronized块在某种程度上是紧密的,而实际上,它们是不同的、不耦合的操作。

例如:

代码语言:javascript
复制
while (map.putIfAbsent(key, new Object()) != null) {
    // in-between right at this point the other thread can do both, remove
    // the key AND execute it’s synchronized(waiter) { waiter.notifyAll(); }
    synchronized (waiter) {
        System.out.println("wait with key " + key);
        waiter.wait(); // therefore this can enter a wait lasting forever
        System.out.println("done waiting with key " + key);
    }
}

在大量线程执行大量操作的测试中,您不会发现这样的错误,因为每个未争用的线程在调用notifyAll()时都会释放所有等待的线程,从而隐藏线程等待已经发生的通知的错误。

因此,这里有一些代码,当活动很少时,代码会意外地中断。

通常,当您在synchronized块之外执行检查时,即使它使用的是像ConcurrentHashMap这样的线程安全构造,也必须重新检查synchronized块中的条件。

唯一的例外是已知的永不恢复的条件(例如,一个“就绪”标志将从false传递到true,但永远不会返回到false)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25471215

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档