多线程队列
约 4689 字大约 16 分钟
2024-12-27
1. 简介
,栈是先进后出。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列:
- 阻塞队列的典型例子是 BlockingQueue
- 非阻塞队列的典型例子是 ConcurrentLinkedQueue
线程安全: 线程安全的类,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的。 如果由于多线程的访问(比如修改、遍历、查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类就不是线程安全的。
在并发的队列上jdk提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列, 无论在那种都继承自Queue。
1.1. 队列的使用场景
最典型的就是线程池,不同的线程池都是基于不同的队列来实现多任务等待的。
1.LinkedBlockingQueue使用场景:
在项目的一些核心业务且生产和消费速度相似的场景中:订单完成的邮件/短信提醒。 订单系统中当用户下单成功后,将信息放入ArrayBlockingQueue中, 由消息推送系统取出数据进行消息推送提示用户下单成功。如果订单的成交量非常大,那么使用ArrayBlockingQueue就会有一些问题, 固定数组很容易被使用完,此时调用的线程会进入阻塞,那么可能无法及时将消息推送出去,所以使用LinkedBlockingQueue比较合适, 但是要注意消费速度不能太低,不然很容易内存被使用完。
2.PriorityBlockingQueue使用场景:
在项目上存在优先级的业务:VIP排队购票 用户购票的时候,根据用户不同的等级,优先放到队伍的前面,当存在票源的时候,根据优先级分配
3.DelayQueue使用场景 :
由于是基于优先级队列实现,但是它比较的是时间,我们可以根据需要去倒叙或者正序排列(一般都是倒叙,用于倒计时)。所以适用于: 订单超时取消功能、网站刷题倒计时 用户下订单未支付开始倒计时,超时则释放订单中的资源,如果取消或者完成支付,我们再将队列中的数据移除掉。
4.SynchronousQueue使用场景:
参考线程池newCachedThreadPool()。 如果我们不确定每一个来自生产者请求数量但是需要很快的处理掉,那么配合SynchronousQueue为每个生产 者请求分配一个消费线程是最简洁的办法。
2. 阻塞队列
阻塞队列(Blocking Queue)提供了可阻塞的 put 和 take 方法,它们与可定时的 offer 和 poll 是等价的。如果队列满了 put 方法会被阻塞等到有空间可用再将元素插入;如果队列是空的,那么 take 方法也会阻塞,直到有元素可用。当队列永远不会被充满时,put 方法和 take 方法就永远不会阻塞
可能报异常 | 返回布尔值 | 可能阻塞 | 设定等待时间 | |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
出队 | remove() | poll() | take() | poll(timeout, unit) |
查看 | element() | peek() | - | - |
注意:
add(e) remove() element() 方法不会阻塞线程。当不满足约束条件时,会抛出IllegalStateException 异常。例如:当队列被元素填满后,再调用add(e),则会抛出异常。
offer(e) poll() peek() 方法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调用offer(e),则不会插入元素,函数返回false。
要想要实现阻塞功能,需要调用put(e) take() 方法。当不满足约束条件时,会阻塞线程。
BlockingQueue 阻塞算法。
BlockingQueue作为线程容器,可以为线程同步提供有力的保障。
2.1. ArrayBlockingQueue
详情:
- 基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
- ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;
- 按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
- ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。
- 这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
2.2. LinkedBlockingQueue
详情:
- 基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;
- 只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
- 而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
- 作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
2.3. PriorityBlockingQueue
详情: 基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
2.4. DelayQueue
详情: 带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
2.5. SynchronousQueue
一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,否则插入操作一直处于阻塞状态。
2.6. LinkedTransferQueue
一个基于链表结构的无界阻塞队列,支持生产者消费者模式。
2.7. LinkedBlockingDeque
详情:
- LinkedBlockingDeque是一个线程安全的双端队列实现,由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。
- 双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。可以说他是最为复杂的一种队列,在内部实现维护了前端和后端节点,但是其没有实现读写分离,因此同一时间只能有一个线程对其讲行操作。在高并发中性能要远低于其它BlockingQueue。更要低于ConcurrentLinkedQueue,jdk早期有一个非线程安全的Deque就是ArryDeque了, java6里添加了LinkBlockingDeque来弥补多线程场景下线程安全的问题。
- 相比于其他阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first结尾的方法,表示插入、获取获移除双端队列的第一个元素。以last结尾的方法,表示插入、获取获移除双端队列的最后一个元素。
- LinkedBlockingDeque还是可选容量的,防止过度膨胀,默认等于Integer.MAX_VALUE。
主要方法:
方法 | 解释 |
---|---|
akeFirst(),takeLast() | 分别返回类表中第一个和最后一个元素,返回的元素会从类表中移除。如果列表为空,调用的方法的线程将会被阻塞直达列表中有可用元素 |
getFirst()和getLast() | 分别返回类表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,则抛出NoSuckElementException异常。 |
peek()、peekFirst()和peekLast() | 分别返回列表中第一个元素和最后一个元素,返回元素不会被移除。如果列表为空返回null。 |
poll()、pollFirst()和pollLast() | 分别返回类表中第一个和最后一个元素,返回的元素会从列表中移除。如果列表为空,返回Null。 |
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)
// 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。
boolean add(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 从此双端队列移除所有元素。
void clear()
// 如果此双端队列包含指定的元素,则返回 true。
boolean contains(Object o)
// 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 获取但不移除此双端队列表示的队列的头部。
E element()
// 获取,但不移除此双端队列的第一个元素。
E getFirst()
// 获取,但不移除此双端队列的最后一个元素。
E getLast()
// 返回在此双端队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offer(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerFirst(E e)
// 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerLast(E e)
// 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E peek()
// 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E peekFirst()
// 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E peekLast()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E poll()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
E poll(long timeout, TimeUnit unit)
// 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E pollFirst()
// 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E pollLast()
// 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 从此双端队列所表示的堆栈中弹出一个元素。
E pop()
// 将元素推入此双端队列表示的栈。
void push(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间。
void put(E e)
// 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
void putFirst(E e)
// 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。
void putLast(E e)
// 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数。
int remainingCapacity()
// 获取并移除此双端队列表示的队列的头部。
E remove()
// 从此双端队列移除第一次出现的指定元素。
boolean remove(Object o)
// 获取并移除此双端队列第一个元素。
E removeFirst()
// 从此双端队列移除第一次出现的指定元素。
boolean removeFirstOccurrence(Object o)
// 获取并移除此双端队列的最后一个元素。
E removeLast()
// 从此双端队列移除最后一次出现的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此双端队列中的元素数。
int size()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。
E take()
// 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素。
E takeFirst()
// 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素。
E takeLast()
// 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()
3. 非阻塞队列
所有无Blocking Queue的都是非阻塞,并且它不会包含 put 和 take 方法。
**定义:**基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。
队列的头部 是队列中时间最长的元素。
队列的尾部 是队列中时间最短的元素。
新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。
当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。
3.1. ConcurrentLinkedQueue
详情: 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。
ConcurrentLinkedQueue重要方法:
add()和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法投有任何区别)
poll()和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会,相当于查看。
在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。
使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。另外它们都是线程安全的,不用考虑线程同步问题。