Skip to content

Subscribe to Kafka for metadata ingestion

Mark Woodhall requested to merge feature/subscribe-to-kafka into develop

The REST API can be run in a profile that will cause it to ingest metadata from Kafka. An example of this can be seen below.

AWS_PROFILE=crossref-staging AWS_REGION=eu-west-1 KAFKA_HOST=kafka1:9092 lein run :nrepl :api :kafka-ingest-xml :kafka-ingest-citation

When run with kafka-ingest-xml or kafka-ingest-citation the rest API will start tasks that will consume data from two Kafka topics.

  • metadata_s3_update_xml
  • metadata_s3_update_citation

These Kafka topics can be configured through environment variables KAFKA_INPUT_TOPIC_XML and KAFKA_INPUT_TOPIC_CITATION respectively.

Data is produced to the above topics by the Kafka Pusher, you can read more about the message format here but the TLDR is that the Kafka Pusher pushes s3_key and s3_bucket wrapped in a message, this will happen whenever new metadata is added to the metadata bucket.

The REST API handles the above messages by downloading the s3_key from the s3_bucket and ingesting the metadata.

Bulk ingestion

The REST API can also be run in a profile that will cause it to push metadata keys to Kafka, just like the Kafka Pusher does. An example of this can be seen below.

AWS_PROFILE=crossref-staging AWS_REGION=eu-west-1 KAFKA_HOST=kafka1:9092 lein run :nrepl :api :kafka-ingest-s3-xml

When run with kafka-ingest-s3-xml or kafka-ingest-s3-citation the rest API will start tasks that will fetch all keys from the metadata bucket.

Each of the keys fetched from the metadata bucket will be pushed to Kafka in the form {"s3_key": "the-key", "s3_bucket": "Name of bucket"}. These messages will be pushed to the KAFKA_INPUT_TOPIC_XML and KAFKA_INPUT_TOPIC_CITATION topics respectively, after which they will be handled by the incremental ingestion mechanism.

Edited by Mark Woodhall

Merge request reports