Pipes.Async
- Package
- purescript-node-stream-pipes
- Repository
- cakekindel/purescript-node-stream-pipes
#WriteSignal Source
#ReadSignal Source
#WriteResult Source
#ReadResult Source
data ReadResult aConstructors
Instances
Generic (ReadResult a) _(Eq a) => Eq (ReadResult a)(Ord a) => Ord (ReadResult a)Functor ReadResultFoldable ReadResultTraversable ReadResult(Show a) => Show (ReadResult a)
#AsyncIO Source
type AsyncIO :: Type -> Type -> (Type -> Type) -> Type -> Typetype AsyncIO a b m r = { awaitRead :: m ReadSignal, awaitWrite :: m WriteSignal, read :: m (ReadResult b), write :: a -> m WriteResult } /\ (AsyncPipe a b m r)
#AsyncPipe Source
data AsyncPipe :: Type -> Type -> (Type -> Type) -> Type -> Typedata AsyncPipe a b m r
An AsyncPipe is a Pipe-like struct that allows
concurrently reading from a Producer and writing to a Consumer.
An implementation of AsyncPipe for Node Transform streams
is provided in Pipes.Node.Stream.
Constructors
Pure rM (m (AsyncPipe a b m r))An
AsyncPipebehind a computationAsyncIO (AsyncIO a b m r)Interface to write & read from the backing resource
Instances
MonadTrans (AsyncPipe a b)(Monad m) => Functor (AsyncPipe a b m)(Monad m) => Apply (AsyncPipe a b m)(Monad m) => Applicative (AsyncPipe a b m)(Monad m) => Bind (AsyncPipe a b m)(Monad m) => Monad (AsyncPipe a b m)(MonadThrow e m) => MonadThrow e (AsyncPipe a b m)(MonadError e m) => MonadError e (AsyncPipe a b m)(MonadEffect m) => MonadEffect (AsyncPipe a b m)(MonadAff m) => MonadAff (AsyncPipe a b m)
#getAsyncIO Source
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))Execute the AsyncPipe monad stack until AsyncIO is reached (if any)
#sync Source
sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m rConvert an AsyncPipe to a regular Pipe.
Rather than two concurrently-running halves (producer & consumer),
this requires the AsyncPipe to occasionally stop awaiting data
written by the upstream Producer so that it can yield to the downstream Consumer.
This implementation chooses to prioritize yielding data to the Consumer over
awaiting written chunks.
Note that using this limits the potential parallelism of the entire pipeline, ex:
Pipe.FS.read "foo.csv" -- read
>-> sync Pipe.CSV.parse -- parse
>-> sync Pipe.CBOR.encode -- encode
>-> Pipe.FS.write "foo.bin" -- write
In the above example, this is what happens when the pipeline is executed:
writeasksencode"do you have any data yet?" (fast)encodeasksparse"do you have any data yet?" (fast)parseasksread"do you have any data yet?" (fast)readpasses 1 chunk toparse(fast)parseblocks until the chunk is parsed (slow)parsepasses 1 chunk toencode(fast)encodeblocks until the chunk is encoded (slow)writewrites the block (fast)
For larger workloads, changing this to use asyncPipe would be preferable, ex:
Pipe.FS.read "foo.csv" -- read
>-/-> Pipe.CSV.parse -- parse
>-/-> Pipe.CBOR.encode -- encode
>-> Pipe.FS.write "foo.bin" -- write
With this change:
readwill pass chunks toparseas fast asparseallowsparsewill parse chunks and yield them toencodeas soon as they're readyencodewill encode chunks and yield them towriteas soon as they're ready
#pipeAsync Source
pipeAsync :: forall f m a b. MonadRec m => MonadAff m => MonadBracket Error f m => Producer (Maybe a) m Unit -> AsyncPipe (Maybe a) (Maybe b) m Unit -> Producer (Maybe b) m UnitImplementation of (>-/->)
In the current MonadFork "thread", read data from the AsyncPipe as it
is yielded and yield to the downstream Consumer.
Concurrently, in a separate thread, read data from the upstream Producer
and write to the AsyncPipe at max throughput.
If the producing half fails, the error is caught and rethrown.
If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
A pure return value