InputBeanFactory

@Factory
class InputBeanFactory

Conditionally creates input streams / sockets based on channel medium, then wires up a pipeline to each input with separate aggregate stores but shared state stores.

Constructors

Link copied to clipboard
constructor()

Functions

Link copied to clipboard
@Singleton
fun aggregateStoreFactory(aggFactory: AggregateFactory, aggregatePublishingConfig: AggregatePublishingConfig): AggregateStoreFactory
Link copied to clipboard
@Singleton
fun inputFlows(@Named(value = "messageFlows") messageFlows: List<Flow<DestinationMessage>>, stateStore: StateStore, stateKeyClient: StateKeyClient, completionTracker: StreamCompletionTracker, statsStore: EmittedStatsStore): List<DataFlowPipelineInputFlow>
Link copied to clipboard
@Named(value = "messageFlows")
@Singleton
fun messageFlows(@Named(value = "inputStreams") inputStreams: ConnectorInputStreams, @Value(value = "${airbyte.destination.core.data-channel.format}") dataChannelFormat: DataChannelFormat, deserializer: ProtocolMessageDeserializer, destinationMessageFactory: DestinationMessageFactory): List<Flow<DestinationMessage>>
Link copied to clipboard
@Singleton
fun pipes(inputFlows: List<DataFlowPipelineInputFlow>, @Named(value = "parse") parse: DataFlowStage, @Named(value = "flush") flush: DataFlowStage, @Named(value = "state") state: DataFlowStage, aggregateStoreFactory: AggregateStoreFactory, stateHistogramStore: StateHistogramStore, statsStore: CommittedStatsStore, aggregatePublishingConfig: AggregatePublishingConfig, @Named(value = "aggregationDispatcher") aggregationDispatcher: CoroutineDispatcher, @Named(value = "flushDispatcher") flushDispatcher: CoroutineDispatcher): List<DataFlowPipeline>
Link copied to clipboard
@Requires(property = "airbyte.destination.core.data-channel.medium", value = "SOCKET")
@Singleton
fun sockets(@Value(value = "${airbyte.destination.core.data-channel.socket-paths}") socketPaths: List<String>, @Value(value = "${airbyte.destination.core.data-channel.socket-buffer-size-bytes}") bufferSizeBytes: Int, @Value(value = "${airbyte.destination.core.data-channel.socket-connection-timeout-ms}") socketConnectionTimeoutMs: Long): List<ClientSocket>
Link copied to clipboard
@Requires(property = "airbyte.destination.core.data-channel.medium", value = "SOCKET")
@Named(value = "inputStreams")
@Singleton
fun socketStreams(sockets: List<ClientSocket>): ConnectorInputStreams
Link copied to clipboard
@Requires(property = "airbyte.destination.core.data-channel.medium", value = "STDIO")
@Named(value = "inputStreams")
@Singleton
fun stdInStreams(): ConnectorInputStreams