DestinationTaskLauncher

@Singleton
class DestinationTaskLauncher(taskScopeProvider: TaskScopeProvider, catalog: DestinationCatalog, config: DestinationConfiguration, syncManager: SyncManager, inputConsumerTask: InputConsumerTask? = null, heartbeatTask: HeartbeatTask? = null, updateBatchTask: UpdateBatchStateTaskFactory, statsEmitter: StatsEmitter? = null, setupTaskFactory: SetupTaskFactory, openStreamTask: OpenStreamTask, closeStreamTaskFactory: CloseStreamTaskFactory, teardownTaskFactory: TeardownTaskFactory, loadPipeline: LoadPipeline?, updateCheckpointsTask: UpdateCheckpointsTask, failStreamTaskFactory: FailStreamTaskFactory, failSyncTaskFactory: FailSyncTaskFactory, @Named(value = "openStreamQueue") openStreamQueue: MessageQueue<DestinationStream>, @Named(value = "defaultDestinationTaskLauncherHasThrown") hasThrown: AtomicBoolean)

Governs the task workflow for the entire destination life-cycle.

The domain is "decide what to do next given the reported results of the individual task."

The workflow is as follows:

  1. Start the destination setup task.

  2. Start the spill-to-disk task for each stream

  3. When setup completes, start the open stream task for each stream

  4. When each new spilled file is ready, start the process records task

    (This task will wait if open stream is not yet complete for that stream.)
  1. When each batch is ready

    - update the batch state in the stream manager
- if the batch is not complete, start the process batch task
- if the batch is complete and all batches are complete, start the close stream task
  1. When the stream is closed

    - mark the stream as closed in the stream manager
- start the teardown task
(The teardown task will only run once, and only after all streams are closed.)
  1. When the teardown task is complete, stop the task launcher

// TODO: Capture failures, retry, and call into close(failure=true) if can't recover.

Constructors

Link copied to clipboard
constructor(taskScopeProvider: TaskScopeProvider, catalog: DestinationCatalog, config: DestinationConfiguration, syncManager: SyncManager, inputConsumerTask: InputConsumerTask? = null, heartbeatTask: HeartbeatTask? = null, updateBatchTask: UpdateBatchStateTaskFactory, statsEmitter: StatsEmitter? = null, setupTaskFactory: SetupTaskFactory, openStreamTask: OpenStreamTask, closeStreamTaskFactory: CloseStreamTaskFactory, teardownTaskFactory: TeardownTaskFactory, loadPipeline: LoadPipeline?, updateCheckpointsTask: UpdateCheckpointsTask, failStreamTaskFactory: FailStreamTaskFactory, failSyncTaskFactory: FailSyncTaskFactory, @Named(value = "openStreamQueue") openStreamQueue: MessageQueue<DestinationStream>, @Named(value = "defaultDestinationTaskLauncherHasThrown") hasThrown: AtomicBoolean)

Types

Link copied to clipboard
inner class WrappedTask(innerTask: Task) : Task

Functions

Link copied to clipboard
suspend fun handleException(e: Exception)
Link copied to clipboard
Link copied to clipboard
suspend fun handleSetupComplete()
Link copied to clipboard
suspend fun handleStreamClosed()

Called when a stream is closed.

Link copied to clipboard
Link copied to clipboard
suspend fun handleTeardownComplete(success: Boolean = true)

Called exactly once when all streams are closed.

Link copied to clipboard
suspend fun run()