Tuesday, 15 October 2013

Weka and Hadoop Part 1

How to handle large datasets with Weka is a question that crops up frequently on the Weka mailing list and forums. This post is the first of three that outlines what's available, in terms of distributed processing functionality, in several new packages for Weka 3.7. This series of posts is continued in part 2 and part 3.

The first new package is called distributedWekaBase. It provides base "map" and "reduce" tasks that are not tied to any specific distributed platform. The second, called distributedWekaHadoop, provides Hadoop-specific wrappers and jobs for these base tasks. In the future there could be other wrappers - one based on the Spark platform would be cool.

Base map and reduce tasks

distributedWekaBase version 1.0 provides tasks for:

  1. Determining a unified ARFF header from separate data chunks in CSV format. This is particularly important because, as Weka users know, Weka is quite particular about metadata - especially when it comes to nominal attributes. At the same time this task computes some handy summary statistics (that are stored as additional "meta attributes" in the header), such as count, sum, sum squared, min, max, num missing, mean, standard deviation and frequency counts for nominal values.  These summary statistics come in useful for some of the other tasks listed below.
  2. Computing a correlation or covariance matrix. Once the ARFF header job has been run, then computing a correlation matrix can be completed in just one pass over the data given our handy summary stats. The matrix produced by this job can be read by Weka's Matrix class. Map tasks compute a partial matrix of covariance sums. The reduce tasks aggregates individual rows of the matrix in order to produce the final matrix. This means that parallelism can be exploited in the reduce phase by using as many reducers as there are rows in the matrix.
  3. Training a Weka classifier (or regressor). The map portion of this task can train any Weka classifier (batch or incremental) on a given data chunk and then the reduce portion will aggregate the individual models in various ways, depending on the type of classifier. Recently, a number of classifiers in Weka 3.7 have become Aggregateable. Such classifiers allow one final model, of the same type, to be produced from several separate models. Examples include: naive Bayes, naive Bayes multinomial, various linear regression models (learned by SGD) and Bagging. Other, non-Aggregateable, classifiers can be combined by forming a voted ensemble using Weka's Vote meta classifier. The classifier task also has various handy options such as allowing reservoir sampling to be used with batch learners (so that a maximum number of instances processed by the learning algorithm in a given map can be enforced), normal Weka filters to be used for pre-processing in each map (the task takes care of using various special subclasses of FilteredClassifier for wrapping the base classifier and filters depending on whether the base learner is Aggregateable and/or incremental), forcing batch learning for incremental learners (if desired), and for using a special "pre-constructed" filter (see below).
  4.  Evaluating a classifier or regressor. This task handles evaluating a classifier using either the training data, a separate test set or cross-validation. Because Weka's Evaluation module is Aggregateable, and computes statistics incrementally, this is fairly straightforward. The process makes use of the classifier training task to learn an aggregated classifier in one pass over the data and then evaluation proceeds in a second pass. In the case of cross-validation, the classifiers for all folds are learned in one go (i.e. one aggregated classifier per fold) and then evaluated. In this case, the learning phase can make use of up to k reducers (one per fold). In the batch learning case, the normal process of creating folds (using Instances.train/testCV()) is used and the order of the instances in each map gets randomised first. In the case of incremental learning, instances are processed in a streaming fashion and a modulus operation is used to pull out the training/test instances corresponding to a given fold of the cross-validation.
  5. Scoring using a trained classifier or regressor. This is fairly simple and just takes uses a trained model to make predictions. No reducer is needed in this case. The task outputs input instances with predicted probability distributions appended. The user can specify which of the input attribute values to output along with the predictions. It also builds a mapping between the attributes in the incoming instances and those that the model is expecting, with missing attributes or type mismatches replaced with missing values.
  6. PreconstructedPCA. This is not a distributed task as such; instead it is a filter that can accept a correlation matrix or covariance matrix (as produced by the correlation matrix task) and produces a principal components analysis. The filter produces the same textual analysis output as Weka's standard PCA (in the attribute selection package) and also encapsulates the transformation for data filtering purposes. Once constructed, it can be used with the classifier building task.

Hadoop wrappers and jobs

distributedWekaHadoop version 1.0 provides a number of utilities for configuration/HDFS, mappers and reducers that wrap the base tasks, and jobs to orchestrate everything against Apache Hadoop 1.x (in particular, it has been developed and tested against Hadoop 1.1.2 and 1.2.1).

Getting datasets in and out of HDFS

The first thing this package provides is a "Loader" and "Saver" for HDFS. These can batch transfer or stream data in and out of HDFS using any base Loader or Saver - so any data format that Weka already supports can be written or read to/from HDFS. Because the package uses Hadoop's TextInputFormat for delivering data to mappers, we work solely with CSV files that have no header row. The CSVSaver in Weka 3.7.10 has a new option to omit the header row when writing a CSV file. The new HDFSSaver and HDFSLoader can be used from the command line or the Knowledge Flow GUI:


ARFF header creation job

The first job that the distributedWekaHadoop package provides is one to create a unified ARFF header + summary statistics from the input data. All Weka Hadoop jobs have an extensive command line interface (to facilitate scripting etc.) and a corresponding step in the Knowledge Flow GUI. The jobs also take care of making sure that all Weka classes (and dependencies) are available to map and reduce tasks executing in Hadoop. It does this by installing the Weka jar file (and other dependencies) in HDFS and then adding them to the distributed cache and classpath for the job.


java weka.Run ArffHeaderHadoopJob \
-hdfs-host palladium.local -hdfs-port 9000 \
-jobtracker-host palladium.local -jobtracker-port 9001 \
-input-paths /users/mhall/input/classification \
-output-path /users/mhall/output \
-names-file $HOME/hypothyroid.names -max-split-size 100000 \
-logging-interval 5 \
-user-prop mapred.child.java.opts=-Xmx500m







The job has options for specifying Hadoop connection details and input/output paths. It also allows control over the number of map tasks that actually get executed via the max-split-size option (this sets dfs.block.size) as Hadoop's default of 64Mb may not be appropriate for batch learning tasks, depending on data characteristics. The classifier job, covered in the next instalment of this series, has a pre-processing option to create a set of randomly shuffled input data chunks, which gives greater control over the number and size of the data sets processed by the mappers. The ARFF header job also has a set of options for controlling how the CSV input file gets parsed and processed. It is possible to specify attribute (column) names directly or have them read from a "names" file (one attribute name per line; not to be confused with the C4.5 ".names" file format) stored on the local file system or in HDFS. 

As other Weka Hadoop jobs use the ARFF job internally, and it is not necessary to repeat it for subsequent jobs that process the same data set, it is possible to prevent the job from executing by providing a path to an existing ARFF header (in or out of HDFS) to use. 

The image below shows what the job produces for the UCI hypothyroid dataset. Given the configuration for this job shown above, the header gets stored as /users/mhall/output/arff/hypothyroid.arff in HDFS. It also gets displayed by the TextViewer in the Knowledge Flow. "Class" is the last of the actual data attributes and the ones that occur after that are the summary meta attributes that correspond to each of the nominal or numeric attributes in the data.


This ends the first part of our coverage of the new distributed Weka functionality. In part two I'll cover the remaining Hadoop jobs for learning and evaluating classifiers and performing a correlation analysis.

21 comments:

  1. The latest distributedWekaHadoop doesn't build with mvn clean package. Maven says opencsv is the problem.

    -- Brian

    ReplyDelete
  2. I've just committed a fix to the pom.xml in distributedWekaBase. Thanks for pointing this out.

    Cheers,
    Mark.

    ReplyDelete
  3. Hi Mark,
    Thanks! I still don't get past 'mvn clean package'. Was this tested on a clean machine without access to any local maven repositories? There are dependency resolution problems afoot.

    -- Brian

    -- Brian

    ReplyDelete
  4. Thanks for sharing the valuable information

    Hadoop Online Training

    ReplyDelete
  5. Hi - thanks for making the library available.
    Might it be possible to also post an example of a short java running a weka clusterer/classifier as a mapreduce job?
    Thanks.

    ReplyDelete
  6. congratulations guys, quality information you have given!!! Big Data and Analytics

    ReplyDelete
  7. good to see the best information about the big data. and u can read all Hadoop Interview Questions here

    ReplyDelete
  8. Hi Mark!
    Thanks for the package. Very interesting.
    One question. How I can set for ARFF header creation job (and other jobs, especially WekaClassifierHadoopJob, that source CSV file has header row ? I tried it, but is recognizes first row as data row, so adds me unnecessary values for every parameter.
    I didn't find this option. Can you help me ?

    With best regards
    Pavel Dvorkin

    ReplyDelete
  9. No, I’m afraid that it is not possible to use a CSV file with a header row. This is because Hadoop will split the file up for processing by multiple mappers and only one mapper will get the chunk that contains the header row.

    You will need to remove the header row in your file before processing by any of the distributed Weka jobs. Note that the CSVSaver in Weka has an option to omit the header row when writing a CSV file, so you could try reading file (incrementally if it is large) via the HDFSLoader+CSVLoader and then writing it back into HDFS as a new CSV file (minus header row) via HDFSSaver+CSVSaver.

    Cheers,
    Mark.

    ReplyDelete
  10. Hi Mark!
    Can you give me plugin ( distributedWekaHadoop) installation algorithm? I can not get it to connect to Weka

    ReplyDelete
  11. Hi Sergey,

    It is designed to work with Weka 3.7. If you have Weka 3.7.10 then you can install distributedWekaHadoop via the built-in package manager (GUIChooser-->Tools).

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Hi Mark,

      Thank you very mach !
      I installed the plugin, but got an error. There are no modules (JDBC). Where to get and how to install? Help me, please!

      https://www.dropbox.com/s/7ywqrcqqkwhdfdv/Getting%20Started%20%28MacBook-Pro-Nero%27s%20conflicted%20copy%202014-03-22%29.pdf

      Delete
  12. These are just warning to let you know that there are some missing JDBC drivers. This has no impact on distributed Weka.

    Cheers,
    Mark.

    ReplyDelete
  13. Hi Mark,
    I'm don't see "HDFSServer".
    Help me please !
    Sergey

    https://www.dropbox.com/s/cfvw8flvl6rqkuz/%D0%A1%D0%BD%D0%B8%D0%BC%D0%BE%D0%BA_%D1%8D%D0%BA%D1%80%D0%B0%D0%BD%D0%B0_24_03_14__22_28.jpg

    ReplyDelete
  14. I can't see a "Hadoop" folder on the left-hand-side in your screenshot. Have you installed "distributedWekaHadoop" via the package manager (GUIChooser-->Tools)?

    Once installed correctly, you will find HDFSLoader under "DataSources" and HDFSSaver under "DataSinks".

    Cheers,
    Mark.

    ReplyDelete
  15. heyyy,, can u plz give me the solution....why weka is not working on a large dataset using multilayer perceptron classifier but its working on naive bayes using same data

    ReplyDelete
  16. You might need to expand a bit on "not working". What are you doing exactly and what is happening (i.e. errors, exceptions etc.)?

    Cheers,
    Mark.

    ReplyDelete
  17. hi Mark,
    what about Hadoop 2.2 ? I tried to use HDFSSaver and got the following:

    11:52:57: [Saver] HDFSSaver$254396694|-dest / -saver "weka.core.converters.CSVSaver -F , -M ? -decimal 6" -hdfs-host 10.165.140.57 -hdfs-port 8020| problem saving. org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

    ReplyDelete
  18. It will work on Hadoop 2 (at least it did for me when I ran a few quick tests a while back). You will have to swap out the Hadoop 1.2.x client libraries that come with the distributedWekaHadoop package for the 2.x versions though.

    Also, the UI isn't really set up for Yarn. The job tracker settings don't apply anymore (but you will still need to have something in there so that Weka doesn't complain). In addition you will need to set a few properties in the User properties part of the dialog. In particular, I set:

    yarn.nodemanager.aux-services=mapreduce_shuffle
    mapreduce.framework.name=yarn

    After this was set, I managed to run all Weka job types successfully - on a local single node psuedo-distributed setup at least.

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. I just started use your package and want to make it work with my hadoop2.x.
      Could you please specify which library file to replace and how to change the user properties?

      Thanks in advance

      Delete