探索数据上Node.js流的触发时间和顺序
上一次写Stream管道的细节,在源代码中发现了一段无用的逻辑,由此引发了对Stream数据事件的触发时序和顺序的探索。
无用的逻辑
当时基于Node.js v8.11.1的源代码研究了管道的细节,其中上游ondata事件处理如下:
//如果用户在我们向dest写入数据时推送更多数据,那么我们将再次在ondata中结束//。但是,我们只想增加一次awaitDrain,因为//dest对于多次写入只会发出一个“Drain”事件。//=在增加awaitDrain.var增加waitDrain=false上引入一个保护;src.on('data ',on data);函数on data(chunk){ debug(' on data ');递增等待排出=假;var ret=dest . write(chunk);if (false===ret!递增等待排干){ if(((state . pipes count===1 state . pipes===dest)| |(state . pipes count 1 state . pipes . indexof(dest)!==-1)) !cleanedUp) { debug('假写响应,暂停',src。_ readablestate . await drain);src。_ readableState.awaitDrain递增等待消耗=真;} src . pause();}}关注increasedAwaitDrain变量,了解这个变量期望实现什么,然后仔细阅读代码,就会发现if (false===ret!语句必须为false,因为变量只在前一行被赋值为false,所以变量变得毫无意义。
递增等待排出=假;var ret=dest . write(chunk);if (false===ret!incrementedwaitrain){ }以上是关键的三行代码,因为Node.js是单线程,变量incrementedwaitrain的值在dest.write(chunk)中没有修改,那么if语句中的incrementedwaitrain的值肯定还是false,也就是说,incrementedwaitrain的相关逻辑没有达到预期的目标。
为什么会出现无用的代码
虽然上一段已经分析过了,增加waitDrain不管用,但是作者为什么要写这样的逻辑呢?实际上,在定义increasedAwaitDrain语句的上面,作者说可能会出现这样的情况:“当我们收到一个上游ondata事件,试图向下游写入数据时,上游可能会触发另一个数据事件,两个ondata的数据在写入下游时都可能返回false,从而导致src。_readableState.awaitDrain将被执行两次”。
作者不希望执行两次“等待草稿”,因为当下游触发“草稿”事件时,“等待草稿”将减少1,而上游在其值为0之前不会再次流动。如果等待草稿执行两次,下游仅触发一次草稿事件,则等待草稿将不会为0,上游将无法继续读取数据而不会再次流动。
探索真相的过程
虽然理性地认为增加等待流失没有起到作用,但我不能确定增加绝对。我自己试着求助,没有师傅指出问题。不过听了我的描述,有同事说可能是BUG。虽然他认为不太可能,但他还是抱着试试看的态度转到了大师分支。随即发现最新代码中没有类似于incremedwaitrain的逻辑,间接说明v8.11.1分支中与incremedwaitrain相关的逻辑真的没用。
虽然这里有一段无用的代码,但是我们应该如何理解作者在递增等待流失上面的评论呢?为了进一步揭示真相,我继续花时间查看流的相关代码。可读,想知道如何决定数据事件的触发时间和顺序。
可读流的简单原理
在进一步解释数据事件的触发顺序之前,我们先简单说说可读流的实现原理。如果我们需要自己实现一个可读的流,我们可以使用新的流。可读(选项)方法,其中选项可以包含四个属性:高水位线、编码、对象模式和读取。最重要的属性是read属性。当流的用户需要数据时,使用read方法从数据源获取数据,然后通过this.push(chunk)将数据传递给用户。如果没有更多数据要读取,this.push(null)表示读取结束。
常量可读=必需('流')。可读;字母=ABCDEFG。拆分(“”);让索引=0;const RS=new Readable({ read(size){ this . push(字母[index]| | null);}});rs.on('data ',chunk={ console . log(chunk . tostring());});//输出//A//B//C//.这里,虽然ondata没有明显调用read方法,但它仍然通过调用read方法结合this.push在内部输出数据,在源代码内部,可以发现参数传递的read方法实际上是赋值给这个的。_阅读,然后是这个。_ read在Readable.prototype.read中调用以获取数据。
灵魂密码
为了进一步说明数据流的数据事件触发顺序和场景。可读,相关官方源代码修改删除如下:
函数可读(选项){ this。_ read=options.read//将参数传递的read函数赋给这个。_read}//用户可以读取数据,prototype . read=function(size){ var state=this。_ readablestate通过调用read方法;//模拟锁定,一旦不返回_read if (this.push),后续读取将不会继续调用_read读取数据if(!state . reading){ state . reading=true;state.sync=true//在push方法中使用sync来指示是否pushthis。_read (size)在_read内部同步调用;state.sync=false} //_read,如果同步调用push,数据会放入缓冲区。//_read,如果异步调用push且缓冲区中没有内容,数据可能会发出数据。//尝试从缓冲区(state.buffer)中获取具有大小的数据,如果成功,则在(ret) this.emit('data ',ret)中触发数据事件;返回ret};//输出数据可读. prototype.push=function (chunk,encoding) {var state=this。_在执行此过程中的readablestate。_ readtime _ read获取数据,并打开锁。state.reading=false//如果流模式缓冲区中没有异步返回的数据,则数据事件if (state。flowingstate.length===0!state.sync) { stream.emit('data ',chunk);stream . read(0);//触发下一次读取,如果异步推送_read,它还是会来到这里,像保持流出一样} else {//将数据放入缓冲区状态. length=chunk . length;state . buffer . push(chunk);}};//pause flow可读. prototype . pause=function(){ if(this。_ readablestate.flowing!==false) {这个。_ readableState.flowing=falsethis . emit(' pause ');}退回这个;};函数流(stream) { const state=stream。_ readableStatewhile(state . flow stream . read()!==null);}数据事件的触发时间和顺序
定时
数据只有两个触发器:
如果流模式下缓冲区中没有数据,将异步调用push,数据不会通过缓冲区。如果直接触发数据事件不满足上述情况,推送的数据会放入缓冲区,然后尝试从缓冲区读取指定大小的数据,触发数据事件序列。
数据的触发序列实际上是由发射序列决定的。为了讨论原问题:“为什么递增waitDrain的相关逻辑可以删除?”为了简化代码:
让计数=0;src.on('data ',chunk={ let ret=dest . write(chunk);if(!ret){ count;src . pause();}});在监控流的数据事件时,流最终会通过恢复和调用流函数进入流模式,即不断调用read方法读取数据。然后分析以下场景。当dest.write(chunk)返回false时,count将被执行几次。注意上面提到的灵魂密码。
场景一:每次_read同步推送数据,第一次读取发生时,数据同步推送至缓冲区,然后从缓冲区读取数据,通过emit data传输至ondata。如果dest.write(chunk)此时返回false,count将执行一次,然后调用stream.pause()。而条件state . flow为false,这将导致不再调用stream.read,并且在流再次流动之前,count的值不会继续增加。
场景二:每次read _异步推送数据,当第一次读取发生时,异步推送的数据会通过emit data直接传输到ondata,但是read函数中的emit不会被触发,因为它无法从缓冲区读取数据,同时read返回null,导致while循环相应停止。在这种情况下,在异步推送触发数据事件之后,下面的stream.read(0)将继续保持流的流动。当dest.write(chunk)返回false时,count将执行一次,流将被挂起,然后read将继续被调用一次,但这一次数据将被放入缓冲区,而不会触发数据事件,count仍将只执行一次。
场景流暂停一次后再次流动时,数据消耗方式会与之前不同,缓冲区数据会优先消耗,直到清空,但这不会导致count多次执行。
场景3:per _read多次同步推送数据与场景1类似,只是per _ read会多次将数据写入缓冲区,最后从缓冲区读取数据后触发data事件。
场景4:每次_read异步推送数据类似于场景2。假设one _read中有两个异步推送。执行第一次异步推送时,会触发数据事件,其中的dest.write(chunk)返回false,这将导致count同时挂起。当执行第二次异步推送时,数据将被写入缓冲区,而不是触发数据事件,因此count将只执行一次。
场景5:读操作可以是同步或异步推送。无论是同步推送还是异步推送,当ondata中的flow设置为暂停模式一次时,flow函数中的while条件state . flow为false,将导致stream.read不再被调用,异步推送的emit data判断条件也不再满足,也就是目前内部不会触发数据事件,直到外部再次间接或直接调用read方法。
模拟以上五种场景来分析这个问题。其实只要你能理解第五种场景,你就能理解一切。
摘要
文章最后的内容偏离了我的初衷,不知道如何评价这篇文章的质量。然而,在业余时间花两天时间深入了解stream是一件非常值得的事情。为了写好这篇文章,可读性更强,我更坚定自己能在写文章上走得更远。
PS:猜猜为什么会有烂片。可能是因为导演长期对创作的投入,会让他迷失内心,找不到问题。写文章也是。他辛辛苦苦写的文章,通过阅读很难优化。
PS:下图属于美团博客。可能我写了这么多,但是配不上这张图。解释很重要。
摘要
以上就是本文的全部内容。希望本文的内容对大家的学习或工作有一定的参考价值。谢谢你的支持。如果你想了解更多,请查看下面的相关链接
版权声明:探索数据上Node.js流的触发时间和顺序是由宝哥软件园云端程序自动收集整理而来。如果本文侵犯了你的权益,请联系本站底部QQ或者邮箱删除。