[Mondrian] Implementation of ResponseQueue

Julian Hyde jhyde at pentaho.com
Mon Jun 10 17:29:11 EDT 2013


I can't see anything wrong with it. You're correct that the response queue needs to be more of a "map" than a "queue". So it makes sense to use a map, with SlotFuture acting a "queue of one" for each request.

It is worth making explicit the assumption that two requests will only be equal if they are the same object. I don't think the previous implementation made that assumption. But it's not too onerous -- none of the current request classes override equals().

This implementation will allow more concurrency. It may allow requests to be serviced in a different order than received, and this is a good thing, but there may be code that assumes a particular order. Watch out for bugs caused by this.

It is worth writing a small test that stresses the queue from a number of threads, and adding that test to the suite.

Julian


On Jun 10, 2013, at 12:28 PM, Luc Boudreau <lucboudreau at gmail.com> wrote:

> Matt and I have been looking into performance bottlenecks. One that we found was in the way that the Actor of SegmentCacheManagerImpl implements the ResponseQueue. Because it synchronized the call to ResponseQueue.take, it was possible for a long running command to prevent other threads from obtaining the response that they needed out of the queue. 
> 
> Another problem was that it relied on a WeakHashMap and the GC was responsible of cleaning it up periodically.
> 
> We came up with the following replacement for the ResponseQueue and were curious if someone can spot a potential problem with this new brew.
> 
> Cheers!
> 
> --------------------------------------------------------------------------------
>    private static class ResponseQueue<K, V> {
>        private final ConcurrentHashMap<K, SlotFuture<V>> queue;
> 
>        /**
>         * Creates a ResponseQueue with given capacity.
>         *
>         * @param capacity Capacity
>         */
>        public ResponseQueue(int capacity) {
>            queue = new ConcurrentHashMap<K, SlotFuture<V>>(capacity);
>        }
> 
>        /**
>         * Places a (request, response) pair onto the queue.
>         *
>         * @param k Request
>         * @param v Response
>         * @throws InterruptedException if interrupted while waiting
>         */
>        public void put(K k, V v) throws InterruptedException {
>            queue.putIfAbsent(k, new SlotFuture<V>());
>            queue.get(k).put(v);
>        }
> 
>        /**
>         * Retrieves the response from the queue matching the given key,
>         * blocking until it is received.
>         *
>         * @param k Response
>         * @return Response
>         * @throws InterruptedException if interrupted while waiting
>         */
>        public V take(K k) throws InterruptedException {
>            queue.putIfAbsent(k, new SlotFuture<V>());
>            V v = Util.safeGet(queue.get(k), "");
>            queue.remove(k);
>            return v;
>        }
>    }
> --------------------------------------------------------------------------------
> _______________________________________________
> Mondrian mailing list
> Mondrian at pentaho.org
> http://lists.pentaho.org/mailman/listinfo/mondrian

Julian Hyde
jhyde at pentaho.com





More information about the Mondrian mailing list