Strom is a powerful, ZIO ZStream-inspired streaming library for yoga-om that brings pull-based, composable, and resource-safe stream processing to PureScript.
Just as Om represents your effect type, Strom represents streaming effects. The name comes from German/Swedish meaning "stream" or "current" - fitting for a streaming library!
Strom brings the best ideas from Scala's ZIO ZStream to PureScript:
- Pull-based streams - Efficient, backpressure-aware processing
- Chunked execution - Batch processing for optimal performance
- Resource safety - Proper cleanup and error handling
- Type-safe - Leverages Om's context and error tracking
- Construction:
fromArray,range,iterate,repeat,unfold,repeatOm - Transformation:
map,mapM,mapParallel,bind(or>>=),scan,mapAccum - Selection:
take,takeWhile,takeUntil,drop,dropWhile,filter,collect,changes - Combination:
append,concat,merge,zip,zipWith,interleave,race - Grouping:
grouped,chunked,partition - Execution:
runCollect,runDrain,runFold,traverse_,for_,subscribe - Error Handling:
catchAll,orElse
spago install yoga-om-core yoga-om-stromimport Yoga.Om.Strom as Strom
-- Simple transformation pipeline
result <-
Strom.range 1 10
# Strom.map (_ * 2)
# Strom.filter (_ > 10)
# Strom.runCollect
-- Output: [12, 14, 16, 18]import Yoga.Om.Strom.Do (guard)
-- Pythagoras triples
triples <-
(do
a <- Strom.range 1 20
b <- Strom.range a 20
c <- Strom.range b 20
guard (a * a + b * b == c * c)
pure (Tuple a (Tuple b c))
) # Strom.runCollect
-- Output: [(3,4,5), (5,12,13), (6,8,10), (8,15,17), ...]See DO_NOTATION.md for comprehensive examples!
-- Map with monadic effects
users <-
Strom.range 1 100
# Strom.mapM (\id -> fetchUser id) -- Om ctx err User
# Strom.filter (_.active)
# Strom.runCollect-- Process up to 10 items concurrently
results <-
Strom.fromArray urls
# Strom.mapParallel 10 fetchUrl
# Strom.runCollect-- Running total with scan
totals <-
Strom.fromArray [1, 2, 3, 4, 5]
# Strom.scan (+) 0
# Strom.runCollect
-- Output: [1, 3, 6, 10, 15]
-- Stateful map with accumulator
indexed <-
Strom.fromArray ["a", "b", "c"]
# Strom.mapAccum (\i x -> Tuple (i + 1) (show i <> ": " <> x)) 1
# Strom.runCollect
-- Output: ["1: a", "2: b", "3: c"]-- Zip two streams
pairs <-
Strom.zip
(Strom.range 1 5)
(Strom.fromArray ["a", "b", "c", "d", "e"])
# Strom.runCollect
-- Output: [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")]
-- Interleave deterministically
mixed <-
Strom.interleave
(Strom.fromArray [1, 3, 5])
(Strom.fromArray [2, 4, 6])
# Strom.runCollect
-- Output: [1, 2, 3, 4, 5, 6]
-- Merge non-deterministically (race-based)
merged <-
Strom.merge stream1 stream2
# Strom.runCollect-- Take from infinite stream
first10 <-
Strom.iterate (_ + 1) 0
# Strom.take 10
# Strom.runCollect
-- Output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
-- Fibonacci sequence
fibonacci <-
Strom.unfold
(\(Tuple a b) -> Just (Tuple a (Tuple b (a + b))))
(Tuple 0 1)
# Strom.take 10
# Strom.runCollect
-- Output: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]-- Process in batches
Strom.range 1 100
# Strom.grouped 10
# Strom.traverse_ (\batch -> do
Console.log $ "Processing batch of " <> show (length batch)
processBatch batch
)-- Catch and recover
results <-
riskyStream
# Strom.catchAll (\err -> Strom.succeed defaultValue)
# Strom.runCollect
-- Provide alternative stream
results <-
primaryStream `Strom.orElse` fallbackStream
# Strom.runCollecttype Event = { timestamp :: Int, userId :: String, action :: String }
type Context = { logger :: String -> Aff Unit, db :: Database }
processEvents :: Om Context () Unit
processEvents = do
fetchEventStream
# Strom.filter (_.action /= "spam") -- Filter spam
# Strom.changes -- Deduplicate consecutive
# Strom.mapM enrichEvent -- Fetch additional data
# Strom.mapParallel 5 validateEvent -- Validate concurrently
# Strom.collect identity -- Keep only valid (Maybe)
# Strom.grouped 50 -- Batch for DB
# Strom.tapM (\batch -> logBatch batch) -- Log each batch
# Strom.traverse_ (\batch -> saveBatch batch) -- Save to DB-- Unfold paginated results into a stream
fetchAllPages :: Om Context () (Array Item)
fetchAllPages = do
Strom.unfoldOm (\pageToken -> do
page <- fetchPage pageToken
case page.nextToken of
Nothing -> pure Nothing
Just token -> pure $ Just $ Tuple page.items token
) initialToken
>>= Strom.fromArray -- Flatten pages
# Strom.runCollect-- Process with delays between items
processWithRateLimit :: Om Context () Unit
processWithRateLimit = do
Strom.fromArray items
# Strom.mapM (\item -> do
result <- callAPI item
Om.delay (Milliseconds 100.0) -- Rate limit
pure result
)
# Strom.traverse_ handleResult-- ❌ Arrays: Load everything into memory
processArrays :: Om ctx err (Array Result)
processArrays = do
items <- fetchAllItems -- Loads ALL items
Array.traverse processItem items -- Sequential
-- ✅ Strom: Efficient streaming
processStream :: Om ctx err Unit
processStream = do
Strom.unfoldOm fetchNextBatch initialState
>>= Strom.fromArray
# Strom.mapParallel 10 processItem -- Parallel!
# Strom.runDrain-- Bolson Events are push-based (FRP)
-- Great for UI, but less control over backpressure
-- Strom is pull-based
-- Consumer controls when to pull next items
-- Better for batch processing, APIs, file I/Oempty :: Strom ctx err a- Empty streamsucceed :: a -> Strom ctx err a- Single elementfromArray :: Array a -> Strom ctx err a- From arrayfromFoldable :: Foldable f => f a -> Strom ctx err a- From any foldablefromOm :: Om ctx err a -> Strom ctx err a- From Om effectfromAff :: Aff a -> Strom ctx err a- From Affrange :: Int -> Int -> Strom ctx err Int- Range of integersiterate :: (a -> a) -> a -> Strom ctx err a- Infinite iterationrepeat :: a -> Strom ctx err a- Infinite repetitionrepeatOm :: Om ctx err a -> Strom ctx err a- Infinite Om repetitionunfold :: (b -> Maybe (Tuple a b)) -> b -> Strom ctx err a- Unfold patternunfoldOm :: (b -> Om ctx err (Maybe (Tuple a b))) -> b -> Strom ctx err a- Effectful unfold
runCollect :: Strom ctx err a -> Om ctx err (Array a)- Collect all elementsrunDrain :: Strom ctx err a -> Om ctx err Unit- Run and discardrunFold :: b -> (b -> a -> b) -> Strom ctx err a -> Om ctx err b- Fold streamtraverse_ :: (a -> Om ctx err Unit) -> Strom ctx err a -> Om ctx err Unit- Traverse with effectsfor_ :: Strom ctx err a -> (a -> Om ctx err Unit) -> Om ctx err Unit- Flipped traverse_subscribe :: (a -> Om ctx err Unit) -> Strom ctx err a -> Om ctx err (Om ctx err Unit)- Subscribe with cancellation
map :: (a -> b) -> Strom ctx err a -> Strom ctx err b- Pure mapmapM :: (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b- Monadic mapmapParallel :: Int -> (a -> Om ctx err b) -> Strom ctx err a -> Strom ctx err b- Parallel mapbind :: (a -> Strom ctx err b) -> Strom ctx err a -> Strom ctx err b- Monadic bind (use>>=operator)scan :: (b -> a -> b) -> b -> Strom ctx err a -> Strom ctx err b- Running foldmapAccum :: (s -> a -> Tuple s b) -> s -> Strom ctx err a -> Strom ctx err b- Stateful maptap :: (a -> Unit) -> Strom ctx err a -> Strom ctx err a- Observe without modifyingtapM :: (a -> Om ctx err Unit) -> Strom ctx err a -> Strom ctx err a- Monadic tap
take :: Int -> Strom ctx err a -> Strom ctx err a- Take n elementstakeWhile :: (a -> Boolean) -> Strom ctx err a -> Strom ctx err a- Take while truetakeUntil :: (a -> Boolean) -> Strom ctx err a -> Strom ctx err a- Take until true (inclusive)drop :: Int -> Strom ctx err a -> Strom ctx err a- Drop n elementsdropWhile :: (a -> Boolean) -> Strom ctx err a -> Strom ctx err a- Drop while truefilter :: (a -> Boolean) -> Strom ctx err a -> Strom ctx err a- Filter elementsfilterM :: (a -> Om ctx err Boolean) -> Strom ctx err a -> Strom ctx err a- Monadic filtercollect :: (a -> Maybe b) -> Strom ctx err a -> Strom ctx err b- Filter + mapcollectM :: (a -> Om ctx err (Maybe b)) -> Strom ctx err a -> Strom ctx err b- Monadic collectchanges :: Eq a => Strom ctx err a -> Strom ctx err a- Remove consecutive duplicates
append :: Strom ctx err a -> Strom ctx err a -> Strom ctx err a- Sequential concatenationconcat :: Array (Strom ctx err a) -> Strom ctx err a- Concat multiple streamsmerge :: Strom ctx err a -> Strom ctx err a -> Strom ctx err a- Non-deterministic mergezip :: Strom ctx err a -> Strom ctx err b -> Strom ctx err (Tuple a b)- Zip two streamszipWith :: (a -> b -> c) -> Strom ctx err a -> Strom ctx err b -> Strom ctx err c- Zip with functioninterleave :: Strom ctx err a -> Strom ctx err a -> Strom ctx err a- Deterministic interleaverace :: Array (Strom ctx err a) -> Strom ctx err a- Race multiple streams
grouped :: Int -> Strom ctx err a -> Strom ctx err (Array a)- Fixed-size chunkschunked :: Int -> Strom ctx err a -> Strom ctx err (Array a)- Alias for groupedpartition :: (a -> Boolean) -> Strom ctx err a -> Tuple (Strom ctx err a) (Strom ctx err a)- Split by predicate
catchAll :: (Record err -> Strom ctx err2 a) -> Strom ctx err a -> Strom ctx err2 a- Catch and recoverorElse :: Strom ctx err a -> Strom ctx err a -> Strom ctx err a- Alternative on failure
- Chunked processing: Operations process arrays internally for efficiency
- Pull-based: Backpressure naturally handled - consumer controls pace
- Lazy: Elements only computed when pulled
- Resource-safe: Om's error handling ensures cleanup
Performance benchmarks on a modern system (median of 10 runs):
| Operation | Dataset | Median Time |
|---|---|---|
| Simple Ops | ||
| map | 2M elements | <1ms |
| filter (50%) | 2M elements | <1ms |
| map chain (3x) | 2M elements | <1ms |
| pipeline (map+filter+map) | 2M elements | <1ms |
| Aggregation | ||
| fold (sum) | 1M elements | <1ms |
| collect | 50k elements | <1ms |
| scan (running sum) | 2M elements | <1ms |
| Construction | ||
| fromArray | 1M elements | <1ms |
| iterateStrom | 10k elements | <1ms |
| repeatStrom | 10k elements | <1ms |
| unfold | 10k elements | <1ms |
| iterateInfinite + take | 10k elements | 1ms |
| Selection | ||
| take | 5k from 2M | <1ms |
| takeWhile | ~50k from 1M | <1ms |
| drop | 5k from 1M | <1ms |
| dropWhile | 500k from 1M | <1ms |
| Transform | ||
| mapM (effect) | 50k elements | <1ms |
| tap | 50k elements | <1ms |
| collect (filter+map) | 50k elements | <1ms |
| changes (dedup) | 50k elements | <1ms |
| Grouping | ||
| grouped (chunks of 100) | 50k elements | <1ms |
| partition (even/odd) | 50k elements | <1ms |
| Combining | ||
| append | 2x500k streams | <1ms |
| concat | 10x10k streams | <1ms |
| zip | 2x500k streams | <1ms |
| zipWith | 2x500k streams | <1ms |
| bind (flatMap) | 5k×10 | <1ms |
| Concurrent (Pure) | ||
| mergeND (pure functions) | 2x1M streams | 485ms ⚠️ |
| mapPar (pure, concurrency=4) | 5k elements | 355ms ⚠️ |
| mergeND (with 1ms delays) | 2x100 elements | ~200ms ✅ |
| mapPar (with 10ms delays, conc=4) | 100 elements | ~250ms ✅ |
Key Takeaways:
- ⚡ Most operations are sub-millisecond thanks to chunked processing and STArray optimizations
- 🚀 Excellent throughput: >2M elements/ms for simple transformations
- 📦 Efficient chunking (10,000 elements per chunk) minimizes overhead
- ⚠️ Concurrent operations (
mergeND,mapPar) have coordination overhead per chunk/group- With pure functions: Overhead dominates (use sequential operations instead!)
- With async I/O: Parallelism provides huge speedup (e.g., 100 parallel 10ms tasks = 250ms vs 1000ms sequential)
- 💡 Use concurrent ops for: API calls, database queries, file I/O
- 💡 Use sequential ops for: Pure transformations, CPU-bound work
Strom is built on a simple but powerful abstraction:
newtype Strom ctx err a = Strom
{ pull :: Om ctx err (Step (Strom ctx err a) (Maybe (Chunk a)))
}
type Chunk a = Array aEach pull:
- Returns a
Step- eitherDone(finished) orLoop(continue) - May emit a
Chunkof values (orNothingif no values yet) - Can perform Om effects (context access, error handling, etc.)
This design enables:
- Efficient batching
- Natural backpressure
- Resource safety
- Composability
- INDEX.md - Start here! Navigation and overview
- README.md - This file (complete guide)
- QUICKREF.md - One-page API reference
- DO_NOTATION.md - Do-notation guide with examples
- DEMO.md - API showcase with usage patterns
- COMPARISON.md - Detailed comparison with ZIO ZStream
- TESTING.md - Test coverage summary
- Examples.purs - 18 runnable examples
- yoga-om-core - Core Om effect system (required)
- yoga-om-node - Node.js integrations
- yoga-om-rom - Reactive Om: Bolson FRP integration (complementary, push-based)
Inspired by ZIO ZStream from the Scala ZIO ecosystem.