package com.conversantmedia.util.concurrent;

/*
 * #%L
 * Conversant Disruptor
 * ~~
 * Conversantmedia.com © 2016, Conversant, Inc. Conversant® is a trademark of Conversant, Inc.
 * ~~
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import java.util.concurrent.atomic.LongAdder;

This is the disruptor implemented for multiple simultaneous reader and writer threads. This data structure approaches 20-40ns for transfers on fast hardware. This code is optimized and tested using a 64bit HotSpot JVM on an Intel x86-64 environment. Other environments should be carefully tested before using in production. Created by jcairns on 5/29/14.
/** * This is the disruptor implemented for multiple simultaneous reader and writer threads. * * This data structure approaches 20-40ns for transfers on fast hardware. * * This code is optimized and tested using a 64bit HotSpot JVM on an Intel x86-64 environment. Other * environments should be carefully tested before using in production. * * * Created by jcairns on 5/29/14. */
public class MultithreadConcurrentQueue<E> implements ConcurrentQueue<E> { /* * Note to future developers/maintainers - This code is highly tuned * and possibly non-intuitive. Rigorous performance and functional * testing should accompany any proposed change * */ // maximum allowed capacity // this must always be a power of 2 // protected final int size; // we need to compute a position in the ring buffer // modulo size, since size is a power of two // compute the bucket position with x&(size-1) // aka x&mask final long mask; // the sequence number of the end of the queue final LongAdder tail = new LongAdder(); final ContendedAtomicLong tailCursor = new ContendedAtomicLong(0L); // use the value in the L1 cache rather than reading from memory when possible long p1, p2, p3, p4, p5, p6, p7; long tailCache = 0L; long a1, a2, a3, a4, a5, a6, a7, a8; // a ring buffer representing the queue final E[] buffer; long r1, r2, r3, r4, r5, r6, r7; long headCache = 0L; long c1, c2, c3, c4, c5, c6, c7, c8; // the sequence number of the start of the queue final LongAdder head = new LongAdder(); final ContendedAtomicLong headCursor = new ContendedAtomicLong(0L);
Construct a blocking queue of the given fixed capacity. Note: actual capacity will be the next power of two larger than capacity.
Params:
  • capacity – maximum capacity of this queue
/** * Construct a blocking queue of the given fixed capacity. * * Note: actual capacity will be the next power of two * larger than capacity. * * @param capacity maximum capacity of this queue */
public MultithreadConcurrentQueue(final int capacity) { size = Capacity.getCapacity(capacity); mask = size - 1L; buffer = (E[])new Object[size]; } @Override public boolean offer(E e) { int spin = 0; for(;;) { final long tailSeq = tail.sum(); // never offer onto the slot that is currently being polled off final long queueStart = tailSeq - size; // will this sequence exceed the capacity if((headCache > queueStart) || ((headCache = head.sum()) > queueStart)) { // does the sequence still have the expected // value if(tailCursor.compareAndSet(tailSeq, tailSeq + 1L)) { try { // tailSeq is valid // and we got access without contention // convert sequence number to slot id final int tailSlot = (int)(tailSeq&mask); buffer[tailSlot] = e; return true; } finally { tail.increment(); } } // else - sequence misfire, somebody got our spot, try again } else { // exceeded capacity return false; } spin = Condition.progressiveYield(spin); } } @Override public E poll() { int spin = 0; for(;;) { final long head = this.head.sum(); // is there data for us to poll if((tailCache > head) || (tailCache = tail.sum()) > head) { // check if we can update the sequence if(headCursor.compareAndSet(head, head+1L)) { try { // copy the data out of slot final int pollSlot = (int)(head&mask); final E pollObj = (E) buffer[pollSlot]; // got it, safe to read and free buffer[pollSlot] = null; return pollObj; } finally { this.head.increment(); } } // else - somebody else is reading this spot already: retry } else { return null; // do not notify - additional capacity is not yet available } // this is the spin waiting for access to the queue spin = Condition.progressiveYield(spin); } } @Override public final E peek() { return buffer[(int)(head.sum()&mask)]; } @Override // drain the whole queue at once public int remove(final E[] e) { /* This employs a "batch" mechanism to load all objects from the ring * in a single update. This could have significant cost savings in comparison * with poll */ final int maxElements = e.length; int spin = 0; for(;;) { final long pollPos = head.sum(); // prepare to qualify? // is there data for us to poll // note we must take a difference in values here to guard against // integer overflow final int nToRead = Math.min((int)(tail.sum() - pollPos), maxElements); if(nToRead > 0 ) { for(int i=0; i<nToRead;i++) { final int pollSlot = (int)((pollPos+i)&mask); e[i] = buffer[pollSlot]; } // if we still control the sequence, update and return if(headCursor.compareAndSet(pollPos, pollPos+nToRead)) { head.add(nToRead); return nToRead; } } else { // nothing to read now return 0; } // wait for access spin = Condition.progressiveYield(spin); } }
This implemention is known to be broken if preemption were to occur after reading the tail pointer. Code should not depend on size for a correct result.
Returns:int - possibly the size, or possibly any value less than capacity()
/** * This implemention is known to be broken if preemption were to occur after * reading the tail pointer. * * Code should not depend on size for a correct result. * * @return int - possibly the size, or possibly any value less than capacity() */
@Override public final int size() { // size of the ring // note these values can roll from positive to // negative, this is properly handled since // it is a difference return (int)Math.max((tail.sum() - head.sum()), 0); } @Override public int capacity() { return size; } @Override public final boolean isEmpty() { return tail.sum() == head.sum(); } @Override public void clear() { int spin = 0; for(;;) { final long head = this.head.sum(); if(headCursor.compareAndSet(head, head+1)) { for(;;) { final long tail = this.tail.sum(); if (tailCursor.compareAndSet(tail, tail + 1)) { // we just blocked all changes to the queue // remove leaked refs for (int i = 0; i < buffer.length; i++) { buffer[i] = null; } // advance head to same location as current end this.tail.increment(); this.head.add(tail-head+1); headCursor.set(tail + 1); return; } spin = Condition.progressiveYield(spin); } } spin = Condition.progressiveYield(spin); } } @Override public final boolean contains(Object o) { for(int i=0; i<size(); i++) { final int slot = (int)((head.sum() + i) & mask); if(buffer[slot]!= null && buffer[slot].equals(o)) return true; } return false; } long sumToAvoidOptimization() { return p1+p2+p3+p4+p5+p6+p7+a1+a2+a3+a4+a5+a6+a7+a8+r1+r2+r3+r4+r5+r6+r7+c1+c2+c3+c4+c5+c6+c7+c8+headCache+tailCache; } }