Monday, 25 January 2016

CPython Scripting in Pentaho Data Integration

Using the approach developed for integrating Python into Weka, Pentaho Data Integration (PDI) now has a new step that can be used to leverage the Python programming language (and its extensive package-based support for scientific computing) as part of a data integration pipeline. The step has been released to the community from Pentaho Labs and can be installed directly from PDI via the marketplace.


Python is becoming a serious contender to R when it comes to programming language choice for data scientists. In fact, many folks are leveraging the strengths of both languages when developing solutions. With that in mind, it is clear that data scientists and predictive application developers can boost productivity by leveraging the PDI + Python combo. As we all know, data preparation consumes the bulk of time in a typical predictive project. That data prep can typically be achieved more quickly in PDI, compared to developing code from scratch, thanks to its intuitive graphical development environment and extensive library of connectors and processing steps. Instead of having to write (and rewrite) code to connect to source systems (such as relational databases, NoSQL databases, Hadoop filesystems and so forth), and to join/filter/blend data etc., PDI allows the developer to focus their coding efforts on the cool data science-oriented algorithms.

CPython Script Executor

As the name suggests, the new step uses the C implementation of the Python programming language. While there are JVM-based solutions available - such as Jython - that allow a more tightly integrated experience when executing in the JVM, these do not facilitate the use of many high-powered Python libraries for scientific computing, due to the fact that such libraries include highly optimised components that are written in C or Fortran. In order to gain access to such libraries, the PDI step launches, and communicates with, a micro-service running in the C Python environment. Communication is done over plain sockets and messages are stored in JSON structures. Datasets are transmitted as CSV and the very fast routines for reading and writing CSV from the pandas Python package are leveraged.

The step itself offers maximum flexibility when it comes to dealing with data. It can act as a start point/data source in PDI (thus allowing the developer the freedom to source data directly via their Python code if so desired), or it can accept data from an upstream step and push it into the Python environment. In the latter case, the user can opt to send all incoming rows to Python in one hit, send fixed sized batches of rows, or send rows one-at-a-time. In any of these cases the data sent is considered a dataset, gets stored in a user-specified variable in Python, and the user's Python script is invoked. In the "all data" case, there is also the option to apply reservoir sampling to down-sample to a fixed size before sending the data to Python. The pandas DataFrame is used as the data structure for datasets transferred into Python.



A python script can be specified via the built-in editor, or loaded from a file dynamically at runtime. There are two scenarios for getting output from the Python environment to pass on to downstream PDI steps for further processing. The first (primary) scenario is when there is a single variable to retrieve from Python and it is a pandas DataFrame. In this case, the columns of the data frame become output fields from the step. In the second scenario, one or more non-data frame variables may be specified. In this case, their values are assumed to be textual (or can be represented as text) or contain image data (in which case they are retrieved from Python as binary PNG data). Each variable is output in a separate PDI field.


Requirements

The CPython Script Executor step will work with PDI >= 5.0. Of course, it requires Python to be installed and the python executable to be in your PATH environment variable. The step has been tested with Python 2.7 and 3.x and, at a minimum, needs the pandas library to be installed. For Windows users in particular, I'd recommend installing the excellent Anaconda python distribution. This includes the entire SciPy stack (including pandas and scikit-learn) along with lots of other libraries.

Example

The example transformation shown in the following screenshot can be obtained from here.

The example uses Fisher's classic iris data. The first python step (at the top) simply computes some quartiles for the numeric columns in the iris data. This is output from the step as a pandas DataFrame, where each row corresponds to one of the quartiles computed (25th, 50th and 75th), and each column holds the value for one of the numeric fields in the iris data. The second python step from the top uses the scikit-learn decomposition routine to compute a principal components analysis on the iris data and then transforms the iris data into the PCA space, which is then the output of the step. The third python step from the top uses the matplotlib library and plotting routines from the pandas library to compute some visualisations of the iris data (scatter plot matrix, Andrew's curves, parallel coordinates and rad-viz). These are then extracted as binary PNG data from the python environment and saved to files in the same directory as the transformation was loaded from. The two python steps at the bottom of the transformation learn a decision tree model and then use that model to score the iris data respectively. The model is saved (from the python environment) to the directory that the transformation was loaded from.


Conclusion

The new PDI CPython Script Executor step opens up the power of Python to the PDI developer and data scientist. It joins the R Script Executor and Weka machine learning steps in PDI as part of an expanding array of advanced statistical and predictive tools that can be leveraged within data integration processes.

9 comments:

  1. Great stuff. Thanks for sharing this!
    Question: Is it possible to pass a JSON string instead of a CSV?

    ReplyDelete
  2. Kettle rows are transferred as CSV and materialised as a pandas data frame on the Python side. If your json strings are stored as quoted kettle fields (escaping is probably necessary for quotes in the json itself) then I guess it should be possible to just pull the values out of the rows of the frame on the Python side.

    We could, in a future release, add an option to the row-by-row mode to simply assign the value of each field in the Kettle row to a separate variable in Python.

    Cheers,
    Mark.

    ReplyDelete
  3. I keep getting the error as at org.pentaho.di.trans.steps.cpythonscriptexecutor.CPythonScriptExecutor.executeScript(CPythonScriptExecutor.java:446)

    Any suggestions ?

    I am using Anaconda python.

    ReplyDelete
  4. I assume that you have the Anaconda bin directory in your PATH? Is Anaconda installed system-wide, or in your home directory? Some people have reported problems with a system-wide installation - something to do with file permissions preventing Anaconda from writing data.

    Is there a stack trace available on the console or in the PDI logs?

    Cheers,
    Mark.

    ReplyDelete
  5. Hi Mark!
    Thanks a lot for wonderful job.
    It is amazing opportunity ti incorporate pure Python script inside PDI.
    I use PDI 7.0 Community Edition on iOS X and Ubuntu.
    But sometimes I got a error after saving and trying to open CPythonExecutor step.
    "Enable to open dialog for this step"
    Argument cannot be null
    java.lang.IllegalArgumentException: Argument cannot be null
    at org.eclipse.swt.SWT.error(Unknown Source)
    at org.eclipse.swt.SWT.error(Unknown Source)
    at org.eclipse.swt.SWT.error(Unknown Source)
    at org.eclipse.swt.widgets.Widget.error(Unknown Source)
    at org.eclipse.swt.widgets.Text.setText(Unknown Source)
    at org.pentaho.di.ui.core.widget.TextVar.setText(TextVar.java:210)
    at org.pentaho.di.ui.trans.steps.cpythonscriptexecutor.CPythonScriptExecutorDialog.getData(CPythonScriptExecutorDialog.java:886)
    at org.pentaho.di.ui.trans.steps.cpythonscriptexecutor.CPythonScriptExecutorDialog.open(CPythonScriptExecutorDialog.java:249)
    at org.pentaho.di.ui.spoon.delegates.SpoonStepsDelegate.editStep(SpoonStepsDelegate.java:127)
    at org.pentaho.di.ui.spoon.Spoon.editStep(Spoon.java:8789)
    at org.pentaho.di.ui.spoon.trans.TransGraph.editStep(TransGraph.java:3179)
    at org.pentaho.di.ui.spoon.trans.TransGraph.mouseDoubleClick(TransGraph.java:775)
    at org.eclipse.swt.widgets.TypedListener.handleEvent(Unknown Source)
    at org.eclipse.swt.widgets.EventTable.sendEvent(Unknown Source)
    at org.eclipse.swt.widgets.Display.sendEvent(Unknown Source)
    at org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source)
    at org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source)
    at org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source)
    at org.eclipse.swt.widgets.Widget.notifyListeners(Unknown Source)
    at org.eclipse.swt.widgets.Display.runDeferredEvents(Unknown Source)
    at org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source)
    at org.pentaho.di.ui.spoon.Spoon.readAndDispatch(Spoon.java:1359)
    at org.pentaho.di.ui.spoon.Spoon.waitForDispose(Spoon.java:7990)
    at org.pentaho.di.ui.spoon.Spoon.start(Spoon.java:9290)
    at org.pentaho.di.ui.spoon.Spoon.main(Spoon.java:685)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.pentaho.commons.launcher.Launcher.main(Launcher.java:92)

    ReplyDelete
  6. I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Pentaho , kindly contact us http://www.maxmunus.com/contact
    MaxMunus Offer World Class Virtual Instructor led training on Pentaho . We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
    For Demo Contact us.
    Nitesh Kumar
    MaxMunus
    E-mail: nitesh@maxmunus.com
    Skype id: nitesh_maxmunus
    Ph:(+91) 8553912023
    http://www.maxmunus.com/


    ReplyDelete
  7. Hi
    Thanks for making this available. I'm using the CPython Scripting tab in the Explorer interface. I have loaded a csv file into Explorer and this is transferred as a py_data dataframe into the CPython tab and I can inspect the data. However, if I make changes to the dataframe, adding another column for instance, how do I update the data loaded into Explorer? I have tried to write out a csv file so I can load it in again, but this fails with an error. I wonder if you have an example?

    Thanks again,
    Mark.

    ReplyDelete
  8. Hi Mark,

    I have a script which decodes a URL , I just need to feed the input URL to it. Can you help me please
    below is the script
    #!python

    import sys
    import re
    import urllib.parse
    import html.parser

    def main():
    rewrittenurl = sys.argv[1]
    match = re.search(r'https://urldefense.proofpoint.com/(v[0-9])/', rewrittenurl)
    if match:
    if match.group(1) == 'v1':
    decodev1(rewrittenurl)
    elif match.group(1) == 'v2':
    decodev2(rewrittenurl)
    else:
    print('Unrecognized version in: ', rewrittenurl)

    else:
    print('No valid URL found in input: ', rewrittenurl)

    def decodev1 (rewrittenurl):
    match = re.search(r'u=(.+?)&k=',rewrittenurl)
    if match:
    urlencodedurl = match.group(1)
    htmlencodedurl = urllib.parse.unquote(urlencodedurl)
    url = html.parser.HTMLParser().unescape(htmlencodedurl)
    print(url)
    else:
    print('Error parsing URL')

    def decodev2 (rewrittenurl):
    match = re.search(r'u=(.+?)&[dc]=',rewrittenurl)
    if match:
    specialencodedurl = match.group(1)
    trans = str.maketrans('-_', '%/')
    urlencodedurl = specialencodedurl.translate(trans)
    htmlencodedurl = urllib.parse.unquote(urlencodedurl)
    url = html.parser.HTMLParser().unescape(htmlencodedurl)
    print(url)
    else:
    print('Error parsing URL')

    if __name__ == '__main__':
    main()
    I need to give rewrittenurl variable the output of my previous step in pentaho.(which is coming from a select values having field name S3link.
    Thanks

    ReplyDelete