Stream Source Reference
search
The search
function searches a SolrCloud collection and emits a stream of tuples that match the query. This is very similar to a standard Solr query, and uses many of the same parameters.
This expression allows you to specify a request hander using the qt
parameter. By default, the /select
handler is used. The /select
handler can be used for simple rapid prototyping of expressions. For production, however, you will most likely want to use the /export
handler which is designed to sort
and export
entire result sets. The /export
handler is not used by default because it has stricter requirements then the /select
handler so it’s not as easy to get started working with. To read more about the /export
handler requirements review the section Exporting Result Sets.
search Parameters
collection
: (Mandatory) the collection being searched.q
: (Mandatory) The query to perform on the Solr index.fl
: (Mandatory) The list of fields to return.sort
: (Mandatory) The sort criteria.zkHost
: Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.qt
: Specifies the query type, or request handler, to use. Set this to/export
to work with large result sets. The default is/select
.rows
: (Mandatory with the/select
handler) The rows parameter specifies how many rows to return. This parameter is only needed with the/select
handler (which is the default) since the/export
handler always returns all rows.partitionKeys
: Comma delimited list of keys to partition the search results by. To be used with the parallel function for parallelizing operations across worker nodes. See the parallel function for details.
search Syntax
expr=search(collection1,
zkHost="localhost:9983",
qt="/export",
q="*:*",
fl="id,a_s,a_i,a_f",
sort="a_f asc, a_i asc")
jdbc
The jdbc
function searches a JDBC datasource and emits a stream of tuples representing the JDBC result set. Each row in the result set is translated into a tuple and each tuple contains all the cell values for that row.
jdbc Parameters
connection
: (Mandatory) JDBC formatted connection string to whatever driver you are using.sql
: (Mandatory) query to pass off to the JDBC endpointsort
: (Mandatory) The sort criteria indicating how the data coming out of the JDBC stream is sorteddriver
: The name of the JDBC driver used for the connection. If provided then the driver class will attempt to be loaded into the JVM. If not provided then it is assumed that the driver is already loaded into the JVM. Some drivers require explicit loading so this option is provided.[driverProperty]
: One or more properties to pass to the JDBC driver during connection. The format ispropertyName="propertyValue"
. You can provide as many of these properties as you’d like and they will all be passed to the connection.
Connections and Drivers
Because some JDBC drivers require explicit loading the driver
parameter can be used to provide the driver class name. If provided, then during stream construction the driver will be loaded. If the driver cannot be loaded because the class is not found on the classpath, then stream construction will fail.
When the JDBC stream is opened it will validate that a driver can be found for the provided connection string. If a driver cannot be found (because it hasn’t been loaded) then the open will fail.
Datatypes
Due to the inherent differences in datatypes across JDBC sources the following datatypes are supported. The table indicates what Java type will be used for a given JDBC type. Types marked as requiring conversion will go through a conversion for each value of that type. For performance reasons the cell data types are only considered when the stream is opened as this is when the converters are created.
JDBC Type | Java Type | Requires Conversion |
---|---|---|
String | String | No |
Short | Long | Yes |
Integer | Long | Yes |
Long | Long | No |
Float | Double | Yes |
Double | Double | No |
Boolean | Boolean | No |
jdbc Syntax
A basic jdbc
expression:
jdbc(
connection="jdbc:hsqldb:mem:.",
sql="select NAME, ADDRESS, EMAIL, AGE from PEOPLE where AGE > 25 order by AGE, NAME DESC",
sort="AGE asc, NAME desc",
driver="org.hsqldb.jdbcDriver"
)
A jdbc
expression that passes a property to the driver:
// get_column_name is a property to pass to the hsqldb driver
jdbc(
connection="jdbc:hsqldb:mem:.",
sql="select NAME as FIRST_NAME, ADDRESS, EMAIL, AGE from PEOPLE where AGE > 25 order by AGE, NAME DESC",
sort="AGE asc, NAME desc",
driver="org.hsqldb.jdbcDriver",
get_column_name="false"
)
drill
The drill
function is designed to support efficient high cardinality aggregation. The drill
function sends a request to the export
handler in a specific collection which includes a Streaming
Expression that the export
handler applies to the sorted result set. The export
handler then emits the aggregated tuples.
The drill
function reads and emits the aggregated tuples fromn each shard maintaining the sort order,
but does not merge the aggregations. Streaming Expression functions can be wrapped around the drill
function to
merge the aggregates.
drill Parameters
collection
: (Mandatory) the collection being searched.q
: (Mandatory) The query to perform on the Solr index.fl
: (Mandatory) The list of fields to return.sort
: (Mandatory) The sort criteria.expr
: The streaming expression that is sent to the export handler that operates over the sorted result set. Theinput()
function provides the stream of sorted tuples from the export handler (see examples below).
drill Syntax
Example 1: Basic drill syntax
drill(articles,
q="abstract:water",
fl="author",
sort="author asc",
rollup(input(), over="author", count(*)))
Example 2: A rollup
wrapped around the drill
function to sum the counts emitted from each shard.
rollup(drill(articles,
q="abstract:water",
fl="author",
sort="author asc",
rollup(input(), over="author", count(*))),
over="author",
sum(count(*)))
echo
The echo
function returns a single Tuple echoing its text parameter. Echo
is the simplest stream source designed to provide text
to a text analyzing stream decorator.
echo Syntax
echo("Hello world")
facet
The facet
function provides aggregations that are rolled up over buckets. Under the covers the facet function pushes down the aggregation into the search engine using Solr’s JSON Facet API. This provides sub-second performance for many use cases. The facet function is appropriate for use with a low to moderate number of distinct values in the bucket fields. To support high cardinality aggregations see the rollup function.
facet Parameters
collection
: (Mandatory) Collection the facets will be aggregated from.q
: (Mandatory) The query to build the aggregations from.buckets
: (Mandatory) Comma separated list of fields to rollup over. The comma separated list represents the dimensions in a multi-dimensional rollup.bucketSorts
: (Mandatory) Comma separated list of sorts to apply to each dimension in the buckets parameters. Sorts can be on the computed metrics or on the bucket values.rows
: (Default 10) The number of rows to return. '-1' will return all rows.offset
:(Default 0) The offset in the result set to start from.overfetch
: (Default 150) Over-fetching is used to provide accurate aggregations over high cardinality fields.method
: The JSON facet API aggregation method.bucketSizeLimit
: Sets the absolute number of rows to fetch. This is incompatible with rows, offset and overfetch. This value is applied to each dimension. '-1' will fetch all the buckets.metrics
: List of metrics to compute for the buckets. Currently supported metrics aresum(col)
,avg(col)
,min(col)
,max(col)
,count(*)
,per(col, 50)
. Theper
metric calculates a percentile for a numeric column and can be specified multiple times in the same facet function.
facet Syntax
Example 1:
facet(collection1,
q="*:*",
buckets="a_s",
bucketSorts="sum(a_i) desc",
rows=100,
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
The example above shows a facet function with rollups over a single bucket, where the buckets are returned in descending order by the calculated value of the sum(a_i)
metric.
Example 2:
facet(collection1,
q="*:*",
buckets="year_i, month_i, day_i",
bucketSorts="year_i desc, month_i desc, day_i desc",
rows=10,
offset=20,
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
The example above shows a facet
function with rollups over three buckets, where the buckets are returned in descending order by bucket value.
The rows
parameter returns 10 rows and the offset
parameter starts returning rows from the 20th row.
features
The features
function extracts the key terms from a text field in a classification training set stored in a SolrCloud collection. It uses an algorithm known as Information Gain, to select the important terms from the training set. The features
function was designed to work specifically with the train function, which uses the extracted features to train a text classifier.
The features
function is designed to work with a training set that provides both positive and negative examples of a class. It emits a tuple for each feature term that is extracted along with the inverse document frequency (IDF) for the term in the training set.
The features
function uses a query to select the training set from a collection. The IDF for each selected feature is calculated relative to the training set matching the query. This allows multiple training sets to be stored in the same SolrCloud collection without polluting the IDF across training sets.
features Parameters
collection
: (Mandatory) The collection that holds the training setq
: (Mandatory) The query that defines the training set. The IDF for the features will be generated specific to the result set matching the query.featureSet
: (Mandatory) The name of the feature set. This can be used to retrieve the features if they are stored in a SolrCloud collection.field
: (Mandatory) The text field to extract the features from.outcome
: (Mandatory) The field that defines the class, positive or negativenumTerms
: (Mandatory) How many feature terms to extract.positiveLabel
: (defaults to 1) The value in the outcome field that defines a postive outcome.
features Syntax
features(collection1,
q="*:*",
featureSet="features1",
field="body",
outcome="out_i",
numTerms=250)
cat
The cat
function reads the specified files or directories and emits each line in the file(s) as a tuple.
Each emitted tuple contains two fields: file
and line
. file
contains the path to the file being read from relative to the userfiles
chroot (directly under $SOLR_HOME
), and line
contains a line in that file.
cat
is ideally used with the update
stream to index data from the specified documents, or with the analyze
stream to further split the lines into individual tokens for statistical processing or visualization.
cat Parameters
filePaths
: (Mandatory) a comma separated list of filepaths to read lines from. If the specified path is a directory, it will be crawled recursively and all contained files will be read. To prevent malicious users from reading arbitrary files from Solr nodes,filePaths
must be a relative path measured from a chroot of$SOLR_HOME/userfiles
on the node running the streaming expression.maxLines
: (defaults to -1) The maximum number of lines to read (and tuples to emit). If a negative value is specified, all lines in the specified files will be emitted as tuples. Files are read in the order that they appear in the comma-separatedfilePaths
argument. If the line-limit is hit, it will be these later files that are partially emitted or not read at all.
cat Examples
The following example emits all lines from a single text file located at $SOLR_HOME/userfiles/authors.txt
:
cat("authors.txt")
This example will read lines from $SOLR_HOME/userfiles/authors.txt
, as well as all files (recursively) found under $SOLR_HOME/userfiles/fiction/scifi
. Only 500 lines will be emitted, meaning that some files may be partially emitted or not read at all:
cat("authors.txt,fiction/scifi/", maxLines=500)
nodes
The nodes
function provides breadth-first graph traversal. For details, see the section Graph Traversal.
knnSearch
The knnSearch
function returns the k-nearest neighbors for a document based on text similarity. Under the covers the knnSearch
function
uses the More Like This query parser plugin.
knnSearch Parameters
collection
: (Mandatory) The collection to perform the search in.id
: (Mandatory) The id of the source document to begin the knn search from.qf
: (Mandatory) The query field used to compare documents.k
: (Mandatory) The number of nearest neighbors to return.fl
: (Mandatory) The field list to return.mindf
: (Optional) The minimum number of occurrences in the corpus to be included in the search.maxdf
: (Optional) The maximum number of occurrences in the corpus to be included in the search.minwl
: (Optional) The minimum world length of to be included in the search.maxwl
: (Optional) The maximum world length of to be included in the search.
knnSearch Syntax
knnSearch(collection1,
id="doc1",
qf="text_field",
k="10",
fl="id, title",
mintf="3",
maxdf="10000000")
model
The model
function retrieves and caches logistic regression text classification models that are stored in a SolrCloud collection. The model
function is designed to work with models that are created by the train function, but can also be used to retrieve text classification models trained outside of Solr, as long as they conform to the specified format. After the model is retrieved it can be used by the classify function to classify documents.
A single model tuple is fetched and returned based on the id parameter. The model is retrieved by matching the id parameter with a model name in the index. If more then one iteration of the named model is stored in the index, the highest iteration is selected.
Caching with model
The model
function has an internal LRU (least-recently-used) cache so models do not have to be retrieved with each invocation of the model
function. The time to cache for each model ID can be passed as a parameter to the function call. Retrieving a cached model does not reset the time for expiring the model ID in the cache.
Model Storage
The storage format of the models in Solr is below. The train
function outputs the format below so you only need to know schema details if you plan to use the model
function with logistic regression models trained outside of Solr.
name_s
(Single value, String, Stored): The name of the model.iteration_i
(Single value, Integer, Stored): The iteration number of the model. Solr can store all iterations of the models generated by the train function.terms_ss
(Multi value, String, Stored: The array of terms/features of the model.weights_ds
(Multi value, double, Stored): The array of term weights. Each weight corresponds by array index to a term.idfs_ds
(Multi value, double, Stored): The array of term IDFs (Inverse document frequency). Each IDF corresponds by array index to a term.
model Parameters
collection
: (Mandatory) The collection where the model is stored.id
: (Mandatory) The id/name of the model. The model function always returns one model. If there are multiple iterations of the name, the highest iteration is returned.cacheMillis
: (Optional) The amount of time to cache the model in the LRU cache.
model Syntax
model(modelCollection,
id="myModel"
cacheMillis="200000")
random
The random
function searches a SolrCloud collection and emits a pseudo-random set of results that match the query. Each invocation of random will return a different pseudo-random result set.
random Parameters
collection
: (Mandatory) The collection the stats will be aggregated from.q
: (Mandatory) The query to build the aggregations from.rows
: (Mandatory) The number of pseudo-random results to return.fl: (Mandatory) The field list to return.
fq
: (Optional) Filter query
random Syntax
random(baskets,
q="productID:productX",
rows="100",
fl="basketID")
In the example above the random
function is searching the baskets collections for all rows where "productID:productX". It will return 100 pseudo-random results. The field list returned is the basketID.
significantTerms
The significantTerms
function queries a SolrCloud collection, but instead of returning documents, it returns significant terms found in documents in the result set. The significantTerms
function scores terms based on how frequently they appear in the result set and how rarely they appear in the entire corpus. The significantTerms
function emits a tuple for each term which contains the term, the score, the foreground count and the background count. The foreground count is how many documents the term appears in in the result set. The background count is how many documents the term appears in in the entire corpus. The foreground and background counts are global for the collection.
significantTerms Parameters
collection
: (Mandatory) The collection that the function is run on.q
: (Mandatory) The query that describes the foreground document set.field
: (Mandatory) The field to extract the terms from.limit
: (Optional, Default 20) The max number of terms to return.minDocFreq
: (Optional, Defaults to 5 documents) The minimum number of documents the term must appear in on a shard. This is a float value. If greater then 1.0 then it’s considered the absolute number of documents. If less then 1.0 it’s treated as a percentage of documents.maxDocFreq
: (Optional, Defaults to 30% of documents) The maximum number of documents the term can appear in on a shard. This is a float value. If greater then 1.0 then it’s considered the absolute number of documents. If less then 1.0 it’s treated as a percentage of documents.minTermLength
: (Optional, Default 4) The minimum length of the term to be considered significant.
significantTerms Syntax
significantTerms(collection1,
q="body:Solr",
field="author",
limit="50",
minDocFreq="10",
maxDocFreq=".20",
minTermLength="5")
In the example above the significantTerms
function is querying collection1
and returning at most 50 significant terms from the authors
field that appear in 10 or more documents but not more then 20% of the corpus.
shortestPath
The shortestPath
function is an implementation of a shortest path graph traversal. The shortestPath
function performs an iterative breadth-first search through an unweighted graph to find the shortest paths between two nodes in a graph. The shortestPath
function emits a tuple for each path found. Each tuple emitted will contain a path
key which points to a List
of nodeIDs comprising the path.
shortestPath Parameters
collection
: (Mandatory) The collection that the topic query will be run on.from
: (Mandatory) The nodeID to start the search fromto
: (Mandatory) The nodeID to end the search atedge
: (Mandatory) Syntax:from_field=to_field
. Thefrom_field
defines which field to search from. Theto_field
defines which field to search to. See example below for a detailed explanation.threads
: (Optional: Default 6) The number of threads used to perform the partitioned join in the traversal.partitionSize
: (Optional: Default 250) The number of nodes in each partition of the join.fq
: (Optional) Filter querymaxDepth
: (Mandatory) Limits to the search to a maximum depth in the graph.
shortestPath Syntax
shortestPath(collection,
from="john@company.com",
to="jane@company.com",
edge="from_address=to_address",
threads="6",
partitionSize="300",
fq="limiting query",
maxDepth="4")
The expression above performs a breadth-first search to find the shortest paths in an unweighted, directed graph.
The search starts from the nodeID "john@company.com" in the from_address
field and searches for the nodeID "jane@company.com" in the to_address
field. This search is performed iteratively until the maxDepth
has been reached. Each level in the traversal is implemented as a parallel partitioned nested loop join across the entire collection. The threads
parameter controls the number of threads performing the join at each level, while the partitionSize
parameter controls the of number of nodes in each join partition. The maxDepth
parameter controls the number of levels to traverse. fq
is a limiting query applied to each level in the traversal.
shuffle
The shuffle
expression sorts and exports entire result sets. The shuffle
expression is similar to the search
expression except that
under the covers shuffle
always uses the /export handler. The shuffle
expression is designed to be combined with the relational algebra
decorators that require complete, sorted result sets. Shuffled result sets can be partitioned across worker nodes with the parallel
stream decorator to perform parallel relational algebra. When used in parallel mode the partitionKeys parameter must be provided.
shuffle Parameters
collection
: (Mandatory) the collection being searched.q
: (Mandatory) The query to perform on the Solr index.fl
: (Mandatory) The list of fields to return.sort
: (Mandatory) The sort criteria.zkHost
: Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.partitionKeys
: Comma delimited list of keys to partition the search results by. To be used with the parallel function for parallelizing operations across worker nodes. See the parallel function for details.
shuffle Syntax
shuffle(collection1,
q="*:*",
fl="id,a_s,a_i,a_f",
sort="a_f asc, a_i asc")
stats
The stats
function gathers simple aggregations for a search result set. The stats function does not support rollups over buckets, so the stats stream always returns a single tuple with the rolled up stats. Under the covers the stats function pushes down the generation of the stats into the search engine using the StatsComponent. The stats function currently supports the following metrics: count(*)
, sum()
, avg()
, min()
, and max()
.
stats Parameters
collection
: (Mandatory) Collection the stats will be aggregated from.q
: (Mandatory) The query to build the aggregations from.metrics
: (Mandatory) The metrics to include in the result tuple. Current supported metrics aresum(col)
,avg(col)
,min(col)
,max(col)
,count(*)
,per(col, 50)
. Theper
metric calculates a percentile for a numeric column and can be specified multiple times in the same stats function.
stats Syntax
stats(collection1,
q=*:*,
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
timeseries
The timeseries
function builds a time series aggregation. Under the covers the timeseries
function uses the
JSON Facet API as its high performance aggregation engine.
timeseries Parameters
collection
: (Mandatory) Collection the stats will be aggregated from.q
: (Mandatory) The query to build the aggregations from.field
: (Mandatory) The date field for the time series.start
: (Mandatory) The start of the time series expressed in Solr date or date math syntax.end
: (Mandatory) The end of the time series expressed in Solr date or date math syntax.gap
: (Mandatory) The time gap between time series aggregation points expressed in Solr date math syntax.format
: (Optional) Date template to format the date field in the output tuples. Formatting is performed by Java’s SimpleDateFormat class.metrics
: (Mandatory) The metrics to include in the result tuple. Current supported metrics aresum(col)
,avg(col)
,min(col)
,max(col)
,count(*)
,per(col, 50)
. Theper
metric calculates a percentile for a numeric column and can be specified multiple times in the same timeseries function.
timeseries Syntax
timeseries(collection1,
q=*:*,
field="rec_dt"
start="NOW-30DAYS",
end="NOW",
gap="+1DAY",
format="YYYY-MM-dd",
sum(a_i),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
train
The train
function trains a Logistic Regression text classifier on a training set stored in a SolrCloud collection. It uses a parallel iterative, batch Gradient Descent approach to train the model. The training algorithm is embedded inside Solr so with each iteration only the model is streamed across the network.
The train
function wraps a features function which provides the terms and inverse document frequency (IDF) used to train the model. The train
function operates over the same training set as the features
function, which includes both positive and negative examples of the class.
With each iteration the train
function emits a tuple with the model. The model contains the feature terms, weights, and the confusion matrix for the model. The optimized model can then be used to classify documents based on their feature terms.
train Parameters
collection
: (Mandatory) Collection that holds the training setq
: (Mandatory) The query that defines the training set. The IDF for the features will be generated on thename
: (Mandatory) The name of model. This can be used to retrieve the model if they stored in a SolrCloud collection.field
: (Mandatory) The text field to extract the features from.outcome
: (Mandatory) The field that defines the class, positive or negativemaxIterations
: (Mandatory) How many training iterations to perform.positiveLabel
: (defaults to 1) The value in the outcome field that defines a positive outcome.
train Syntax
train(collection1,
features(collection1, q="*:*", featureSet="first", field="body", outcome="out_i", numTerms=250),
q="*:*",
name="model1",
field="body",
outcome="out_i",
maxIterations=100)
topic
The topic
function provides publish/subscribe messaging capabilities built on top of SolrCloud. The topic function allows users to subscribe to a query. The function then provides one-time delivery of new or updated documents that match the topic query. The initial call to the topic function establishes the checkpoints for the specific topic ID. Subsequent calls to the same topic ID will return documents added or updated after the initial checkpoint. Each run of the topic query updates the checkpoints for the topic ID. Setting the initialCheckpoint parameter to 0 will cause the topic to process all documents in the index that match the topic query.
The topic function should be considered in beta until SOLR-8709 is committed and released. |
topic Parameters
checkpointCollection
: (Mandatory) The collection where the topic checkpoints are stored.collection
: (Mandatory) The collection that the topic query will be run on.id
: (Mandatory) The unique ID for the topic. The checkpoints will be saved under this id.q
: (Mandatory) The topic query.fl
: (Mandatory) The field list returned by the topic function.initialCheckpoint
: (Optional) Sets the initial Solr_version_
number to start reading from the queue. If not set, it defaults to the highest version in the index. Setting to 0 will process all records that match query in the index.zkHost
: (Optional) Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
topic Syntax
topic(checkpointCollection,
collection,
id="uniqueId",
q="topic query",
fl="id, name, country")
tuple
The tuple
function emits a single Tuple with name/value pairs. The values can be set to variables assigned in a let
expression, literals, Stream Evaluators or
Stream Expressions. In the case of Stream Evaluators the tuple will output the return value from the evaluator.
This could be a numeric, list or map. If a value is set to a Stream Expression, the tuple
function will flatten
the tuple stream from the Stream Expression into a list of Tuples.
tuple Parameters
name=value pairs
tuple Syntax
tuple(a=add(1,1),
b=search(collection1, q="cat:a", fl="a, b, c", sort="a desc"))