Wednesday 4 March 2015

Weka and Spark

Spark is smokin' at the moment. Every being and his/her/its four-legged canine-like companion seems to be scrambling to provide some kind of support for running their data processing platform under Spark. Hadoop/YARN is the old big data dinosaur, whereas Spark is like Zaphod Beeblebrox - so hip it has trouble seeing over its own pelvis; so cool you could keep a side of beef in it for a month :-) Certainly, Spark has advantages over first generation map-reduce when it comes to machine learning. Having data (or as much as possible) in memory can't be beat for iterative algorithms. Perhaps less so for incremental, single pass methods.

Anyhow, the coolness factor alone was sufficient to get me to have a go at seeing whether Weka's general purpose distributed processing package - distributedWekaBase - could be leveraged inside of Spark. To that end, there is now a distributedWekaSpark package which, short of a few small modifications to distributedWekaBase (mainly to support some retrieval of outputs in-memory rather than from files), proved fairly straightforward to produce. In fact, because develop/test cycles seemed so much faster in Spark than Hadoop, I prototyped Weka's distributed k-means|| implementation in Spark before coding it for Hadoop.

Internals

Internally, distributedWekaSpark handles CSV files only at present. In the future we'll look at supporting other data formats, such as Parquet, Avro and sequence files. CSV handling is the same as for distributedWekaHadoop - due to the fact that source data is split/partitioned over multiple nodes/workers it can't have a header row, and attribute names can be supplied via a "names" file or manually as a parameter to the jobs. The CSV data is loaded and parsed just once, resulting in an
RDD<weka.core.Instance> distributed data structure. Thanks to Spark's mapPartitions() processing framework, processing proceeds in much the same way as it does in distributedWekaHadoop, with the exception that the overhead of re-parsing the data on each iteration when running iterative algorithms is eliminated. Processing an entire partition at a time also avoids object creation overhead when making use of distributedWekaBase's classes.

Reduce operations are pairwise associative and commutative in Spark, and there isn't quite the analogue of a Hadoop reduce (where a single reducer instance iterates over a list of all elements with the same key value). Because of this, and to avoid lots of object creations again, many "reduce" operations were implemented via sorting and repartitioning followed by a map partitions phase. In most cases this approach works just fine. In the case of the job that randomly shuffles and stratifies the input data, the result (when there are class labels that occur less frequently than the number of requested folds) is slightly different to the Hadoop implementation. The Spark implementation results in these minority classes getting oversampled slightly.

distributedWekaSpark is not the only Weka on Spark effort available. Kyriakos-Aris Koliopoulos has been developing at the same time as myself, and released a proof-of-concept he developed for his Masters these about five months ago:

https://github.com/ariskk/distributedWekaSpark

I've borrowed his nifty cache heuristic that uses the source file size and object overhead settings to automatically determine a Spark storage level to use for RDDs.

Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. This allows Spark steps downstream from the start point in the flow to present a simpler version of their configuration UI, i.e. they can hide connection and CSV parsing details (as this is only required to be specified once, at the start step).

What's in the package?

The distributedWekaSpark package comes bundled with core Spark classes that are sufficient for running out-of-the-box in Spark's local mode and sourcing data from the local filesystem. This mode can take advantage of all the cores on your desktop machine by launching workers in separate threads. All the bundled template examples for Spark available from the Knowledge Flow's template menu use this mode of operation. It should also be sufficient to run against a standalone Spark cluster using a shared filesystem such as NTFS. 

To run against HDFS (and/or on YARN), it is necessary to delete all the Spark jar files in ${user.home}/wekafiles/packages/distributedWekaSpark/lib and copy in the spark-assembly-X.Y.Z-hadoopA.B.C.jar from your Spark distribution, as this will have been compiled against the version of Hadoop/HDFS that you're using.

All the same jobs that are available in distributedWekaHadoop have been implemented in distributedWekaSpark. Like in the Hadoop case, there is a full featured command line interface available. All jobs can stand alone - i.e. they will invoke other jobs (such as ARFF header creation and data shuffling) internally if necessary. As mentioned above, when running in the Knowledge Flow, individual job steps become aware of the Spark context and what datasets already exist in memory on the cluster. This allows the configuration of connection details and CSV parsing options to only have to be specified once, and downstream job steps can simplify their UI accordingly.





When referencing inputs or outputs in HDFS, the Weka Spark code handles hdfs:// URLs in a somewhat non-standard way. Like the way the hadoop fs/hdfs command operates relative to the current user's directory in HDFS (unless an absolute path is specified), hdfs:// URLs are interpreted relative to the current user's directory. So a URL like

hdfs://host:port/output/experiment1

refers to the output/experiment1 directory in the current user's home directory. To force an absolute path an extra / is needed - e.g.

hdfs://host:port//user/mhall/output/experiment1

Limitations

Aside from only handling CSV files at present, another limitation stems from the the fact that only one Spark context can be active within a single JVM. This imposes some constraints on the structure of Knowledge Flow processes using Spark steps. There can only be one Spark-related start point in a given Knowledge Flow graph. This is because a start point is where the Spark context is first created; so having more than one will lead to grief.

When running on a Spark cluster managed by YARN only the "yarn-client" mode is supported. This is the mode where the driver program executes on the local machine and the YARN resource manager is simply used to provision worker nodes in the cluster for Spark to use. The reason for this is that Weka's implementation configures everything programatically via SparkConf/JavaSparkContext, including all Weka and supporting jar files that are needed to run. This works fine for standalone Spark clusters, Mesos clusters and yarn-client mode, but not for yarn-cluster mode (where the driver program itself runs in an application master process on the cluster). In yarn-cluster mode some funky jiggery pokery (as done by the spark-submit shell script) is necessary to get everything configured for the driver to work on the cluster. It seems pretty ugly that this can't be handled by Spark seamlessly behind the scenes via the same SparkConf/SparkContext configuration as the other modes. Hopefully this will get rectified in a future release of Spark. Some discussion of this issue can be seen in the email thread at:

http://markmail.org/message/cmxswp2ffqjrkxix#query:+page:1+mid:42fug7it6f2zoxga+state:results

Another YARN-related issue is that it is necessary to have your Hadoop cluster's conf dir in the CLASSPATH. This is because Spark picks up the resource manager's address and other bits and pieces from the configuration files. Unfortunately, it is not possible to set this information programatically. You can only get at the Hadoop Configuration object being used by Spark internally after the SparkContext has been created - by this time it's too late, as Spark is already trying to talk to the resource manager.

Anyhow, distributedWekaSpark is available from Weka's package manager today. So give it a go and pass on any feedback you might have.

72 comments:

  1. Mark,
    have you got a Java example of how to use distributedWekaSpark from the code ?
    Or some unit/integration tests from which the usage could be derived.
    tnx in advance.

    ReplyDelete
  2. Hi Alex,

    I don't have any specific code examples yet. However, all the individual jobs are OptionHandlers and have main() methods. So you can look at the main method to see how a single job is used. To see how a bunch of different jobs are used within one JVM (sharing a context and RDD datasets) you can look at the code for the Knowledge Flow steps (which invoke the main Weka spark job classes) - in particular weka.gui.beans.AbstractSparkJob (most of the logic is in this class, and subclasses of it are pretty much just extracting different types of results from their corresponding weka.distributed.spark.* job object).

    Cheers,
    Mark.

    ReplyDelete
  3. This is great Mark!
    I was searching for Weka with hadoop and found this which is even better.
    I have a CDH cloudera cluster running but, I don't know how to connect the Weka to it.
    It would be great if a tutorial similar to "Weka and Hadoop Part 1, 2 & 3" can be posted.

    Thank you Mark!

    ReplyDelete
  4. Hi Amr,

    I've verified that it works against a CDH 5.3.0 sandbox VM at least. What you have to do is delete all the jars in wekafiles/packages/distributedWekaSpark/lib and then copy everything from /usr/lib/hadoop/client/ into wekafiles/packages/distributedWekaSpark/lib and the CDH spark assembly jar too (/usr/lib/spark/lib).

    The sample flows included in Knowledge Flow will then run against the CDH Spark master if you 1) copy the hypothyroid.csv file into hdfs somewhere, set the inputFile setting of the ArffHeaderSparkJob to point to the directory in hdfs, and then set the masterHost setting appropriately. E.g. for my quickstart VM I used:

    inputFile: hdfs://quickstart.cloudera:8020/input/hypothyroid.csv
    masterHost: spark://quickstart.cloudera

    Optionally you can st the outputDir to point to a directory in hdfs too, rather than saving the results to your client machine's home directory.

    Note that this was running against standalone Spark on CDH. I haven't tried it against YARN on CDH yet.

    Cheers,
    Mark.

    ReplyDelete
  5. Thanks Mark!
    I'll try doing that soon and see what happens.

    ReplyDelete
  6. Hi Mark,
    I have gone through the steps and built up the Knowledge Flow but, I get "12:48:50: Could not parse Master URL: ':null'" when I try to run the flow.

    In ArffHeaderSparkJob I have:
    inputfile:hdfs://192.168.56.102:8020/user/input/hypothyroid.csv
    masterhost: spark://192.168.56.102
    masterport: 18080
    outputDir: C:\Users\Amr\wekafiles\OutDir
    pathtoWekaJar: C:\Program Files (x86)\Weka-3-7\weka.jar

    Does that seem alright?

    Thank you,
    Amr

    ReplyDelete
  7. Is there a stack trace in the console or in the Weka log (${user.home}/wekafiles/weka.log)? I assume that port 18080 is correct for your cluster (the default in Spark is 7077 for the master).

    Also, did you start with the template Knowledge Flow file for producing just an ARFF header? This can be accessed from the templates menu (third button from the right in the Knowledge Flow's toolbar).

    Cheers,
    Mark.

    ReplyDelete
  8. I missed starting with the file for producing the ARFF header. Thanks for pointing that out. Now I get "14:34:30: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]"

    I have tried 18080 as well.

    This is the log file:
    ....
    INFO: Monday, 30 March 2015
    WARNING: loading a newer version (3.7.13-SNAPSHOT > 3.7.12)!
    Combined: -master spark://192.168.56.101 -port 7077 -weka-jar "C:\\Program Files (x86)\\Weka-3-7\\weka.jar" -input-file hdfs://192.168.56.101:8020/user/input/hypothyroid.csv -min-slices 4 -output-dir "C:\\Users\\Amr Munshi\\wekafiles\\OutDir" -cluster-mem -1.0 -overhead 3.0 -mem-fraction 0.6 -names-file ${user.home}/wekafiles/packages/distributedWekaSpark/sample_data/hypothyroid.names -header-file-name hypo.arff -M ? -E ' -F , -compression 50.0
    2015-03-30 14:34:03 weka.gui.beans.LogPanel logMessage
    INFO: [FlowRunner] launching flow start points in parallel...
    2015-03-30 14:34:03 weka.gui.beans.LogPanel logMessage
    INFO: [FlowRunner] Launching flow 1...
    2015-03-30 14:34:03 weka.gui.beans.LogPanel logMessage
    INFO: Setting job name to: WekaKF:ARFF instances header job
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/C:/Users/Amr%20Munshi/wekafiles/packages/distributedWekaSpark/lib/slf4j-log4j12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/C:/Users/Amr%20Munshi/wekafiles/packages/distributedWekaSpark/lib/spark-assembly-1.3.0-hadoop2.0.0-mr1-cdh4.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    2015-03-30 14:34:04 weka.gui.beans.LogPanel logMessage
    INFO: INFO - Changing view acls to: Amr Munshi


    ReplyDelete
  9. This comment has been removed by the author.

    ReplyDelete
  10. 2015-03-30 14:34:46 weka.gui.beans.LogPanel logMessage
    INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]

    2015-03-30 14:34:47 weka.gui.beans.LogPanel logMessage
    INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]

    2015-03-30 14:34:48 weka.gui.beans.LogPanel logMessage
    INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]

    2015-03-30 14:34:49 weka.gui.beans.LogPanel logMessage
    INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]

    2015-03-30 14:35:05 weka.gui.beans.LogPanel logMessage
    INFO: ERROR - Application has been killed. Reason: All masters are unresponsive! Giving up.

    2015-03-30 14:35:05 weka.gui.beans.LogPanel logMessage
    INFO: WARN - Application ID is not initialized yet.

    2015-03-30 14:35:05 weka.gui.beans.LogPanel logMessage
    INFO: ERROR - Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up.

    ReplyDelete
  11. Hi Amr,

    Is 192.168.56.102 the address for the master that is displayed on the Spark cluster web UI? I've found that you need to use exactly the address that is shown there. Beyond this it is almost certainly likely to be a networking issue. Spark uses a bunch of ports and web servers for communication between the driver, master and workers. I'm assuming that your Windows machine is not on the same network as the cluster. You'll need to check that all the necessary ports are accessible from your machine.

    Another thing to check is that your machine (the client) is accessible from the master on the cluster. Under *nix at least, the output of the 'hostname' command or first non-local host interface of the client machine is picked up and passed to the master. If this is not reachable by the master then you'll have problems. There are a couple of properties (that can be set in the Weka Spark step's property field) that can be used to explicitly set this if necessary: spark.local.ip and spark.driver.host.

    If possible, I would first suggest running Weka on one of the cluster nodes. If this works, then the problems are definitely network related with respect to your Windows machine.

    Cheers,
    Mark.

    ReplyDelete
  12. This comment has been removed by the author.

    ReplyDelete
  13. The ip is dynamic so I checked it before thats why I have 192.168.56.101 and sometimes 192.168.56.102. And I tried pinging from the master's cmd and it worked.
    I think as you suggested that I have some issues with Spark configurations on the client.

    Thanks for helping

    ReplyDelete
  14. Hi Mark,
    I solved the issue, it was something to do with the IPs of the master and client machine. I gave static IPs to all and I got them connected.

    Now, I have in the 'inputFile'=hdfs://192.168.56.105:8020/user/input/hypothyroid.csv

    But, I get: ArffHeaderSparkJob$189847010|ERROR: Input path does not exist: hdfs://192.168.56.105:8020/user/Amr/user/input/hypothyroid.csv. It adds "/user/Amr/" to the input file.

    ReplyDelete
  15. Excellent. The code allows for both relative (to your home directory in HDFS) and absolute paths. Use either:

    hdfs://192.168.56.105:8020/input/hypothyroid.csv
    or
    hdfs://192.168.56.105:8020//user/Amr/user/input/hypothyroid.csv

    Note the // after the port number in the second URL.

    Cheers,
    Mark.

    ReplyDelete
  16. Thanks a lot Mark!
    Appreciate your help and patience.

    ReplyDelete
  17. Hi Mark

    Can you please give us an example on how to use this library from a Java program ?, i know how to use Weka from Java and also Spark but i can't understand how to use this in Java.

    Thank you

    ReplyDelete
  18. Each job is basically self contained and can be invoked from a Java program with just a few lines of code. Outputs are written to the file system and, in many cases, can be obtained from the job object programatically. For example, using the classifier building job:

    String[] options = ... // an array of command line options for the job
    WekaClassifierSparkJob wcsj = new WekaClassifierSparkJob();
    wcsj.setOptions(options);
    wcsj.runJob();
    Classifier finalModel = wcsj.getClassifier();
    Instances modelHeader = wcsj.getTrainingHeader();

    Cheers,
    Mark.

    ReplyDelete
  19. i get this error while using HDFS problem:

    INFO: ArffHeaderSparkJob$950316213|ERROR: Server IPC version 9 cannot communicate with client version 4
    org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

    any solution ?

    ReplyDelete
  20. Replace all the jar files in ~/wekafiles/packages/distributedWekaSpark/lib with the spark assembly jar that comes with your spark distribution.

    Cheers,
    Mark.

    ReplyDelete
  21. Hi Mark,
    This is an interesting implementation. Any Spark wrapper for LinearRegression ?
    Thanks.
    Jason

    ReplyDelete
  22. Hi Jason,

    You can use the WekaClassifierSpark job to train (via multiple iterations over the data) weka.classifiers.functions.SGD. Setting the loss function in SGD to squared loss will give you linear regression (albeit without the attribute selection performed by weka.classifiers.LinearRegression).

    Cheers,
    Mark.

    ReplyDelete
  23. Hi Mark,
    I would like to implement distributedWekaSpark jobs on yarn-cluster but I'm really stuck with even the basics like how to setup the data through because I'm just starting out with spark as well. Can you kindly share how i can use the functions/class Dataset or which class I should use to set up the data.

    ReplyDelete
  24. It's not really necessary to dig that deep into the supporting classes and API if you're wanting to use the distributedWeka jobs programatically. Each job class has a fully functional command line option parsing mechanism, so you just need to construct an instance of a particular job (e.g. WekaClassifierSparkJob) and then call setOptions() to fully configure it. Then you just call the runJob() method to execute it. If you want to get any outputs produced, then all the jobs write to the specified output directory. Alternatively, most of them have methods to retrieve outputs programatically once the job completes (e.g. the WekaClassifierSparkJob as a getClassifier() method and a getTrainingHeader() method to get the trained classifier and header-only Instances object of the training data respectively).

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Hi Mark,
      Its actually much easier than I thought to set up a job, thanks for all the work. However, I'm still having a little problem, see my code below:

      //set sparkContext
      JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));

      //Load Header as Instances object
      String toyDataHeader = "/Users/dmatekenya/Documents/BestDrSc/LocationPrediction2.0/data/toyData/toyDataHeader.arff";

      DataSource source_l = new DataSource(toyDataHeader);
      Instances header = source_l.getDataSet();

      //File containing csv data
      String toyDataCSV = "/Users/dmatekenya/Documents/BestDrSc/LocationPrediction2.0/data/toyData/toyDataCSV.csv";

      //Now create WekaClassifierSparkJob and configure it
      WekaClassifierSparkJob cls = new WekaClassifierSparkJob();

      JavaRDD data_RDD = cls.loadCSVFile(toyDataCSV,header,"", sc,"",-1,false);

      Dataset distData = new Dataset("toyData",data_RDD);

      cls.setDataset("toyData",distData);

      cls.setNumIterations(50);

      cls.setClassAttribute(4);

      wcsj.runJob();

      The problem is on the line where I try to load CSV file. It gives the compilation error shown below which I'm not sure how to resolve:

      "The type weka.gui.beans.ClassifierProducer cannot be resolved. It is indirectly referenced from required .class files"

      Thanks in advance for your help.
      Dunstan

      Delete
    2. Can you provide your example to my mail id vsdrshn@gmail.com it would be very helpful thanks in advance also wanted to know if we can use voting mechanism in spark programming

      Delete
  25. Mark,
    Thanks for the quick response. I guess I'm thinking I have to follow the same pipeline as in regular weka. Let me have a go at it again. I will post if I need further help.

    Best,
    Dunstan

    ReplyDelete
  26. I installed Weka 3.7.13 x64 for Windows and all the distributed packages. I then loaded the "Spark:train and save two classifiers" knowledge flow and ran it. The ArfHeaderSparkJob "Executing" takes forever (close to two hours already) and there is no error messages. What is going on? It is said to "run out of the box" so is it not an example? How do I know Spark works properly in Weka?

    Thanks, Chen

    ReplyDelete
  27. I would recommend that you just install distributedWekaSpark (which will also install distributedWekaBase automatically). Weka's package system uses a shared classpath. This makes having package dependencies easy to implement but can lead to library clashes. Installing all the distributed Weka for Hadoop packages as well as for Spark will probably lead to issues.

    To see if there are exceptions/problems you should use the Start menu option for launching Weka with a console. You can also check the weka.log file - ${user.home}/wekafiles/weka.log.

    I have run the template examples for distributedWekaSpark under x64 Windows 7 and x64 Windows 8 without issues.

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Hi, Mark:

      Thanks for your prompt reply! That's the problem. When Hadoop packages are removed, the problem is gone.

      Then I simplified the KnowledgeFlow to just WekaClassifierSparkJob (Naive Bayes) and try to scale up the data size. The data I used is KDD99. I know my data files are OK since I ran PySpark classifiers on them successfully. I wish to see Weka running it. The classifier ran OK up to 10% of the KDD99 but raised error at the full set. The ERROR lines in the console (twice after screen top overflow) are below:

      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      16/01/26 14:48:39 ERROR scheduler.TaskSchedulerImpl: Exception in statusUpdate
      java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler
      .TaskResultGetter$$anon$3@67d1a07f rejected from java.util.concurrent.ThreadPool
      Executor@c45a52c[Terminated, pool size = 0, active threads = 0, queued tasks = 0
      , completed tasks = 2]

      Delete
  28. Hy,

    Please is there any way to Handle GRIB Files with Weka or Spark please?

    Thank you

    ReplyDelete
  29. I don't think there is a way to read GRIB files with Weka regardless of whether it's running on your desktop or in Spark :-) You will need to convert your GRIB files to CSV before you can process them using Weka on Spark.

    Cheers,
    Mark.

    ReplyDelete
  30. Thank you very much Mark, I hope there will be examples to use your Spark package, because i want to do some comparaison of clustering algorithms. Anyway, thank you again :)

    ReplyDelete
  31. Hello mark, is there any memory constraint if i use larger datasets on spark templets ? basic weka has memory constraint of 1.2GB

    ReplyDelete
  32. That will depend on the configuration of your Spark cluster, size of the dataset, number of partitions in the RDD that backs the dataset and choice of algorithm. In general, you will want to make sure that there are enough partitions in the data to enable a given partition to be processed in the heap space available to a Spark worker process.

    Cheers,
    Mark.

    ReplyDelete
  33. Hello Mark ,Can you please provide an example on how i can run j48 classifier in my cluster?Also wanted to know if i can create and save the model once and apply to the test set to generate the class Thanks in advance

    ReplyDelete
  34. Hi, I have installed distributedwekaspark on my machine (os-windows 8, weka-3.8, spark 2.0). The package works fine when i run it in standalone mode. But when i replace the jar files from the spark distribution (spark standalone mode work on the same machine - windows-8 which is also working) and execute the arfffheader flow, it throws below error. can you please let me know if it can work with spark on the same machine.?
    Error logs from weka.log as below -

    java.lang.AbstractMethodError: weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;
    org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:152)
    org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:152)
    org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    org.apache.spark.scheduler.Task.run(Task.scala:85)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    java.lang.Thread.run(Unknown Source)

    ReplyDelete
  35. There are API breaking changes in Spark 2.0. Distributed Weka for Spark has not been changed to work against Spark 2.0, and I don't anticipate adding 2.0 support for a while yet.

    Cheers,
    Mark.

    ReplyDelete
  36. Hello Mark
    I am trying to execute the example, but i get this error:

    aused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Expected 3887 fields, but got 1 for row
    weka.distributed.CSVToARFFHeaderMapTask.processRowValues(CSVToARFFHeaderMapTask.java:1121)
    weka.distributed.CSVToARFFHeaderMapTask.processRow(CSVToARFFHeaderMapTask.java:1296)
    weka.distributed.spark.ArffHeaderSparkJob$1.call(ArffHeaderSparkJob.java:779)
    weka.distributed.spark.ArffHeaderSparkJob$1.call(ArffHeaderSparkJob.java:754)
    org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:138)

    I have only Arffherader and wekaClassifier with two text viewer

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
  37. Hello Mark,

    really nice stuff! I can run it local and I am now trying to connect it to my hortonworks 2.5 installation. I found the spark-assembly files as described in the package description and copied it. However, mu cluster has kerberos authentication and I am looking for a way to use properties in the spark job configuration in Weka to pass on these parameters. Is this at all possible at the moment? And if so, how?

    Thanks in advance.

    best regards
    Andreas

    ReplyDelete
  38. instructor lead live training in Big Data Hadoop and Spark Developer, kindly contact us http://www.maxmunus.com/contact
    MaxMunus Offer World Class Virtual Instructor led training on TECHNOLOGY. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
    For Demo Contact us.
    Sangita Mohanty
    MaxMunus I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual)
    E-mail: sangita@maxmunus.com
    Skype id: training_maxmunus
    Ph:(0) 9738075708 / 080 - 41103383
    http://www.maxmunus.com/

    ReplyDelete
  39. Hello Mark,

    I continuously get the following error. I appreciate any help

    ...
    12:47:50: INFO - Added rdd_1_0 in memory on 10.50.2.203:42650 (size: 10.7 KB, free: 366.3 MB)
    12:47:50: WARN - Lost task 0.0 in stage 0.0 (TID 0, 10.50.2.203): java.lang.AbstractMethodError: weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;
    ...
    12:47:50: INFO - Starting task 0.1 in stage 0.0 (TID 1, 10.50.2.203, partition 0, ANY, 6090 bytes)
    12:47:50: INFO - Launching task 1 on executor id: 0 hostname: 10.50.2.203.
    12:47:50: INFO - Lost task 0.1 in stage 0.0 (TID 1) on executor 10.50.2.203: java.lang.AbstractMethodError (weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;) [duplicate 1]
    12:47:50: INFO - Starting task 0.2 in stage 0.0 (TID 2, 10.50.2.203, partition 0, ANY, 6090 bytes)
    12:47:50: INFO - Launching task 2 on executor id: 0 hostname: 10.50.2.203.
    12:47:51: INFO - Lost task 0.2 in stage 0.0 (TID 2) on executor 10.50.2.203: java.lang.AbstractMethodError (weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;) [duplicate 2]
    12:47:51: INFO - Starting task 0.3 in stage 0.0 (TID 3, 10.50.2.203, partition 0, ANY, 6090 bytes)
    12:47:51: INFO - Launching task 3 on executor id: 0 hostname: 10.50.2.203.
    12:47:51: INFO - Lost task 0.3 in stage 0.0 (TID 3) on executor 10.50.2.203: java.lang.AbstractMethodError (weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;) [duplicate 3]
    12:47:51: ERROR - Task 0 in stage 0.0 failed 4 times; aborting job
    12:47:51: INFO - Removed TaskSet 0.0, whose tasks have all completed, from pool
    12:47:51: INFO - Cancelling stage 0
    12:47:51: INFO - ResultStage 0 (collect at ArffHeaderSparkJob.java:840) failed in 3.034 s
    12:47:51: INFO - Job 0 failed: collect at ArffHeaderSparkJob.java:840, took 3.287746 s
    12:47:51: [Basic] WekaClassifierSparkJob$2078201622|Last step - shutting down Spark context
    12:47:51: INFO - Stopped Spark web UI at http://10.50.2.203:4040
    12:47:51: INFO - Shutting down all executors
    12:47:51: INFO - Asking each executor to shut down
    12:47:51: INFO - MapOutputTrackerMasterEndpoint stopped!
    12:47:51: INFO - MemoryStore cleared
    12:47:51: INFO - BlockManager stopped
    12:47:51: INFO - BlockManagerMaster stopped
    12:47:51: INFO - OutputCommitCoordinator stopped!
    12:47:51: INFO - Successfully stopped SparkContext
    12:47:51: [ERROR] WekaClassifierSparkJob$2078201622|Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.50.2.203): java.lang.AbstractMethodError: weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;
    ...

    ReplyDelete
  40. Have you replaced the Spark libraries in ~/wekafiles/packages/distributedWekaSpark/lib with the spark assembly jar that comes with the Spark distribution being used to run your cluster? Also make sure that you are using an Oracle JVM to run both Weka and Spark.

    Cheers,
    Mark.

    ReplyDelete
  41. Yes, I replaced the libraries and I'm using Oracle JVM. But I guess, it's because I use Spark 2.1. I recently seen that it works with older versions. Thanks for the reply though. I'll try with older versions.

    ReplyDelete
  42. Hello Mark,

    I am trying to use the distributedWekaSpark package. Everything is running fine while running the process using files in local system. But when I try to use a file from kerberose enabled hdfs ,its giving me following errors. Is there a way to connect to kerberized cluster?

    ArffHeaderSparkJob$266157602|SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
    weka.core.WekaException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
    at weka.knowledgeflow.steps.AbstractSparkJob.runJob(AbstractSparkJob.java:294)
    at weka.knowledgeflow.steps.AbstractSparkJob.start(AbstractSparkJob.java:221)
    at weka.knowledgeflow.StepManagerImpl.startStep(StepManagerImpl.java:1020)
    at weka.knowledgeflow.BaseExecutionEnvironment$3.run(BaseExecutionEnvironment.java:440)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Thanks

    ReplyDelete
    Replies
    1. Arshaq,

      Did you resolve this? I'm getting the same error.

      Thanks

      Delete
  43. This comment has been removed by the author.

    ReplyDelete
  44. Hi Fray,

    Packages are installed in ${user.home}/wekafiles/packages. Inside this directory you should find distributedWekaBase and distributedWekaSpark (or distributedWekaSparkDev, depending on which one you installed). Each directory contains a src folder.

    Cheers,
    Mark.

    ReplyDelete
    Replies
    1. Thank you very much , another question please, i ran distributed kmeans in local mode and i got a model extension file, how i can view the file

      Delete
  45. you may open weka explorer and load any dataset file to enable other tabs. Now go to cluster tab, under the result list area, right click which will show you load model object file. There you can locate the model file and see it in cluster output window.

    ReplyDelete
  46. Mark,

    Any thoughts on Arshaq's question above?

    ReplyDelete
    Replies
    1. There is nothing built in to distributed Weka to handle kerberos I'm afraid. However, the general approach (not specific to Pentaho data integration) at:

      https://help.pentaho.com/Documentation/5.2/0P0/0W0/030/040

      might be usable.

      Cheers,
      Mark.

      Delete
  47. This comment has been removed by the author.

    ReplyDelete
  48. Dear Mark,
    I am using DistributedWekaSpark via Java.
    I am trying to run WekaClassifierSparkJob and a WekaClassifierEvaluationSparkJob for Linear Regression with the code bellow:

    WekaClassifierSparkJob job = new WekaClassifierSparkJob();
    job.setClassAttribute("last");
    job.setDebug(false);
    job.setModelFileName(classifier.getName()+".model");
    job.setNumIterations(1);
    job.setRandomizeAndStratify(false);
    job.setWriteRandomlyShuffledSplitsToOutput(false);
    job.setClassifierMapTaskOptions("LinearRegression -S 0 -R 1.0E-8 -num-decimal-places 4");
    job.setDataset(previousJob.getDatasets().next().getKey(), previousJob.getDataset(previousJob.getDatasets().next().getKey()));
    job.setCachingStrategy(previousJob.getCachingStrategy());
    job.getSparkJobConfig().setOutputDir(Constants.SPARK_OUTPUT);
    job.runJobWithContext(previousJob.getSparkContext());
    WekaClassifierEvaluationSparkJob job2 = new WekaClassifierEvaluationSparkJob();
    job2.setDebug(false);
    job2.setOutputSubdir(classifier.getName());
    job2.setDataset(previousJob.getDatasets().next().getKey(), previousJob.getDataset(previousJob.getDatasets().next().getKey()));
    job2.setCachingStrategy(previousJob.getCachingStrategy());
    job2.getSparkJobConfig().setOutputDir(Constants.SPARK_OUTPUT);
    job2.runJobWithContext(job.getSparkContext());
    if(job2.getJobStatus().equals(DistributedJob.JobStatus.FINISHED)){
    System.out.println(classifier.getName()+": "+job2.getText());
    }
    Although this seems to work, if I run the same job for Gaussian Processes I get the same results.
    So, I guess I am not configuring the jobs correctly.
    How can I configure them to get the same results as when I run the jobs from Weka KnowledgeFlow?

    Thanks in advance,
    Marios

    ReplyDelete
  49. The key to this is to take a look at the command line options for the job (or the listOptions() method in WekaClassifierMapTask). If you run:

    java weka.Run .WekaClassifierSparkJob -h

    One of the options is:

    -W
    The fully qualified base classifier to use. Classifier options
    can be supplied after a '--'

    So, your setClassifierMapTaskOptions() call needs to actually take the following string:

    "-W weka.classifiers.functions.LinearRegression -- -S 0 -R 1.0E-8 -num-decimal-places 4"

    A similar string will allow you to specify Gaussian processes.

    Cheers,
    Mark.

    ReplyDelete
  50. Good Blog
    Sanjary kids is the best playschool, preschool in Hyderabad, India. Start your play school,preschool in Hyderabad with sanjary kids. Sanjary kids provides programs like Play group,Nursery,Junior KG,Serior KG,and Teacher Training Program.
    best preschool in hyderabad
    preschool teacher training
    playschools in hyderabad
    preschool teacher training in hyderabad

    ReplyDelete
  51. The details captured by the data migration development services help in the migration of files folder excellently. The steps explained by your services are beneficial, which helped me in the migration of the folder files easily without getting interrupted at any point.

    ReplyDelete
  52. As the growth of Big data engineering automation , it is essential to spread knowledge in people. This meetup will work as a burst of awareness.

    ReplyDelete
  53. Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. bedsheets buy online , premium bed sheets , queen size fitted bed sheets , bridal bed covers , cotton duvet sets , vicky razai factory address , sofa cover sofa cover , velvet duvet cover

    ReplyDelete
  54. Very nice article,Thank you for sharing it.
    Keep updating...

    Big Data and Hadoop Online Training

    ReplyDelete
  55. Trade Stocks, Forex, And Bitcoin Anywhere In The World: roboforex login Is The Leading Provider Of Software That Allows You To Trade On Your Own Terms. Whether You Are Operating In The Forex, Stock, Or Cryptocurrency Markets, Use roboforex login Software And Anonymous Digital Wallet To Connect With The Financial World.: roboforex login Is A Currency Trading Company That Allows You To Trade Stocks, Forex, And Cryptocurrency.

    ReplyDelete
  56. Would you be interested in trading links or maybe guest writing a blog post or vice-versa?
    oracle rac online training
    oracle rac training

    ReplyDelete
  57. I desperately looking for a blog like yours, full of information written in simple and understandable language. I always support bloggers like you who do not post only on that topic that make money. Keep it up and a big thumb to you and your work. I also have a request for you nowadays people are obsessed with organic words. Before buying any product they seek organic, especially in foods. Would you like to cover a suggested topic pusht 100% organic whole masoor dal

    ReplyDelete
  58. I am getting this error on my Windows machine for the 4th workflow.: java.lang.NoSuchMethodError: 'sun.misc.Cleaner sun.nio.ch.DirectBuffer.cleaner()'

    ReplyDelete