DataChannelBeanFactory

Responsible for all wiring that depends directly on the data channel medium.

Constructors

Link copied to clipboard
constructor()

Functions

Link copied to clipboard
@Singleton
@Named(value = "dataChannelFormat")
fun dataChannelFormat(@Value(value = "${airbyte.destination.core.data-channel.format}") dataChannelFormat: DataChannelFormat): DataChannelFormat
Link copied to clipboard
@Singleton
@Named(value = "dataChannelInputFlows")
fun dataChannelInputFlows(catalog: DestinationCatalog, @Named(value = "globalMemoryManager") queueMemoryManager: ReservationManager, @Named(value = "_pipelineInputQueue") pipelineInputQueue: PartitionedQueue<PipelineInputEvent>? = null, dataChannelMedium: DataChannelMedium, dataChannelReader: DataChannelReader, pipelineEventBookkeepingRouter: PipelineEventBookkeepingRouter, @Named(value = "dataChannelSocketPaths") socketPaths: List<String>, @Value(value = "${airbyte.destination.core.data-channel.socket-buffer-size-bytes}") bufferSizeBytes: Int, @Value(value = "${airbyte.destination.core.data-channel.socket-connection-timeout-ms}") socketConnectionTimeoutMs: Long, @Named(value = "logPerNRecords") logPerNRecords: Long): Array<Flow<PipelineInputEvent>>

The input flows from which the pipeline will read. The size of the array will always be equal to @Named("numInputPartitions")numInputPartitions.

Link copied to clipboard
@Singleton
@Named(value = "dataChannelMedium")
fun dataChannelMedium(@Value(value = "${airbyte.destination.core.data-channel.medium}") dataChannelMedium: DataChannelMedium): DataChannelMedium

The medium uses for the data channel. One of DataChannelMedium. This value is determined here in order to have a single source of truth.

Link copied to clipboard
@Singleton
fun dataChannelReader(@Named(value = "dataChannelFormat") dataChannelFormat: DataChannelFormat, destinationMessageFactory: DestinationMessageFactory, @Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium): DataChannelReader
Link copied to clipboard
@Singleton
@Named(value = "dataChannelSocketPaths")
fun dataChannelSocketPaths(@Value(value = "${airbyte.destination.core.data-channel.socket-paths}") socketPaths: List<String>): List<String>
Link copied to clipboard
@Singleton
@Named(value = "fileMessageQueue")
fun fileMessageQueue(config: DestinationConfiguration): MultiProducerChannel<FileTransferQueueMessage>
Link copied to clipboard
@Singleton
@Named(value = "logPerNRecords")
fun logPerNRecords(@Value(value = "${airbyte.destination.core.data-channel.log-per-n-records:100000}") logPerNRecords: Long): Long
Link copied to clipboard
@Singleton
@Named(value = "markEndOfStreamAtEndOfSync")
fun markEndOfStreamAtEndOfSync(@Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium): Boolean
Link copied to clipboard
@Singleton
fun namespaceMapper(@Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium, @Value(value = "${airbyte.destination.core.mappers.namespace-mapping-config-path}") namespaceMappingConfigPath: String): NamespaceMapper
Link copied to clipboard
@Singleton
@Named(value = "numDataChannels")
fun numDataChannels(@Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium, @Named(value = "numInputPartitions") numInputPartitions: Int): Int
Link copied to clipboard
@Singleton
@Named(value = "numInputPartitions")
fun numInputPartitions(loadStrategy: LoadStrategy? = null, @Named(value = "isFileTransfer") isFileTransfer: Boolean = false, dataChannelMedium: DataChannelMedium, @Named(value = "dataChannelSocketPaths") dataChannelSocketPaths: List<String>): Int

The number of input partitions used by the pipeline. For STDIO syncs, this is the number of partitions to which the input stream is split. For SOCKETS syncs, this will be the number of socket flows.

Link copied to clipboard
@Singleton
@Named(value = "_pipelineInputQueue")
@Requires(property = "airbyte.destination.core.data-channel.medium", value = "STDIO")
fun pipelineInputQueue(@Named(value = "numInputPartitions") numInputPartitions: Int): PartitionedQueue<PipelineInputEvent>

PRIVATE: Do not use outside this factory.

Link copied to clipboard
@Singleton
@Named(value = "requireCheckpointIdOnRecordAndKeyOnState")
fun requireCheckpointIdOnRecord(@Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium): Boolean

Because sockets uses multiple threads, state must be kept coherent by

Link copied to clipboard
@Singleton
@Requires(property = "airbyte.destination.core.data-channel.medium", value = "STDIO")
fun stdioHeartbeatTask(@Named(value = "_pipelineInputQueue") pipelineInputQueue: PartitionedQueue<PipelineInputEvent>? = null, config: DestinationConfiguration, checkpointManager: CheckpointManager): HeartbeatTask

Because sockets will be implemented as cold flows, the heartbeat behavior will have to reside in the readers.

Link copied to clipboard
@Singleton
@Requires(property = "airbyte.destination.core.data-channel.medium", value = "STDIO")
fun stdioInputConsumerTask(inputFlow: ReservingDeserializingInputFlow, @Named(value = "_pipelineInputQueue") pipelineInputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>? = null, partitioner: InputPartitioner, pipelineEventBookkeepingRouter: PipelineEventBookkeepingRouter): InputConsumerTask

Sockets will be implemented as cold flows, so a task is only needed for reading from STDIO.