RxJS.Observable
- Package
- purescript-rxjs
- Repository
- jasonzoladz/purescript-rxjs
#Observable Source
data Observable :: Type -> Type
Note: A couple operators are not wrapped (namely, bindCallback
, bindNodeCallback
) because RxJS
implementation details prevent giving the operators an "honest" PureScript type.
However, such operators are replaced easily using Aff
with the AsyncSubject
module.
Please see RxJS Version 5.* documentation for
additional details on proper usage of the library.
Instances
Monoid (Observable a)
Functor Observable
Apply Observable
Applicative Observable
Bind Observable
Monad Observable
Semigroup (Observable a)
Alt Observable
Plus Observable
Alternative Observable
MonadZero Observable
MonadPlus Observable
MonadError Error Observable
MonadThrow Error Observable
(Arbitrary a) => Arbitrary (Observable a)
#observeOn Source
observeOn :: forall a. Scheduler -> Observable a -> Observable a
Makes every next
call run in the new Scheduler.
#subscribeOn Source
subscribeOn :: forall a. Scheduler -> Observable a -> Observable a
Makes subscription happen on a given Scheduler.
#subscribe Source
subscribe :: forall e a. Subscriber a -> Observable a -> Eff e Subscription
Subscribing to an Observable is like calling a function, providing
next
, error
and completed
effects to which the data will be delivered.
#subscribeNext Source
subscribeNext :: forall e a. (a -> Eff e Unit) -> Observable a -> Eff e Subscription
#ajaxWithBody Source
ajaxWithBody :: forall e. Request -> Eff e (Observable Response)
#fromArray Source
fromArray :: forall a. Array a -> Observable a
Creates an Observable from an Array.
#fromEvent Source
fromEvent :: forall e. EventTarget -> EventType -> Eff e (Observable Event)
Creates an Observable that emits events of the specified type coming from the given event target.
#interval Source
interval :: Int -> Observable Int
Returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions.
#just Source
just :: forall a. a -> Observable a
Creates an Observable that emits the value specify,
and then emits a complete notification. An alias for of
.
#never Source
never :: forall a. Observable a
Creates an Observable that emits no items. Subscriptions it must be disposed manually.
#range Source
range :: Int -> Int -> Observable Int
The range operator emits a range of sequential integers, in order, where you select the start of the range and its length ![marble diagram](http://reactivex.io/rxjs/img/range.png" width="640" height="195">
#throw Source
throw :: forall a. Error -> Observable a
Creates an Observable that immediately sends an error notification.
#timer Source
timer :: Int -> Int -> Observable Int
Creates an Observable that, upon subscription, emits and infinite sequence of ascending integers, after a specified delay, every specified period. Delay and period are in milliseconds.
#buffer Source
buffer :: forall b a. Observable a -> Observable b -> Observable (Array a)
Collects values from the first Observable into an Array, and emits that array only when second Observable emits.
#bufferCount Source
bufferCount :: forall a. Int -> Int -> Observable a -> Observable (Array a)
Collects values from the past as an array, emits that array when its size (arg1) reaches the specified buffer size, and starts a new buffer. The new buffer starts with nth (arg2) element of the Observable counting from the beginning of the last buffer.
#bufferToggle Source
bufferToggle :: forall c b a. (Observable a) -> (Observable b) -> (b -> Observable c) -> (Observable (Array a))
Collects values from the source Observable (arg1) as an array. Starts collecting only when the opening (arg2) Observable emits, and calls the closingSelector function (arg3) to get an Observable that decides when to close the buffer. Another buffer opens when the opening Observable emits its next value.
#bufferWhen Source
bufferWhen :: forall b a. Observable a -> (a -> Observable b) -> Observable (Array a)
Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that emits to close the buffer and restart collecting.
#concatMap Source
concatMap :: forall b a. Observable a -> (a -> Observable b) -> Observable b
Equivalent to mergeMap (a.k.a, >>=
) EXCEPT that, unlike mergeMap,
the next bind will not run until the Observable generated by the projection function (arg2)
completes. That is, composition is sequential, not concurrent.
Warning: if source values arrive endlessly and faster than their corresponding
inner Observables can complete, it will result in memory issues as inner
Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
#concatMapTo Source
concatMapTo :: forall c b a. Observable a -> Observable b -> (a -> b -> Observable c) -> Observable c
The type signature explains it best. Warning: Like concatMap
, composition is sequential.
#exhaustMap Source
exhaustMap :: forall b a. Observable a -> (a -> Observable b) -> Observable b
It's Like concatMap (a.k.a, >>=
) EXCEPT that it ignores every new projected
Observable if the previous projected Observable has not yet completed.
#expand Source
expand :: forall a. Observable a -> (a -> Observable a) -> Observable a
It's similar to mergeMap, but applies the projection function to every source value as well as every output value. It's recursive.
#groupBy Source
groupBy :: forall b a. (a -> b) -> Observable a -> Observable (Observable a)
Groups the items emitted by an Observable (arg2) according to the value returned by the grouping function (arg1). Each group becomes its own Observable.
#mapTo Source
mapTo :: forall b a. b -> Observable a -> Observable b
Emits the given constant value on the output Observable every time the source Observable emits a value.
#mergeMap Source
mergeMap :: forall b a. Observable a -> (a -> Observable b) -> Observable b
Maps each value to an Observable, then flattens all of these Observables
using mergeAll. It's just monadic bind
.
#mergeMapTo Source
mergeMapTo :: forall b a. Observable a -> Observable b -> Observable b
Maps each value of the Observable (arg1) to the same inner Observable (arg2), then flattens the result.
#pairwise Source
pairwise :: forall a. Observable a -> Observable (Array a)
Puts the current value and previous value together as an array, and emits that.
#partition Source
partition :: forall a. (a -> Boolean) -> Observable a -> Array (Observable a)
Given a predicate function (arg1), and an Observable (arg2), it outputs a two element array of partitioned values (i.e., [ Observable valuesThatPassPredicate, Observable valuesThatFailPredicate ]).
#scan Source
scan :: forall b a. (a -> b -> b) -> b -> Observable a -> Observable b
Given an accumulator function (arg1), an initial value (arg2), and a source Observable (arg3), it returns an Observable that emits the current accumlation whenever the source emits a value.
#switchMap Source
switchMap :: forall b a. Observable a -> (a -> Observable b) -> Observable b
Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.
#switchMapTo Source
switchMapTo :: forall b a. Observable a -> Observable b -> Observable b
It's like switchMap, but maps each value to the same inner Observable.
#window Source
window :: forall b a. Observable a -> Observable b -> Observable (Observable a)
It's like buffer, but emits a nested Observable instead of an array.
#windowCount Source
windowCount :: forall a. Int -> Int -> Observable a -> Observable (Observable a)
It's like bufferCount, but emits a nested Observable instead of an array.
#windowTime Source
windowTime :: forall a. Int -> Int -> Observable a -> Observable (Observable a)
It's like bufferTime, but emits a nested Observable instead of an array, and it doesn't take a maximum size parameter. arg1 is how long to buffer items into a new Observable, arg2 is the when the next buffer should begin, and arg3 is the source Observable.
#windowToggle Source
windowToggle :: forall c b a. (Observable a) -> (Observable b) -> (b -> Observable c) -> (Observable (Array a))
It's like bufferToggle, but emits a nested Observable instead of an array.
#windowWhen Source
windowWhen :: forall b a. Observable a -> Observable b -> Observable (Observable a)
It's like bufferWhen, but emits a nested Observable instead of an array.
#audit Source
audit :: forall b a. Observable a -> (a -> Observable b) -> Observable a
It's like auditTime, but the silencing duration is determined by a second Observable.
#auditTime Source
auditTime :: forall a. Int -> Observable a -> Observable a
Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.
#debounce Source
debounce :: forall a. Observable a -> (a -> Observable Int) -> Observable a
It's like debounceTime, but the time span of emission silence is determined by a second Observable. Allows for a variable debounce rate.
#debounceTime Source
debounceTime :: forall a. Int -> Observable a -> Observable a
It's like delay, but passes only the most recent value from each burst of emissions.
#distinct Source
distinct :: forall a. Observable a -> Observable a
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
#distinctUntilChanged Source
distinctUntilChanged :: forall a. Observable a -> Observable a
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
#elementAt Source
elementAt :: forall a. Observable a -> Int -> Observable a
Emits the single value at the specified index in a sequence of emissions from the source Observable.
#filter Source
filter :: forall a. (a -> Boolean) -> Observable a -> Observable a
Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.
#ignoreElements Source
ignoreElements :: forall a. Observable a -> Observable a
Ignores all items emitted by the source Observable and only passes calls of complete or error.
#last Source
last :: forall a. Observable a -> (a -> Boolean) -> Observable a
Returns an Observable that emits only the last item emitted by the source Observable that that satisfies the given predicate.
#sample Source
sample :: forall b a. Observable a -> Observable b -> Observable a
It's like sampleTime, but samples whenever the notifier Observable emits something.
#sampleTime Source
sampleTime :: forall a. Int -> Observable a -> Observable a
Periodically looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling.
#skip Source
skip :: forall a. Int -> Observable a -> Observable a
Returns an Observable that skips n items emitted by an Observable.
#skipUntil Source
skipUntil :: forall b a. Observable a -> Observable b -> Observable a
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
#skipWhile Source
skipWhile :: forall a. (a -> Boolean) -> Observable a -> Observable a
Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.
#take Source
take :: forall a. Int -> Observable a -> Observable a
Emits only the first n values emitted by the source Observable.
#takeUntil Source
takeUntil :: forall b a. Observable a -> Observable b -> Observable a
Lets values pass until a second Observable emits something. Then, it completes. ![marble diagram](https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt=""
#takeWhile Source
takeWhile :: forall a. (a -> Boolean) -> Observable a -> Observable a
Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.
#throttle Source
throttle :: forall b a. Observable a -> (a -> Observable b) -> Observable a
It's like throttleTime, but the silencing duration is determined by a second Observable. ![marble diagram](http://reactivex.io/rxjs/img/throttle.png" width="640" height="195">
#throttleTime Source
throttleTime :: forall a. Int -> Observable a -> Observable a
Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.
#combineLatest Source
combineLatest :: forall c b a. (a -> b -> c) -> Observable a -> Observable b -> Observable c
An Observable of projected values from the most recent values from each input Observable.
#combineLatest3 Source
combineLatest3 :: forall d c b a. (a -> b -> c -> d) -> Observable a -> Observable b -> Observable c -> Observable d
#concat Source
concat :: forall a. Observable a -> Observable a -> Observable a
Concatenates two Observables together by sequentially emitting their values, one Observable after the other.
#concatAll Source
concatAll :: forall a. Observable (Observable a) -> Observable a
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
#exhaust Source
exhaust :: forall a. Observable (Observable a) -> Observable a
Flattens an Observable-of-Observables by dropping the next inner Observables while the current inner is still executing.
#merge Source
merge :: forall a. Observable a -> Observable a -> Observable a
Creates an output Observable which concurrently emits all values from each input Observable.
#mergeAll Source
mergeAll :: forall a. Observable (Observable a) -> Observable a
Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
#race Source
race :: forall a. Array (Observable a) -> Observable a
Returns an Observable that mirrors the first source Observable to emit an item from the array of Observables.
#startWithMany Source
startWithMany :: forall a f. Foldable f => f a -> Observable a -> Observable a
Returns an Observable that emits the items in the given Foldable before it begins to emit items emitted by the source Observable.
#startWith Source
startWith :: forall a. a -> Observable a -> Observable a
Returns an Observable that emits the item given before it begins to emit items emitted by the source Observable.
#withLatestFrom Source
withLatestFrom :: forall c b a. (a -> b -> c) -> Observable a -> Observable b -> Observable c
Combines each value from the source Observables using a project function to determine the value to be emitted on the output Observable. ![marble diagram](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/withLatestFrom.png" alt="">
#zip Source
zip :: forall a. Array (Observable a) -> Observable (Array a)
Waits for each Observable to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.
#catch Source
catch :: forall a. (Observable a) -> (Error -> Observable a) -> (Observable a)
#retry Source
retry :: forall a. Int -> Observable a -> Observable a
If the source Observable calls error, this method will resubscribe to the source Observable n times rather than propagating the error call.
#delay Source
delay :: forall a. Int -> Observable a -> Observable a
Time shifts each item by some specified amount of milliseconds.
#delayWhen Source
delayWhen :: forall b a. Observable a -> (a -> Observable b) -> Observable a
Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.
#dematerialize Source
dematerialize :: forall a. Observable (Notification a) -> Observable a
Returns an Observable that reverses the effect of materialize
by
Notification
objects emitted by the source Observable into the items
or notifications they represent.
#materialize Source
materialize :: forall a. Observable a -> Observable (Notification a)
Turns all of the notifications from a source Observable into onNext emissions,
and marks them with their original notification types within Notification
objects.
#performEach Source
performEach :: forall e a. Observable a -> (a -> Eff e Unit) -> Eff e (Observable a)
Performs the effect on each value of the Observable. An alias for do
.
Useful for testing (transparently performing an effect outside of a subscription).
#toArray Source
toArray :: forall a. Observable a -> Observable (Array a)
Returns an Observable that emits a single item, a list composed of all the items emitted by the source Observable.
#defaultIfEmpty Source
defaultIfEmpty :: forall a. Observable a -> a -> Observable a
Returns an Observable that emits the items emitted by the source Observable or a specified default item if the source Observable is empty.
takes a defaultValue which is the item to emit if the source Observable emits no items.
returns an Observable that emits either the specified default item if the source Observable emits no items, or the items emitted by the source Observable
#every Source
every :: forall a. Observable a -> (a -> Boolean) -> Observable Boolean
Determines whether all elements of an observable sequence satisfy a condition. Returns an observable sequence containing a single element determining whether all elements in the source sequence pass the test in the specified predicate.
#isEmpty Source
isEmpty :: forall a. Observable a -> Observable Boolean
Tests whether this Observable
emits no elements.
returns an Observable emitting one single Boolean, which is true
if this Observable
emits no elements, and false
otherwise.
#first Source
first :: forall a. Observable a -> (a -> Boolean) -> Observable a
Returns an Observable that emits only the first item emitted by the source Observable that satisfies the given predicate.
#count Source
count :: forall a. Observable a -> Observable Int
Counts the number of emissions on the source and emits that number when the source completes.
#reduce Source
reduce :: forall b a. (a -> b -> b) -> b -> Observable a -> Observable b
Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given a seed value.
#unwrap Source
unwrap :: forall e a. Observable (Eff e a) -> Eff e (Observable a)
Run an Observable of effects
NOTE: The semigroup instance uses
merge
NOTconcat
.