Class KafkaCrossDcConsumer
- java.lang.Object
-
- org.apache.solr.crossdc.manager.consumer.Consumer.CrossDcConsumer
-
- org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer
-
- All Implemented Interfaces:
Runnable
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer
This is a Java class called KafkaCrossDcConsumer, which is part of the Apache Solr framework. It consumes messages from Kafka and mirrors them into a Solr instance. It uses a KafkaConsumer object to subscribe to one or more topics and receive ConsumerRecords that contain MirroredSolrRequest objects. The SolrMessageProcessor handles each MirroredSolrRequest and sends the resulting UpdateRequest to the CloudSolrClient for indexing. A ThreadPoolExecutor is used to handle the update requests asynchronously. The KafkaCrossDcConsumer also handles offset management, committing offsets to Kafka and can seek to specific offsets for error recovery. The class provides methods to start and top the consumer thread.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaCrossDcConsumer.SolrClientSupplier
Supplier for creating and managing a working CloudSolrClient instance.
-
Field Summary
Fields Modifier and Type Field Description protected KafkaCrossDcConsumer.SolrClientSupplier
solrClientSupplier
-
Constructor Summary
Constructors Constructor Description KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description org.apache.kafka.clients.consumer.KafkaConsumer<String,MirroredSolrRequest<?>>
createKafkaConsumer(Properties properties)
protected KafkaMirroringSink
createKafkaMirroringSink(KafkaCrossDcConf conf)
protected KafkaCrossDcConsumer.SolrClientSupplier
createSolrClientSupplier(KafkaCrossDcConf conf)
protected SolrMessageProcessor
createSolrMessageProcessor()
protected void
processResult(MirroredSolrRequest.Type type, IQueueHandler.Result<MirroredSolrRequest<?>> result)
void
run()
This is where the magic happens.void
sendBatch(org.apache.solr.client.solrj.SolrRequest<?> solrReqBatch, MirroredSolrRequest.Type type, org.apache.kafka.clients.consumer.ConsumerRecord<String,MirroredSolrRequest<?>> lastRecord, org.apache.solr.crossdc.manager.consumer.PartitionManager.WorkUnit workUnit)
void
shutdown()
Shutdown the Kafka consumer by calling wakeup.
-
-
-
Field Detail
-
solrClientSupplier
protected KafkaCrossDcConsumer.SolrClientSupplier solrClientSupplier
-
-
Constructor Detail
-
KafkaCrossDcConsumer
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch)
- Parameters:
conf
- The Kafka consumer configurationstartLatch
- To inform the caller when the Consumer has started
-
-
Method Detail
-
createSolrClientSupplier
protected KafkaCrossDcConsumer.SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf)
-
createSolrMessageProcessor
protected SolrMessageProcessor createSolrMessageProcessor()
-
createKafkaConsumer
public org.apache.kafka.clients.consumer.KafkaConsumer<String,MirroredSolrRequest<?>> createKafkaConsumer(Properties properties)
-
createKafkaMirroringSink
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf)
-
run
public void run()
This is where the magic happens.- Polls and gets the packets from the queue
- Extract the MirroredSolrRequest objects
- Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
-
sendBatch
public void sendBatch(org.apache.solr.client.solrj.SolrRequest<?> solrReqBatch, MirroredSolrRequest.Type type, org.apache.kafka.clients.consumer.ConsumerRecord<String,MirroredSolrRequest<?>> lastRecord, org.apache.solr.crossdc.manager.consumer.PartitionManager.WorkUnit workUnit)
-
processResult
protected void processResult(MirroredSolrRequest.Type type, IQueueHandler.Result<MirroredSolrRequest<?>> result) throws MirroringException
- Throws:
MirroringException
-
shutdown
public final void shutdown()
Shutdown the Kafka consumer by calling wakeup.
-
-