????LinkedBlockingQueue
????LinkedBlockingQueue ?????????????????в????????????С? ??????????????????????????? ??
??????????????
/** The capacity bound?? or Integer.MAX_VALUE if none ??????С */
private final int capacity;
/** Current number of elements ?????????????2?????????????????????AtomicInteger */
private final AtomicInteger count = new AtomicInteger(0);
/**
* Head of linked list.
* Invariant: head.item == null
* ????
*/
private transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
* β???
*/
private transient Node<E> last;
/** Lock held by take?? poll?? etc ??????? */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes ????????????? */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put?? offer?? etc ???????? */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts ?????????????? */
private final Condition notFull = putLock.newCondition();
????ArrayBlockingQueue ???1??????????????????????????????1??????У????????????С?
?????? LinkedBlockingQueue ??2?????????????????????????????????????????????н??е?????????????????????????????1??????????С?
????add ??????????? offer ??????
public boolean offer(E e) {
if (e == null) throw new NullPointerException(); // ??????????
final AtomicInteger count = this.count;
if (count.get() == capacity) // ????????????????false
return false;
int c = -1;
Node<E> node = new Node(e); // ?????????????????????
final ReentrantLock putLock = this.putLock;
putLock.lock(); // ?????????????????offer????????????1?????
try {
// ????ж??????????????????????????????????????????????????????
if (count.get() < capacity) {
enqueue(node); // ???????????β??
c = count.getAndIncrement(); // ??????+1
if (c + 1 < capacity) // ????????????
notFull.signal(); // ???????????????notFull??????????????????????????????????????????
}
} finally {
putLock.unlock(); // ???????????????????????offer????
}
// ?????????????????????????????????????????????????count??仯???????if???????????????л???1??????
if (c == 0)
// ????????????????notEmpty?????????????1?????????????????1??????????????????
signalNotEmpty();
return c >= 0; // ?????????true????????false
}
????put ??????
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); // ??????????
int c = -1;
Node<E> node = new Node(e); // ????????????
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // ?????????????????put????????????1?????
try {
while (count.get() == capacity) { // ???????????
notFull.await(); // ??????????????
}
enqueue(node); // ???????????β??
c = count.getAndIncrement(); // ??????+1
if (c + 1 < capacity) // ????????????
// ???????????????notFull????????????????????????????????????????????????л????
notFull.signal();
} finally {
putLock.unlock(); // ???????????????????????put????
}
// ??????????????????????????????????????????count??仯???????if???????????????л???1??????
if (c == 0)
// ????????????????notEmpty?????????????1?????????????????1??????????????????
signalNotEmpty();
}
????ArrayBlockingQueue ?з??????????????????????????????????
??????LinkedBlockingQueue?з??????????????????????????2???????????????з?????????????????????????????????????л???????????????????????????????????????????????????????????
????poll ??????
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0) // ??????????0
return null; // ????null
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // ?????????????????poll????????????1?????
try {
if (count.get() > 0) { // ?ж???????????????
x = dequeue(); // ???????
c = count.getAndDecrement(); // ??????-1
if (c > 1) // ??????????????
// ????????????????notEmpty????????????????????????????????????????????
notEmpty.signal();
}
} finally {
takeLock.unlock(); // ????????????????????????poll????
}
// ????????????????????????????????????????count??仯???????if???????????????л??????????????
if (c == capacity)
// ???????????????notFull?????????????1???????????????????????????
signalNotFull();
return x;
}
????take ??????
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // ?????????????????take????????????1?????
try {
while (count.get() == 0) { // ????????????????????
notEmpty.await(); // ??????????????
}
x = dequeue(); // ???????
c = count.getAndDecrement(); // ??????-1
if (c > 1) // ??????????????
// ????????????????notEmpty????????????????????????????????????????????
notEmpty.signal();
} finally {
takeLock.unlock(); // ????????????????????????take????
}
// ????????????????????????????????????????count??仯???????if???????????????л??????????????
if (c == capacity)
// ???????????????notFull?????????????1???????????????????????????
signalNotFull();
return x;
}
????remove ??????
public boolean remove(Object o) {
if (o == null) return false;
fullyLock(); // remove??????????λ?ò??????2?????????????
try {
for (Node<E> trail = head?? p = trail.next; // ???????????????
p != null;
trail = p?? p = p.next) {
if (o.equals(p.item)) { // ?ж???????????
unlink(p?? trail); // ?????????????????????notFull??signal????
return true;
}
}
return false;
} finally {
fullyUnlock(); // 2????????
}
}
????LinkedBlockingQueue ??take??????????????????????????poll???????????????remove???????????????
?????????????remove???????????????????λ?ò?????????2????????????