# Pipes.Async

- Package
- purescript-node-stream-pipes
- Repository
- cakekindel/purescript-node-stream-pipes

### #WriteSignal Source

### #ReadSignal Source

### #WriteResult Source

### #ReadResult Source

`data ReadResult a`

#### Constructors

#### Instances

`Generic (ReadResult a) _`

`(Eq a) => Eq (ReadResult a)`

`(Ord a) => Ord (ReadResult a)`

`Functor ReadResult`

`Foldable ReadResult`

`Traversable ReadResult`

`(Show a) => Show (ReadResult a)`

### #AsyncIO Source

`type AsyncIO :: Type -> Type -> (Type -> Type) -> Type -> Type`

`type AsyncIO a b m r = { awaitRead :: m ReadSignal, awaitWrite :: m WriteSignal, read :: m (ReadResult b), write :: a -> m WriteResult } /\ (AsyncPipe a b m r)`

### #AsyncPipe Source

`data AsyncPipe :: Type -> Type -> (Type -> Type) -> Type -> Type`

`data AsyncPipe a`

_{}b_{}m_{}r_{}

An `AsyncPipe`

is a `Pipe`

-like struct that allows
concurrently reading from a `Producer`

and writing to a `Consumer`

.

An implementation of `AsyncPipe`

for Node `Transform`

streams
is provided in `Pipes.Node.Stream`

.

#### Constructors

`Pure r`

`M (m (AsyncPipe a b m r))`

An

`AsyncPipe`

behind a computation`AsyncIO (AsyncIO a b m r)`

Interface to write & read from the backing resource

#### Instances

`MonadTrans (AsyncPipe a b)`

`MFunctor (AsyncPipe a b)`

`(Monad m) => Functor (AsyncPipe a b m)`

`(Monad m) => Apply (AsyncPipe a b m)`

`(Monad m) => Applicative (AsyncPipe a b m)`

`(Monad m) => Bind (AsyncPipe a b m)`

`(Monad m) => Monad (AsyncPipe a b m)`

`(MonadThrow e m) => MonadThrow e (AsyncPipe a b m)`

`(MonadError e m) => MonadError e (AsyncPipe a b m)`

`(MonadEffect m) => MonadEffect (AsyncPipe a b m)`

`(MonadAff m) => MonadAff (AsyncPipe a b m)`

### #getAsyncIO Source

`getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))`

Execute the `AsyncPipe`

monad stack until `AsyncIO`

is reached (if any)

### #sync Source

`sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r`

Convert an `AsyncPipe`

to a regular `Pipe`

.

Rather than two concurrently-running halves (producer & consumer),
this requires the `AsyncPipe`

to occasionally stop `await`

ing data
written by the upstream `Producer`

so that it can `yield`

to the downstream `Consumer`

.

This implementation chooses to prioritize `yield`

ing data to the `Consumer`

over
`await`

ing written chunks.

Note that using this limits the potential parallelism of the entire pipeline, ex:

```
Pipe.FS.read "foo.csv" -- read
>-> sync Pipe.CSV.parse -- parse
>-> sync Pipe.CBOR.encode -- encode
>-> Pipe.FS.write "foo.bin" -- write
```

In the above example, this is what happens when the pipeline is executed:

`write`

asks`encode`

"do you have any data yet?" (fast)`encode`

asks`parse`

"do you have any data yet?" (fast)`parse`

asks`read`

"do you have any data yet?" (fast)`read`

passes 1 chunk to`parse`

(fast)`parse`

blocks until the chunk is parsed (slow)`parse`

passes 1 chunk to`encode`

(fast)`encode`

blocks until the chunk is encoded (slow)`write`

writes the block (fast)

For larger workloads, changing this to use `asyncPipe`

would be preferable, ex:

```
Pipe.FS.read "foo.csv" -- read
>-/-> Pipe.CSV.parse -- parse
>-/-> Pipe.CBOR.encode -- encode
>-> Pipe.FS.write "foo.bin" -- write
```

With this change:

`read`

will pass chunks to`parse`

as fast as`parse`

allows`parse`

will parse chunks and yield them to`encode`

as soon as they're ready`encode`

will encode chunks and yield them to`write`

as soon as they're ready

### #pipeAsync Source

`pipeAsync :: forall e f m a b. MonadRec m => MonadAff m => MonadBracket e f m => Producer (Maybe a) m Unit -> AsyncPipe (Maybe a) (Maybe b) m Unit -> Producer (Maybe b) m Unit`

Implementation of `(>-/->)`

In the current `MonadFork`

"thread", read data from the `AsyncPipe`

as it
is yielded and `yield`

to the downstream `Consumer`

.

Concurrently, in a separate thread, read data from the upstream `Producer`

and write to the `AsyncPipe`

at max throughput.

If the producing half fails, the error is caught and rethrown.

If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.

A pure return value