PipelineEventBookkeepingRouter

@Singleton
class PipelineEventBookkeepingRouter(catalog: DestinationCatalog, syncManager: SyncManager, checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>, openStreamQueue: QueueWriter<DestinationStream>, fileTransferQueue: MessageQueue<FileTransferQueueMessage>, @Named(value = "batchStateUpdateQueue") batchStateUpdateQueue: ChannelMessageQueue<BatchUpdate>, @Named(value = "numDataChannels") numDataChannels: Int, @Named(value = "markEndOfStreamAtEndOfSync") markEndOfStreamAtEndOfSync: Boolean, namespaceMapper: NamespaceMapper) : CloseableCoroutine

The stdio and socket input channel differ in subtle and critical ways

  • record-wise, sockets are cold flows, stdio publishes to a partitioned channel

  • sockets receive checkpointId on the record, stdio infers from state message order

  • speed will receive complete/incomplete on all sockets (no need to publish)

  • because they are flows, sockets don't need memory management; the buffer can serve as the

    backpressure scheme (note: the exception to this is for file transfer, which forwards the
record after handling the file; and of course obviously for state)

These differences might diverge/converge as we tune (i.e., because of lock contention with multiple sockets, or because the socket pattern ends up working for stdio as well). For now, since the main difference is what is done with the pipeline events, we'll consolidate bookkeeping into a single class that yields events from DestinationMessage(s). CheckpointIds can be inferred if the record does not provide one.

Constructors

Link copied to clipboard
constructor(catalog: DestinationCatalog, syncManager: SyncManager, checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>, openStreamQueue: QueueWriter<DestinationStream>, fileTransferQueue: MessageQueue<FileTransferQueueMessage>, @Named(value = "batchStateUpdateQueue") batchStateUpdateQueue: ChannelMessageQueue<BatchUpdate>, @Named(value = "numDataChannels") numDataChannels: Int, @Named(value = "markEndOfStreamAtEndOfSync") markEndOfStreamAtEndOfSync: Boolean, namespaceMapper: NamespaceMapper)

Functions

Link copied to clipboard
open suspend override fun close()
Link copied to clipboard
inline suspend fun CloseableCoroutine.closeFinally(cause: Throwable?)
Link copied to clipboard
suspend fun handleCheckpoint(reservation: Reserved<CheckpointMessage>)
Link copied to clipboard
suspend fun handleStreamMessage(message: DestinationStreamAffinedMessage, postProcessingCallback: suspend () -> Unit = {}): PipelineInputEvent