Class KafkaCrossDcConsumer
- java.lang.Object
-
- org.apache.solr.crossdc.manager.consumer.Consumer.CrossDcConsumer
-
- org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer
-
- All Implemented Interfaces:
Runnable
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer
This is a Java class called KafkaCrossDcConsumer, which is part of the Apache Solr framework. It consumes messages from Kafka and mirrors them into a Solr instance. It uses a KafkaConsumer object to subscribe to one or more topics and receive ConsumerRecords that contain MirroredSolrRequest objects. The SolrMessageProcessor handles each MirroredSolrRequest and sends the resulting UpdateRequest to the CloudSolrClient for indexing. A ThreadPoolExecutor is used to handle the update requests asynchronously. The KafkaCrossDcConsumer also handles offset management, committing offsets to Kafka and can seek to specific offsets for error recovery. The class provides methods to start and top the consumer thread.
-
-
Field Summary
Fields Modifier and Type Field Description protected org.apache.solr.client.solrj.impl.CloudSolrClient
solrClient
-
Constructor Summary
Constructors Constructor Description KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description org.apache.kafka.clients.consumer.KafkaConsumer<String,MirroredSolrRequest<?>>
createKafkaConsumer(Properties properties)
protected KafkaMirroringSink
createKafkaMirroringSink(KafkaCrossDcConf conf)
protected org.apache.solr.client.solrj.impl.CloudSolrClient
createSolrClient(KafkaCrossDcConf conf)
protected SolrMessageProcessor
createSolrMessageProcessor()
protected void
processResult(MirroredSolrRequest.Type type, IQueueHandler.Result<MirroredSolrRequest<?>> result)
void
run()
This is where the magic happens.void
sendBatch(org.apache.solr.client.solrj.SolrRequest<?> solrReqBatch, MirroredSolrRequest.Type type, org.apache.kafka.clients.consumer.ConsumerRecord<String,MirroredSolrRequest<?>> lastRecord, org.apache.solr.crossdc.manager.consumer.PartitionManager.WorkUnit workUnit)
void
shutdown()
Shutdown the Kafka consumer by calling wakeup.
-
-
-
Constructor Detail
-
KafkaCrossDcConsumer
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch)
- Parameters:
conf
- The Kafka consumer configurationstartLatch
- To inform the caller when the Consumer has started
-
-
Method Detail
-
createSolrMessageProcessor
protected SolrMessageProcessor createSolrMessageProcessor()
-
createKafkaConsumer
public org.apache.kafka.clients.consumer.KafkaConsumer<String,MirroredSolrRequest<?>> createKafkaConsumer(Properties properties)
-
run
public void run()
This is where the magic happens.- Polls and gets the packets from the queue
- Extract the MirroredSolrRequest objects
- Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
-
sendBatch
public void sendBatch(org.apache.solr.client.solrj.SolrRequest<?> solrReqBatch, MirroredSolrRequest.Type type, org.apache.kafka.clients.consumer.ConsumerRecord<String,MirroredSolrRequest<?>> lastRecord, org.apache.solr.crossdc.manager.consumer.PartitionManager.WorkUnit workUnit)
-
processResult
protected void processResult(MirroredSolrRequest.Type type, IQueueHandler.Result<MirroredSolrRequest<?>> result) throws MirroringException
- Throws:
MirroringException
-
shutdown
public final void shutdown()
Shutdown the Kafka consumer by calling wakeup.
-
createSolrClient
protected org.apache.solr.client.solrj.impl.CloudSolrClient createSolrClient(KafkaCrossDcConf conf)
-
createKafkaMirroringSink
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf)
-
-