ObjectLoaderQueueBeanFactory

@Factory
class ObjectLoaderQueueBeanFactory(val loader: ObjectLoader)

Constructors

Link copied to clipboard
constructor(loader: ObjectLoader)

Types

Link copied to clipboard
object Companion

Properties

Link copied to clipboard
Link copied to clipboard
val log: KLogger

Functions

Link copied to clipboard

Completed file uploads.

Link copied to clipboard
@Singleton
@Named(value = "fileLoadedPartQueue")
@Requires(condition = IsFileTransferCondition::class)
fun <T : RemoteObject<*>> fileLoadedPartQueue(): PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartLoader.PartResult<T>>>

Queue between upload file parts and the upload completer. It will hold the fact of upload completion only, so in theory it can be Channel.UNLIMITED, but to be safe we'll limit it to 10,000 queued completions.

Link copied to clipboard
@Singleton
@Named(value = "filePartQueue")
@Requires(condition = IsFileTransferCondition::class)
fun fileObjectLoaderPartQueue(@Named(value = "globalMemoryManager") globalMemoryManager: ReservationManager): ResourceReservingPartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartFormatter.FormattedPart>>

Queue between file part chunking and loading of file parts. It will hold the actual part bytes and needs to be sized based on the available reserved memory.

Link copied to clipboard
@Singleton
@Named(value = "fileQueue")
@Requires(condition = IsFileTransferCondition::class)
fun fileQueue(@Named(value = "numInputPartitions") numInputPartitions: Int): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>

A queue for records with file references for file uploading.

Link copied to clipboard
@Singleton
@Named(value = "objectLoaderClampedPartSizeBytes")
@Requires(bean = ObjectLoader::class)
fun objectLoaderClampedPartSizeBytes(@Named(value = "objectLoaderPartQueue") queue: ResourceReservingPartitionedQueue<*>, @Named(value = "dataChannelMedium") dataChannelMedium: DataChannelMedium, @Named(value = "dataChannelSocketPaths") dataChannelSocketPaths: List<String>): Long

If we naively accept the part size and concurrency settings, we might end up with a connector that passes CI but can't run in a resource-limited production environment, because there isn't enough memory even for the workers to hold parts in flight.

Link copied to clipboard
@Singleton
@Named(value = "objectLoaderLoadedPartQueue")
@Requires(bean = ObjectLoader::class)
fun <T : RemoteObject<*>> objectLoaderLoadedPartQueue(): PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartLoader.PartResult<T>>>

Queue between part 2 (upload parts) and part 3 (finish the upload). It will hold the fact of upload completion only, so in theory it can be Channel.UNLIMITED, but to be safe we'll limit it to 10,000 queued completions.

Link copied to clipboard
@Singleton
@Named(value = "objectLoaderPartQueue")
@Requires(bean = ObjectLoader::class)
fun recordObjectLoaderPartQueue(@Named(value = "globalMemoryManager") globalMemoryManager: ReservationManager): ResourceReservingPartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartFormatter.FormattedPart>>

Queue between step 1 (format parts) and step 2 (load them): it will hold the actual part bytes and needs to be sized based on the available reserved memory.

Link copied to clipboard
@Singleton
@Named(value = "recordQueue")
fun recordQueue(@Named(value = "numInputPartitions") numInputPartitions: Int): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>

A queue for records uploading.