Verified Commit d8b5564d authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

fix(indexer): skip watermark update when no rows are indexed

parent 7e6ca3d1
Loading
Loading
Loading
Loading
+121 −34
Original line number Diff line number Diff line
@@ -107,11 +107,17 @@ impl Handler for GlobalHandler {

        let mut errors = Vec::new();
        let mut successful_pipelines = 0;
        let mut total_rows_indexed: u64 = 0;
        for pipeline in &self.pipelines {
            if let Err(error) = pipeline
            match pipeline
                .process(params.to_json(), context.destination.as_ref(), "global")
                .await
            {
                Ok(rows) => {
                    successful_pipelines += 1;
                    total_rows_indexed += rows;
                }
                Err(error) => {
                    error!(entity = pipeline.entity_name(), %error, "pipeline processing failed");
                    self.metrics.pipeline_errors.add(
                        1,
@@ -122,15 +128,14 @@ impl Handler for GlobalHandler {
                        ],
                    );
                    errors.push((pipeline.entity_name().to_string(), error));
            } else {
                successful_pipelines += 1;
                }
            }
        }

        let elapsed = started_at.elapsed();
        let handler_labels = [KeyValue::new("handler", "global-handler")];

        if errors.is_empty() {
        if errors.is_empty() && total_rows_indexed > 0 {
            self.watermark_store
                .set_global_watermark(&payload.watermark)
                .await
@@ -155,7 +160,9 @@ impl Handler for GlobalHandler {
                watermark = %payload.watermark.format(TIMESTAMP_FORMAT),
                "global watermark updated"
            );
        }

        if errors.is_empty() {
            if let Err(error) = context.lock_service.release(global_lock_key()).await {
                error!(%error, "failed to release global lock, will expire via TTL");
            }
@@ -197,11 +204,13 @@ impl Handler for GlobalHandler {
#[cfg(test)]
mod tests {
    use super::*;
    use crate::modules::sdlc::datalake::{DatalakeError, DatalakeQuery, RecordBatchStream};
    use crate::modules::sdlc::test_fixtures::{
        EmptyDatalake, MockWatermarkStore, NonEmptyDatalake,
    };
    use crate::testkit::{MockDestination, MockLockService, MockNatsServices, TestEnvelopeFactory};
    use futures::stream;
    use ontology::{DataType, EtlConfig, EtlScope, Field, NodeEntity, Ontology};
    use std::collections::BTreeMap;
    use std::sync::Mutex;

    fn test_metrics() -> SdlcMetrics {
        let provider = opentelemetry::global::meter_provider();
@@ -209,15 +218,33 @@ mod tests {
        SdlcMetrics::with_meter(&meter)
    }

    struct MockWatermarkStore;
    struct RecordingGlobalWatermarkStore {
        watermark: Mutex<Option<DateTime<Utc>>>,
    }

    impl RecordingGlobalWatermarkStore {
        fn new() -> Self {
            Self {
                watermark: Mutex::new(None),
            }
        }

        fn stored_watermark(&self) -> Option<DateTime<Utc>> {
            *self.watermark.lock().unwrap()
        }
    }

    #[async_trait]
    impl WatermarkStore for MockWatermarkStore {
    impl WatermarkStore for RecordingGlobalWatermarkStore {
        async fn get_global_watermark(&self) -> Result<DateTime<Utc>, WatermarkError> {
            Ok(DateTime::<Utc>::UNIX_EPOCH)
        }

        async fn set_global_watermark(&self, _: &DateTime<Utc>) -> Result<(), WatermarkError> {
        async fn set_global_watermark(
            &self,
            watermark: &DateTime<Utc>,
        ) -> Result<(), WatermarkError> {
            *self.watermark.lock().unwrap() = Some(*watermark);
            Ok(())
        }

@@ -239,19 +266,6 @@ mod tests {
        }
    }

    struct MockDatalake;

    #[async_trait]
    impl DatalakeQuery for MockDatalake {
        async fn query_arrow(
            &self,
            _sql: &str,
            _params: serde_json::Value,
        ) -> Result<RecordBatchStream<'_>, DatalakeError> {
            Ok(Box::pin(stream::empty()))
        }
    }

    fn create_test_node(name: &str, destination_table: &str, source: &str) -> NodeEntity {
        NodeEntity {
            name: name.to_string(),
@@ -282,7 +296,7 @@ mod tests {

    #[tokio::test]
    async fn handle_processes_pipelines() {
        let datalake = Arc::new(MockDatalake);
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let user_node = create_test_node("User", "gl_user", "siphon_users");
        let project_node = create_test_node("Project", "gl_project", "siphon_projects");
@@ -321,7 +335,7 @@ mod tests {

    #[tokio::test]
    async fn handler_releases_lock_on_success() {
        let datalake = Arc::new(MockDatalake);
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let user_node = create_test_node("User", "gl_user", "siphon_users");

@@ -356,4 +370,77 @@ mod tests {
            "global lock should be released after successful processing"
        );
    }

    #[tokio::test]
    async fn watermark_updated_when_rows_indexed() {
        let datalake = Arc::new(NonEmptyDatalake);
        let ontology = Ontology::new();
        let user_node = create_test_node("User", "gl_user", "siphon_users");

        let pipelines = vec![
            OntologyEntityPipeline::from_node(&user_node, &ontology, datalake, test_metrics())
                .unwrap(),
        ];

        let store = Arc::new(RecordingGlobalWatermarkStore::new());
        let handler = GlobalHandler::new(store.clone(), pipelines, test_metrics());

        let payload = serde_json::json!({
            "watermark": "2024-06-15T12:00:00Z"
        })
        .to_string();
        let envelope = TestEnvelopeFactory::simple(&payload);

        let destination = Arc::new(MockDestination::new());
        let context = HandlerContext::new(
            destination,
            Arc::new(MockNatsServices::new()),
            Arc::new(MockLockService::new()),
        );

        handler.handle(context, envelope).await.unwrap();

        let expected = "2024-06-15T12:00:00Z".parse::<DateTime<Utc>>().unwrap();
        assert_eq!(
            store.stored_watermark(),
            Some(expected),
            "global watermark should be updated when rows were indexed"
        );
    }

    #[tokio::test]
    async fn watermark_not_updated_when_no_rows_indexed() {
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let user_node = create_test_node("User", "gl_user", "siphon_users");

        let pipelines = vec![
            OntologyEntityPipeline::from_node(&user_node, &ontology, datalake, test_metrics())
                .unwrap(),
        ];

        let store = Arc::new(RecordingGlobalWatermarkStore::new());
        let handler = GlobalHandler::new(store.clone(), pipelines, test_metrics());

        let payload = serde_json::json!({
            "watermark": "2024-06-15T12:00:00Z"
        })
        .to_string();
        let envelope = TestEnvelopeFactory::simple(&payload);

        let destination = Arc::new(MockDestination::new());
        let context = HandlerContext::new(
            destination,
            Arc::new(MockNatsServices::new()),
            Arc::new(MockLockService::new()),
        );

        handler.handle(context, envelope).await.unwrap();

        assert_eq!(
            store.stored_watermark(),
            None,
            "global watermark should not be updated when no rows were indexed"
        );
    }
}
+94 −42
Original line number Diff line number Diff line
@@ -174,66 +174,75 @@ impl Module for SdlcModule {
}

#[cfg(test)]
mod tests {
    use super::*;
pub(crate) mod test_fixtures {
    use std::sync::Arc;

    #[test]
    fn create_global_pipelines_returns_global_entities() {
        let ontology = Ontology::load_embedded().expect("should load ontology");
        let module = SdlcModule::with_ontology(
            Arc::new(MockDatalake),
            Arc::new(MockWatermarkStore),
            ontology,
        );
    use arrow::array::{BooleanArray, Int64Array};
    use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema};
    use arrow::record_batch::RecordBatch;
    use async_trait::async_trait;
    use chrono::{DateTime, Utc};
    use futures::stream;

        let pipelines = module.create_global_pipelines();
    use super::datalake::{DatalakeError, DatalakeQuery, RecordBatchStream};
    use super::watermark_store::{WatermarkError, WatermarkStore};

        let entity_names: Vec<_> = pipelines.iter().map(|p| p.entity_name()).collect();
        assert!(entity_names.contains(&"User"), "should include User entity");
    pub(crate) struct EmptyDatalake;

    #[async_trait]
    impl DatalakeQuery for EmptyDatalake {
        async fn query_arrow(
            &self,
            _sql: &str,
            _params: serde_json::Value,
        ) -> Result<RecordBatchStream<'_>, DatalakeError> {
            Ok(Box::pin(stream::empty()))
        }
    }

    #[test]
    fn create_namespace_pipelines_returns_namespaced_entities() {
        let ontology = Ontology::load_embedded().expect("should load ontology");
        let module = SdlcModule::with_ontology(
            Arc::new(MockDatalake),
            Arc::new(MockWatermarkStore),
            ontology,
        );
    pub(crate) struct NonEmptyDatalake;

        let pipelines = module.create_namespace_pipelines();
    #[async_trait]
    impl DatalakeQuery for NonEmptyDatalake {
        async fn query_arrow(
            &self,
            _sql: &str,
            _params: serde_json::Value,
        ) -> Result<RecordBatchStream<'_>, DatalakeError> {
            let schema = Arc::new(Schema::new(vec![
                ArrowField::new("id", ArrowDataType::Int64, false),
                ArrowField::new("_version", ArrowDataType::Int64, false),
                ArrowField::new("_deleted", ArrowDataType::Boolean, false),
            ]));

            let batch = RecordBatch::try_new(
                schema,
                vec![
                    Arc::new(Int64Array::from(vec![1])),
                    Arc::new(Int64Array::from(vec![1])),
                    Arc::new(BooleanArray::from(vec![false])),
                ],
            )
            .unwrap();

        let entity_names: Vec<_> = pipelines.iter().map(|p| p.entity_name()).collect();
        assert!(
            entity_names.contains(&"Group"),
            "should include Group entity"
        );
        assert!(
            entity_names.contains(&"Project"),
            "should include Project entity"
        );
            Ok(Box::pin(stream::once(async { Ok(batch) })))
        }
    }

    use async_trait::async_trait;
    use chrono::{DateTime, Utc};
    use datalake::{DatalakeError, RecordBatchStream};
    use futures::stream;
    use watermark_store::WatermarkError;

    struct MockDatalake;
    pub(crate) struct FailingDatalake;

    #[async_trait]
    impl DatalakeQuery for MockDatalake {
    impl DatalakeQuery for FailingDatalake {
        async fn query_arrow(
            &self,
            _sql: &str,
            _params: serde_json::Value,
        ) -> Result<RecordBatchStream<'_>, DatalakeError> {
            Ok(Box::pin(stream::empty()))
            Err(DatalakeError::Query("simulated failure".to_string()))
        }
    }

    struct MockWatermarkStore;
    pub(crate) struct MockWatermarkStore;

    #[async_trait]
    impl WatermarkStore for MockWatermarkStore {
@@ -263,3 +272,46 @@ mod tests {
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use test_fixtures::{EmptyDatalake, MockWatermarkStore};

    #[test]
    fn create_global_pipelines_returns_global_entities() {
        let ontology = Ontology::load_embedded().expect("should load ontology");
        let module = SdlcModule::with_ontology(
            Arc::new(EmptyDatalake),
            Arc::new(MockWatermarkStore),
            ontology,
        );

        let pipelines = module.create_global_pipelines();

        let entity_names: Vec<_> = pipelines.iter().map(|p| p.entity_name()).collect();
        assert!(entity_names.contains(&"User"), "should include User entity");
    }

    #[test]
    fn create_namespace_pipelines_returns_namespaced_entities() {
        let ontology = Ontology::load_embedded().expect("should load ontology");
        let module = SdlcModule::with_ontology(
            Arc::new(EmptyDatalake),
            Arc::new(MockWatermarkStore),
            ontology,
        );

        let pipelines = module.create_namespace_pipelines();

        let entity_names: Vec<_> = pipelines.iter().map(|p| p.entity_name()).collect();
        assert!(
            entity_names.contains(&"Group"),
            "should include Group entity"
        );
        assert!(
            entity_names.contains(&"Project"),
            "should include Project entity"
        );
    }
}
+138 −119
Original line number Diff line number Diff line
@@ -71,6 +71,38 @@ impl NamespaceHandler {
        }
    }

    async fn advance_namespace_watermark(
        &self,
        namespace_id: i64,
        entity: &str,
        watermark: &DateTime<Utc>,
    ) -> Result<(), HandlerError> {
        self.watermark_store
            .set_namespace_watermark(namespace_id, entity, watermark)
            .await
            .map_err(|error| {
                error!(namespace_id, entity, %error, "failed to update namespace watermark");
                HandlerError::Processing(format!(
                    "failed to update namespace watermark for {entity}: {error}"
                ))
            })?;

        let lag = Utc::now()
            .signed_duration_since(*watermark)
            .num_milliseconds()
            .max(0) as f64
            / 1000.0;
        self.metrics.watermark_lag.record(
            lag,
            &[
                KeyValue::new("entity", entity.to_owned()),
                KeyValue::new("scope", "namespace"),
            ],
        );

        Ok(())
    }

    async fn resolve_namespace_watermark(&self, namespace_id: i64, entity: &str) -> DateTime<Utc> {
        match self
            .watermark_store
@@ -144,10 +176,12 @@ impl Handler for NamespaceHandler {
                &payload.watermark,
            );

            if let Err(error) = pipeline
            let rows_indexed = match pipeline
                .process(params.to_json(), context.destination.as_ref(), "namespace")
                .await
            {
                Ok(rows) => rows,
                Err(error) => {
                    error!(namespace_id = payload.namespace, entity, %error, "entity pipeline failed");
                    self.metrics.pipeline_errors.add(
                        1,
@@ -160,27 +194,12 @@ impl Handler for NamespaceHandler {
                    errors.push((entity.to_string(), error));
                    continue;
                }
            };

            self.watermark_store
                .set_namespace_watermark(payload.namespace, entity, &payload.watermark)
                .await
                .map_err(|error| {
                    error!(namespace_id = payload.namespace, entity, %error, "failed to update namespace watermark");
                    HandlerError::Processing(format!("failed to update namespace watermark for {entity}: {error}"))
                })?;

            let lag = Utc::now()
                .signed_duration_since(payload.watermark)
                .num_milliseconds()
                .max(0) as f64
                / 1000.0;
            self.metrics.watermark_lag.record(
                lag,
                &[
                    KeyValue::new("entity", entity.to_owned()),
                    KeyValue::new("scope", "namespace"),
                ],
            );
            if rows_indexed > 0 {
                self.advance_namespace_watermark(payload.namespace, entity, &payload.watermark)
                    .await?;
            }

            successful_entity_pipelines += 1;
        }
@@ -198,10 +217,12 @@ impl Handler for NamespaceHandler {
                &payload.watermark,
            );

            if let Err(error) = edge_pipeline
            let rows_indexed = match edge_pipeline
                .process(params.to_json(), context.destination.as_ref(), "namespace")
                .await
            {
                Ok(rows) => rows,
                Err(error) => {
                    error!(namespace_id = payload.namespace, edge = entity, %error, "edge pipeline failed");
                    self.metrics.pipeline_errors.add(
                        1,
@@ -214,27 +235,12 @@ impl Handler for NamespaceHandler {
                    errors.push((entity.to_string(), error));
                    continue;
                }
            };

            self.watermark_store
                .set_namespace_watermark(payload.namespace, entity, &payload.watermark)
                .await
                .map_err(|error| {
                    error!(namespace_id = payload.namespace, edge = entity, %error, "failed to update namespace edge watermark");
                    HandlerError::Processing(format!("failed to update namespace watermark for {entity}: {error}"))
                })?;

            let lag = Utc::now()
                .signed_duration_since(payload.watermark)
                .num_milliseconds()
                .max(0) as f64
                / 1000.0;
            self.metrics.watermark_lag.record(
                lag,
                &[
                    KeyValue::new("entity", entity.to_owned()),
                    KeyValue::new("scope", "namespace"),
                ],
            );
            if rows_indexed > 0 {
                self.advance_namespace_watermark(payload.namespace, entity, &payload.watermark)
                    .await?;
            }

            successful_edge_pipelines += 1;
        }
@@ -294,9 +300,9 @@ impl Handler for NamespaceHandler {
#[cfg(test)]
mod tests {
    use super::*;
    use crate::modules::sdlc::datalake::{DatalakeError, DatalakeQuery, RecordBatchStream};
    use crate::modules::sdlc::datalake::DatalakeQuery;
    use crate::modules::sdlc::test_fixtures::{EmptyDatalake, FailingDatalake, NonEmptyDatalake};
    use crate::testkit::{MockDestination, MockLockService, MockNatsServices, TestEnvelopeFactory};
    use futures::stream;
    use ontology::{DataType, EtlConfig, EtlScope, Field, NodeEntity, Ontology};
    use std::collections::{BTreeMap, HashMap};
    use std::sync::Mutex;
@@ -398,32 +404,6 @@ mod tests {
        }
    }

    struct MockDatalake;

    #[async_trait]
    impl DatalakeQuery for MockDatalake {
        async fn query_arrow(
            &self,
            _sql: &str,
            _params: serde_json::Value,
        ) -> Result<RecordBatchStream<'_>, DatalakeError> {
            Ok(Box::pin(stream::empty()))
        }
    }

    struct FailingDatalake;

    #[async_trait]
    impl DatalakeQuery for FailingDatalake {
        async fn query_arrow(
            &self,
            _sql: &str,
            _params: serde_json::Value,
        ) -> Result<RecordBatchStream<'_>, DatalakeError> {
            Err(DatalakeError::Query("simulated failure".to_string()))
        }
    }

    fn create_test_node(name: &str, destination_table: &str, source_table: &str) -> NodeEntity {
        NodeEntity {
            name: name.to_string(),
@@ -454,7 +434,7 @@ mod tests {

    #[tokio::test]
    async fn handle_processes_pipelines() {
        let datalake = Arc::new(MockDatalake);
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let group_node = create_test_node("Group", "gl_group", "groups");
        let issue_node = create_test_node("Issue", "gl_issue", "issues");
@@ -500,7 +480,7 @@ mod tests {

    #[tokio::test]
    async fn handler_releases_lock_on_success() {
        let datalake = Arc::new(MockDatalake);
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let group_node = create_test_node("Group", "gl_group", "groups");

@@ -545,9 +525,55 @@ mod tests {
        );
    }

    #[tokio::test]
    async fn watermark_not_updated_when_no_rows_indexed() {
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let group_node = create_test_node("Group", "gl_group", "groups");
        let issue_node = create_test_node("Issue", "gl_issue", "issues");

        let pipelines = vec![
            OntologyEntityPipeline::from_node(
                &group_node,
                &ontology,
                datalake.clone(),
                test_metrics(),
            )
            .unwrap(),
            OntologyEntityPipeline::from_node(&issue_node, &ontology, datalake, test_metrics())
                .unwrap(),
        ];

        let store = Arc::new(RecordingWatermarkStore::new());
        let handler = NamespaceHandler::new(store.clone(), pipelines, vec![], test_metrics());

        let payload = serde_json::json!({
            "organization": 1,
            "namespace": 100,
            "watermark": "2024-06-15T12:00:00Z"
        })
        .to_string();
        let envelope = TestEnvelopeFactory::simple(&payload);

        let destination = Arc::new(MockDestination::new());
        let context = HandlerContext::new(
            destination,
            Arc::new(MockNatsServices::new()),
            Arc::new(MockLockService::new()),
        );

        handler.handle(context, envelope).await.unwrap();

        let stored = store.stored_watermarks();
        assert!(
            stored.is_empty(),
            "watermarks should not be updated when no rows were indexed"
        );
    }

    #[tokio::test]
    async fn watermark_updated_per_entity_on_success() {
        let datalake = Arc::new(MockDatalake);
        let datalake = Arc::new(NonEmptyDatalake);
        let ontology = Ontology::new();
        let group_node = create_test_node("Group", "gl_group", "groups");
        let issue_node = create_test_node("Issue", "gl_issue", "issues");
@@ -610,7 +636,7 @@ mod tests {

    #[tokio::test]
    async fn failed_pipeline_does_not_update_its_watermark() {
        let ok_datalake = Arc::new(MockDatalake);
        let ok_datalake = Arc::new(EmptyDatalake);
        let failing_datalake: Arc<dyn DatalakeQuery> = Arc::new(FailingDatalake);
        let ontology = Ontology::new();

@@ -671,8 +697,8 @@ mod tests {
        };

        assert!(
            stored.contains_key(&group_key),
            "Group watermark should be stored since it succeeded"
            !stored.contains_key(&group_key),
            "Group watermark should not be stored since no rows were indexed"
        );
        assert!(
            !stored.contains_key(&issue_key),
@@ -682,7 +708,7 @@ mod tests {

    #[tokio::test]
    async fn processing_continues_after_earlier_pipeline_fails() {
        let ok_datalake = Arc::new(MockDatalake);
        let ok_datalake = Arc::new(EmptyDatalake);
        let failing_datalake: Arc<dyn DatalakeQuery> = Arc::new(FailingDatalake);
        let ontology = Ontology::new();

@@ -743,14 +769,14 @@ mod tests {
            "Group watermark should not be stored since it failed"
        );
        assert!(
            stored.contains_key(&issue_key),
            "Issue watermark should be stored even though an earlier pipeline failed"
            !stored.contains_key(&issue_key),
            "Issue watermark should not be stored since no rows were indexed"
        );
    }

    #[tokio::test]
    async fn each_entity_resolves_its_own_watermark() {
        let datalake = Arc::new(MockDatalake);
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let group_node = create_test_node("Group", "gl_group", "groups");
        let issue_node = create_test_node("Issue", "gl_issue", "issues");
@@ -794,20 +820,19 @@ mod tests {
        );

        // The handler should resolve different watermarks per entity.
        // Both succeed, so both get the new watermark stored.
        // Both succeed but return zero rows, so no watermarks are stored.
        handler.handle(context, envelope).await.unwrap();

        let stored = store.stored_watermarks();
        assert_eq!(
            stored.len(),
            2,
            "both entities should have updated watermarks"
        assert!(
            stored.is_empty(),
            "no watermarks should be updated when no rows were indexed"
        );
    }

    #[tokio::test]
    async fn watermark_set_failure_returns_handler_error() {
        let datalake = Arc::new(MockDatalake);
    async fn zero_rows_skips_watermark_set_even_if_store_would_fail() {
        let datalake = Arc::new(EmptyDatalake);
        let ontology = Ontology::new();
        let group_node = create_test_node("Group", "gl_group", "groups");

@@ -836,14 +861,8 @@ mod tests {

        let result = handler.handle(context, envelope).await;
        assert!(
            result.is_err(),
            "should propagate watermark write failure as HandlerError"
        );

        let error_message = result.unwrap_err().to_string();
        assert!(
            error_message.contains("failed to update namespace watermark for Group"),
            "error should identify the entity: {error_message}"
            result.is_ok(),
            "should succeed because watermark set is skipped when no rows were indexed"
        );
    }

+4 −4
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ impl OntologyEntityPipeline {
        params: Value,
        destination: &dyn Destination,
        scope: &str,
    ) -> Result<(), HandlerError> {
    ) -> Result<u64, HandlerError> {
        let started_at = Instant::now();
        let labels = [
            KeyValue::new("entity", self.entity_name.clone()),
@@ -166,7 +166,7 @@ impl OntologyEntityPipeline {
            );
        }

        Ok(())
        Ok(total_rows)
    }

    async fn transform_and_write_batch(
@@ -306,7 +306,7 @@ impl OntologyEdgePipeline {
        params: Value,
        destination: &dyn Destination,
        scope: &str,
    ) -> Result<(), HandlerError> {
    ) -> Result<u64, HandlerError> {
        let started_at = Instant::now();
        let labels = [
            KeyValue::new("entity", self.relationship_kind.clone()),
@@ -403,7 +403,7 @@ impl OntologyEdgePipeline {
            );
        }

        Ok(())
        Ok(total_rows)
    }

    async fn transform_and_write_batch(