Yoga.Om.Strom
- Package
- purescript-yoga-om-strom
- Repository
- rowtype-yoga/purescript-yoga-om-strom
#Strom Source
newtype Strom :: Type -> Row Type -> Type -> Typenewtype Strom ctx err a
A Strom is a stream of values that can:
- Emit zero or more values of type
a - Fail with errors tracked in the
errrow - Require a context
ctx - Perform effects in Om
Think of it as: Om ctx err (Array a) but lazy and chunked
The Step type is interpreted as:
- Loop (chunk, next) => emit chunk (if present) and continue with next
- Done finalChunk => emit final chunk (if present) and stop
Instances
#fromFoldable Source
fromFoldable :: forall ctx err f a. Foldable f => f a -> Strom ctx err aCreate a stream from any foldable
#rangeStrom Source
rangeStrom :: forall ctx err. Int -> Int -> Strom ctx err IntCreate a stream of integers in a range [start, end)
#iterateStrom Source
iterateStrom :: forall ctx err a. (a -> a) -> a -> Strom ctx err aIterate producing values - limited to ~10,000 elements for stack safety
For truly infinite streams, use iterateStromInfinite (with Aff overhead)
#iterateStromInfinite Source
iterateStromInfinite :: forall ctx err a. (a -> a) -> a -> Strom ctx err aTruly infinite iteration - stack-safe via Aff async boundaries Adds tiny delay (0ms) at each chunk to reset the stack Use this when you need more than 10k elements
#repeatStrom Source
repeatStrom :: forall ctx err a. a -> Strom ctx err aRepeat a value - limited to ~10,000 elements for stack safety
#repeatStromInfinite Source
repeatStromInfinite :: forall ctx err a. a -> Strom ctx err aTruly infinite repeat - stack-safe via Aff async boundaries Adds tiny delay (0ms) at each chunk to reset the stack
#repeatOmStrom Source
repeatOmStrom :: forall ctx err a. Om ctx err a -> Strom ctx err aRepeat an Om computation - limited to 100 iterations for stack safety
For truly infinite repetition, use repeatOmStromInfinite (with Aff overhead)
#repeatOmStromInfinite Source
repeatOmStromInfinite :: forall ctx err a. Om ctx err a -> Strom ctx err aTruly infinite Om repetition - stack-safe via Aff async boundaries Adds tiny delay (0ms) at each pull to reset the stack
#unfoldStrom Source
unfoldStrom :: forall ctx err a b. (b -> Maybe (a /\ b)) -> b -> Strom ctx err aUnfold a stream from a seed value - can be infinite if f never returns Nothing Emits chunks lazily, terminates when f returns Nothing
#unfoldOmStrom Source
unfoldOmStrom :: forall ctx err a b. (b -> Om ctx err (Maybe (a /\ b))) -> b -> Strom ctx err aUnfold a stream with an effectful function
#runCollect Source
runCollect :: forall ctx err a. Strom ctx err a -> Om ctx err (Array a)Collect all elements into an array (O(n) using chunk accumulation)
#mapAccumStrom Source
mapAccumStrom :: forall ctx err a b s. (s -> a -> (s /\ b)) -> s -> Strom ctx err a -> Strom ctx err bMap with accumulator (stateful map)
#filterStrom Source
filterStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err aFilter elements
#takeWhileStrom Source
takeWhileStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err aTake while predicate is true
#takeUntilStrom Source
takeUntilStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err aTake until predicate is true (includes the matching element)
#dropWhileStrom Source
dropWhileStrom :: forall ctx err a. (a -> Boolean) -> Strom ctx err a -> Strom ctx err aDrop while predicate is true
#filterMStrom Source
filterMStrom :: forall ctx err a. (a -> Om ctx err Boolean) -> Strom ctx err a -> Strom ctx err aFilter with monadic predicate
#collectStrom Source
collectStrom :: forall ctx err a b. (a -> Maybe b) -> Strom ctx err a -> Strom ctx err bCollect with partial function (mapMaybe)
#collectMStrom Source
collectMStrom :: forall ctx err a b. (a -> Om ctx err (Maybe b)) -> Strom ctx err a -> Strom ctx err bCollect with monadic partial function
#changesStrom Source
changesStrom :: forall ctx err a. Eq a => Strom ctx err a -> Strom ctx err aRemove consecutive duplicates
#groupedStrom Source
groupedStrom :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err (Array a)Group elements into fixed-size arrays
#chunkedStrom Source
chunkedStrom :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err (Array a)Alias for groupedStrom
#groupByStrom Source
groupByStrom :: forall ctx err a k. Eq k => (a -> k) -> Strom ctx err a -> Strom ctx err (Array a)Group consecutive elements by a key function Emits groups when the key changes
#debounce Source
debounce :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err aDebounce a stream - adds a delay after each element Useful for rate-limiting or giving downstream time to process
#throttle Source
throttle :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err aThrottle a stream - emit at most once per duration period Tracks last emission time and skips elements that arrive too quickly Note: Uses wall-clock time, not processing time
#delayStrom Source
delayStrom :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err aDelay each element by a duration
#appendStrom Source
appendStrom :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err aRun a stream in the background, returning a Fiber that can be cancelled
To use cancellable streams, convert your stream to Om and use Om.launchOm:
Example:
fiber <- Om.launchOm ctx handlers (Strom.runCollect myStream)
-- ... later ...
killFiber (error "Cancelled") fiber
Or for subscription-style:
fiber <- Om.launchOm ctx handlers (Strom.traverseStrom_ callback myStream)
-- ... later ...
killFiber (error "Unsubscribed") fiber
Note: You can use Om.launchOm directly with any Om-returning function like:
runCollect- collect results in backgroundrunDrain- run for side effectstraverseStrom_- process each element with callbackrunFold- fold with accumulator Append two streams Now properly handles multi-chunk streams with new Step protocol Note: s2 is appended TO s1 (s1 comes first, then s2) This matches pipeline semantics: stream # appendStrom other means stream followed by other
#concatStrom Source
concatStrom :: forall ctx err a. Array (Strom ctx err a) -> Strom ctx err aConcatenate an array of streams Uses foldr with flip to match appendStrom's pipeline-friendly parameter order
#zipWithStrom Source
zipWithStrom :: forall ctx err a b c. (a -> b -> c) -> Strom ctx err a -> Strom ctx err b -> Strom ctx err cZip two streams with a function
#interleave Source
interleave :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err aInterleave two streams - alternates between elements from each stream Similar to merge but deterministic - always alternates
#intersperse Source
intersperse :: forall ctx err a. a -> Strom ctx err a -> Strom ctx err aIntersperse a separator element between each element of the stream
#HaltStrategy Source
#mergeWith Source
mergeWith :: forall ctx err a b c. HaltStrategy -> (a -> c) -> (b -> c) -> Strom ctx err a -> Strom ctx err b -> Strom ctx err cMerge with halt strategy and mapping functions (ZStream-like)
#mergeHaltEither Source
mergeHaltEither :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a#mergeHaltLeft Source
mergeHaltLeft :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a#mergeHaltRight Source
mergeHaltRight :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err a#mergeND Source
mergeND :: forall ctx err a. Strom ctx err a -> Strom ctx err a -> Strom ctx err aNon-deterministic merge: Merge two streams concurrently, emitting elements as soon as they're available
Unlike merge, this runs both streams in parallel and elements may arrive in any order
The stream that produces results faster will have its elements appear first
Optimized: Spawns producer fibers once and uses a queue for communication
#mergeNDWith Source
mergeNDWith :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err a -> Strom ctx err aNon-deterministic merge with configurable buffer capacity (chunk queue)
#mergeAllND Source
mergeAllND :: forall ctx err a. Array (Strom ctx err a) -> Strom ctx err aNon-deterministic merge of multiple streams All streams run concurrently and emit elements as soon as they're available
#mergeAllNDWith Source
mergeAllNDWith :: forall ctx err a. Int -> Array (Strom ctx err a) -> Strom ctx err aNon-deterministic merge of multiple streams with configurable buffer capacity
#mapParUnordered Source
mapParUnordered :: forall ctx err a b. Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err bUnordered parallel map (currently same as mapPar)
#mapMParUnordered Source
mapMParUnordered :: forall ctx err a b. Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err bAlias for unordered mapPar with explicit "M" naming
#bufferChunks Source
bufferChunks :: forall ctx err a. Int -> Strom ctx err a -> Strom ctx err aBuffer stream chunks with backpressure (preserves chunking)
#timeout Source
timeout :: forall ctx err a. Milliseconds -> Strom ctx err a -> Strom ctx err aAdd a timeout to stream operations If the stream doesn't complete within the duration, it terminates as empty