After the jars are built, let’s populate a SolrCloud index with tweets (be sure to update the command shown below with your Twitter API credentials):
Start Solr running in Cloud mode and create a collection named “socialdata” partitioned into two shards:
bin/solr -c && bin/solr create -c socialdata -shards 2
Note
|
The remaining sections in this document assume Solr is running in cloud mode on port 8983 with embedded ZooKeeper listening on localhost:9983 .
|
Also, to ensure you can see tweets as they are indexed in near real-time, you should enable auto soft-commits using Solr’s Config API. Specifically, for this exercise, we’ll commit tweets every 2 seconds.
curl -X POST http://localhost:8983/solr/socialdata/config \
-d '{"set-property":{"updateHandler.autoSoftCommit.maxTime":"2000"}}'
Now, let’s populate Solr with tweets using Spark streaming:
$SPARK_HOME/bin/spark-submit --master $SPARK_MASTER \
--conf "spark.executor.extraJavaOptions=-Dtwitter4j.oauth.consumerKey=? -Dtwitter4j.oauth.consumerSecret=? -Dtwitter4j.oauth.accessToken=? -Dtwitter4j.oauth.accessTokenSecret=?" \
--class com.lucidworks.spark.SparkApp \
./target/spark-solr-1.0-SNAPSHOT-shaded.jar \
twitter-to-solr -zkHost localhost:9983 -collection socialdata
Replace $SPARK_MASTER
with the URL of your Spark master server. If you don’t have access to a Spark cluster, you can run the Spark job in local mode by passing:
--master local[2]
However, when running in local mode, there is no executor, so you’ll need to pass the Twitter credentials in the spark.driver.extraJavaOptions
parameter instead of spark.executor.extraJavaOptions
.
Tweets will start flowing into Solr; be sure to let the streaming job run for a few minutes to build up a few thousand tweets in your socialdata collection. You can kill the job using ctrl-C.
Note
|
The sample command above includes properties for Twitter API credentials which need to be provided by you. If you don’t already have your Twitter API credentials, you will need to set up a Twitter app (https://apps.twitter.com) and get your user and access tokens. This link is a good Spark-related walkthrough of the process: https://databricks-training.s3.amazonaws.com/realtime-processing-with-spark-streaming.html#twitter-credential-setup. |
Let’s start up the Spark Scala REPL shell to do some interactive data exploration with our indexed tweets:
cd $SPARK_HOME
bin/spark-shell --jars $PROJECT_HOME/target/spark-solr-${VERSION}-shaded.jar
$PROJECT_HOME
is the location where you cloned the spark-solr project. You should (but might not, depending on logging config) see a message like this from Spark during shell initialization:
15/05/27 10:07:53 INFO SparkContext: Added JAR file:/spark-solr/target/spark-solr-1.0-SNAPSHOT-shaded.jar at http://192.168.1.3:57936/jars/spark-solr-1.0-SNAPSHOT-shaded.jar with timestamp 1432742873044
Let’s load the socialdata collection into Spark by executing the following Scala code in the shell:
val tweets = spark.read.format("solr").options(
Map("zkHost" -> "localhost:9983", "collection" -> "socialdata")
).load
.filter("provider_s='twitter'")
On line 1, we use the sparkSession object loaded into the shell automatically by Spark to load a DataSource named “solr”. Behind the scenes, Spark locates the solr.DefaultSource class
in the project JAR file we added to the shell using the --jars parameter.
On line 2, we pass configuration parameters needed by the Solr DataSource to connect to Solr using a Scala Map. At a minimum, we need to pass the ZooKeeper connection string (zkHost
) and collection name (collection
). By default, the DataSource matches all documents in the collection, but you can pass a Solr query to the DataSource using an optional query
parameter. This allows to you restrict the documents seen by the DataSource using a Solr query.
On line 3, while it appears that we’re perhaps loading the data into some truly materialized set of objects, we’re still lazy at this point: we have a DataFrame (read on)
On line 4, we use a filter to only select documents that come from Twitter (provider_s='twitter'
).
At this point, we have a Spark SQL DataFrame object that can read tweets from Solr. In Spark, a DataFrame is a distributed collection of data organized into named columns (see: https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html). Conceptually, DataFrames are similar to tables in a relational database except they are partitioned across multiple nodes in a Spark cluster.
It’s important to understand that Spark does not actually load the socialdata collection into memory at this point. We’re only setting up to perform some analysis on that data; the actual data isn’t loaded into Spark until it is needed to perform some calculation later in the job. This allows Spark to perform the necessary column and partition pruning operations to optimize data access into Solr.
Every DataFrame has a schema. You can use the printSchema()
function to get information about the fields available for the tweets DataFrame:
tweets.printSchema()
Behind the scenes, our DataSource implementation uses Solr’s Schema API to determine the fields and field types for the collection automatically.
scala> tweets.printSchema()
root
|-- accessLevel_i: integer (nullable = true)
|-- author_s: string (nullable = true)
|-- createdAt_tdt: timestamp (nullable = true)
|-- currentUserRetweetId_l: long (nullable = true)
|-- favorited_b: boolean (nullable = true)
|-- id: string (nullable = false)
|-- id_l: long (nullable = true)
...
Next, let’s register the tweets DataFrame as a temp table so that we can use it in SQL queries:
tweets.registerTempTable("tweets")
For example, we can count the number of retweets by doing:
sqlContext.sql("SELECT COUNT(type_s) FROM tweets WHERE type_s='echo'").show()
If you check your Solr log, you’ll see the following query was generated by the Solr DataSource to process the SQL statement (note I added the newlines between parameters to make it easier to read the query):
q=*:*&
fq=provider_s:twitter&
fq=type_s:echo&
distrib=false&
fl=type_s,provider_s&
cursorMark=*&
start=0&
sort=id+asc&
collection=socialdata&
rows=1000
There are a couple of interesting aspects of this query.
First, notice that the provider_s
field filter we used when we declared the DataFrame translated into a Solr filter query parameter (fq=provider_s:twitter
). Solr will cache an efficient data structure for this filter that can be reused across queries, which improves performance when reading data from Solr to Spark.
In addition, the SQL statement included a WHERE clause that also translated into an additional filter query (fq=type_s:echo
). Our DataSource implementation handles the translation of SQL clauses to Solr specific query constructs. On the backend, Spark handles the distribution and optimization of the logical plan to execute a job that accesses data sources.
Even though there are many fields available for each tweet in our collection, Spark ensures that only the fields needed to satisfy the query are retrieved from the data source, which in this case is only type_s
and provider_s
. In general, it’s a good idea to only request the specific fields you need access to when reading data in Spark.
The query also uses deep-paging cursors to efficiently read documents deep into the result set. If you’re curious how deep paging cursors work in Solr, please read: https://lucidworks.com/blog/coming-soon-to-solr-efficient-cursor-based-iteration-of-large-result-sets/. Also, matching documents are streamed back from Solr, which improves performance because the client side (Spark task) does not have to wait for a full page of documents (1000) to be constructed on the Solr side before receiving data. In other words, documents are streamed back from Solr as soon as the first hit is identified.
The last interesting aspect of this query is the distrib=false
parameter. Behind the scenes, the Solr DataSource will read data from all shards in a collection in parallel from different Spark tasks. In other words, if you have a collection with ten shards, then the Solr DataSource implementation will use 10 Spark tasks to read from each shard in parallel. The distrib=false
parameter ensures that each shard will only execute the query locally instead of distributing it to other shards.
However, reading from all shards in parallel does not work for Top N type use cases where you need to read documents from Solr in ranked order across all shards. You can disable the parallelization feature by setting the parallel_shards
parameter to false. When set to false, the Solr DataSource will execute a standard distributed query. Consequently, you should use caution when disabling this feature, especially when reading very large result sets from Solr.
Beyond SQL, the Spark API exposes a number of functional operations you can perform on a DataFrame. For example, if we wanted to determine the top authors based on the number of posts, we could use the following SQL:
sqlContext.sql("select author_s, COUNT(author_s) num_posts from tweets where type_s='post' group by author_s order by num_posts desc limit 10").show()
tweets.filter("type_s='post'").groupBy("author_s").count().orderBy(desc("count")).limit(10).show()
Another subtle aspect of working with DataFrames is that you as a developer need to decide when to cache the DataFrame based on how expensive it was to create it. For instance, if you load 10’s of millions of rows from Solr and then perform some costly transformation that trims your DataFrame down to 10,000 rows, then it would be wise to cache the smaller DataFrame so that you won’t have to re-read millions of rows from Solr again. On the other hand, caching the original millions of rows pulled from Solr is probably not very useful, as that will consume too much memory. The general advice I follow is to cache DataFrames when you need to reuse them for additional computation and they require some computation to generate.