LoadPipelineStepTask

class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStream, U : Any>(batchAccumulator: BatchAccumulator<S, K1, T, U>, inputFlow: Flow<PipelineEvent<K1, T>>, batchUpdateQueue: QueueWriter<BatchUpdate>, outputPartitioner: OutputPartitioner<K1, T, K2, U>?, outputQueue: PartitionedQueue<PipelineEvent<K2, U>>?, flushStrategy: PipelineFlushStrategy?, part: Int, numWorkers: Int, stepId: String, streamCompletions: ConcurrentHashMap<Pair<String, DestinationStream.Descriptor>, AtomicInteger>, maxNumConcurrentKeys: Int? = null) : Task

A long-running task that actually implements a load pipeline step.

Constructors

Link copied to clipboard
constructor(batchAccumulator: BatchAccumulator<S, K1, T, U>, inputFlow: Flow<PipelineEvent<K1, T>>, batchUpdateQueue: QueueWriter<BatchUpdate>, outputPartitioner: OutputPartitioner<K1, T, K2, U>?, outputQueue: PartitionedQueue<PipelineEvent<K2, U>>?, flushStrategy: PipelineFlushStrategy?, part: Int, numWorkers: Int, stepId: String, streamCompletions: ConcurrentHashMap<Pair<String, DestinationStream.Descriptor>, AtomicInteger>, maxNumConcurrentKeys: Int? = null)

Types

Link copied to clipboard
data class StateStore<K1, S : AutoCloseable>(val stateWithCounts: MutableMap<K1, StateWithCounts<S>> = mutableMapOf(), val streamCounts: MutableMap<DestinationStream.Descriptor, Long> = mutableMapOf(), val streamsEnded: MutableSet<DestinationStream.Descriptor> = mutableSetOf())

Task-global state. A map of all the keys seen with associated accumulator state and bookkeeping info. Also includes a global count of inputs seen per stream and fact of stream end (it is a critical error to receive input for a stream that has ended, as it means that something is likely wrong with our bookkeeping.)

Properties

Link copied to clipboard

Functions

Link copied to clipboard
open suspend override fun execute()
Link copied to clipboard
suspend fun handleOutput(inputKey: K1, checkpointCounts: Map<CheckpointId, CheckpointValue>, output: U, inputCount: Long, context: PipelineContext? = null)
Link copied to clipboard
open override fun toString(): String