The pub/sub model captures a simple concept - assign "handlers" to an event manager of some kind, and delegate messages to those handlers by issuing events. This is also considered a "channel" in Haskell - something that stores messages until they are read.
import Queue (Queue, readOnly, writeOnly, READ, WRITE, new, on, put) main :: Effect Unit main = do (q :: Queue (read :: READ, write :: WRITE) Int) <- new -- assign a handler on (readOnly q) logShow -- put messages to the queue put (writeOnly q) 1 put q 2 -- Doesn't need to be strictly write-only
The calls to
writeOnlyaren't necessary; they're just to demonstrate the ability to quarantine sections of your code, in which you only which to expose
(read :: READ)or
(write :: WRITE)access, as the only facilities available to the queue.
The primary purpose of a queue is to decouple function invocation from parameter application - if I don't have a function yet, but I have inputs for it, then I'll just write them to the queue. If I have a function, but no inputs yet, then I'll just add the handler to the queue to await inputs. Furthermore, if you want to remove a handler, you should be able to.
It tries to imitate similar functionality to
Chans from Haskell, but isn't nearly as cool; the
IO monad in Haskell can block indefinitely while other threads write to the same channel, while our
Queues can only do that in the
Aff monad (single-threaded, but asynchronous). This functionality
is achieved using the
Queue.Aff module and its siblings.
There are three flavors of
Queue, sorted by their module:
module Queue.One- the simplest version - in only allows for at-most one handler - useful when you know there's only going to be one listener to the queue, but you don't know when the input will be available.
module Queue- similar to
Queue.One, except it allows for a set of handlers at a time - you may additively include individual handlers at any time, but because they can't be distinguished from each other as values (
Eq), you can only delete all of them at once. This may be useful when you know in-advance that you won't be deleting any handlers.
module IxQueue- similar to
Queue, except it makes you index your handlers by some
String. This names them, and allows you to delete individual handlers at any time without disrupting other ones.
newcreates a new, empty queue.
putinserts data into the queue. If there's at least one listener, it will supply that data to it. If not, it will be cached in-order (oldest to youngest).
drawtakes the oldest data from the queue, in
onassigns a handler that doesn't die by itself.
onceassigns a handler that dies after receiving one input.
Some less important functions:
takeremoves the oldest message, without invoking a handler.
popremoves the youngest message, without invoking a handler.
drainadds a handler that does nothing - just empties any residual data.
putManyadds multiple values by traversing the handlers over the container.
popManyreturns an array of values.
delremoves all handlers from the queue, but not affecting the cache if there is one.
readobserves the values in the queue, without affecting it's cache.
lengthreturns the length of the cache.
Initially, when using
new, a queue is both read and write accessible, through the type-level flags
This may be undesirable for complex networks of queues, where one section of code clearly only supplies data, while another
one clearly only consumes it. There are functions for changing this:
read :: READlabel from the row type
write :: WRITElabel from the row type
read :: READlabel back to the row type
write :: WRITElabel back to the row type
This makes the type signature for a queue look something like
Queue (read :: READ, write :: WRITE) Foo.
There is also some extra kit defined in
module Queue.Types -
These functions take similar arguments - some time value, and a readable queue, and return a writable queue, with some thread (fiber).
Generally, in your code, you will be listening to the queue you provide, while writing to the queue these functions return, to get their intended effects.
debounceStaticis intended to drop messages sent before the time parameter - useful for user interface applications.
throttleStaticis intended to delay all messages sent before the time parameter, without loss. Useful for network applications.
intersperseStaticis intended to create messages if none are sent after the time parameter, without loss. Useful for connectivity in network applications (pings).
Note: these variants are called "fooStatic" because there's only a single time value used, and wouldn't be capable of something more advanced like exponential falloff.
This library's additional goal was to aid in asynchronous interop; having some source data originate asynchronously,
and the ability to handle it spontaneously. Through the
IxQueue.IOQueues modules, we can treat
message passing to queues at a higher level, as procedure invocations. We do this by creating two queues - one
for handling inputs to the handler(s), and one for returning results from the handler(s).
module IOQueues and
module IxQueue.IOQueues modules, there's some somewhat confusing nomenclature:
IOQueues- a pair of queues; one for input, one for output.
callAsyncputs an input in the
IOQueues, and blocks until an output is available in
registerSyncattaches a processing function to the
IOQueues, taking an input and returning an output, but does so in lock-step - atomically adding a function to the system synchronously.
callAsyncEffdoes the same thing as
callAsync, but can't block in
Affand only operates in
registerSyncOncedoes the same thing as
registerSync, but removes itself after being invoked once.
import Queue.One (Queue) as One -- ^ most lightweight implementation, only one handler import IOQueues (new, registerSync, callAsync, IOQueues) import Effect.Aff (runAff_) main = do (io :: IOQueues One.Queue Int Int) <- newIOQueues -- "IOQueues queue input output" means "using 'queue', take 'input' and make 'output'." let handler :: Int -> Effect Int handler i = do log $ "input: " <> show i let o = i + 1 log $ "incremented: " <> show o pure o registerSync io handler -- attach the handler in Effect -- `resolveAff` does nothing - it's needed by `runAff_` - see `Effect.Aff` for details let resolveAff :: Either String Unit -> Effect Unit resolveAff _ = pure unit runAff_ resolveAff do result <- callAsync io 20 -- invoke delegated computation in Aff liftEffect $ log $ "Should be 21 - Result: " <> show result
module IxQueue.IOQueues is useful when you need async
IOQueues calls, but need to integrate into an
IxQueue with a network of handlers and data supplied. The primary difference with this is that
IxQueue.IOQueues, you need to pass a
String reference to identify which handlers will be targeted
by what data.
module Queue as an underlying
IOQueues queue might be pretty confusing - it allows for multiple
registerSync handlers waiting for the same input, which would cause a race condition for the
invocation. However, if you mess with the internal output queue in the
IOQueues and add extra handlers,
you may broadcast the results of the
registerSync handler to multiple areas, without race conditions
callAsync only reads from the output queue once). Either way, this use case would probably not be stable
or in your favor, and should generally be avoided.
This library has grown quite a lot over the years, and I still find it very useful. purescript-react-queue was spawned from this, and I still use it all the time for managing react components.
If you have any ideas you'd like to see in this library, please file an issue or drop me a line. Thanks for using it!