package com.conversantmedia.util.concurrent;

/*
 * #%L
 * Conversant Disruptor
 * ~~
 * Conversantmedia.com © 2016, Conversant, Inc. Conversant® is a trademark of Conversant, Inc.
 * ~~
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */


Dmitry Vyukov, Bounded MPMC queue - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue Added for benchmarking and comparison. MultithreadConcurrentQueue performs better in the regimes I have tested. Created by jcairns on 5/29/14.
/** * Dmitry Vyukov, Bounded MPMC queue - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue * * Added for benchmarking and comparison. MultithreadConcurrentQueue performs better in the regimes I have tested. * * Created by jcairns on 5/29/14. */
class MPMCConcurrentQueue<E> implements ConcurrentQueue<E> { protected final int size; final long mask; // a ring buffer representing the queue final Cell<E>[] buffer; final ContendedAtomicLong head = new ContendedAtomicLong(0L); final ContendedAtomicLong tail = new ContendedAtomicLong(0L);
Construct a blocking queue of the given fixed capacity. Note: actual capacity will be the next power of two larger than capacity.
Params:
  • capacity – maximum capacity of this queue
/** * Construct a blocking queue of the given fixed capacity. * * Note: actual capacity will be the next power of two * larger than capacity. * * @param capacity maximum capacity of this queue */
public MPMCConcurrentQueue(final int capacity) { // capacity of at least 2 is assumed int c = 2; while(c < capacity) c <<=1; size = c; mask = size - 1L; buffer = new Cell[size]; for(int i=0; i<size; i++) { buffer[i] = new Cell<E>(i); } } @Override public boolean offer(E e) { Cell<E> cell; long tail = this.tail.get(); for(;;) { cell = buffer[(int)(tail & mask)]; final long seq = cell.seq.get(); final long dif = seq - tail; if(dif == 0) { if(this.tail.compareAndSet(tail, tail+1)) { break; } } else if(dif < 0) { return false; } else { tail = this.tail.get(); } } cell.entry = e; cell.seq.set(tail + 1); return true; }; @Override public E poll() { Cell<E> cell; long head = this.head.get(); for(;;) { cell = buffer[(int)(head & mask)]; long seq = cell.seq.get(); final long dif = seq - (head+1L); if(dif == 0) { if(this.head.compareAndSet(head, head+1)) { break; } } else if(dif < 0) { return null; } else { head = this.head.get(); } } try { return cell.entry; } finally { cell.entry = null; cell.seq.set(head + mask + 1L); } } @Override public final E peek() { return buffer[(int)(head.get()&mask)].entry; } @Override // drain the whole queue at once public int remove(final E[] e) { int nRead = 0; while(nRead < e.length && !isEmpty()) { final E entry = poll(); if(entry != null) { e[nRead++] = entry; } } return nRead; } @Override public final int size() { return (int)Math.max((tail.get() - head.get()), 0); } @Override public int capacity() { return size; } @Override public final boolean isEmpty() { return head.get() == tail.get(); } @Override public void clear() { while(!isEmpty()) poll(); } @Override public final boolean contains(Object o) { for(int i=0; i<size(); i++) { final int slot = (int)((head.get() + i) & mask); if(buffer[slot].entry != null && buffer[slot].entry.equals(o)) return true; } return false; } protected static final class Cell<R> { final ContendedAtomicLong seq = new ContendedAtomicLong(0L); public long p1, p2, p3, p4, p5, p6, p7; R entry; public long a1, a2, a3, a4, a5, a6, a7, a8; Cell(final long s) { seq.set(s); entry = null; } public long sumToAvoidOptimization() { return p1+p2+p3+p4+p5+p6+p7+a1+a2+a3+a4+a5+a6+a7+a8; } } }