blockingqueue如何使用Disruptor从Ringbuffer读取

blockingqueue  时间:2021-07-12  阅读:()

java 中怎么使用queue

展开全部 Queue接口与List、Set同一级别,都是继承了Collection接口。

LinkedList实现了Queue接 口。

Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时,就完全只能访问Queue接口所定义的方法 了,而不能直接访问 LinkedList的非Queue的方法),以使得只有恰当的方法才可以使用。

BlockingQueue 继承了Queue接口。

队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具。

工作者线程可 以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们。

队列会自动平衡负载。

如果第一个线程集运行得比第二个慢,则第二个 线程集在等待结果时就会阻塞。

如果第一个线程集运行得快,那么它将等待第二个线程集赶上来。

下表显示了jdk1.5中的阻塞队列的操作: add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常 remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常 element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常 offer 添加一个元素并返回true 如果队列已满,则返回false poll 移除并返问队列头部的元素 如果队列为空,则返回null peek 返回队列头部的元素 如果队列为空,则返回null put 添加一个元素 如果队列满,则阻塞 take 移除并返回队列头部的元素 如果队列为空,则阻塞 remove、element、offer 、poll、peek 其实是属于Queue接口。

阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。

当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。

这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。

java.util.concurrent的队列

java.util.concurrent ConcurrentLinkedQueue 类提供了高效的、可伸缩的、线程安全的非阻塞 FIFO 队列。

java.util.concurrent 中的五个实现都支持扩展的 BlockingQueue 接口,该接口定义了 put 和 take 的阻塞版本:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、PriorityBlockingQueue 和 DelayQueue。

这些不同的类覆盖了生产者-使用者、消息传递、并行任务执行和相关并发设计的大多数常见使用的上下文。

java中的queue类是什么,啥作用?

java5中新增加了java.util.Queue接口,用以支持队列的常见操作。

该接口扩展了java.util.Collection接口。

Queue使用时要尽量避免Collection的add()和remove()方法,而是要使用offer()来加入元素,使用poll()来获取并移出元素。

它们的优 点是通过返回值可以判断成功与否,add()和remove()方法在失败的时候会抛出异常。

如果要使用前端而不移出该元素,使用 element()或者peek()方法。

值得注意的是LinkedList类实现了Queue接口,因此我们可以把LinkedList当成Queue来用。

小例子: /** * * @author Zang XT */ import java.util.Queue; import java.util.LinkedList; public class TestQueue { public static void main(String[] args) { Queue<String> queue = new LinkedList<String>(); queue.offer("Hello"); queue.offer("World!"); queue.offer("你好!"); System.out.println(queue.size()); String str; while((str=queue.poll())!=null){ System.out.print(str); } System.out.println(); System.out.println(queue.size()); } } offer,add区别: 一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。

这时新的 offer 方法就可以起作用了。

它不是对调用 add() 方法抛出一个 unchecked 异常,而只是得到由 offer() 返回的 false。

poll,remove区别: remove() 和 poll() 方法都是从队列中删除第一个元素(head)。

remove() 的行为与 Collection 接口的版本相似, 但是新的 poll() 方法在用空集合调用时不是抛出异常,只是返回 null。

因此新的方法更适合容易出现异常条件的情况。

peek,element区别: element() 和 peek() 用于在e69da5e887aa62616964757a686964616f31333332613666队列的头部查询元素。

与 remove() 方法类似,在队列为空时, element() 抛出一个异常,而 peek() 返回 null。

处理InterruptedException,捕捉到它,然后怎么处理它? 详细??

您不能忽略这个异常, 因为它是一个检查异常(checked excepti on) 。

但是应该如何处理它呢? 在本月的 Java 理论与实践中, 并发专家 Bri an Goetz 将解释 I nterruptedExcepti on 的含义, 为什么会抛出 I nterruptedExcepti on, 以及在捕捉到该异常时应该怎么做。

这样的情景您也许并不陌生: 您在编写一个测试程序, 程序需要暂停一段时间, 于是调用 Thread. sleep( ) 。

但是编译器或 I DE 报错说没有处理检查到的 InterruptedException 。

InterruptedException 是什么呢, 为什么必须处理它? 对于 InterruptedException , 一种常见的处理方式是 “生吞(swal l ow) ” 它 —— 捕捉它, 然后什么也不做(或者记录下它, 不过这也好不到哪去) —— 就像后面的 清单 4 一样。

不幸的是, 这种方法忽略了这样一个事实: 这期间可能发生中断, 而中断可能导致应用程序丧失及时取消活动或关闭的能力。

当一个方法抛出 InterruptedException 时, 它不仅告诉您它可以抛出一个特定的检查异常, 而且还告诉您其他一些事情。

例如, 它告诉您它是一个阻塞(blocking) 方法, 如果您响应得当的话, 它将尝试消除阻塞并尽早返回。

阻塞方法不同于一般的要运行较长时间的方法。

一般方法的完成只取决于它所要做的事情,以及是否有足够多可用的计算资源(CPU 周期和内存) 。

而阻塞方法的完成还取决于一些外部的事件, 例如计时器到期, I /O 完成, 或者另一个线程的动作(释放一个锁, 设置一个标志, 或者将一个任务放在一个工作队列中) 。

一般方法在它们的工作做完后即可结束,而阻塞方法较难于预测, 因为它们取决于外部事件。

阻塞方法可能影响响应能力, 因为难于预测它们何时会结束。

阻塞方法可能因为等不到所等的事件而无法终止, 因此令阻塞方法可取消 就非常有用(如果长时间运行的非阻塞方法是可取消的, 那么通常也非常有用) 。

可取消操作是指能从外部使之在正常完成之前终止的操作。

由 Thread 提供并受 Thread. sleep( ) 和 Obj ect. wait( ) 支持的中断机制就是一种取消机制; 它允许一个线程请求另一个线程停止它正在做的事情。

当一个方法抛出 InterruptedException 时, 它是在告诉您, 如果执行该方法的线程被中断, 它将尝试停止它正在做的事情而提前返回, 并通过抛出 InterruptedException 表明它提前返回。

行为良好的阻塞库方法应该能对中断作出响应并抛出 InterruptedException , 以便能够用于可取消活动中, 而不至于影响响应。

每个线程都有一个与之相关联的 Bool ean 属性, 用于表示线程的中断状态(interrupted status) 。

中断状态初始时为 fal se; 当另一个线程通过调用 Thread. interrupt( ) 中断一个线程时, 会出现以下两种情况之一。

如果那个线程在执行一个低级可中断阻塞方法, 例如 Thread. sleep( ) 、 Thread. j oin( ) 或 Obj ect. wait( ) , 那么它将取消阻塞并抛出 InterruptedException 。

否则, interrupt( ) 只是设置线程的中断状态。

在被中断线程中运行的代码以后可以轮询中断状态, 看看它是否被请求停止正在做的事情。

中断状态可以通过 Thread. isInterrupted( ) 来读取, 并且可以通过一个名为 Thread. interrupted( ) 的操作读取和清除。

中断是一种协作机制。

当一个线程中断另一个线程时, 被中断的线程不一定要立即停止正在做的事情。

相反, 中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情。

有些方法, 例如 Thread. sleep( ) , 很认真地对待这样的请求, 但每个方法不是一定要对中断作出响应。

对于中断请求, 不阻塞但是仍然要花较长时间执行的方法可以轮询中断状态, 并在被中断的时候提前返回。

您可以随意忽略中断请求, 但是这样做的话会影响响应。

中断的协作特性所带来的一个好处是, 它为安全地构造可取消活动提供更大的灵活性。

我们很少希望一个活动立即停止; 如果活动在正在进行更新的时候被取消, 那么程序数据结构可能处于不一致状态。

中断允许一个可取消活动来清理正在进行的工作, 恢复不变量, 通知其他活动它要被取消, 然后才终止。

InterruptedExcepti on 如果抛出 InterruptedException 意味着一个方法是阻塞方法, 那么调用一个阻塞方法则意味着您的方法也是一个阻塞方法, 而且您应该有某种策略来处理 InterruptedException 。

通常最容易的策略是自己抛出 InterruptedException ,如清单 1 中 putTask( ) 和 getTask( ) 方法中的代码所示。

这样做可以使方法对中断作出响应, 并且只需将 InterruptedException 添加到 throws 子句。

1 . InterruptedExcepti on publ i c cl ass TaskQueue { pri vate stati c fi nal i nt MAX_TASKS = 1000; pri vate Bl ocki ngQueue queue = new Li nkedBl ocki ngQueue(MAX_TASKS) ; publ i c voi d putTask(Task r) { queue. put(r) ; } publ i c Task getTask() { return queue. take() ; } } 有时候需要在传播异常之前进行一些清理工作。

在这种情况下, 可以捕捉 InterruptedException , 执行清理, 然后抛出异常。

清单 2 演示了这种技术, 该代码是用于匹配在线游戏服务中的玩家的一种机制。

matchPlayers( ) 方法等待两个玩家到来, 然后开始一个新游戏。

如果在一个玩家已到来, 但是另一个玩家仍未到来之际该方法被中断, 那么它会将那个玩家放回队列中, 然后重新抛出 InterruptedException , 这样那个玩家对游戏的请求就不至于丢失。

2. InterruptedExcepti on publ i c cl ass Pl ayerMatcher { pri vate Pl ayerSource pl ayers; publ i c Pl ayerMatcher(Pl ayerSource pl ayers) { thi s. pl ayers = pl ayers; } publ i c voi d matchPl ayers() { try { Pl ayer pl ayerOne, pl ayerTwo; whi l e (true) { pl ayerOne = pl ayerTwo = nul l ; // Wai t for two pl ayers to arri ve and start a new game pl ayerOne = pl ayers. wai tForPl ayer() ; // coul d throw IE pl ayerTwo = pl ayers. wai tForPl ayer() ; // coul d throw IE startNewGame(pl ayerOne, pl ayerTwo) ; } } catch (InterruptedExcepti on e) { // If we got one pl ayer and were i nterrupted, put that pl ayer back i f (pl ayerOne ! = nul l ) pl ayers. addFi rst(pl ayerOne) ; // Then propagate the excepti on } } } 有时候抛出 InterruptedException 并不合适, 例如当由 Runnable 定义的任务调用一个可中断的方法时, 就是如此。

在这种情况下, 不能重新抛出 InterruptedException , 但是您也不想什么都不做。

当一个阻塞方法检测到中断并抛出 InterruptedException 时, 它清除中断状态。

如果捕捉到 InterruptedException 但是不能重新抛出它, 那么应该保留中断发生的证据, 以便调用栈中更高层的代码能知道中断, 并对中断作出响应。

该任务可以通过调用 interrupt( ) 以 “重新中断” 当前线程来完成, 如清单 3 所示。

至少, 每当捕捉到 InterruptedException 并且不重新抛出它时, 就在返回之前重新中断当前线程。

3. InterruptedExcepti on publ i c cl ass TaskRunner i mpl ements Runnabl e { pri vate Bl ocki ngQueue queue; publ i c TaskRunner(Bl ocki ngQueue queue) { thi s. queue = queue; } publ i c voi d run() { try { whi l e (true) { Task task = queue. take(10, Ti meUni t. SECONDS) ; task. execute() ; } } catch (InterruptedExcepti on e) { // Restore the i nterrupted status } } } 处理 InterruptedException 时采取的最糟糕的做法是生吞它 —— 捕捉它, 然后既不重新抛出它, 也不重新断言线程的中断状态。

对于不知如何处理的异常, 最标准的处理方法是捕捉它, 然后记录下它, 但是这种方法仍然无异于生吞中断, 因为调用栈中更高层的代码还是无法获得关于该异常的信息。

(仅仅记录 InterruptedException 也不是明智的做法, 因为等到人来读取日志的时候, 再来对它作出处理就为时已晚了。

) 清单 4 展示了一种使用得很广泛的模式, 这也是生吞中断的一种模式: 4. // Don' t do thi s publ i c cl ass TaskRunner i mpl ements Runnabl e { pri vate Bl ocki ngQueue queue; publ i c TaskRunner(Bl ocki ngQueue queue) { thi s. queue = queue; } publ i c voi d run() { try { whi l e (true) { Task task = queue. take(10, Ti meUni t. SECONDS) ; task. execute() ; } } catch (InterruptedExcepti on swal l owed) { /* DON' T DO THIS - RESTORE THE INTERRUPTED STATUS INSTEAD */ } } } 如果不能重新抛出 InterruptedException , 不管您是否计划处理中断请求, 仍然需要重新中断当前线程, 因为一个中断请求可能有多个 “接收者” 。

标准线程池( ThreadPoolExecutor ) worker 线程实现负责中断, 因此中断一个运行在线程池中的任务可以起到双重效果, 一是取消任务, 二是通知执行线程线程池正要关闭。

如果任务生吞中断请求, 则 worker 线程将不知道有一个被请求的中断, 从而耽误应用程序或服务的关闭。

语言规范中并没有为中断提供特定的语义, 但是在较大的程序中, 难于维护除取消外的任何中断语义。

取决于是什么活动, 用户可以通过一个 GUI 或通过网络机制, 例如 JMX 或 Web 服务来请求取消。

程序逻辑也可以请求取消。

例如, 一个 Web 爬行器(crawl er) 如果检测到磁盘已满, 它会自动关闭自己, 否则一个并行算法会启动多个线程来搜索解决方案空间的不同区域, 一旦其中一个线程找到一个解决方案, 就取消那些线程。

仅仅因为一个任务是可取消的, 并不意味着需要立即 对中断请求作出响应。

对于执行一个循环中的代码的任务, 通常只需为每一个循环迭代检查一次中断。

取决于循环执行的时间有多长, 任何代码可能要花一些时间才能注意到线程已经被中断(或者是通过调用 Thread. isInterrupted( ) 方法轮询中断状态, 或者是调用一个阻塞方法) 。

如果任务需要提高响应能力, 那么它可以更频繁地轮询中断状态。

阻塞方法通常在入口就立即轮询中断状态, 并且, 如果它被设置来改善响应能力, 那么还会抛出 InterruptedException 。

惟一可以生吞中断的时候是您知道线程正要退出。

只有当调用可中断方法的类是 Thread 的一部分, 而不是 Runnable 或通用库代码的情况下, 才会发生这样的场景, 清单 5 演示了这种情况。

清单 5 创建一个线程, 该线程列举素数, 直到被中断, 这里还允许该线程在被中断时退出。

用于搜索素数的循环在两个地方检查是否有中断: 一处是在 whi l e 循环的头部轮询 isInterrupted( ) 方法, 另一处是调用阻塞方法 BlockingQueue. put( ) 。

5. publ i c cl ass Pri meProducer extends Thread { pri vate fi nal Bl ocki ngQueue queue; Pri meProducer(Bl ocki ngQueue queue) { thi s. queue = queue; } publ i c voi d run() { try { Bi gInteger p = Bi gInteger. ONE; whi l e (! Thread. currentThread() . i sInterrupted() ) queue. put(p = p. nextProbabl ePri me() ) ; } catch (InterruptedExcepti on consumed) { /* Al l ow thread to exi t */ } } publ i c voi d cancel () { i nterrupt() ; } } 并非所有的阻塞方法都抛出 InterruptedException 。

输入和输出流类会阻塞等待 I /O 完成, 但是它们不抛出 InterruptedException , 而且在被中断的情况下也不会提前返回。

然而, 对于套接字 I /O, 如果一个线程关闭套接字, 则那个套接字上的阻塞 I /O 操作将提前结束, 并抛出一个 SocketException 。

j ava. nio 中的非阻塞 I /O 类也不支持可中断 I /O, 但是同样可以通过关闭通道或者请求 Selector 上的唤醒来取消阻塞操作。

类似地, 尝试获取一个内部锁的操作(进入一个 synchronized 块) 是不能被中断的, 但是 ReentrantLock 支持可中断的获取模式。

有些任务拒绝被中断, 这使得它们是不可取消的。

但是, 即使是不可取消的任务也应该尝试保留中断状态, 以防在不可取消的任务结束之后, 调用栈上更高层的代码需要对中断进行处理。

清单 6 展示了一个方法, 该方法等待一个阻塞队列, 直到队列中出现一个可用项目,而不管它是否被中断。

为了方便他人, 它在结束后在一个 fi nal l y 块中恢复中断状态, 以免剥夺中断请求的调用者的权利。

(它不能在更早的时候恢复中断状态, 因为那将导致无限循环 —— BlockingQueue. take( ) 将在入口处立即轮询中断状态, 并且, 如果发现中断状态集, 就会抛出 InterruptedException 。

) 6. publ i c Task getNextTask(Bl ocki ngQueue queue) { bool ean i nterrupted = fal se; try { whi l e (true) { try { return queue. take() ; } catch (InterruptedExcepti on e) { i nterrupted = true; // fal l through and retry } } } fi nal l y { i f (i nterrupted) Thread. currentThread() . i nterrupt() ; } } 您可以用 Java 平台提供的协作中断机制来构造灵活的取消策略。

各活动可以自行决定它们是可取消的还是不可取消的, 以及如何对中断作出响应, 如果立即返回会危害应用程序完整性的话, 它们还可以推迟中断。

BlockingQueue可用于什么场合

BlockingQueue是一个由数组支持的有界阻塞队列,也就是说当一个线程向一个固定大小的BlockingQueue队列里面不停地存放数据,另一个线程不停的向这个队列里面取数据,如果队列满了,还继续存放数据,此时出现阻塞,直到队列有空闲的位置;反之,如果队列为空,还继续取数据,则会出现阻塞,直到队列中有数据为止

如何使用Disruptor从Ringbuffer读取

它可以用来替代队列,同时有很多SEDA和Actors模式的特性。

和队列比较:Disruptor可以向其他线程发送消息,并在需要的时候唤醒其他线程(和BlockingQueue相似)。

不过,他们之间有三个主要的区别。

2. 把消息放入Disruptor需要2个步骤

LOCVPS:VPS主机全场8折,德国/荷兰/美国KVM终身7折

LOCVPS发来了针对元旦新年的促销活动,除了全场VPS主机8折优惠外,针对德国/荷兰KVM #1/美国KVM#2 VPS提供终身7折优惠码(限量50名,先到先得)。LOCVPS是一家成立于2012年的国人VPS服务商,提供中国香港、韩国、美国、日本、新加坡、德国、荷兰、俄罗斯等地区VPS服务器,基于KVM或XEN架构(推荐优先选择KVM),均选择直连或者优化线路,国内延迟低,适合建站或远程办公使...

腾讯云CVM云服务器大硬盘方案400GB和800GB数据盘方案

最近看到群里的不少网友在搭建大数据内容网站,内容量有百万篇幅,包括图片可能有超过50GB,如果一台服务器有需要多个站点的话,那肯定默认的服务器50GB存储空间是不够用的。如果单独在购买数据盘会成本提高不少。这里我们看到腾讯云促销活动中有2款带大数据盘的套餐还是比较实惠的,一台是400GB数据盘,一台是800GB数据盘,适合他们的大数据网站。 直达链接 - 腾讯云 大数据盘套餐服务器这里我们看到当前...

TmhHost 全场八折优惠且充值返10% 多款CN2线路

TmhHost 商家是一家成立于2019年的国人主机品牌。目前主营的是美国VPS以及美国、香港、韩国、菲律宾的独立服务器等,其中VPS业务涵盖香港CN2、香港NTT、美国CN2回程高防、美国CN2 GIA、日本软银、韩国cn2等,均为亚太中国直连优质线路,TmhHost提供全中文界面,支持支付宝付款。 TmhHost黑五优惠活动发布了,全场云服务器、独立服务器提供8折,另有充值返现、特价服务器促销...

blockingqueue为你推荐
soapui下载手机系统用户界面软件下载akf德州水份检测仪价格,AKF系列卡尔费休水份测定仪和世界顶级进口品牌相比怎么样?知识库管理系统什么是知识管理rdlDVD±RW/±RDL/RAM 具体什么意思jqlJQL JINQILIN注册过商标吗?还有哪些分类可以注册?inode智能客户端iNode 智能客户端windows7上网方法vipjrvipjr跟哒哒英语比,两家公司的区别在哪里?各自的特点有哪些?ruby语言Ruby语言输入方法法qq业务中心QQ业务怎么开通?基础设施即服务基础设施、 产品服务、 财务和 () 这几个问题是商业模式设计需要去主要解决的。
租服务器价格 阿里云os jsp主机 payoneer xfce 华为云主机 网页背景图片 网通服务器ip 免费活动 免费全能主机 南通服务器 稳定免费空间 免费高速空间 东莞服务器 免费邮件服务器 双线空间 云服务是什么意思 海外加速 apache启动失败 alexa搜 更多