Subscribe to Kafka for metadata ingestion
The REST API can be run in a profile that will cause it to ingest metadata from Kafka. An example of this can be seen below.
AWS_PROFILE=crossref-staging AWS_REGION=eu-west-1 KAFKA_HOST=kafka1:9092 lein run :nrepl :api :kafka-ingest-xml :kafka-ingest-citation
When run with kafka-ingest-xml
or kafka-ingest-citation
the rest API will start tasks that will consume data from two Kafka topics.
- metadata_s3_update_xml
- metadata_s3_update_citation
These Kafka topics can be configured through environment variables KAFKA_INPUT_TOPIC_XML
and KAFKA_INPUT_TOPIC_CITATION
respectively.
Data is produced to the above topics by the Kafka Pusher, you can read more about the message format
here but the TLDR is that the Kafka Pusher pushes s3_key
and s3_bucket
wrapped in a message, this
will happen whenever new metadata is added to the metadata bucket.
The REST API handles the above messages by downloading the s3_key
from the s3_bucket
and ingesting the metadata.
Bulk ingestion
The REST API can also be run in a profile that will cause it to push metadata keys to Kafka, just like the Kafka Pusher does. An example of this can be seen below.
AWS_PROFILE=crossref-staging AWS_REGION=eu-west-1 KAFKA_HOST=kafka1:9092 lein run :nrepl :api :kafka-ingest-s3-xml
When run with kafka-ingest-s3-xml
or kafka-ingest-s3-citation
the rest API will start tasks that will fetch all keys from the metadata bucket.
Each of the keys fetched from the metadata bucket will be pushed to Kafka in the form {"s3_key": "the-key", "s3_bucket": "Name of bucket"}
. These
messages will be pushed to the KAFKA_INPUT_TOPIC_XML
and KAFKA_INPUT_TOPIC_CITATION
topics respectively, after which they will be
handled by the incremental ingestion mechanism.