Class KafkaCrossDcConsumer
java.lang.Object
org.apache.solr.crossdc.manager.consumer.Consumer.CrossDcConsumer
org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer
- All Implemented Interfaces:
Runnable
This is a Java class called KafkaCrossDcConsumer, which is part of the Apache Solr framework. It
consumes messages from Kafka and mirrors them into a Solr instance. It uses a KafkaConsumer
object to subscribe to one or more topics and receive ConsumerRecords that contain
MirroredSolrRequest objects. The SolrMessageProcessor handles each MirroredSolrRequest and sends
the resulting UpdateRequest to the CloudSolrClient for indexing. A ThreadPoolExecutor is used to
handle the update requests asynchronously. The KafkaCrossDcConsumer also handles offset
management, committing offsets to Kafka and can seek to specific offsets for error recovery. The
class provides methods to start and top the consumer thread.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classSupplier for creating and managing a working CloudSolrClient instance. -
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionorg.apache.kafka.clients.consumer.KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Properties properties) protected KafkaMirroringSinkprotected KafkaCrossDcConsumer.SolrClientSupplierprotected SolrMessageProcessorprotected voidprocessResult(MirroredSolrRequest.Type type, IQueueHandler.Result<MirroredSolrRequest<?>> result) voidrun()This is where the magic happens.voidsendBatch(org.apache.solr.client.solrj.SolrRequest<? extends org.apache.solr.client.solrj.SolrResponse> solrReqBatch, MirroredSolrRequest.Type type, org.apache.kafka.clients.consumer.ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord, org.apache.solr.crossdc.manager.consumer.PartitionManager.WorkUnit workUnit) final voidshutdown()Shutdown the Kafka consumer by calling wakeup.
-
Field Details
-
solrClientSupplier
-
-
Constructor Details
-
KafkaCrossDcConsumer
- Parameters:
conf- The Kafka consumer configurationstartLatch- To inform the caller when the Consumer has started
-
-
Method Details
-
createSolrClientSupplier
-
createSolrMessageProcessor
-
createKafkaConsumer
public org.apache.kafka.clients.consumer.KafkaConsumer<String,MirroredSolrRequest<?>> createKafkaConsumer(Properties properties) -
createKafkaMirroringSink
-
run
public void run()This is where the magic happens.- Polls and gets the packets from the queue
- Extract the MirroredSolrRequest objects
- Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
-
sendBatch
public void sendBatch(org.apache.solr.client.solrj.SolrRequest<? extends org.apache.solr.client.solrj.SolrResponse> solrReqBatch, MirroredSolrRequest.Type type, org.apache.kafka.clients.consumer.ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord, org.apache.solr.crossdc.manager.consumer.PartitionManager.WorkUnit workUnit) -
processResult
protected void processResult(MirroredSolrRequest.Type type, IQueueHandler.Result<MirroredSolrRequest<?>> result) throws MirroringException - Throws:
MirroringException
-
shutdown
public final void shutdown()Shutdown the Kafka consumer by calling wakeup.
-