Module

Yoga.Kafka.Consumer

Package
purescript-yoga-kafka
Repository
rowtype-yoga/purescript-yoga-kafka

#Consumer Source

#ConsumerOptionsImpl Source

type ConsumerOptionsImpl :: Row Typetype ConsumerOptionsImpl = (heartbeatInterval :: Milliseconds, rebalanceTimeout :: Milliseconds, sessionTimeout :: Milliseconds)

#createConsumerImpl Source

createConsumerImpl :: forall opts. EffectFn2 Kafka { groupId :: ConsumerGroupId | opts } Consumer

#createConsumer Source

createConsumer :: forall opts opts_. Union opts opts_ ConsumerOptionsImpl => { groupId :: ConsumerGroupId | opts } -> Kafka -> Effect Consumer

#SubscribeOptionsImpl Source

type SubscribeOptionsImpl :: Row Typetype SubscribeOptionsImpl = (fromBeginning :: Boolean)

#subscribeImpl Source

subscribeImpl :: forall opts. EffectFn2 Consumer { topic :: TopicName | opts } (Promise Unit)

#subscribe Source

subscribe :: forall opts opts_. Union opts opts_ SubscribeOptionsImpl => { topic :: TopicName | opts } -> Consumer -> Aff Unit

#KafkaMessageFFI Source

type KafkaMessageFFI = { message :: { headers :: Object HeaderValue, key :: Nullable Key, offset :: Offset, timestamp :: Timestamp, value :: Value }, partition :: PartitionId, topic :: TopicName }

#BatchMessage Source

type BatchMessage = { headers :: Object HeaderValue, key :: Maybe Key, offset :: Offset, timestamp :: Timestamp, value :: Value }

#Batch Source

type Batch = { commitOffsetsIfNecessary :: Effect (Promise Unit), heartbeat :: Aff Unit, highWatermark :: Offset, isRunning :: Effect Boolean, isStale :: Effect Boolean, messages :: Array BatchMessage, partition :: PartitionId, pause :: Effect (Effect Unit), resolveOffset :: Offset -> Effect Unit, topic :: TopicName, uncommittedOffsets :: Effect Foreign }

#RunOptionsImpl Source

type RunOptionsImpl :: Row Typetype RunOptionsImpl = (autoCommit :: Boolean, autoCommitInterval :: Milliseconds, autoCommitThreshold :: Int, eachBatch :: Batch -> Aff Unit, eachBatchAutoResolve :: Boolean, eachMessage :: KafkaMessageFFI -> Aff Unit, partitionsConsumedConcurrently :: Int)

#run Source

run :: forall opts opts_. Union opts opts_ RunOptionsImpl => Record opts -> Consumer -> Aff Unit