Pipes.Async
- Package
- purescript-node-stream-pipes
- Repository
- cakekindel/purescript-node-stream-pipes
#WriteSignal Source
#ReadSignal Source
#WriteResult Source
#ReadResult Source
data ReadResult a
Constructors
Instances
Generic (ReadResult a) _
(Eq a) => Eq (ReadResult a)
(Ord a) => Ord (ReadResult a)
Functor ReadResult
Foldable ReadResult
Traversable ReadResult
(Show a) => Show (ReadResult a)
#AsyncIO Source
type AsyncIO :: Type -> Type -> (Type -> Type) -> Type -> Type
type AsyncIO a b m r = { awaitRead :: m ReadSignal, awaitWrite :: m WriteSignal, read :: m (ReadResult b), write :: a -> m WriteResult } /\ (AsyncPipe a b m r)
#AsyncPipe Source
data AsyncPipe :: Type -> Type -> (Type -> Type) -> Type -> Type
data AsyncPipe a b m r
An AsyncPipe
is a Pipe
-like struct that allows
concurrently reading from a Producer
and writing to a Consumer
.
An implementation of AsyncPipe
for Node Transform
streams
is provided in Pipes.Node.Stream
.
Constructors
Pure r
M (m (AsyncPipe a b m r))
An
AsyncPipe
behind a computationAsyncIO (AsyncIO a b m r)
Interface to write & read from the backing resource
Instances
MonadTrans (AsyncPipe a b)
MFunctor (AsyncPipe a b)
(Monad m) => Functor (AsyncPipe a b m)
(Monad m) => Apply (AsyncPipe a b m)
(Monad m) => Applicative (AsyncPipe a b m)
(Monad m) => Bind (AsyncPipe a b m)
(Monad m) => Monad (AsyncPipe a b m)
(MonadThrow e m) => MonadThrow e (AsyncPipe a b m)
(MonadError e m) => MonadError e (AsyncPipe a b m)
(MonadEffect m) => MonadEffect (AsyncPipe a b m)
(MonadAff m) => MonadAff (AsyncPipe a b m)
#getAsyncIO Source
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
Execute the AsyncPipe
monad stack until AsyncIO
is reached (if any)
#sync Source
sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
Convert an AsyncPipe
to a regular Pipe
.
Rather than two concurrently-running halves (producer & consumer),
this requires the AsyncPipe
to occasionally stop await
ing data
written by the upstream Producer
so that it can yield
to the downstream Consumer
.
This implementation chooses to prioritize yield
ing data to the Consumer
over
await
ing written chunks.
Note that using this limits the potential parallelism of the entire pipeline, ex:
Pipe.FS.read "foo.csv" -- read
>-> sync Pipe.CSV.parse -- parse
>-> sync Pipe.CBOR.encode -- encode
>-> Pipe.FS.write "foo.bin" -- write
In the above example, this is what happens when the pipeline is executed:
write
asksencode
"do you have any data yet?" (fast)encode
asksparse
"do you have any data yet?" (fast)parse
asksread
"do you have any data yet?" (fast)read
passes 1 chunk toparse
(fast)parse
blocks until the chunk is parsed (slow)parse
passes 1 chunk toencode
(fast)encode
blocks until the chunk is encoded (slow)write
writes the block (fast)
For larger workloads, changing this to use asyncPipe
would be preferable, ex:
Pipe.FS.read "foo.csv" -- read
>-/-> Pipe.CSV.parse -- parse
>-/-> Pipe.CBOR.encode -- encode
>-> Pipe.FS.write "foo.bin" -- write
With this change:
read
will pass chunks toparse
as fast asparse
allowsparse
will parse chunks and yield them toencode
as soon as they're readyencode
will encode chunks and yield them towrite
as soon as they're ready
#pipeAsync Source
pipeAsync :: forall e f m a b. MonadRec m => MonadAff m => MonadBracket e f m => Producer (Maybe a) m Unit -> AsyncPipe (Maybe a) (Maybe b) m Unit -> Producer (Maybe b) m Unit
Implementation of (>-/->)
In the current MonadFork
"thread", read data from the AsyncPipe
as it
is yielded and yield
to the downstream Consumer
.
Concurrently, in a separate thread, read data from the upstream Producer
and write to the AsyncPipe
at max throughput.
If the producing half fails, the error is caught and rethrown.
If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
A pure return value