
Hi
I am currently experimenting around with OLAP functionalities
of gremlin (on Janusgraph). While I get simple queries to run like a
charm, I am encountering issues with a slightly more complex case. I
have the feeling that the devil hides somewhere in the details, but I am
currently unable to pinpoint it, and am hoping to get some help here
...
Setup- Managed Spark (Dataproc, Google Cloud Platform)
-
1 Master, up to 5 worker nodes (auto scaling enabled, each nodes has
roughly 40 GB memory available for Spark jobs. But I am pretty flexible
here and can easily configure more RAM, more CPU, more cores)
- (embedded) Janusgrap 0.5.2 backed by ScyllaDB
- Tinkerpop / Spark Gremlin 3.4.7
- Spark job is a Java application
- Current graph has roughly 22 million vertices and 24 million edges, ~65 GB storage on Scylla DB)
Use CaseI
want to calculated shortcut edges between two "company" vertices. Two
companies are linked via 4 edges (see image below, the red link is the
shortcut edge I want to calculate)
IssueSome
simple queries (like g.V().count or g.E().count()) work like a charm,
so my setup is in general working. However my shortcut edge-calculating
is not.
The (simplified) query is as follows (that query is suposed to return the top 10 related companies)
g = GraphFactory.open(configuration).traversal().withComputer(SparkGraphComputer.class)
g.V().has("company", "companyName", "company")
.as("start")
.in("has_company")
.has("deleted", false)
.repeat(
bothE()
.filter(hasLabel("has_propertyA", "has_propertyB"))
.otherV()
.simplePath())
.times(2)
.has("product", "site", "de")
.has("deleted", false)
.dedup()
.out("has_company")
.where(P.neq("start"))
.groupCount().by()
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
.unfold()
.project("company", "tf", "tf-idf")
.by(select(Column.keys).values("companyName"))
.by(select(Column.values)) // tf
.by(project("tf", "df") // idf
.by(select(Column.values))
.by(select(Column.keys).in("has_company").count())
.math("tf * log10(2000000/df)")) // todo fix the hardcoded 2m
.fold()
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
This
runs into timeout or memory issues or any other weird kind of issues,
even if I give an insanely high amount of memory to the executor (or
driver), or simply stops after some time because some executors failed
10 times.
By trying to pinpoint the issue, I executed the
following much more limited query, first on our "normal" OLTP Janusgraph
server, and then using the SparkGraphComputer.
While
in the OLTP version, I get a result within seconds, I don't if I use
SparkGraphComputer. I don't expect the response within seconds, because
bringing up the whole spark context takes its time. But I wouldn't
expect hours either.
What is wrong? I assume I am using some construct somewhere that should not be used in that mode, but I don't see what.
g.V().has("company", "companyName", "company")
.as("start")
.in("has_company")
.has("deleted", false)
.repeat(
bothE()
.filter(hasLabel("has_propertyA
", "has_propertyB
"))
.otherV()
.simplePath())
.times(2)
.has("jobOffer", "site", "de")
.has("deleted", false)
.limit(1000)
.out("has_company")
.where(P.neq("start"))
.groupCount().by()
.order(Scope.local).by(select(Column.values)).limit(Scope.local, 100)
.unfold()
.project("company", "tf", "tf-idf")
.by(select(Column.keys).values("companyName"))
.by(select(Column.values)) // tf
.by(project("tf", "df") // idf
.by(select(Column.values))
.by(select(Column.keys).in("has_company").count())
.math("tf * log10(2000000/df)")) // todo fix the 2m
.fold()
.order(Scope.local).by(select("tf-idf"), Order.desc).limit(Scope.local, 10)
.toList();
Janusgraph.confFor the sake of completeness, here my complete configuration
# Hadoop Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true
gremlin.spark.graphStorageLevel=MEMORY_AND_DISK
# Scylla
janusgraphmr.ioformat.conf.storage.backend=cql
janusgraphmr.ioformat.conf.storage.hostname=<ip>
janusgraphmr.ioformat.conf.storage.port=<port>
janusgraphmr.ioformat.conf.index.search.backend=lucene
janusgraphmr.ioformat.conf.index.search.directory=/tmp/
janusgraphmr.ioformat.conf.index.search.hostname=127.0.0.1
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.widerows=true
# Spark
spark.master=yarn
spark.submit.deployMode=client
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
### I tried with different memory settings here, up to 50G more executor
spark.executor.memory=16g
spark.executor.memoryOverhead=4g
spark.driver.memory=16g
spark.driver.memoryOverhead=4g
spark.eventLog.enabled=true
spark.network.timeout=300s
Any hints, thoughts, comments, feedback are greatly appreciated
Cheers,
Claire