Hi
I am currently experimenting around with OLAP functionalities of
gremlin (on Janusgraph). While I get simple queries to run like
a charm, I am encountering issues with a slightly more complex
case. I have the feeling that the devil hides somewhere in the
details, but I am currently unable to pinpoint it, and am hoping
to get some help here ...
Setup
- Managed Spark (Dataproc, Google Cloud Platform)
- 1 Master, up to 5 worker nodes (auto scaling enabled, each
nodes has roughly 40 GB memory available for Spark jobs. But I
am pretty flexible here and can easily configure more RAM, more
CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million
edges, ~65 GB storage on Scylla DB)
Use Case
I want to calculated shortcut edges between two "company"
vertices. Two companies are linked via 4 edges (see image below,
the red link is the shortcut edge I want to calculate)
Issue
Some simple queries (like g.V().count or g.E().count()) work
like a charm, so my setup is in general working. However my
shortcut edge-calculating is not.
The (simplified) query is as follows (that query is suposed to
return the top 10 related companies)
g = GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)
g.V().has("company", "companyName", "company")
.as("start")
.in("has_company")
.has("deleted", false)
.repeat(
bothE()
.filter(hasLabel("has_propertyA", "has_propertyB"))
.otherV()
.simplePath())
.times(2)
.has("product", "site", "de")
.has("deleted", false)
.dedup()
.out("has_company")
.where(P.neq("start"))
.groupCount().by()
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
.unfold()
.project("company", "tf", "tf-idf")
.by(select(Column.keys).values("companyName"))
.by(select(Column.values)) // tf
.by(project("tf", "df") // idf
.by(select(Column.values))
.by(select(Column.keys).in("has_company").count())
.math("tf *
log10(2000000/df)")) // todo
fix the hardcoded 2m
.fold()
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
This runs into timeout or memory issues or any other weird kind
of issues, even if I give an insanely high amount of memory to
the executor (or driver), or simply stops after some time
because some executors failed 10 times.
By trying to pinpoint the issue, I executed the following
much more limited query, first on our "normal" OLTP Janusgraph
server, and then using the SparkGraphComputer.
While in the OLTP version, I get a result within seconds, I
don't if I use SparkGraphComputer. I don't expect the response
within seconds, because bringing up the whole spark context
takes its time. But I wouldn't expect hours either.
What is wrong? I assume I am using some construct somewhere that
should not be used in that mode, but I don't see what.
g.V().has("company", "companyName", "company")
.as("start")
.in("has_company")
.has("deleted", false)
.repeat(
bothE()
.filter(hasLabel("has_propertyA
", "has_propertyB
"))
.otherV()
.simplePath())
.times(2)
.has("jobOffer", "site", "de")
.has("deleted", false)
.limit(1000)
.out("has_company")
.where(P.neq("start"))
.groupCount().by()
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
.unfold()
.project("company", "tf", "tf-idf")
.by(select(Column.keys).values("companyName"))
.by(select(Column.values)) // tf
.by(project("tf", "df") // idf
.by(select(Column.values))
.by(select(Column.keys).in("has_company").count())
.math("tf *
log10(2000000/df)")) // todo
fix the 2m
.fold()
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
.toList();
Janusgraph.conf
For the sake of completeness, here my complete configuration
# Hadoop Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true
gremlin.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr.ioformat.conf.storage.backend=cql
janusgraphmr.ioformat.conf.storage.hostname=<ip>
janusgraphmr.ioformat.conf.storage.port=<port>
janusgraphmr.ioformat.conf.index.search.backend=lucene
janusgraphmr.ioformat.conf.index.search.directory=/tmp/
janusgraphmr.ioformat.conf.index.search.hostname=127.0.0.1
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.widerows=true
# Spark
spark.master=yarn
spark.submit.deployMode=client
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different
memory settings here, up to 50G more executor
spark.executor.memory=16g
spark.executor.memoryOverhead=4g
spark.driver.memory=16g
spark.driver.memoryOverhead=4g
spark.eventLog.enabled=true
spark.network.timeout=300s
Any hints, thoughts, comments, feedback are greatly appreciated
Cheers,
Claire
--