Module

Pipes.Async

Package
purescript-node-stream-pipes
Repository
cakekindel/purescript-node-stream-pipes

#AsyncIO Source

type AsyncIO :: Type -> Type -> (Type -> Type) -> Type -> Typetype AsyncIO a b m r = { awaitRead :: m ReadSignal, awaitWrite :: m WriteSignal, read :: m (ReadResult b), write :: a -> m WriteResult } /\ (AsyncPipe a b m r)

#AsyncPipe Source

data AsyncPipe :: Type -> Type -> (Type -> Type) -> Type -> Typedata AsyncPipe a b m r

An AsyncPipe is a Pipe-like struct that allows concurrently reading from a Producer and writing to a Consumer.

An implementation of AsyncPipe for Node Transform streams is provided in Pipes.Node.Stream.

Constructors

  • Pure r

    A pure return value

  • M (m (AsyncPipe a b m r))

    An AsyncPipe behind a computation

  • AsyncIO (AsyncIO a b m r)

    Interface to write & read from the backing resource

Instances

#mapIO Source

mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r

Modify request / response types

#bindIO Source

bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r

Modify request / response types

#stripIO Source

stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r

Remove the AsyncPipe wrapper by discarding the IO

#getAsyncIO Source

getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))

Execute the AsyncPipe monad stack until AsyncIO is reached (if any)

#debug Source

debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r

Wraps all fields of an AsyncPipe with logging to debug behavior and timing.

#sync Source

sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r

Convert an AsyncPipe to a regular Pipe.

Rather than two concurrently-running halves (producer & consumer), this requires the AsyncPipe to occasionally stop awaiting data written by the upstream Producer so that it can yield to the downstream Consumer.

This implementation chooses to prioritize yielding data to the Consumer over awaiting written chunks.

Note that using this limits the potential parallelism of the entire pipeline, ex:

Pipe.FS.read "foo.csv"        -- read
  >-> sync Pipe.CSV.parse     -- parse
  >-> sync Pipe.CBOR.encode   -- encode
  >-> Pipe.FS.write "foo.bin" -- write

In the above example, this is what happens when the pipeline is executed:

  1. write asks encode "do you have any data yet?" (fast)
  2. encode asks parse "do you have any data yet?" (fast)
  3. parse asks read "do you have any data yet?" (fast)
  4. read passes 1 chunk to parse (fast)
  5. parse blocks until the chunk is parsed (slow)
  6. parse passes 1 chunk to encode (fast)
  7. encode blocks until the chunk is encoded (slow)
  8. write writes the block (fast)

For larger workloads, changing this to use asyncPipe would be preferable, ex:

Pipe.FS.read "foo.csv"        -- read
  >-/-> Pipe.CSV.parse        -- parse
  >-/-> Pipe.CBOR.encode      -- encode
  >-> Pipe.FS.write "foo.bin" -- write

With this change:

  • read will pass chunks to parse as fast as parse allows
  • parse will parse chunks and yield them to encode as soon as they're ready
  • encode will encode chunks and yield them to write as soon as they're ready

#pipeAsync Source

pipeAsync :: forall e f m a b. MonadRec m => MonadAff m => MonadBracket e f m => Producer (Maybe a) m Unit -> AsyncPipe (Maybe a) (Maybe b) m Unit -> Producer (Maybe b) m Unit

Implementation of (>-/->)

In the current MonadFork "thread", read data from the AsyncPipe as it is yielded and yield to the downstream Consumer.

Concurrently, in a separate thread, read data from the upstream Producer and write to the AsyncPipe at max throughput.

If the producing half fails, the error is caught and rethrown.

If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.

#(>-/->) Source

Operator alias for Pipes.Async.pipeAsync (left-associative / precedence 7)