Spark connector


Takao Magoori <ma...@...>
 

I have a JanusGraph Server (github master, gremlin 3.2.5) on top of Cassandra storage backend, to store users, items and "WHEN, WHERE, WHO bought WHAT ?" relations.
To get data from and modify data in the graph, I use Python aiogremlin driver-mode (== groovy sessionless eval mode) and it works well for now. Thanks developers !

So now, I have to compute recommendation and forecast item sales.
In order to data-cleaning, data-normalization, recommendation and forecasting, Because of a little big graph, I want to use higher-level pyspark tools (ex. DataFrame, ML) and python machine learning packages (ex, scikit-learn). But I can not find the way to load graph data into Spark. What I want is "connector" which can be used by pyspark to load data from JanusGraph, not SparkGraphComputer.

Could someone please how to do it ?


- Additional info
It seems OrientDB has some Spark connectors (though, I don't know these can be used by pyspark). But I want JanusGraph's one.
https://github.com/sbcd90/spark-orientdb
https://github.com/metreta/spark-orientdb-connector


HadoopMarc <bi...@...>
 

Hi Takao,

Only some directions. If you combine:

http://yaaics.blogspot.nl/              (using CassandraInputFormat in your case)
http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark

it should be possible to access the PersistedInputRDD alias graphRDD from the Spark object.
Never done this myself, I would be interested to read if this works! Probably you will need to run an OLAP query with SparkGraphComputer anyway (e.g. g.V()) to have the PersistedInputRDD realized (RDD's are not realized until a spark action is run on them.)

Cheers,     Marc


Op donderdag 17 augustus 2017 16:25:42 UTC+2 schreef Takao Magoori:

I have a JanusGraph Server (github master, gremlin 3.2.5) on top of Cassandra storage backend, to store users, items and "WHEN, WHERE, WHO bought WHAT ?" relations.
To get data from and modify data in the graph, I use Python aiogremlin driver-mode (== groovy sessionless eval mode) and it works well for now. Thanks developers !

So now, I have to compute recommendation and forecast item sales.
In order to data-cleaning, data-normalization, recommendation and forecasting, Because of a little big graph, I want to use higher-level pyspark tools (ex. DataFrame, ML) and python machine learning packages (ex, scikit-learn). But I can not find the way to load graph data into Spark. What I want is "connector" which can be used by pyspark to load data from JanusGraph, not SparkGraphComputer.

Could someone please how to do it ?


- Additional info
It seems OrientDB has some Spark connectors (though, I don't know these can be used by pyspark). But I want JanusGraph's one.


Takao Magoori <ma...@...>
 

Hi Marc,

Thank you!
But I don't understand what you mean, sorry.
I feel SparkGraphComputer is "OLAP by gremlin on top of spark distributed power". But I want "OLAP by spark using janusGraph data".

So, I want to run "spark-submit", create pyspark sparkContext, load JanusGraph data into DataFrame. Then, I can use spark Dataframe, spark ML and python machine-learning packages.
The following pseudo-code is what really I want. (like https://github.com/sbcd90/spark-orientdb)
If there is no such solution, I guess I have to "dump whole graph into csv and read it from pyspark".

--------
spark_session = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate()

df_user = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).valueMap("user_id", "name" "age")',
).load().dropna().join(
    other=some_df,
)


df_item = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).out("buy").hasLabel("item").valueMap("item_id", "name")',
).load().dropna()


df_sale = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).outE("buy").valueMap("timestamp")',
).load().select(
    col('item_id'),
    col('name'),
).dropna()
--------


2017年8月18日金曜日 4時08分02秒 UTC+9 HadoopMarc:

Hi Takao,

Only some directions. If you combine:

http://yaaics.blogspot.nl/              (using CassandraInputFormat in your case)
http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark

it should be possible to access the PersistedInputRDD alias graphRDD from the Spark object.
Never done this myself, I would be interested to read if this works! Probably you will need to run an OLAP query with SparkGraphComputer anyway (e.g. g.V()) to have the PersistedInputRDD realized (RDD's are not realized until a spark action is run on them.)

Cheers,     Marc


Op donderdag 17 augustus 2017 16:25:42 UTC+2 schreef Takao Magoori:
I have a JanusGraph Server (github master, gremlin 3.2.5) on top of Cassandra storage backend, to store users, items and "WHEN, WHERE, WHO bought WHAT ?" relations.
To get data from and modify data in the graph, I use Python aiogremlin driver-mode (== groovy sessionless eval mode) and it works well for now. Thanks developers !

So now, I have to compute recommendation and forecast item sales.
In order to data-cleaning, data-normalization, recommendation and forecasting, Because of a little big graph, I want to use higher-level pyspark tools (ex. DataFrame, ML) and python machine learning packages (ex, scikit-learn). But I can not find the way to load graph data into Spark. What I want is "connector" which can be used by pyspark to load data from JanusGraph, not SparkGraphComputer.

Could someone please how to do it ?


- Additional info
It seems OrientDB has some Spark connectors (though, I don't know these can be used by pyspark). But I want JanusGraph's one.


HadoopMarc <bi...@...>
 

Hi Takao,

JanusGraph reads data from distributed backends into hadoop using its HBaseInputFormat and CassandraInputFomat classes (which are descendents of org.apache.hadoop.mapreduce.InputFormat). Therefore, it seems possible to directly access graphs in these backends from spark using sc.newAPIHadoopRDD. AFAIK, this particular use of the inputformats is nowhere documented or demonstrated, though.

My earlier answer effectively came down to storing the graph to hdfs using the OutputRDD class for the gremlin.hadoop.graphWriter property and spark serialization (my earlier suggestion of persisting the graphRDD using PersistedOutputRDD would not work for you because python and gremlin-server would not share the same SparkContext). This may or may not be easier or more efficient than writing your own csv input/output routines (in combination with the BulkDumperVertexProgram to parallelize the writing).

Hope this helps,

Marc



Op vrijdag 18 augustus 2017 04:19:33 UTC+2 schreef Takao Magoori:

Hi Marc,

Thank you!
But I don't understand what you mean, sorry.
I feel SparkGraphComputer is "OLAP by gremlin on top of spark distributed power". But I want "OLAP by spark using janusGraph data".

So, I want to run "spark-submit", create pyspark sparkContext, load JanusGraph data into DataFrame. Then, I can use spark Dataframe, spark ML and python machine-learning packages.
The following pseudo-code is what really I want. (like https://github.com/sbcd90/spark-orientdb)
If there is no such solution, I guess I have to "dump whole graph into csv and read it from pyspark".

--------
spark_session = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate()

df_user = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).valueMap("user_id", "name" "age")',
).load().dropna().join(
    other=some_df,
)


df_item = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).out("buy").hasLabel("item").valueMap("item_id", "name")',
).load().dropna()


df_sale = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).outE("buy").valueMap("timestamp")',
).load().select(
    col('item_id'),
    col('name'),
).dropna()
--------


2017年8月18日金曜日 4時08分02秒 UTC+9 HadoopMarc:
Hi Takao,

Only some directions. If you combine:

http://yaaics.blogspot.nl/              (using CassandraInputFormat in your case)
http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark

it should be possible to access the PersistedInputRDD alias graphRDD from the Spark object.
Never done this myself, I would be interested to read if this works! Probably you will need to run an OLAP query with SparkGraphComputer anyway (e.g. g.V()) to have the PersistedInputRDD realized (RDD's are not realized until a spark action is run on them.)

Cheers,     Marc


Op donderdag 17 augustus 2017 16:25:42 UTC+2 schreef Takao Magoori:
I have a JanusGraph Server (github master, gremlin 3.2.5) on top of Cassandra storage backend, to store users, items and "WHEN, WHERE, WHO bought WHAT ?" relations.
To get data from and modify data in the graph, I use Python aiogremlin driver-mode (== groovy sessionless eval mode) and it works well for now. Thanks developers !

So now, I have to compute recommendation and forecast item sales.
In order to data-cleaning, data-normalization, recommendation and forecasting, Because of a little big graph, I want to use higher-level pyspark tools (ex. DataFrame, ML) and python machine learning packages (ex, scikit-learn). But I can not find the way to load graph data into Spark. What I want is "connector" which can be used by pyspark to load data from JanusGraph, not SparkGraphComputer.

Could someone please how to do it ?


- Additional info
It seems OrientDB has some Spark connectors (though, I don't know these can be used by pyspark). But I want JanusGraph's one.


Takao Magoori <ma...@...>
 

Hi Marc,

I finally understood what you mean. It would be theoretically possible, thanks!
I feel it is difficult for me, since I am not familiar with scala/java, though I will try it.
But,,, It would be nice if someone has the spark connector which can be used by python :(

Takao Magoori

2017年8月19日土曜日 4時14分57秒 UTC+9 HadoopMarc:

Hi Takao,

JanusGraph reads data from distributed backends into hadoop using its HBaseInputFormat and CassandraInputFomat classes (which are descendents of org.apache.hadoop.mapreduce.InputFormat). Therefore, it seems possible to directly access graphs in these backends from spark using sc.newAPIHadoopRDD. AFAIK, this particular use of the inputformats is nowhere documented or demonstrated, though.

My earlier answer effectively came down to storing the graph to hdfs using the OutputRDD class for the gremlin.hadoop.graphWriter property and spark serialization (my earlier suggestion of persisting the graphRDD using PersistedOutputRDD would not work for you because python and gremlin-server would not share the same SparkContext). This may or may not be easier or more efficient than writing your own csv input/output routines (in combination with the BulkDumperVertexProgram to parallelize the writing).

Hope this helps,

Marc



Op vrijdag 18 augustus 2017 04:19:33 UTC+2 schreef Takao Magoori:
Hi Marc,

Thank you!
But I don't understand what you mean, sorry.
I feel SparkGraphComputer is "OLAP by gremlin on top of spark distributed power". But I want "OLAP by spark using janusGraph data".

So, I want to run "spark-submit", create pyspark sparkContext, load JanusGraph data into DataFrame. Then, I can use spark Dataframe, spark ML and python machine-learning packages.
The following pseudo-code is what really I want. (like https://github.com/sbcd90/spark-orientdb)
If there is no such solution, I guess I have to "dump whole graph into csv and read it from pyspark".

--------
spark_session = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate()

df_user = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).valueMap("user_id", "name" "age")',
).load().dropna().join(
    other=some_df,
)


df_item = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).out("buy").hasLabel("item").valueMap("item_id", "name")',
).load().dropna()


df_sale = spark_session.read.format(
    'org.apache.janusgraph.some_spark_gremlin_connector',
).options(
    url='url',
    query='g.V().hasLabel("user").has("age", gt(29)).outE("buy").valueMap("timestamp")',
).load().select(
    col('item_id'),
    col('name'),
).dropna()
--------


2017年8月18日金曜日 4時08分02秒 UTC+9 HadoopMarc:
Hi Takao,

Only some directions. If you combine:

http://yaaics.blogspot.nl/              (using CassandraInputFormat in your case)
http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark

it should be possible to access the PersistedInputRDD alias graphRDD from the Spark object.
Never done this myself, I would be interested to read if this works! Probably you will need to run an OLAP query with SparkGraphComputer anyway (e.g. g.V()) to have the PersistedInputRDD realized (RDD's are not realized until a spark action is run on them.)

Cheers,     Marc


Op donderdag 17 augustus 2017 16:25:42 UTC+2 schreef Takao Magoori:
I have a JanusGraph Server (github master, gremlin 3.2.5) on top of Cassandra storage backend, to store users, items and "WHEN, WHERE, WHO bought WHAT ?" relations.
To get data from and modify data in the graph, I use Python aiogremlin driver-mode (== groovy sessionless eval mode) and it works well for now. Thanks developers !

So now, I have to compute recommendation and forecast item sales.
In order to data-cleaning, data-normalization, recommendation and forecasting, Because of a little big graph, I want to use higher-level pyspark tools (ex. DataFrame, ML) and python machine learning packages (ex, scikit-learn). But I can not find the way to load graph data into Spark. What I want is "connector" which can be used by pyspark to load data from JanusGraph, not SparkGraphComputer.

Could someone please how to do it ?


- Additional info
It seems OrientDB has some Spark connectors (though, I don't know these can be used by pyspark). But I want JanusGraph's one.