DefaultPipelineFlushStrategy

@Singleton
class DefaultPipelineFlushStrategy(@Value(value = "${airbyte.destination.core.record-batch-size-override:null}") microBatchOverride: Long? = null, config: DestinationConfiguration) : PipelineFlushStrategy

This composes the two built-in flush strategies

  • if the record-batch-size-override is set, flush every record (ignore the actual value, this is just to make some of our unit tests work)

  • if the data held in flight the configured max data age, flush

NOTE: If you override this, you must at least provide the microbatch behavior to keep tests from breaking. TODO: make the pipeline inject the microbatching no matter what, so no dev has to think about it.

Microbatching means finishing work for every record. We do this in integration tests to guarantee that state gets flushed, so multi-sync tests that test recovery can wait for a state ack before killing the connector. (Otherwise they might hang forever.) We can probably get rid of this if we can better guarantee that all in-flight work is flushed during exception handling: https://github.com/airbytehq/airbyte-internal-issues/issues/12310

Constructors

Link copied to clipboard
constructor(@Value(value = "${airbyte.destination.core.record-batch-size-override:null}") microBatchOverride: Long? = null, config: DestinationConfiguration)

Functions

Link copied to clipboard
open override fun shouldFlush(inputCount: Long, dataAgeMs: Long): Boolean