ObjectLoaderPipeline

@Singleton
@Requires(bean = ObjectLoader::class)
class ObjectLoaderPipeline<K : WithStream, T : RemoteObject<*>>(routeEventStep: RouteEventStep?, fileChunkStep: FileChunkStep<T>?, @Named(value = "filePartLoaderStep") fileChunkUploader: ObjectLoaderPartLoaderStep<T>?, @Named(value = "fileUploadCompleterStep") fileCompleterStep: ObjectLoaderUploadCompleterStep<K, T>?, forwardFileRecordStep: ForwardFileRecordStep<T>?, @Named(value = "fileRecordPartFormatterStep") fileRecordFormatStep: ObjectLoaderPartFormatterStep?, @Named(value = "recordPartFormatterStep") recordFormatStep: ObjectLoaderPartFormatterStep, @Named(value = "recordPartLoaderStep") recordUploadStep: ObjectLoaderPartLoaderStep<T>, @Named(value = "recordUploadCompleterStep") recordCompleterStep: ObjectLoaderUploadCompleterStep<K, T>, @Value(value = "${airbyte.destination.core.file-transfer.enabled}") isLegacyFileTransfer: Boolean, processFileTaskLegacyStep: ProcessFileTaskLegacyStep, @Named(value = "isFileTransfer") isFileTransfer: Boolean, @Named(value = "oneShotObjectLoaderStep") oneShotObjectLoaderStep: ObjectLoaderOneShotUploaderStep<K, T>, @Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium) : LoadPipeline

Three steps for default record flow:

  1. format records into loadable parts (byte arrays destined for specific object keys)

  2. stage the parts in object storage

  3. finish the uploads as all parts become available

Between steps 1<->2 and 2<->3 are single-partition queues:

  • formatted parts are put on the first queue as they are completed. its size is scaled to the available memory and part size

  • the upload workers take parts as they become available and upload them, then put fact-of-upload on the second queue

  • a single completer worker reads the second queue and completes the uploads

  • state is acked only when the completer finishes each upload

There are 8 steps for the file and record flow.

Composed of 5 new steps (File Pipe) that feed into the same 3 steps as above (Record Pipe).

The new steps are as follows:

  1. Routes the record message to either through file pipe or straight to the record pipe if it's not related to a file based stream.

  2. Read file reference from the incoming record, open the file and read into chunks, emitting them as "Part"s downstream.

  3. Uploads file parts

  4. Completes multipart file uploads.

  5. Passes the related record on to the record pipe (see above)

Constructors

Link copied to clipboard
constructor(routeEventStep: RouteEventStep?, fileChunkStep: FileChunkStep<T>?, @Named(value = "filePartLoaderStep") fileChunkUploader: ObjectLoaderPartLoaderStep<T>?, @Named(value = "fileUploadCompleterStep") fileCompleterStep: ObjectLoaderUploadCompleterStep<K, T>?, forwardFileRecordStep: ForwardFileRecordStep<T>?, @Named(value = "fileRecordPartFormatterStep") fileRecordFormatStep: ObjectLoaderPartFormatterStep?, @Named(value = "recordPartFormatterStep") recordFormatStep: ObjectLoaderPartFormatterStep, @Named(value = "recordPartLoaderStep") recordUploadStep: ObjectLoaderPartLoaderStep<T>, @Named(value = "recordUploadCompleterStep") recordCompleterStep: ObjectLoaderUploadCompleterStep<K, T>, @Value(value = "${airbyte.destination.core.file-transfer.enabled}") isLegacyFileTransfer: Boolean, processFileTaskLegacyStep: ProcessFileTaskLegacyStep, @Named(value = "isFileTransfer") isFileTransfer: Boolean, @Named(value = "oneShotObjectLoaderStep") oneShotObjectLoaderStep: ObjectLoaderOneShotUploaderStep<K, T>, @Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium)

Types

Link copied to clipboard
object Companion

Functions

Link copied to clipboard
suspend fun start(launcher: suspend (Task) -> Unit)
Link copied to clipboard
open suspend fun stop()