DestinationTaskLauncher
@Singleton
Governs the task workflow for the entire destination life-cycle.
The domain is "decide what to do next given the reported results of the individual task."
The workflow is as follows:
Start the destination setup task.
Start the spill-to-disk task for each stream
When setup completes, start the open stream task for each stream
When each new spilled file is ready, start the process records task
(This task will wait if open stream is not yet complete for that stream.)Content copied to clipboard
When each batch is ready
- update the batch state in the stream manager
- if the batch is not complete, start the process batch task
- if the batch is complete and all batches are complete, start the close stream taskContent copied to clipboard
When the stream is closed
- mark the stream as closed in the stream manager
- start the teardown task
(The teardown task will only run once, and only after all streams are closed.)Content copied to clipboard
When the teardown task is complete, stop the task launcher
// TODO: Capture failures, retry, and call into close(failure=true) if can't recover.
Constructors
Link copied to clipboard
constructor(taskScopeProvider: TaskScopeProvider, catalog: DestinationCatalog, config: DestinationConfiguration, syncManager: SyncManager, inputConsumerTask: InputConsumerTask? = null, heartbeatTask: HeartbeatTask? = null, updateBatchTask: UpdateBatchStateTaskFactory, statsEmitter: StatsEmitter? = null, setupTaskFactory: SetupTaskFactory, openStreamTask: OpenStreamTask, closeStreamTaskFactory: CloseStreamTaskFactory, teardownTaskFactory: TeardownTaskFactory, loadPipeline: LoadPipeline?, updateCheckpointsTask: UpdateCheckpointsTask, failStreamTaskFactory: FailStreamTaskFactory, failSyncTaskFactory: FailSyncTaskFactory, @Named(value = "openStreamQueue") openStreamQueue: MessageQueue<DestinationStream>, @Named(value = "defaultDestinationTaskLauncherHasThrown") hasThrown: AtomicBoolean)