DlqPipelineFactoryFactory

A Micronaut Factory to help initialize the component required for a DeadLetterQueuePipeline

Constructors

Link copied to clipboard
constructor()

Functions

Link copied to clipboard
@Named(value = "dlqInputQueue")
@Singleton
fun dlqInputQueue(@Named(value = "globalMemoryManager") globalMemoryManager: ReservationManager): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>

This queue is the input queue of a "traditional" ObjectStorageLoadPipeline

Link copied to clipboard
@Singleton
fun dlqPipelineFactory(@Named(value = "dlqInputQueue") dlqInputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>, @Named(value = "dlqPipelineSteps") dlqPipelineSteps: List<LoadPipelineStep>, pipelineStepTaskFactory: LoadPipelineStepTaskFactory, objectLoader: ObjectLoader): DlqPipelineFactory

The end goal of this file.

Link copied to clipboard
@Singleton
@Requires(bean = ObjectStorageConfig::class, beanProperty = "type", value = "None")
@Named(value = "dlqPipelineSteps")
fun dlqPipelineSteps(): List<LoadPipelineStep>

@Singleton
@Requires(bean = ObjectStorageConfig::class, beanProperty = "type", notEquals = "None")
@Named(value = "dlqPipelineSteps")
fun <K : WithStream, T : RemoteObject<*>> dlqPipelineSteps(@Named(value = "dlqRecordFormatterStep") formatterStep: ObjectLoaderPartFormatterStep, @Named(value = "recordPartLoaderStep") loaderStep: ObjectLoaderPartLoaderStep<T>, @Named(value = "recordUploadCompleterStep") completerStep: ObjectLoaderUploadCompleterStep<K, T>): List<LoadPipelineStep>

References the traditional ObjectStorage pipeline steps.

Link copied to clipboard

Add a Singleton to extract the ObjectStorageConfig from the DestinationConfiguration.

Link copied to clipboard
@Named(value = "dlqRecordFormatterStep")
@Singleton
fun <T : OutputStream> partFormatterStep(@Named(value = "numInputPartitions") numInputPartitions: Int, partFormatter: ObjectLoaderPartFormatter<T>, @Named(value = "dlqInputQueue") inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>, @Named(value = "objectLoaderPartQueue") outputQueue: PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartFormatter.FormattedPart>>, taskFactory: LoadPipelineStepTaskFactory, flushStrategy: PipelineFlushStrategy): ObjectLoaderPartFormatterStep

This is the start of the "traditional" object storage pipeline. However, in order for the updated pipeline to work, this recreates the ObjectLoaderPartFormatterStep but with the dlqInputQueue as in input instead of the actual input of the destination which is used by the DlqLoader in this case.

Link copied to clipboard

Singleton to extract the different object storage related config providers for S3