Toolkit for real-time scoring using Spark MLLib library.
This toolkit implements the NLS feature. Use the guidelines for the message bundle that are described in Messages and National Language Support for toolkits
To learn more about Streams:
To create applications that use the SparkMLLib Toolkit, you must configure either Streams Studio or the SPL compiler to be aware of the location of the toolkit.
- Install IBM InfoSphere Streams. Configure the product environment variables by entering the following command: source /bin/streamsprofile.sh
- Generate a Spark model as described in the next section and save it to the local filesystem or HDFS.
- The worker nodes which execute the toolkit code do not require an separate Apache Spart installation.
- This toolkit provides a number of operators that can load a stored Spark MLlib model and use it to perform real time scoring on incoming tuple data.
- To generate the model files point the java classpath either to an installed Apache Spark version (e.g.: $SPARK_HOME/jars) or to the download directory of the streamsx.sparkmllib toolkit (e.g.: $STREAMS_SPLPATH/com.ibm.streamsx.sparkmllib/opt/downloaded)
For example, the SparkCollaborativeFilteringALS operator can load a Spark collaborative filtering model (of type MatrixFactorizationModel in the Spark API). In order for the operator to be able to use this model within Streams, the Spark program that created the original model must store the model. The following scala code demonstrates how the model can be saved to HDFS:
//Generate a MatrixFactorizationModel by training against test data
val model = ALS.train(training, rank, numIter, lambda)
//Save the generated model to the filesystem
model.save(sparkContext, "hdfs://some/path/my_model")
Once the model has been persisted, the path to the persisted model would be passed in as a parameter to the SparkCollaborativeFilteringALS operator. The following code demonstrates how this would be done in the SPL program:
(stream<int32 user, int32 counter, list<int32> analysisResult> SparkCollaborativeFilteringALSOut) as
SparkCollaborativeFilteringALSOp1 =
SparkCollaborativeFilteringALS(InputPort1)
{
param
analysisType : RecommendProducts ;
attr1 : Beacon_1_out0.user ;
attr2 : Beacon_1_out0.counter ;
modelPath : "hdfs://some/path/my_model" ;
}
On initialization, the operator will load the model. Each incoming tuple will be used to generate a score using the model and the score would be passed as an attribute called 'analysisResult' on the output schema.
After the location of the toolkit is communicated to the compiler, the SPL artifacts that are specified in the toolkit can be used by an application. The application can include a use directive to bring the necessary namespaces into scope. Alternatively, you can fully qualify the operators that are provided by toolkit with their namespaces as prefixes.
- Make sure that a trained Spark model has been saved to the local file system or on HDFS. Alternatively you can bundle the model files into the sab-file (see sample).
- Configure the SPL compiler to find the toolkit root directory. Use one of the following methods:
- Set the STREAMS_SPLPATH environment variable to the root directory of a toolkit or multiple toolkits (with : as a separator). For example: export STREAMS_SPLPATH=$STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib
- Specify the -t or --spl-path command parameter when you run the sc command. For example: sc -t $STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib -M MyMain where MyMain is the name of the SPL main composite. Note: These command parameters override the STREAMS_SPLPATH environment variable.
- Add the toolkit location in InfoSphere Streams Studio.
- Develop your application.
- Build your application. You can use the sc command or Streams Studio.
- Start the InfoSphere Streams instance.
- Run the application. You can submit the application as a job by using the streamtool submitjob command or by using Streams Studio.