Running Python on WSO2 Analytics Servers (DAS)

In the previous post we discussed on how to connect jupyter notebook to pyspark. Further going forward, in this post I will discuss on how you can run python scripts, and analyze and build Machine Learning models on top of data stored in WSO2 Data Analytics servers. You may use a vanilla Data Analytics Server (DAS) or, any other analytics server such as ESB Analytics, APM Analytics or IS Analytics Servers for this purpose.


Prerequisites


  • Install jupyter
  • Download WSO2 Data Analytcs Server (DAS) 3.1.0
  • Download and uncompress spark 1.6.2 binary.
  • Dowload pyrolite-4.13.jar


Configure the Analytics Server

In this scenario, Analytics Server will act as the external spark cluster, as well as the data source. Hence it is required to start the Analytics server in cluster mode. For that open <DAS_HOME>/repository/conf/axis2/axis2.xml and enable clustering as follows:
<clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
                enable="true">

When the Analytics server starts in cluster, it creates a spark cluster as well. (Or if its pointed to a external spark cluster, it will join that external cluster). Analytics server also creates a Spark App, and will be accumilating all the existing cores in the cluster. But, when connect python/pyspark for the same cluster, it will also creates an Spark App, but since no cores are available to run, it will be in "waiting" state, and will not run. Therefore to avoid that, we need to limit the amount of resource that gets allocated to the CarbonAnalytics Spark app. In order to do that, open <DAS_HOME>/repository/conf/analytics/spark/spark-defaults.conf file, and set/change the following parameters.
carbon.spark.master.count  1

# Worker
spark.worker.cores 4
spark.worker.memory 4g

spark.executor.memory 2g

spark.cores.max 2
Note that, here "spark.worker.cores" (4) is the number of total cores we allocate for spark. And "spark.cores.max" (2) is the number of maximum  cores allocate for each spark application.

Since we are not using a minimum HA cluster in DAS/Analytics Server, we need to set the following property in  <DAS_HOME>/repository/conf/etc/tasks-config.xml file.
<taskServerCount>1</taskServerCount>

Now start the server by navigating to <DAS_HOME> and executing:
./bin/wso2server.sh

Once the server is up, to check whether the spark cluster is correctly configured, navigate to the spark master web UI on: http://localhost:8081/. It should show something similar to below.



Note that the number of cores allocated for the worker is 4 (2 Used). And the number of cores allocated for the CarbonAnalytics Application is 2. Also here you can see the spark master URL, on the top-left corner (spark://10.100.5.116:7077). This URL is used by pyspark and other clients to connect/submit jobs to this spark cluster.

Now to run a python script on top of this Analytics Server, we have two options:
  • Connect ipython/jupyter notebook, execute the python script from the UI.
  • Execute the raw python script using spark-submit.sh


Connect Jupyter Notebook (Option I)

open ~/.bashrc and add the following entries: 
export PYSPARK_DRIVER_PYTHON=ipython
export PYSPARK_DRIVER_PYTHON_OPTS='notebook' pyspark
export PYSPARK_PYTHON=/home/supun/Supun/Softwares/anaconda3/bin/python
export SPARK_HOME="/home/supun/Supun/Softwares/spark-1.6.2-bin-hadoop2.6"
export PATH="/home/supun/Supun/Softwares/spark-1.6.2-bin-hadoop2.6/bin:$PATH"
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib:$PYTHONPATH
export SPARK_CLASSPATH=/home/supun/Downloads/pyrolite-4.13.jar

When we are running a python script on top of spark, pyspark will submit it as a job to the spark cluster (Analytics server, in this case). Therefore we need to add the all the external jars to the sparks class path, so that the spark executor knows where to look for the classes during runtime. Thus, we need to add absolute path of jars located in <DAS_HOME>/repository/libs directory as well as  <DAS_HOME>/repository/components/plugins directory, to the Spark Class path, seperated by colons (:) as below.
export SPARK_CLASSPATH=/home/supun/Downloads/wso2das-3.1.0/repository/components/plugins/abdera_1.0.0.wso2v3.jar:/home/supun/Downloads/wso2das-3.1.0/repository/components/plugins/ajaxtags_1.3.0.beta-rc7-wso2v1.jar.......

To make the changes take effect, run:
source ~/.bashrc 

Create a new directory, to be used as the python workspace (say "python-workspace"). This directory will be used to store the scripts we create in the notebook. Navigate to that created directory, and start the notebook, specifying the master URL of the remote spark cluster as below, when starting the notebook.
pyspark --master spark://10.100.5.116:7077 --conf "spark.driver.extraJavaOptions=-Dwso2_custom_conf_dir=/home/supun/Downloads/wso2das-3.1.0/repository/conf"

Finally navigate to http://localhost:8888/ to access the notebook, and create a new python script by New --> Python 3.
Then check the spark master UI (http://localhost:8081) again. You should see a second application named "PySparkShell" has been started too, and is using the remaining 2 cores. (see below)




Retrieve Data


In the new python script we created in the jupyter, we can use any spark-python API. To do spark operations with python, we are going to need the Spark Context and SQLContext. When we start jupyter with pyspark, it will create a spark context by default. This can be accessed using the object 'sc'.
We can also create our own spark context, with any additional configurations as well. But to create a new one, we need to stop the existing spark context first.
from pyspark import SparkContext, SparkConf, SQLContext

# Set the additional propeties.
sparkConf = (SparkConf().set(key="spark.driver.allowMultipleContexts",value="true").set(key="spark.executor.extraJavaOptions", value="-Dwso2_custom_conf_dir=/home/supun/Downloads/wso2das-3.1.0/repository/conf"))

# Stop the default SparkContext created by pyspark. And create a new SparkContext using the above SparkConf.
sc.stop()
sparkCtx = SparkContext(conf=sparkConf)

# Check spark master.
print(sparkConf.get("spark.master"))

# Create a SQL context.
sqlCtx = SQLContext(sparkCtx)

df = sqlCtx.sql("SELECT * FROM table1")
df.show()

'df' is a spark dataframe. Now you can do any spark operation on top of that dataframe. You can also use spark-mllib and spark-ml packages and build machine learning models as well. You can refer [1] for such a sample on training a Random Forest Classification model, on top of data stored in WSO2 DAS.


Running Python script without jupyter Notebook (Option II)

Other than running python scripts with notebook, we can also run the raw python script directly on top of spark, using pyspark. For that we can use the same python script as above, with a slight modification. In the above scenario, there is a default spark context ("sc") created by notebook. But in this case there wont be any such a default spark context. hence we do not need the sc.stop() snippet. (or else it will give errors.). Once we remove that line of code, we can save the script with .py extension. Then run the saved script as below:
<SPARK_HOME>./bin/spark-submit --master spark://10.100.5.116:7077 --conf "spark.driver.extraJavaOptions=-Dwso2_custom_conf_dir=/home/supun/Downloads/wso2das-3.1.0/repository/conf"  PySpark-Sample.py

You can refer [2] for a python script which does the same as the one we discussed earlier.



References


[1] https://github.com/SupunS/play-ground/blob/master/python/pyspark/PySpark-Sample.ipynb
[2] https://github.com/SupunS/play-ground/blob/master/python/pyspark/PySpark-Sample.py

Share:

1 comments

  1. I very much like to read such posts. I hope I will read more such posts.

    Accommodation in world

    ReplyDelete