Tuesday, 15 October 2013

Weka and Hadoop Part 3

This is the third of three posts covering some new functionality for distributed processing in Weka. The first and second installments covered base functionality and some of the Hadoop-specific wrappers. In this post we'll look at the remaining functionality in version 1.0 of the distributedWekaHadoop package.

Weka classifier evaluation job

This job builds on the classifier training job covered in installment 2 and provides map-reduce tasks to evaluate classifiers via the training data, a separate test set or cross-validation. Apart from ARFF header creation and the optional randomisation/stratification phase (both of which are re-usable once run initially), the evaluation job involves two passes over the data. The first builds the model and the second performs the evaluation.

In the case of a k-fold cross-validation, each mapper for the model building job divides its dataset up into k folds and builds k models in one hit. The reduce phase for the model building job can use up to k  reducers, with a reduce operation aggregating all the models for one fold of the cross-validation. The input to the evaluation pass over the data is then the aggregated model (k aggregated models in the case of cross-validation), pushed out to the nodes via the distributed cache, and either the input data (in the case of test on training or cross-validation) or a separate test set. In the case where the models are batch trained, the data at each map is randomly shuffled and then divided into stratified folds. In the case where the models are incrementally trained, the cross-validation folds are created and processed in a streaming fashion by extracting the instances for a given fold using a modulus operation. The same random seed is used in both the model building and evaluation job in order to keep the folds consistent.


The evaluation job adds only a few options over and above those in the classifier job. You can specify the number of nodes in your cluster so that the job can specify up to k reducers for a cross-validation. Weka's evaluation module computes just about all of its metrics incrementally in an additive fashion (perfectly suited to aggregation). The only exceptions are area under the ROC curve and area under the precision recall curve. These require predictions to be retained. By default, the evaluation job does not compute these two statistics. They can be computed by providing a number for the "sampleFractionForAUC" option. This allows the user to specify some percentage of the total number of predictions generated to be retained (via uniform random sampling) for computing these two statistics. In the above screenshot, we've set this to 0.5 - i.e. 50% of all the predictions generated in all the map tasks will be retained.

In the earlier discussion of the classifier training job we used it to build a model on all the data. It can also be used to train a model on a specific fold of a cross-validation by setting the "foldNumber" and "totalNumFolds" options. When the evaluation job uses the classifier job to perform cross-validation it sets the "foldNumber" option automatically in order to learn models for each of the folds. All we have to do when configuring the evaluation job is to set the "totalNumFolds" parameter.

The output of the evaluation job is the standard Weka evaluation results text (like when the Explorer or command line interface to Weka is used normally) and the metrics stored in a single line CSV and ARFF file. All of these files are written out to the "eval" subdirectory of the output directory in HDFS for the job.


Scoring job

The last Hadoop job in the version 1.0 release of the package is one to perform scoring (prediction) using a trained model. This job actually handles scoring using clusterers as well as classifiers, even though there aren't any clustering tasks/jobs in version 1.0 (stuff to do for version 1.1...).


The job doesn't require a reduce phase, so there will be as many output files in the output directory as there are map tasks run for the dataset being scored. Again the distributed cache is used to place the model on the local file system of each node. The model to be used can be initially on the local file system or in HDFS - the job looks in both places.

The map tasks build a mapping between the incoming data fields and what the model is expecting. Missing data fields, nominal values that haven't been seen during training and type mismatches between what the model is expecting and what is in the current input row are replaced with missing values. During the setup phase, when the mapping is being built, the job will fail if there are fewer than 50% of the attributes that the model is expecting to see present in the incoming data.

The map tasks output CSV data in the same format as the input data but with the predicted probability distribution (comma-separated label:probability pairs) appended to the end of each row. The user can opt to output fewer than all the input columns by setting the "columnsToOutputInScoredData" option.

Orchestrating jobs

The Hadoop jobs can be chained together using the sequential execution facility in the Knowledge Flow and/or new "success" and "failure" event types. The following screenshot shows a flow that:
  1. Transfers the hypothyroid data into HDFS
  2. Runs the correlation matrix + PCA job (which also executes the ARFF header creation job first)
  3. Re-uses the ARFF header and PCA filter created in step 2 to learn a filtered bagging model
  4. Extracts the learned model from HDFS and saves it to the local file system


As mentioned in the first installment of this series, all the jobs have an extensive command-line interface to facilitate scripting.

A note for Windows users

If you are running the Weka jobs from Windows and your Hadoop cluster is running on *nix machines then you will run into an issue with the classpath for the map and reduce tasks on the *nix side of things. It turns out that setting the classpath for a Hadoop job programatically uses the path separator character of the client system (naturally I guess). So under Windows the ";" character is used to separate entries in the classpath that is set in the Configuration object for the job. This will result in ClassNotFound exceptions when the job is actually executed on the *nix cluster. To get around this the Weka jobs will postprocess the classpath entry in the Configuration to replace ";"s with ":"s, but only if you tell it that you're running a Windows client against a *nix Hadoop cluster. To do this you just need to set the environment variable HADOOP_ON_LINUX=true. This is pretty hacky and if anyone knows of a more elegant solution to this please let me know.

Benchmarking on the KDD99 data

I ran a quick test on the KDD99 data set (just under 5 million instances, 42 attributes and 23 classes) on Waikato's Symphony torque cluster (quad core i7 processors at 2793 MHz). I set up a 10 node Hadoop cluster and ran a 10-fold cross-validation of a random forest consisting of 200 trees. The job involved creating the ARFF header, creating 15 randomly shuffled input chunks and then the evaluation itself. This took just under 5 minutes to run. Subsequent runs of 10-fold cross-validation using the already created input chunks took about 3 and a half minutes.


java weka.distributed.hadoop.WekaClassifierEvaluationHadoopJob \
-hdfs-host 192.168.22.240 -hdfs-port 9000 \
-jobtracker-host 192.168.22.240 -jobtracker-port 9001 \
-input-paths /users/mhall/input/kdd99 \
-output-path /users/mhall/output \
-header-file-name kdd99.arff -max-split-size 50000000 \
-randomized-chunks -num-chunks 15 \
-W weka.classifiers.meta.Bagging -total-folds 10 \
-num-nodes 10 -logging-interval 5 \
-user-prop mapred.child.java.opts=-Xmx1200m \
-- -W weka.classifiers.trees.RandomTree -I 200 \
-- -depth 3 -K 3




Next I doubled the size of the input data (just by duplicating the kdd 99 data), to give just under 10 million instances, and launched a 15 node Hadoop cluster. I ran the same job as before but increased the number of randomly shuffled data chunks from 15 to 30 (in order to keep the amount of data entering each map the same as before). This time the job ran in 4 minutes and 23 seconds (the average over several repetitions was about 4 minutes). Although each map is processing the same amount of data, the faster run time is explained by greater parallelism - each map in the model building process now only has to build half as many trees as it did in the first job in order to generate a forest of 200 trees.


Future stuff

There is a bunch of stuff that could go into future releases of the distributed packages. Some things I'd like to add for the next release include:
  1. Clustering. k-means first probably.
  2. More text mining stuff. SGDText and NaiveBayesMultinomialText can already be used in version 1.0 of the distributed packages. Weka's StringToWordVector filter really needs an option to allow a dictionary to be supplied by the user. Once this is done, we could have a job to create a dictionary (and IDF counts) - basically just a modification of the classic word count MR job - and then use the StringToWordVector filter as normal.
  3. The SubstringLabeler and SubstringReplacer Knowledge Flow steps need to become filters so that they can be used for pre-processing in the classifier training job. This would allow the twitter sentiment analysis example (which involves automatic creation of labelled training data) to be implemented as a map-reduce job.
  4. Allow ensembles of heterogeneous classifiers to be learned with the classifier job. At present, only a voted ensemble of classifiers of the same type can be learned. The job could be extended to allow the user to specify a set of base classifiers and then the map tasks could use their task number as a basis for choosing which classifier to build from the set.
  5. Oversampling in the randomly shuffled chunk creation task. This job already makes sure that minority classes have at least one instance in all data chunks but it could be extended to bias the final distribution of classes in each chunk towards a uniform distribution.
  6. Possibly the execution of a Knowledge Flow process in a map or reduce task.

33 comments:

  1. Hi, great article series. You should take a look at Stratosphere (stratosphere.eu). It is similar to Hadoop (e.g. it can also process big data and has a map and reduce operator) but it supports more operators, complex dataflows and iterative algorithms (which are often required for ML.
    Stratosphere could probably help Weka a lot speeding up the ML tasks.
    Please ask me if you have any questions.

    ReplyDelete
  2. Dear Dr. Hall,
    I am writing my master thesis on data mining analytics based on MapReduce. As such , I have come across many map reduce data mining algorithms for clustering, association rule discovery but not really for decision tree induction. I will like to know if distributed weka also implements the legacy decision tree induction without boosting or bagging?
    I will be very grateful for your feedback. It will certainly drive the direction of my master albeit.
    thanks
    Ranjan Sunny
    Infotech
    Universität Stuttgart
    Deutschland

    ReplyDelete
  3. A distributed decision tree algorithm is not yet implemented in distributed Weka I'm afraid. Take a look at MLlib in Spark version 1.0 - they appear to have a simple (pre-pruning only) distributed decision tree implemented.

    Cheers,
    Mark.

    ReplyDelete
  4. Hi Mark,
    When I use EvaluationHadoopJob I get this error:

    Job JOBID="job_201407040859_0048" FINISH_TIME="1404471488813" JOB_STATUS="FAILED" FINISHED_MAPS="0" FINISHED_REDUCES="0" FAIL_REASON="# of failed Map Tasks exceeded allowed limit\. FailedCount: 1\. LastFailedTask: task_201407040859_0048_m_000000"

    Do you know the reason?

    Cheers,
    Yari.

    ReplyDelete
  5. You will have to take a look in the Hadoop logs for the map and reduce tasks for this job - in particular the user logs std. err and std. out outputs - in order to see what has happened.

    One thing to be aware of is that distributedWekaBase (version 1.0.5) and distributedWekaHadoop (version 1.0.8) are not 100% compatible with Weka 3.7.11 (due to some improvements/cleanups made in this release). Version 1.0.6 and 1.0.9 of distributedWekaBase and distributedWekaHadoop respectively are compatible and require Weka 3.7.11.

    Cheers,
    Mark.

    ReplyDelete
  6. Hi,
    This has been a great help to run weka with hadoop.
    I am running a HadoopScoringJob for which only the mappers run so the output is stored in multiple files.I want to store the output as a consolidated csv/arff and cant seem to find a way to do so.. Can you guide me through it ...

    ReplyDelete
  7. There isn't any functionality in distributed Weka at present to consolidate the output files. You will have to implement that yourself. The getmerge shell command can produce a single local file from a directory of files:

    http://hadoop.apache.org/docs/r0.18.3/hdfs_shell.html#getmerge

    Cheers,
    Mark.

    ReplyDelete
  8. Hi,

    I am trying to create a java application that will call the distrubuted services of weka, but when I run any job the Hadoop configuration is always host : localhost and port :8021 . Even if i set the HDFSConfig to point to my cluster parameters it still gives me this message "Retrying connect to server: localhost/127.0.0.1:8021."

    It would be great help if you tell mein how do I set the hdfsconfig params everytime I run a job.

    ReplyDelete
  9. Try something like:

    WekaClassifierHadoopJob myJob = new WekaClassifierHadoopJob();
    MapReduceJobConfig jobConfiguration = myJob.getMapReduceJobConfiguration();
    jobConfiguration.setHDFSHost("myhost.mydomain");
    jobConfiguration.setHDFSPort("9000");
    jobConfiguration.set...
    etc.
    boolean success = myJob.runJob();

    Cheers,
    Mark.

    ReplyDelete
  10. Hi Mark,

    I am facing the similar issue as above this is what i have written,

    WekaClassifierHadoopJob wekaClassifierHadoopJob = new WekaClassifierHadoopJob();
    MapReduceJobConfig configuration = wekaClassifierHadoopJob.getMapReduceJobConfig();
    configuration.setHDFSHost("10.101.3.15");
    configuration.setHDFSPort("9000");
    configuration.setJobTrackerHost("10.101.3.15");
    configuration.setJobTrackerPort("9001");
    configuration.setInputPaths("/home/bigdata/hadoop_ecosystem/apache_hadoop/hadoop-1.2.1/Weka/scheduler.csv");
    configuration.setOutputPath("/home/bigdata/hadoop_ecosystem/apache_hadoop/hadoop-1.2.1/Weka/Scheduler_ARFF");
    wekaClassifierHadoopJob.setMapReduceJobConfig(configuration);
    boolean status = wekaClassifierHadoopJob.runJob();

    but still i get "Retrying connect to server: localhost/127.0.0.1:8021."

    ReplyDelete
  11. OK, I think I know what the issue is. Configuration of the internal ARFF and randomly shuffled input data jobs in WekaClassifierHadoopJob is done in the setOptions() method. All the jobs are like this and I coded it this way to simplify the process of configuring jobs from the Knowledge Flow. So you will need to create an options string (just like you would supply to the job on the command line):

    String opts = "-hdfs-host 10.101.3.15 -hdfs-port 9000 -jobtracker-host 10.101.3.15 -jobtracker-port 9001 -input-paths -output-path -weka-jar ...";

    and then do:

    myJob.setOptions(weka.core.Utils.splitOptions(opts));

    You can then delete all the code relating to getting and configuring the MapReduceJobConfig as setOptions() will do this for the primary job and all sub-jobs.

    Take a look at the full list of command line options as you will need to add more to the string than I've shown above (i.e. for CSV parsing etc.). You can see the options for a particular job via:

    java weka.Run .WekaClassifierHadoopJob -h

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Oops, the blog has interpreted some of the angle brackets I used in the options string above as HTML. There should be some paths following the "-input-paths", "-output-path" and "-weka-jar" options.

      Delete
  12. Hadoop Developer online training| Hadoop Developer ...
    http://www.21cssindia.com/courses/hadoop-online-training-182.html
    ఈ పేజీని అనువదించు
    hadoop developer online training, hadoop developer training, hadoop developer course contents, hadoop developer, hadoop developer enquiry, hadoop ...
    course contents, biztalk admin enquiry, ...Courses at 21st Century Software Solutions
    Talend Online Training -Hyperion Online Training - IBM Unica Online Training -
    Siteminder Online Training - SharePoint Online Training - Informatica Online Training
    SalesForce Online Training - Many more… | Call Us +917386622889
    Visit: http://www.21cssindia.com/courses.html

    ReplyDelete
  13. hi mark, the post is very helpful!! thanks!!

    i've a question
    i use the wekaClassifierHadoopjob, i choose the j48 tree

    when the execution finished, the results shows that 28 trees was generated,
    what means that? why 28 and no another number? i can change that?

    if i use the scoringClassifierJob, how it use the 28 trees to classify a the new instances??

    Thanks in Advance, cheers!!

    ReplyDelete
  14. There must have been a total of 28 maps run by your job. Distributed Weka builds one base model per map. If given model type can be aggregated or averaged in a straightforward way then Weka will give you one final model (e.g. the class conditional frequency counts from multiple naive Bayes models can just be summed up to form one final model). In the case of decision trees there is no simple way to do this. For these cases, distributed Weka forms a voted ensemble of all the models learned in the map tasks. This is materialised as a weka.classifiers.meta.Vote classifier.

    Cheers,
    Mark.

    ReplyDelete
  15. Hi Mark

    In this Example:
    java weka.Run .WekaClassifierEvaluationHadoopJob \
    -hdfs-host hadoopmaster -hdfs-port 9000 \
    -jobtracker-host hadoopmaster -jobtracker-port 9001 \
    -existing-header /user/resultsA/exemplo/output/arff/iris.header.arff \
    -class last -input-paths /user/resultsA/exemplo/iris.csv \
    -output-path /user/resultsA/exemplo/outputnovo \
    -model-file-name J48_dist.model \
    -preserve-order -num-chunks 10 -num-folds 10
    -max-split-size 100000 -M ? -E ' -F ,
    -W weka.classifiers.trees.J48 -fold-number -1 -total-folds 10 -seed 1 -- -C 0.25 -M 2

    What line makes the cross-validation?
    this line: -preserve-order -num-chunks 10 -num-folds 10? *(With this options: -num-chunks 10 -num-folds 10)
    or this line: -W weka.classifiers.trees.J48 -fold-number -1 -total-folds 10 -seed 1 -- -C 0.25 -M 2? *(With this options: -fold-number -1 -total-folds 10)

    I look at the documentation for the cross-validation parameters, but I found: -R -N 10 and another with: -x 10. It does not have the commands -fold-number, -total-folds, -num-chunks and -num-folds at the documentation.
    I am confused. Please help me.

    Thanks

    ReplyDelete
    Replies
    1. Hi Rodrigo,

      I don't think that there is a -preserve-order option to the Hadoop jobs. The -num-chunks and -num-folds options are the ones that define the cross-validation. The WekaClassifierEvaluationHadoopJob uses the WekaClassifierHadoop job internally, so it actually sets the -fold-number and -total-folds options on the WekaClassifierHadoop job programatically (i.e. it increments the -fold-number option over the cross-validation). I admit that this is a little confusing, but it's in the name of re-usability :-) You can use the WekaClassifierHadoop job and have it extract, and learn from, a particular training fold from the block of data processed by each mapper simply by setting the -fold-number option (the -1 default specifies that no fold is to be extracted - i.e. all the data is used for training).

      Cheers,
      Mark.

      Delete
    2. Hi Mark

      This problem starts when I run the WekaHadoop job to train and test the iris database, using cross-validation and the J48 algorithm. The results was different, compared with the results obtained using the interface at a standalone computer (using the same parameters). So, I think that the problem was at the randomized-chunks, because of the randomized process the values was different.

      So, I was looking at the distributedWekaBase1.0.13/doc/index.html documentation and I found this -preserve-order command inside the crossValidateModel method. I found:
      -preserve-order: Preserves the order in the percentage split instead of randomizing the data first with the seed value

      Now, with this command, the difference between the results obtained in the standalone weka X using the HadoopWeka in the cluster is very low, like 0.002 in kapppa, for example.

      Is there another way to turn-off this randomization in the cross-validation process?
      Do you have any idea why this difference between the results obtained using the GUI interface at a standalone computer, compared with the CLI Command, using the cluster occurs? Is it normal?

      Thanks

      Delete
  16. I can't find any reference to a preserve-order option in the just the documentation for distributedWekaBase (I grepped for preserve in the html). There is a preserve-order option when running standard Weka classifiers from the command line (it is part of the Evaluation module and applies only to a percentage split). However, there is no way that I can see from the code that this option gets passed through when using distributedWeka.

    Due to how distributedWeka implements cross-validation, you can't expect the same results as when running in desktop Weka. The folds will be different - this is a function of Hadoop splitting data into blocks and the fact that Weka treats each block as containing part of every cross-validation fold. The effect will be most apparent with unstable learners like decision trees and rules.

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Hi Mark

      After many tests, I observed these 2 situations (summing up):
      A) If I use the command bellow (based on the xml example: http://www.cs.waikato.ac.nz/ml/weka/xml/examples/hadoop_iris.kfml and using -preserve-order), I have very low differences in the results. Like 0.002 in difference for kappa and 0.01% in difference for root relative squared error, for example. But, the time of execution in the cluster is practically the same, compared with the standalone computer (Same performance).
      >>>>> Evaluate the classifier using cross-validation in Hadoop
      java weka.Run .WekaClassifierEvaluationHadoopJob
      -hdfs-host hadoopmaster -hdfs-port 9000 \
      -jobtracker-host hadoopmaster -jobtracker-port 54311 \
      -existing-header /user/resultsA/testesdivergencia/arff/mydb.header.arff \
      -class last -input-paths /user/resultsA/testesdivergencia/mydb.csv \
      -output-path /user/resultsA/test/newtest1 \
      -model-file-name J48_dist.model
      -preserve-order -num-chunks 10 -num-folds 10
      -W weka.classifiers.trees.J48 -fold-number -1 -total-folds 10 -seed 1 -- -C 0.5 -B -M 2
      ** Maybe the "-preserve-order" and the commands "-fold-number -1 -total-folds 10 -seed 1" turns off the distributed process in some way.

      B) If I use the command bellow (based on the WEKA_Ecosystem_Instructions file), I Have very different results, like 0.12 in difference for Kappa, for example (Where the result is worst in the cluster). But, the time of execution in the cluster is very fast. (Using this commands, the Cluster has better performance than the standalone computer, as I expected).
      >>>>> Evaluate the classifier using cross-validation in Hadoop
      java weka.Run .WekaClassifierEvaluationHadoopJob
      -hdfs-host hadoopmaster -hdfs-port 9000 \
      -jobtracker-host hadoopmaster -jobtracker-port 54311 \
      -existing-header /user/resultsA/testesdivergencia/arff/mydb.header.arff \
      -class last -input-paths /user/resultsA/testesdivergencia/mydb.csv \
      -output-path /user/resultsA/test/newtest2 \
      -model-file-name J48_dist.model
      -randomized-chunks -num-chunks 10 -num-folds 10
      -W weka.classifiers.trees.J48 -- -C 0.5 -B -M 2

      So, by your explanation, using the distributedWeka I can't expect the same results as when running in desktop Weka. So, the situation B is with the corret commands, right?
      This is very important for me to understand, because I need to explain this perfectly in my work. I was expecting to have the same results, using the cluster and a standalone computer, but with better performance in the cluster. Because, how can I prove that the WekaHadoop is working properly for the real problem, if I can not have the same results, using the Cluster and the Standalone computer, with a small problem? This is an important question that I need to find an answer. Can you help me please?

      Thanks

      Delete
  17. This comment has been removed by the author.

    ReplyDelete
  18. Hi Mark

    I would like to summarize the doubts that I asked before:
    A) About the documentation: I did not found the weka.Run class at the distributedWekaBase and distributedWekaHadoop documentation. I would like to find more informations about the attributes that I could use in the command with this class. For example, now I know that the commands -num-chunks and -num-folds are the ones that define the cross-validation. But I would like to know about another options. Where can I find these informations?

    B) About the cross-validation: The WekaHadoop makes the cross-validation process in each chunk and after makes a final process to get the partial results and then gives the final result, like a reducer? How it works?

    C) How can I prove that the WekaHadoop is working properly for the real problem, if I can not have the same results, using the Cluster and the Standalone computer, with a small problem?

    Thanks

    ReplyDelete
  19. weka.Run is part of core Weka, not distributed Weka. It allows Weka's dynamic class discovery to load Weka packages and run algorithms therein. Supplying the -h option to a class to run via weka.Run will print command line options and their descriptions.

    The cross validation treats each chunk (as defined by Hadoop's block size) of a dataset as containing part of each cross-validation fold for the whole dataset. So, consider the case where there are 3 chunks in the dataset, where each is processed by one map task, and we are wanting to do a 3-fold cross validation. In this case each chunk contains part of fold 1, 2 and 3 (exactly 1/3 of each fold to be precise). The map task running on each chunk will build a partial model for each test fold (involving 2/3 of the data in the chunk as training data). The reducer combines the models for each partial training fold, thus creating an overall model for the full fold. In the second pass the aggregated models for each training fold are tested against the partial test folds in each chunk, which the results aggregated into one overall evaluation result for the whole dataset.

    Unfortunately, due to how Hadoop splits data into chunks, there is no easy way to ensure that cross-validation folds created in Hadoop are comparable to those created in stand-alone Weka. Another issue is that, for many classifiers, distributedWekaHadoop created a voted ensemble from the partial models built in each map task/chunk of the data. Thus the distirbutedWekaHadoop models are not directly comparable to those created in stand-alone Weka. One thing you can do is use a sufficiently small dataset such that only one mapper runs in Hadoop. Setting the number of folds to -1 results in testing on the training data. This should give results that are the same as stand-alone Weka.

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Hi Mark

      Great, now I got it!
      Yes, I did this test and in this case, I have the same results. Perfect!

      Thanks

      Delete
  20. Hi Mark

    I would like to know how can I check if my datanodes are actually being in use when I run distributed Weka? I already checked the datanodes with jps command and all them are running.
    I checked at "hadoopmaster:8088/cluster/nodes" and when I opened each "Node HTTP Address", there is nothing on the list of application running at the NodeManager. However, the node state at the "Nodes of the cluster" page, is showing: RUNNING. But there is no information in "cluster metrics" and there is no data available at the entries table.
    The problem was observed when I collected the results using four nodes and after, I collected the results using 8 nodes. When I compared the performance, the execution time was practically the same. So, this seems strange. If I am using the double of nodes, why am I having practically the same performance, do you have any idea?

    Thanks

    ReplyDelete
  21. Hi Mark

    Another situation that I forgot to say is that when I use the command "yarn application -appStates RUNNING -list", with a distributed Weka process running in the background, it shows: "Total number of applications (application-types: [] and states: [RUNNING]):0". It seems that the WekaHadoop is not running, although no error was displayed, the masternode and datanodes are running, and I am getting the results, as explained before. Please, how can I solve this problem?

    Thanks

    ReplyDelete
  22. Hi Mark

    Sorry by asking again, but I am still trying to solve this problem:
    I run distributed Weka with 4 nodes and I had better performance, compared with the standalone computer. The problem was observed when I collected the results using 4 nodes and after, I collected the results using 8 nodes. When I compared the performance between 4 and 8 nodes, the execution time was practically the same. So, this seems strange. If I am using the double of nodes, why am I having practically the same performance? *(I run the BayesNet algorithm with different search algorithms, like: K2, HillClimber, NaiveBayes, Genetic Search, Tan, TabuSearch - with standalone computer, 4 and 8 nodes). Please, do you have any idea why this is happening?

    Thanks

    ReplyDelete
  23. Are you sure that all 8 nodes are being used? The level of parallelism is determined by the number of map tasks that run, which in turn is determined by the dataset size and the block size used by HDFS. The max-split-size option to the jobs controls (via the dfs.block.size Hadoop setting) this.

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Hi Mark

      This is one of the codes that I am running: *(My test file database.csv has 150MB)
      java -classpath CLASSPATH:weka.jar weka.Run .WekaClassifierEvaluationHadoopJob -hdfs-host 192.168.10.120 -hdfs-port 9000 -jobtracker-host 192.168.10.120 -jobtracker-port 54311 -existing-header /user/arff/header.arff -class last -input-paths /user/database.csv -output-path /user/result01 -max-split-size 1000 -randomized-chunks -num-chunks 2 -num-folds 10 -W weka.classifiers.bayes.BayesNet -num-nodes 8 -- -D -Q weka.classifiers.bayes.net.search.local.K2 -- -P 4 -S BAYES -E weka.classifiers.bayes.net.estimate.SimpleEstimator -- -A 0.1

      Is there something wrong in this code?

      My datanodes situation is this: *(After the command "bin/hdfs dfsadmin -report")
      Configured Capacity: 1698771673088 (1.55 TB)
      Present Capacity: 1548614254592 (1.41 TB)
      DFS Remaining: 1531361861632 (1.39 TB)
      DFS Used: 17252392960 (16.07 GB)
      DFS Used%: 1.11%
      Under replicated blocks: 0
      Blocks with corrupt replicas: 0
      Missing blocks: 0

      -------------------------------------------------
      Live datanodes (7):

      Hostname: hadoopslave7
      Decommission Status : Normal
      Configured Capacity: 242513305600 (225.86 GB)
      DFS Used: 2631356416 (2.45 GB)
      Non DFS Used: 24263778304 (22.60 GB)
      DFS Remaining: 215618170880 (200.81 GB)
      DFS Used%: 1.09%
      DFS Remaining%: 88.91%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:03 BRT 2016

      Hostname: hadoopslave2
      Decommission Status : Normal
      Configured Capacity: 242807939072 (226.13 GB)
      DFS Used: 2275500032 (2.12 GB)
      Non DFS Used: 20342992896 (18.95 GB)
      DFS Remaining: 220189446144 (205.07 GB)
      DFS Used%: 0.94%
      DFS Remaining%: 90.68%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:03 BRT 2016

      Hostname: hadoopslave4
      Decommission Status : Normal
      Configured Capacity: 242513305600 (225.86 GB)
      DFS Used: 2633531392 (2.45 GB)
      Non DFS Used: 23533953024 (21.92 GB)
      DFS Remaining: 216345821184 (201.49 GB)
      DFS Used%: 1.09%
      DFS Remaining%: 89.21%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:02 BRT 2016

      Hostname: hadoopslave1
      Decommission Status : Normal
      Configured Capacity: 242807939072 (226.13 GB)
      DFS Used: 2343448576 (2.18 GB)
      Non DFS Used: 19365261312 (18.04 GB)
      DFS Remaining: 221099229184 (205.91 GB)
      DFS Used%: 0.97%
      DFS Remaining%: 91.06%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:03 BRT 2016

      Hostname: hadoopslave6
      Decommission Status : Normal
      Configured Capacity: 242807939072 (226.13 GB)
      DFS Used: 2370150400 (2.21 GB)
      Non DFS Used: 20122038272 (18.74 GB)
      DFS Remaining: 220315750400 (205.19 GB)
      DFS Used%: 0.98%
      DFS Remaining%: 90.74%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:04 BRT 2016

      Hostname: hadoopslave5
      Decommission Status : Normal
      Configured Capacity: 242513305600 (225.86 GB)
      DFS Used: 2767527936 (2.58 GB)
      Non DFS Used: 21849722880 (20.35 GB)
      DFS Remaining: 217896054784 (202.93 GB)
      DFS Used%: 1.14%
      DFS Remaining%: 89.85%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:03 BRT 2016

      Hostname: hadoopslave3
      Decommission Status : Normal
      Configured Capacity: 242807939072 (226.13 GB)
      DFS Used: 2230878208 (2.08 GB)
      Non DFS Used: 20679671808 (19.26 GB)
      DFS Remaining: 219897389056 (204.80 GB)
      DFS Used%: 0.92%
      DFS Remaining%: 90.56%
      Configured Cache Capacity: 0 (0 B)
      Cache Used: 0 (0 B)
      Cache Remaining: 0 (0 B)
      Cache Used%: 100.00%
      Cache Remaining%: 0.00%
      Xceivers: 1
      Last contact: Wed Jul 27 12:54:02 BRT 2016

      Thanks a lot!

      Delete
  24. Hi Mark

    Please, help me understand more details about the level of parallelism. If I run this code:
    java -classpath CLASSPATH:weka.jar weka.Run .WekaClassifierEvaluationHadoopJob -hdfs-host 192.168.10.120 -hdfs-port 9000 -jobtracker-host 192.168.10.120 -jobtracker-port 54311 -existing-header /user/arff/header.arff -class last -input-paths /user/database.csv -output-path /user/result05 -max-split-size 5000000 -randomized-chunks -num-chunks 5 -num-folds 10 -W weka.classifiers.bayes.BayesNet -num-nodes 8 -- -D -Q weka.classifiers.bayes.net.search.local.K2 -- -P 4 -S BAYES -E weka.classifiers.bayes.net.estimate.SimpleEstimator -- -A 0.1

    - My test file database.csv has 150.000.000 bytes **obs(dataset size)
    - Number of chunks: 5 **obs(num-chunks 5)
    - Max split size: 5.000.000 **obs(max-split-size 5000000)
    - Probable size of each chunk: 30.000.000 **obs(150.000.000/5chunks)
    - Probable number of MapTasks for each chunk: 6 **obs(30.000.000/5.000.000max-split-size)
    - Probable total number of MapTasks: 30 **obs(6*5chunks)
    So, the distributed Weka will divide these 30 maptasks between the 8 nodes that are available. This logic is right?
    Is there something wrong in the code above?

    Thanks

    ReplyDelete
  25. The randomised data chunk job will run first, to create 5 chunks (separate output files in HDFS). This should involve approximately 30 map tasks to accomplish, given that your input file is 150Mb and the max split size is 5Mb. The classifier building and evaluation phase will also use 30 map tasks (i.e. 6 per chunk file). If you want to have fewer/more maps run for classifier building/evaluation then I'd run the randomized data chunk job first (as a separate job) and then the classifier evaluation job with a different setting for -max-split-size. As long as you keep the -randomized-chunks option for the evaluation job it will detect that pre-existing chunk files exist in the output directory in HDFS and will use these.

    In general, if you have the resources available, then the most efficient execution involves a single wave of map tasks. E.g. if you have 8 nodes in your cluster, and each has two "slots" for map tasks, then typically the fastest results will be had by engineering exactly 16 map tasks to run. Of course, the algorithms you are using and available memory may dictate that more map tasks (smaller block/file sizes) be used.

    Cheers,
    Mark.

    ReplyDelete