Module

Yoga.Om.Strom

Package
purescript-yoga-om-strom
Repository
rowtype-yoga/purescript-yoga-om-strom

#Strom Source

newtype Strom :: Type -> Row Type -> Type -> Typenewtype Strom ctx err a

A Strom is a stream of values that can:

  • Emit zero or more values of type a
  • Fail with errors tracked in the err row
  • Require a context ctx
  • Perform effects in Om

Think of it as: Om ctx err (Array a) but lazy and chunked

The Step type is interpreted as:

  • Loop (chunk, next) => emit chunk (if present) and continue with next
  • Done finalChunk => emit final chunk (if present) and stop

Instances

#empty Source

empty :: forall ctx err a. Strom ctx err a

An empty stream

#succeed Source

succeed :: forall ctx err a. a -> Strom ctx err a

A stream with a single element

#fail Source

fail :: forall ctx err a. Strom ctx err a

A stream that fails immediately with an empty result (no error thrown)

#fromAff Source

fromAff :: forall ctx err a. Aff a -> Strom ctx err a

Create a stream from an Aff computation

#fromOm Source

fromOm :: forall ctx err a. Om ctx err a -> Strom ctx err a

Create a stream from an Om computation

#fromArray Source

fromArray :: forall ctx err a. Array a -> Strom ctx err a

Create a stream from an array

#fromFoldable Source

fromFoldable :: forall ctx err f a. Foldable f => f a -> Strom ctx err a

Create a stream from any foldable

#rangeStrom Source

rangeStrom :: forall ctx err. Int -> Int -> Strom ctx err Int

Create a stream of integers in a range [start, end)

#iterateStrom Source

iterateStrom :: forall ctx err a. (a -> a) -> a -> Strom ctx err a

Iterate producing values - limited to ~10,000 elements for stack safety For truly infinite streams, use iterateStromInfinite (with Aff overhead)

#iterateStromInfinite Source

iterateStromInfinite :: forall ctx err a. (a -> a) -> a -> Strom ctx err a

Truly infinite iteration - stack-safe via Aff async boundaries Adds tiny delay (0ms) at each chunk to reset the stack Use this when you need more than 10k elements

#repeatStrom Source

repeatStrom :: forall ctx err a. a -> Strom ctx err a

Repeat a value - limited to ~10,000 elements for stack safety

#repeatStromInfinite Source

repeatStromInfinite :: forall ctx err a. a -> Strom ctx err a

Truly infinite repeat - stack-safe via Aff async boundaries Adds tiny delay (0ms) at each chunk to reset the stack

#repeatOmStrom Source

repeatOmStrom :: forall ctx err a. Om ctx err a -> Strom ctx err a

Repeat an Om computation - limited to 100 iterations for stack safety For truly infinite repetition, use repeatOmStromInfinite (with Aff overhead)

#repeatOmStromInfinite Source

repeatOmStromInfinite :: forall ctx err a. Om ctx err a -> Strom ctx err a

Truly infinite Om repetition - stack-safe via Aff async boundaries Adds tiny delay (0ms) at each pull to reset the stack

#unfoldStrom Source

unfoldStrom :: forall ctx err a b. (b -> Maybe (a /\ b)) -> b -> Strom ctx err a

Unfold a stream from a seed value - can be infinite if f never returns Nothing Emits chunks lazily, terminates when f returns Nothing

#unfoldOmStrom Source

unfoldOmStrom :: forall ctx err a b. (b -> Om ctx err (Maybe (a /\ b))) -> b -> Strom ctx err a

Unfold a stream with an effectful function

#runCollect Source

runCollect :: forall ctx err a. Strom ctx err a -> Om ctx err (Array a)

Collect all elements into an array (O(n) using chunk accumulation)

#runDrain Source

runDrain :: forall ctx err a. Strom ctx err a -> Om ctx err Unit

Run the stream and discard all elements

#runFold Source

runFold :: forall ctx err a b. b -> (b -> a -> b) -> Strom ctx err a -> Om ctx err b

Fold over all elements

#traverseStrom_ Source

traverseStrom_ :: forall ctx err a. (a -> Om ctx err Unit) -> Strom ctx err a -> Om ctx err Unit

Traverse the stream with effects, discarding results

#forStrom_ Source

forStrom_ :: forall ctx err a. Strom ctx err a -> (a -> Om ctx err Unit) -> Om ctx err Unit

Alias for traverseStrom_ with arguments flipped

#forMStrom_ Source

forMStrom_ :: forall ctx err a. Strom ctx err a -> (a -> Om ctx err Unit) -> Om ctx err Unit

Alias for traverseStrom_ (FP convention)

#traverseMStrom_ Source

traverseMStrom_ :: forall ctx err a. (a -> Om ctx err Unit) -> Strom ctx err a -> Om ctx err Unit

Alias for traverseStrom_ (explicit monadic naming)

#mapStrom Source

mapStrom :: forall ctx err a b. (a -> b) -> Strom ctx err a -> Strom ctx err b

Map over elements

#mapMStrom Source

mapMStrom :: forall ctx err a b. (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b

Map with a monadic effect

#bindStrom Source

bindStrom :: forall ctx err a b. (a -> Strom ctx err b) -> Strom ctx err a -> Strom ctx err b

Bind for streams (monadic flatMap/chain) Use >>= operator for cleaner syntax: stream >>= f

#scanStrom Source

scanStrom :: forall ctx err a b. (b -> a -> b) -> b -> Strom ctx err a -> Strom ctx err b

Scan with accumulator (like foldl but emits intermediate results)

#mapAccumStrom Source

mapAccumStrom :: forall ctx err a b s. (s -> a -> (s /\ b)) -> s -> Strom ctx err a -> Strom ctx err b

Map with accumulator (stateful map)

#tapStrom Source

tapStrom :: forall ctx err a. (a -> Unit) -> Strom ctx err a -> Strom ctx err a

Tap (observe without modifying)

#tapMStrom Source

tapMStrom :: forall ctx err a. (a -> Om ctx err Unit) -> Strom ctx err a -> Strom ctx err a

Tap with monadic effect (observe with effects)

#filterStrom Source

filterStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err a

Filter elements

#takeStrom Source

takeStrom :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a

Take n elements (with proper state tracking across chunks)

#takeWhileStrom Source

takeWhileStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err a

Take while predicate is true

#takeUntilStrom Source

takeUntilStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err a

Take until predicate is true (includes the matching element)

#dropStrom Source

dropStrom :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a

Drop n elements (with proper state tracking across chunks)

#dropWhileStrom Source

dropWhileStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err a

Drop while predicate is true

#filterMStrom Source

filterMStrom :: forall ctx err a. (a -> Om ctx err Boolean) -> Strom ctx err a -> Strom ctx err a

Filter with monadic predicate

#collectStrom Source

collectStrom :: forall ctx err a b. (a -> Maybe b) -> Strom ctx err a -> Strom ctx err b

Collect with partial function (mapMaybe)

#collectMStrom Source

collectMStrom :: forall ctx err a b. (a -> Om ctx err (Maybe b)) -> Strom ctx err a -> Strom ctx err b

Collect with monadic partial function

#changesStrom Source

changesStrom :: forall ctx err a. Eq a => Strom ctx err a -> Strom ctx err a

Remove consecutive duplicates

#groupedStrom Source

groupedStrom :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err (Array a)

Group elements into fixed-size arrays

#chunkedStrom Source

chunkedStrom :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err (Array a)

Alias for groupedStrom

#groupByStrom Source

groupByStrom :: forall ctx err a k. Eq k => (a -> k) -> Strom ctx err a -> Strom ctx err (Array a)

Group consecutive elements by a key function Emits groups when the key changes

#partition Source

partition :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> ((Strom ctx err a) /\ (Strom ctx err a))

Partition a stream by a predicate into two separate operations Returns a tuple of (trues, falses) Note: This requires running the stream twice, so it's not lazy

#partitionMap Source

partitionMap :: forall ctx err a b c. (a -> Either b c) -> Strom ctx err a -> ((Strom ctx err b) /\ (Strom ctx err c))

Partition and map in one pass

#debounce Source

debounce :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err a

Debounce a stream - adds a delay after each element Useful for rate-limiting or giving downstream time to process

#throttle Source

throttle :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err a

Throttle a stream - emit at most once per duration period Tracks last emission time and skips elements that arrive too quickly Note: Uses wall-clock time, not processing time

#delayStrom Source

delayStrom :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err a

Delay each element by a duration

#appendStrom Source

appendStrom :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

Run a stream in the background, returning a Fiber that can be cancelled

To use cancellable streams, convert your stream to Om and use Om.launchOm:

Example:

fiber <- Om.launchOm ctx handlers (Strom.runCollect myStream)
-- ... later ...
killFiber (error "Cancelled") fiber

Or for subscription-style:

fiber <- Om.launchOm ctx handlers (Strom.traverseStrom_ callback myStream)
-- ... later ...  
killFiber (error "Unsubscribed") fiber

Note: You can use Om.launchOm directly with any Om-returning function like:

  • runCollect - collect results in background
  • runDrain - run for side effects
  • traverseStrom_ - process each element with callback
  • runFold - fold with accumulator Append two streams Now properly handles multi-chunk streams with new Step protocol Note: s2 is appended TO s1 (s1 comes first, then s2) This matches pipeline semantics: stream # appendStrom other means stream followed by other

#concatStrom Source

concatStrom :: forall ctx err a. Array (Strom ctx err a) -> Strom ctx err a

Concatenate an array of streams Uses foldr with flip to match appendStrom's pipeline-friendly parameter order

#zipStrom Source

zipStrom :: forall ctx err a b. Strom ctx err a -> Strom ctx err b -> Strom ctx err (a /\ b)

Zip two streams together

#zipWithStrom Source

zipWithStrom :: forall ctx err a b c. (a -> b -> c) -> Strom ctx err a -> Strom ctx err b -> Strom ctx err c

Zip two streams with a function

#interleave Source

interleave :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

Interleave two streams - alternates between elements from each stream Similar to merge but deterministic - always alternates

#intersperse Source

intersperse :: forall ctx err a. a -> Strom ctx err a -> Strom ctx err a

Intersperse a separator element between each element of the stream

#HaltStrategy Source

data HaltStrategy

Merge termination strategy (like ZStream HaltStrategy)

Constructors

Instances

#merge Source

merge :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

Merge two streams concurrently (nondeterministic like ZStream merge) Elements are emitted as soon as they're available from either stream

#mergeAll Source

mergeAll :: forall ctx err a. Array (Strom ctx err a) -> Strom ctx err a

Merge multiple streams concurrently

#mergeWith Source

mergeWith :: forall ctx err a b c. HaltStrategy -> (a -> c) -> (b -> c) -> Strom ctx err a -> Strom ctx err b -> Strom ctx err c

Merge with halt strategy and mapping functions (ZStream-like)

#mergeHaltEither Source

mergeHaltEither :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

#mergeHaltLeft Source

mergeHaltLeft :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

#mergeHaltRight Source

mergeHaltRight :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

#mergeND Source

mergeND :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

Non-deterministic merge: Merge two streams concurrently, emitting elements as soon as they're available Unlike merge, this runs both streams in parallel and elements may arrive in any order The stream that produces results faster will have its elements appear first Optimized: Spawns producer fibers once and uses a queue for communication

#mergeNDWith Source

mergeNDWith :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a -> Strom ctx err a

Non-deterministic merge with configurable buffer capacity (chunk queue)

#mergeAllND Source

mergeAllND :: forall ctx err a. Array (Strom ctx err a) -> Strom ctx err a

Non-deterministic merge of multiple streams All streams run concurrently and emit elements as soon as they're available

#mergeAllNDWith Source

mergeAllNDWith :: forall ctx err a. Int -> Array (Strom ctx err a) -> Strom ctx err a

Non-deterministic merge of multiple streams with configurable buffer capacity

#race Source

race :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a

Race two streams - returns elements from whichever produces first Uses Om.race for true concurrent racing

#raceAll Source

raceAll :: forall ctx err a. Array (Strom ctx err a) -> Strom ctx err a

Race multiple streams - returns elements from whichever produces first

#mapPar Source

mapPar :: forall ctx err a b. Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b

Map over elements in parallel with bounded concurrency Processes chunks in parallel groups up to the concurrency limit

#mapMPar Source

mapMPar :: forall ctx err a b. Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b

Alias for mapPar with explicit "M" naming

#foreachPar Source

foreachPar :: forall ctx err a. Int -> (a -> Om ctx err Unit) -> Strom ctx err a -> Om ctx err Unit

For each element, execute an effect in parallel with bounded concurrency

#mapParUnordered Source

mapParUnordered :: forall ctx err a b. Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b

Unordered parallel map (currently same as mapPar)

#mapMParUnordered Source

mapMParUnordered :: forall ctx err a b. Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b

Alias for unordered mapPar with explicit "M" naming

#foreachParUnordered Source

foreachParUnordered :: forall ctx err a. Int -> (a -> Om ctx err Unit) -> Strom ctx err a -> Om ctx err Unit

Unordered parallel foreach

#buffer Source

buffer :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a

Buffer stream elements with backpressure (destroys chunking)

#bufferChunks Source

bufferChunks :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a

Buffer stream chunks with backpressure (preserves chunking)

#catchAll Source

catchAll :: forall ctx err a. (Variant (exception :: Error | err) -> Strom ctx err a) -> Strom ctx err a -> Strom ctx err a

Catch all errors and recover with a handler function Note: Uses handleErrors' for full error handling

#retry Source

retry :: forall ctx err a. Strom ctx err a -> Strom ctx err a

Retry a stream on failure up to 3 times

#retryN Source

retryN :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a

Retry a stream N times on failure

#timeout Source

timeout :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err a

Add a timeout to stream operations If the stream doesn't complete within the duration, it terminates as empty

#ensuring Source

ensuring :: forall ctx err a. Om ctx err Unit -> Strom ctx err a -> Strom ctx err a

Run a finaliser after the stream completes (success or failure)

#bracket Source

bracket :: forall ctx err a b. Om ctx err a -> (a -> Om ctx err Unit) -> (a -> Strom ctx err b) -> Strom ctx err b

Bracket pattern: acquire, use, release Guarantees release runs even if stream fails

#bracketExit Source

bracketExit :: forall ctx err a b. Om ctx err a -> (a -> Maybe (Variant (exception :: Error | err)) -> Om ctx err Unit) -> (a -> Strom ctx err b) -> Strom ctx err b

Bracket with exit information (whether stream succeeded or failed)

#acquireRelease Source

acquireRelease :: forall ctx err a b. Om ctx err a -> (a -> Om ctx err Unit) -> (a -> Strom ctx err b) -> Strom ctx err b

Simplified acquire-release pattern

#mkStrom Source

mkStrom :: forall ctx err a. Om ctx err (Step ((Maybe (Chunk a)) /\ (Strom ctx err a)) (Maybe (Chunk a))) -> Strom ctx err a

Helper to create a Strom from a pull function

#runStrom Source

runStrom :: forall ctx err a. Strom ctx err a -> Om ctx err (Step ((Maybe (Chunk a)) /\ (Strom ctx err a)) (Maybe (Chunk a)))

Helper to unwrap a Strom