OLAP issues - gremlin query optimization


bobo...@...
 

Hi, 

Thanks again for all your input. It put me on the right track, and I played around some more and now have a much  better understanding on how this all works. I should be able to solve my problem using the Shortest Path step/vertex program. 

One thing I stumbled upon, which I could not yet explain, were weird results when using a "toList()" terminal step. Any hints here? 

What I observed is, that the results were not consistent, and size of the result liss much less, than I would have expected. Not sure if this is due to spark, and the toList() only returning part of the result, or what the root cause is. 
Do I  rather need to use the RDD for further processing of the result in my (Java) program? 

Thanks
Claire

On Thursday, July 30, 2020 at 9:50:24 AM UTC+2, Abhay Pandit wrote:
Hi,

My understanding is you won't be able to go more than one level into using SparkGraphComputer.
But yes you can do it using VertexProgram.
You can try to write your own custom vertex program or you can use ShortestPathVertexProgram if it suits you.

https://github.com/apache/tinkerpop/blob/6a705098b5176086c667b94fa9e13aa2f122bb59/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java

Thanks,
Abhay

On Thu, 30 Jul 2020 at 13:11, <bo...@...> wrote:
Hi Evgeniy,

Thanks a lot for your answer.

The depth-first hint is a good one, I indeed completly forgot about that.
I was hoping to get around writing a custom VertexProgram, but maybe that is something I need to start considering.

I will also try to further isolate the query and see if I can manage to further improve. I will keep you (and others) posted here.

Regards
Claire

Am Donnerstag, 30. Juli 2020 08:01:20 UTC+2 schrieb Evgeniy Ignatiev:

Hello,

Gremlin machine is depth-first, GraphComputer is Pregel - breadth-first essentially, this will definitely give significant differences.
Also https://tinkerpop.apache.org/docs/current/reference/#traversalvertexprogram - personally I don't remember if using traversal based path computation ever worked for our use case either (we had something around shortest path counting/grouping) - performance for such case was atrocious in comparison with Neo4j (in case graph fit in its machine RAM, Tinkerpop never went OOM and actually was incredibly light on resources, but performance was also significantly lower).

Possibly it makes sense to test first half of the query in isolation first? Or first 30%, then 60%, etc. I have suspicions about efficiency of path traversal in GraphComputer, e.g.:

                .repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)

Maybe consider custom VertexProgram with the same logic?

Best regards,
Evgenii Ignatev.

On 7/30/2020 8:13 AM, bo...@... wrote:

links.png

Hi

I am currently experimenting around with OLAP functionalities of gremlin (on Janusgraph). While I get simple queries to run like a charm, I am encountering issues with a slightly more complex case. I have the feeling that the devil hides somewhere in the details, but I am currently unable to pinpoint it, and am hoping to get some help here ...

Setup
- Managed Spark (Dataproc, Google Cloud Platform)
- 1 Master, up to 5 worker nodes (auto scaling enabled, each nodes has roughly 40 GB memory available for Spark jobs. But I am pretty flexible here and can easily configure more RAM, more CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million edges, ~65 GB storage on Scylla DB)


Use Case

I want to calculated shortcut edges between two "company" vertices. Two companies are linked via 4 edges (see image below, the red link is the shortcut edge I want to calculate)




Issue

Some simple queries (like g.V().count or g.E().count()) work like a charm, so my setup is in general working. However my shortcut edge-calculating is not.
The (simplified) query is as follows (that query is suposed to return the top 10 related companies)


g
= GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)

 g
.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("product", "site", "de")
               
.has("deleted", false)
               
.dedup()
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the hardcoded 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)



This runs into timeout or memory issues or any other weird kind of issues, even if I give an insanely high amount of memory to the executor (or driver), or simply stops after some time because some executors failed 10 times.

By trying to pinpoint the issue, I executed the following much more limited query, first on our "normal" OLTP Janusgraph server, and then using the SparkGraphComputer.

While in the OLTP version, I get a result within seconds, I don't if I use SparkGraphComputer. I don't expect the response within seconds, because bringing up the whole spark context takes its time. But I wouldn't expect hours either.
What is wrong? I assume I am using some construct somewhere that should not be used in that mode, but I don't see what.

g.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("jobOffer", "site", "de")
               
.has("deleted", false)
               
.limit(1000)
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
                   
.toList();




Janusgraph.conf

For the sake of completeness, here my complete configuration


# Hadoop Graph Configuration
gremlin
.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin
.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin
.hadoop.jarsInDistributedCache=true
gremlin
.hadoop.inputLocation=none
gremlin
.hadoop.outputLocation=output
gremlin
.spark.persistContext=true
gremlin
.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr
.ioformat.conf.storage.backend=cql
janusgraphmr
.ioformat.conf.storage.hostname=<ip>
janusgraphmr
.ioformat.conf.storage.port=<port>
janusgraphmr
.ioformat.conf.index.search.backend=lucene
janusgraphmr
.ioformat.conf.index.search.directory=/tmp/
janusgraphmr
.ioformat.conf.index.search.hostname=127.0.0.1
cassandra
.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra
.input.widerows=true
# Spark
spark
.master=yarn
spark
.submit.deployMode=client
spark
.serializer=org.apache.spark.serializer.KryoSerializer
spark
.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different memory settings here, up to 50G more executor
spark
.executor.memory=16g
spark
.executor.memoryOverhead=4g
spark
.driver.memory=16g
spark
.driver.memoryOverhead=4g
spark
.eventLog.enabled=true
spark
.network.timeout=300s




Any hints, thoughts, comments, feedback are greatly appreciated

Cheers,
Claire
--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janu...@....
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/c450aed2-9a7a-45fa-9dab-5b5da2e746a5o%40googlegroups.com.
-- 
Best regards,
Evgeniy Ignatiev.

--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgra...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/c8fac1bc-ba16-48cd-920f-c3b66303672do%40googlegroups.com.


Abhay Pandit <abha...@...>
 

Hi,

My understanding is you won't be able to go more than one level into using SparkGraphComputer.
But yes you can do it using VertexProgram.
You can try to write your own custom vertex program or you can use ShortestPathVertexProgram if it suits you.

https://github.com/apache/tinkerpop/blob/6a705098b5176086c667b94fa9e13aa2f122bb59/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java

Thanks,
Abhay

On Thu, 30 Jul 2020 at 13:11, <bobo...@...> wrote:
Hi Evgeniy,

Thanks a lot for your answer.

The depth-first hint is a good one, I indeed completly forgot about that.
I was hoping to get around writing a custom VertexProgram, but maybe that is something I need to start considering.

I will also try to further isolate the query and see if I can manage to further improve. I will keep you (and others) posted here.

Regards
Claire

Am Donnerstag, 30. Juli 2020 08:01:20 UTC+2 schrieb Evgeniy Ignatiev:

Hello,

Gremlin machine is depth-first, GraphComputer is Pregel - breadth-first essentially, this will definitely give significant differences.
Also https://tinkerpop.apache.org/docs/current/reference/#traversalvertexprogram - personally I don't remember if using traversal based path computation ever worked for our use case either (we had something around shortest path counting/grouping) - performance for such case was atrocious in comparison with Neo4j (in case graph fit in its machine RAM, Tinkerpop never went OOM and actually was incredibly light on resources, but performance was also significantly lower).

Possibly it makes sense to test first half of the query in isolation first? Or first 30%, then 60%, etc. I have suspicions about efficiency of path traversal in GraphComputer, e.g.:

                .repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)

Maybe consider custom VertexProgram with the same logic?

Best regards,
Evgenii Ignatev.

On 7/30/2020 8:13 AM, bo...@... wrote:

links.png

Hi

I am currently experimenting around with OLAP functionalities of gremlin (on Janusgraph). While I get simple queries to run like a charm, I am encountering issues with a slightly more complex case. I have the feeling that the devil hides somewhere in the details, but I am currently unable to pinpoint it, and am hoping to get some help here ...

Setup
- Managed Spark (Dataproc, Google Cloud Platform)
- 1 Master, up to 5 worker nodes (auto scaling enabled, each nodes has roughly 40 GB memory available for Spark jobs. But I am pretty flexible here and can easily configure more RAM, more CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million edges, ~65 GB storage on Scylla DB)


Use Case

I want to calculated shortcut edges between two "company" vertices. Two companies are linked via 4 edges (see image below, the red link is the shortcut edge I want to calculate)




Issue

Some simple queries (like g.V().count or g.E().count()) work like a charm, so my setup is in general working. However my shortcut edge-calculating is not.
The (simplified) query is as follows (that query is suposed to return the top 10 related companies)


g
= GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)

 g
.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("product", "site", "de")
               
.has("deleted", false)
               
.dedup()
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the hardcoded 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)



This runs into timeout or memory issues or any other weird kind of issues, even if I give an insanely high amount of memory to the executor (or driver), or simply stops after some time because some executors failed 10 times.

By trying to pinpoint the issue, I executed the following much more limited query, first on our "normal" OLTP Janusgraph server, and then using the SparkGraphComputer.

While in the OLTP version, I get a result within seconds, I don't if I use SparkGraphComputer. I don't expect the response within seconds, because bringing up the whole spark context takes its time. But I wouldn't expect hours either.
What is wrong? I assume I am using some construct somewhere that should not be used in that mode, but I don't see what.

g.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("jobOffer", "site", "de")
               
.has("deleted", false)
               
.limit(1000)
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
                   
.toList();




Janusgraph.conf

For the sake of completeness, here my complete configuration


# Hadoop Graph Configuration
gremlin
.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin
.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin
.hadoop.jarsInDistributedCache=true
gremlin
.hadoop.inputLocation=none
gremlin
.hadoop.outputLocation=output
gremlin
.spark.persistContext=true
gremlin
.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr
.ioformat.conf.storage.backend=cql
janusgraphmr
.ioformat.conf.storage.hostname=<ip>
janusgraphmr
.ioformat.conf.storage.port=<port>
janusgraphmr
.ioformat.conf.index.search.backend=lucene
janusgraphmr
.ioformat.conf.index.search.directory=/tmp/
janusgraphmr
.ioformat.conf.index.search.hostname=127.0.0.1
cassandra
.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra
.input.widerows=true
# Spark
spark
.master=yarn
spark
.submit.deployMode=client
spark
.serializer=org.apache.spark.serializer.KryoSerializer
spark
.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different memory settings here, up to 50G more executor
spark
.executor.memory=16g
spark
.executor.memoryOverhead=4g
spark
.driver.memory=16g
spark
.driver.memoryOverhead=4g
spark
.eventLog.enabled=true
spark
.network.timeout=300s




Any hints, thoughts, comments, feedback are greatly appreciated

Cheers,
Claire
--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janu...@....
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/c450aed2-9a7a-45fa-9dab-5b5da2e746a5o%40googlegroups.com.
-- 
Best regards,
Evgeniy Ignatiev.

--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgra...@....
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/c8fac1bc-ba16-48cd-920f-c3b66303672do%40googlegroups.com.


bobo...@...
 

Hi Evgeniy,

Thanks a lot for your answer.

The depth-first hint is a good one, I indeed completly forgot about that.
I was hoping to get around writing a custom VertexProgram, but maybe that is something I need to start considering.

I will also try to further isolate the query and see if I can manage to further improve. I will keep you (and others) posted here.

Regards
Claire

Am Donnerstag, 30. Juli 2020 08:01:20 UTC+2 schrieb Evgeniy Ignatiev:

Hello,

Gremlin machine is depth-first, GraphComputer is Pregel - breadth-first essentially, this will definitely give significant differences.
Also https://tinkerpop.apache.org/docs/current/reference/#traversalvertexprogram - personally I don't remember if using traversal based path computation ever worked for our use case either (we had something around shortest path counting/grouping) - performance for such case was atrocious in comparison with Neo4j (in case graph fit in its machine RAM, Tinkerpop never went OOM and actually was incredibly light on resources, but performance was also significantly lower).

Possibly it makes sense to test first half of the query in isolation first? Or first 30%, then 60%, etc. I have suspicions about efficiency of path traversal in GraphComputer, e.g.:

                .repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)

Maybe consider custom VertexProgram with the same logic?

Best regards,
Evgenii Ignatev.

On 7/30/2020 8:13 AM, bo...@... wrote:

links.png

Hi

I am currently experimenting around with OLAP functionalities of gremlin (on Janusgraph). While I get simple queries to run like a charm, I am encountering issues with a slightly more complex case. I have the feeling that the devil hides somewhere in the details, but I am currently unable to pinpoint it, and am hoping to get some help here ...

Setup
- Managed Spark (Dataproc, Google Cloud Platform)
- 1 Master, up to 5 worker nodes (auto scaling enabled, each nodes has roughly 40 GB memory available for Spark jobs. But I am pretty flexible here and can easily configure more RAM, more CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million edges, ~65 GB storage on Scylla DB)


Use Case

I want to calculated shortcut edges between two "company" vertices. Two companies are linked via 4 edges (see image below, the red link is the shortcut edge I want to calculate)




Issue

Some simple queries (like g.V().count or g.E().count()) work like a charm, so my setup is in general working. However my shortcut edge-calculating is not.
The (simplified) query is as follows (that query is suposed to return the top 10 related companies)


g
= GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)

 g
.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("product", "site", "de")
               
.has("deleted", false)
               
.dedup()
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the hardcoded 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)



This runs into timeout or memory issues or any other weird kind of issues, even if I give an insanely high amount of memory to the executor (or driver), or simply stops after some time because some executors failed 10 times.

By trying to pinpoint the issue, I executed the following much more limited query, first on our "normal" OLTP Janusgraph server, and then using the SparkGraphComputer.

While in the OLTP version, I get a result within seconds, I don't if I use SparkGraphComputer. I don't expect the response within seconds, because bringing up the whole spark context takes its time. But I wouldn't expect hours either.
What is wrong? I assume I am using some construct somewhere that should not be used in that mode, but I don't see what.

g.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("jobOffer", "site", "de")
               
.has("deleted", false)
               
.limit(1000)
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
                   
.toList();




Janusgraph.conf

For the sake of completeness, here my complete configuration


# Hadoop Graph Configuration
gremlin
.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin
.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin
.hadoop.jarsInDistributedCache=true
gremlin
.hadoop.inputLocation=none
gremlin
.hadoop.outputLocation=output
gremlin
.spark.persistContext=true
gremlin
.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr
.ioformat.conf.storage.backend=cql
janusgraphmr
.ioformat.conf.storage.hostname=<ip>
janusgraphmr
.ioformat.conf.storage.port=<port>
janusgraphmr
.ioformat.conf.index.search.backend=lucene
janusgraphmr
.ioformat.conf.index.search.directory=/tmp/
janusgraphmr
.ioformat.conf.index.search.hostname=127.0.0.1
cassandra
.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra
.input.widerows=true
# Spark
spark
.master=yarn
spark
.submit.deployMode=client
spark
.serializer=org.apache.spark.serializer.KryoSerializer
spark
.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different memory settings here, up to 50G more executor
spark
.executor.memory=16g
spark
.executor.memoryOverhead=4g
spark
.driver.memory=16g
spark
.driver.memoryOverhead=4g
spark
.eventLog.enabled=true
spark
.network.timeout=300s




Any hints, thoughts, comments, feedback are greatly appreciated

Cheers,
Claire
--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgra...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/c450aed2-9a7a-45fa-9dab-5b5da2e746a5o%40googlegroups.com.
-- 
Best regards,
Evgeniy Ignatiev.


Evgeniy Ignatiev <yevgeniy...@...>
 

Hello,

Gremlin machine is depth-first, GraphComputer is Pregel - breadth-first essentially, this will definitely give significant differences.
Also https://tinkerpop.apache.org/docs/current/reference/#traversalvertexprogram - personally I don't remember if using traversal based path computation ever worked for our use case either (we had something around shortest path counting/grouping) - performance for such case was atrocious in comparison with Neo4j (in case graph fit in its machine RAM, Tinkerpop never went OOM and actually was incredibly light on resources, but performance was also significantly lower).

Possibly it makes sense to test first half of the query in isolation first? Or first 30%, then 60%, etc. I have suspicions about efficiency of path traversal in GraphComputer, e.g.:

                .repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)

Maybe consider custom VertexProgram with the same logic?

Best regards,
Evgenii Ignatev.

On 7/30/2020 8:13 AM, bobo...@... wrote:

Hi

I am currently experimenting around with OLAP functionalities of gremlin (on Janusgraph). While I get simple queries to run like a charm, I am encountering issues with a slightly more complex case. I have the feeling that the devil hides somewhere in the details, but I am currently unable to pinpoint it, and am hoping to get some help here ...

Setup
- Managed Spark (Dataproc, Google Cloud Platform)
- 1 Master, up to 5 worker nodes (auto scaling enabled, each nodes has roughly 40 GB memory available for Spark jobs. But I am pretty flexible here and can easily configure more RAM, more CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million edges, ~65 GB storage on Scylla DB)


Use Case

I want to calculated shortcut edges between two "company" vertices. Two companies are linked via 4 edges (see image below, the red link is the shortcut edge I want to calculate)




Issue

Some simple queries (like g.V().count or g.E().count()) work like a charm, so my setup is in general working. However my shortcut edge-calculating is not.
The (simplified) query is as follows (that query is suposed to return the top 10 related companies)


g
= GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)

 g
.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("product", "site", "de")
               
.has("deleted", false)
               
.dedup()
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the hardcoded 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)



This runs into timeout or memory issues or any other weird kind of issues, even if I give an insanely high amount of memory to the executor (or driver), or simply stops after some time because some executors failed 10 times.

By trying to pinpoint the issue, I executed the following much more limited query, first on our "normal" OLTP Janusgraph server, and then using the SparkGraphComputer.

While in the OLTP version, I get a result within seconds, I don't if I use SparkGraphComputer. I don't expect the response within seconds, because bringing up the whole spark context takes its time. But I wouldn't expect hours either.
What is wrong? I assume I am using some construct somewhere that should not be used in that mode, but I don't see what.

g.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("jobOffer", "site", "de")
               
.has("deleted", false)
               
.limit(1000)
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
                   
.toList();




Janusgraph.conf

For the sake of completeness, here my complete configuration


# Hadoop Graph Configuration
gremlin
.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin
.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin
.hadoop.jarsInDistributedCache=true
gremlin
.hadoop.inputLocation=none
gremlin
.hadoop.outputLocation=output
gremlin
.spark.persistContext=true
gremlin
.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr
.ioformat.conf.storage.backend=cql
janusgraphmr
.ioformat.conf.storage.hostname=<ip>
janusgraphmr
.ioformat.conf.storage.port=<port>
janusgraphmr
.ioformat.conf.index.search.backend=lucene
janusgraphmr
.ioformat.conf.index.search.directory=/tmp/
janusgraphmr
.ioformat.conf.index.search.hostname=127.0.0.1
cassandra
.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra
.input.widerows=true
# Spark
spark
.master=yarn
spark
.submit.deployMode=client
spark
.serializer=org.apache.spark.serializer.KryoSerializer
spark
.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different memory settings here, up to 50G more executor
spark
.executor.memory=16g
spark
.executor.memoryOverhead=4g
spark
.driver.memory=16g
spark
.driver.memoryOverhead=4g
spark
.eventLog.enabled=true
spark
.network.timeout=300s




Any hints, thoughts, comments, feedback are greatly appreciated

Cheers,
Claire
--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgra...@....
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/c450aed2-9a7a-45fa-9dab-5b5da2e746a5o%40googlegroups.com.
-- 
Best regards,
Evgeniy Ignatiev.


bobo...@...
 

links.png

Hi

I am currently experimenting around with OLAP functionalities of gremlin (on Janusgraph). While I get simple queries to run like a charm, I am encountering issues with a slightly more complex case. I have the feeling that the devil hides somewhere in the details, but I am currently unable to pinpoint it, and am hoping to get some help here ...

Setup
- Managed Spark (Dataproc, Google Cloud Platform)
- 1 Master, up to 5 worker nodes (auto scaling enabled, each nodes has roughly 40 GB memory available for Spark jobs. But I am pretty flexible here and can easily configure more RAM, more CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million edges, ~65 GB storage on Scylla DB)


Use Case

I want to calculated shortcut edges between two "company" vertices. Two companies are linked via 4 edges (see image below, the red link is the shortcut edge I want to calculate)




Issue

Some simple queries (like g.V().count or g.E().count()) work like a charm, so my setup is in general working. However my shortcut edge-calculating is not.
The (simplified) query is as follows (that query is suposed to return the top 10 related companies)


g
= GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)

 g
.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("product", "site", "de")
               
.has("deleted", false)
               
.dedup()
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the hardcoded 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)



This runs into timeout or memory issues or any other weird kind of issues, even if I give an insanely high amount of memory to the executor (or driver), or simply stops after some time because some executors failed 10 times.

By trying to pinpoint the issue, I executed the following much more limited query, first on our "normal" OLTP Janusgraph server, and then using the SparkGraphComputer.

While in the OLTP version, I get a result within seconds, I don't if I use SparkGraphComputer. I don't expect the response within seconds, because bringing up the whole spark context takes its time. But I wouldn't expect hours either.
What is wrong? I assume I am using some construct somewhere that should not be used in that mode, but I don't see what.

g.V().has("company", "companyName", "company")
               
.as("start")
               
.in("has_company")
               
.has("deleted", false)
               
.repeat(
                    bothE
()
                       
.filter(hasLabel("has_propertyA", "has_propertyB"))
                       
.otherV()
                       
.simplePath())
               
.times(2)
               
.has("jobOffer", "site", "de")
               
.has("deleted", false)
               
.limit(1000)
               
.out("has_company")
               
.where(P.neq("start"))
               
.groupCount().by()
               
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
               
.unfold()
               
.project("company", "tf", "tf-idf")
               
.by(select(Column.keys).values("companyName"))
               
.by(select(Column.values)) // tf
               
.by(project("tf", "df") // idf
                   
.by(select(Column.values))
                   
.by(select(Column.keys).in("has_company").count())
                   
.math("tf * log10(2000000/df)")) // todo fix the 2m
               
.fold()
               
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
                   
.toList();




Janusgraph.conf

For the sake of completeness, here my complete configuration


# Hadoop Graph Configuration
gremlin
.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin
.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin
.hadoop.jarsInDistributedCache=true
gremlin
.hadoop.inputLocation=none
gremlin
.hadoop.outputLocation=output
gremlin
.spark.persistContext=true
gremlin
.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr
.ioformat.conf.storage.backend=cql
janusgraphmr
.ioformat.conf.storage.hostname=<ip>
janusgraphmr
.ioformat.conf.storage.port=<port>
janusgraphmr
.ioformat.conf.index.search.backend=lucene
janusgraphmr
.ioformat.conf.index.search.directory=/tmp/
janusgraphmr
.ioformat.conf.index.search.hostname=127.0.0.1
cassandra
.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra
.input.widerows=true
# Spark
spark
.master=yarn
spark
.submit.deployMode=client
spark
.serializer=org.apache.spark.serializer.KryoSerializer
spark
.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different memory settings here, up to 50G more executor
spark
.executor.memory=16g
spark
.executor.memoryOverhead=4g
spark
.driver.memory=16g
spark
.driver.memoryOverhead=4g
spark
.eventLog.enabled=true
spark
.network.timeout=300s




Any hints, thoughts, comments, feedback are greatly appreciated

Cheers,
Claire