Date
1 - 5 of 5
OLAP, Hadoop, Spark and Cassandra
sly...@...
Hi,
I'm trying to get JanusGraph 0.4.0 with a Cassandra (CQL) backend setup and running as OLAP while still keeping OLTP active in order to do graph updates. I've been searching high and low for some guidance, but so far without any luck. Hopefully someone here could tune in and help?
Here's where I'm at currently
- local Hadoop running according to https://old-docs.janusgraph.org/0.4.0/hadoop-tp3.html
- gremlin server started as /bin/gremlin-server.sh conf/gremlin-server/gremlin-server-configuration.yaml
- gremlin-server-configuration.yaml points to init.groovy script doing the traversal mappings for OLTP and OLAP
def globals = [:]
ve = ConfiguredGraphFactory.open("ve_graph")
OLAPGraph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
globals << [g : ve.traversal(), sg: OLAPGraph.traversal().withComputer(org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer)]
- conf/hadoop-graph/read-cql.properties reads
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true
janusgraphmr.ioformat.conf.storage.backend=cql
janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1
janusgraphmr.ioformat.conf.storage.port=9042
janusgraphmr.ioformat.conf.storage.cassandra.keyspace=janusgraph
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
- Running the gremlin shell I have
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: tinkerpop.server
plugin activated: tinkerpop.tinkergraph
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.utilities
plugin activated: janusgraph.imports
gremlin> :remote connect tinkerpop.server conf/remote.yaml session
==>Configured localhost/127.0.0.1:8182-[655848fc-b46e-40be-8174-f0dc42cdabd4]
gremlin> :remote console
==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182]-[655848fc-b46e-40be-8174-f0dc42cdabd4] - type ':remote console' to return to local mode
gremlin> g
==>graphtraversalsource[standardjanusgraph[cql:[127.0.0.1]], standard]
gremlin>
gremlin> sg
==>graphtraversalsource[hadoopgraph[cqlinputformat->gryooutputformat], sparkgraphcomputer]
gremlin> g.V().has('lbl','System').count()
==>68
gremlin> sg.V().has('lbl','System').count()
- The job is running for some time and while finishing the gremlin-server.log reads
253856 [Executor task launch worker for task 768] INFO org.apache.spark.executor.Executor - Finished task 768.0 in stage 0.0 (TID 768). 2388 bytes result sent to driver
253858 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 768.0 in stage 0.0 (TID 768) in 6809 ms on localhost (executor driver) (769/769)
253861 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (fold at SparkStarBarrierInterceptor.java:101) finished in 161.427 s
253861 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool
253876 [SparkGraphComputer-boss] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: fold at SparkStarBarrierInterceptor.java:101, took 161.598267 s
253888 [SparkGraphComputer-boss] INFO org.apache.spark.rdd.MapPartitionsRDD - Removing RDD 1 from persistence list
253901 [block-manager-slave-async-thread-pool-0] INFO org.apache.spark.storage.BlockManager - Removing RDD 1
- However - the count (==> ) reads 0 for the sg traversal
sly...@...
Answering my own question - turned out I had had a mixup of keyspaces used between the two instances
While for CQL it should read
Default the conf/hadoop-graph/read-cql.properties reads
janusgraphmr.ioformat.conf.storage.cassandra.keyspace
janusgraphmr.ioformat.conf.storage.cql.keyspace
Also - as I made a 'named' (ve_graph) graph I had to point to that one rather than the janusgraph keyspace.
Problem 1 solved. Now to the next - how can I lower the number of 'partitions' Spark is using (here 796 '... on localhost (executor driver) (769/769)')?
On Wednesday, December 4, 2019 at 11:46:42 PM UTC+1, Sture Lygren wrote:
Hi,I'm trying to get JanusGraph 0.4.0 with a Cassandra (CQL) backend setup and running as OLAP while still keeping OLTP active in order to do graph updates. I've been searching high and low for some guidance, but so far without any luck. Hopefully someone here could tune in and help?Here's where I'm at currently
- local Hadoop running according to https://old-docs.
janusgraph.org/0.4.0/hadoop- tp3.html - gremlin server started as /bin/gremlin-server.sh conf/gremlin-server/gremlin-
server-configuration.yaml - gremlin-server-configuration.
yaml points to init.groovy script doing the traversal mappings for OLTP and OLAP def globals = [:]ve = ConfiguredGraphFactory.open("ve_graph") OLAPGraph = GraphFactory.open('conf/hadoop-graph/read-cql. properties') globals << [g : ve.traversal(), sg: OLAPGraph.traversal().withComputer(org.apache. tinkerpop.gremlin.spark. process.computer. SparkGraphComputer)]
- conf/hadoop-graph/read-cql.
properties reads gremlin.graph=org.apache.tinkerpop.gremlin.hadoop. structure.HadoopGraph gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats. cql.CqlInputFormat gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin. hadoop.structure.io.gryo. GryoOutputFormat gremlin.hadoop.jarsInDistributedCache=true gremlin.hadoop.inputLocation=none gremlin.hadoop.outputLocation=output gremlin.spark.persistContext=true janusgraphmr.ioformat.conf.storage.backend=cql janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1 janusgraphmr.ioformat.conf.storage.port=9042 janusgraphmr.ioformat.conf.storage.cassandra.keyspace= janusgraph cassandra.input.partitioner.class=org.apache.cassandra. dht.Murmur3Partitioner spark.serializer=org.apache.spark.serializer. KryoSerializer spark.kryo.registrator=org.janusgraph.hadoop.serialize. JanusGraphKryoRegistrator
- Running the gremlin shell I have
\,,,/(o o)-----oOOo-(3)-oOOo-----SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/ slf4j-log4j12-1.7.12.jar!/org/ slf4j/impl/StaticLoggerBinder. class] SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/ logback-classic-1.1.3.jar!/ org/slf4j/impl/ StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] plugin activated: tinkerpop.serverplugin activated: tinkerpop.tinkergraphplugin activated: tinkerpop.hadoopplugin activated: tinkerpop.sparkplugin activated: tinkerpop.utilitiesplugin activated: janusgraph.importsgremlin> :remote connect tinkerpop.server conf/remote.yaml session==>Configured localhost/127.0.0.1:8182-[655848fc-b46e-40be-8174- f0dc42cdabd4] gremlin> :remote console==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182]-[655848fc-b46e-40be-8174- f0dc42cdabd4] - type ':remote console' to return to local mode gremlin> g==>graphtraversalsource[standardjanusgraph[cql:[127.0. 0.1]], standard] gremlin>gremlin> sg==>graphtraversalsource[hadoopgraph[cqlinputformat-> gryooutputformat], sparkgraphcomputer] gremlin> g.V().has('lbl','System').count() ==>68gremlin> sg.V().has('lbl','System').count()
- The job is running for some time and while finishing the gremlin-server.log reads
253856 [Executor task launch worker for task 768] INFO org.apache.spark.executor.Executor - Finished task 768.0 in stage 0.0 (TID 768). 2388 bytes result sent to driver 253858 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 768.0 in stage 0.0 (TID 768) in 6809 ms on localhost (executor driver) (769/769) 253861 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (fold at SparkStarBarrierInterceptor. java:101) finished in 161.427 s 253861 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool 253876 [SparkGraphComputer-boss] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: fold at SparkStarBarrierInterceptor. java:101, took 161.598267 s 253888 [SparkGraphComputer-boss] INFO org.apache.spark.rdd.MapPartitionsRDD - Removing RDD 1 from persistence list 253901 [block-manager-slave-async-thread-pool-0] INFO org.apache.spark.storage. BlockManager - Removing RDD 1 I've most likely missed some crucial point here, but I'm not able to spot it. Please help.
- However - the count (==> ) reads 0 for the sg traversal
Mladen Marović <mladen...@...>
I know I'm quite late to the party, but for future reference - the number of input partitions in Spark depends on the partitioning of the source. In case of cassandra, partitioning is determined by the number of tokens each node gets (as configured by `num_tokens` in `cassandra.yaml`), which is set to 256 by default. So, if you have a 3-node cassandra cluster, by default each node should get 256 tokens, which would result in 3*256 = 768 tokens total. Since Spark reads directly from cassandra (if you're using `org.janusgraph.hadoop.formats.cql.CqlInputFormat`), that translates to 768 partitions in the input Spark RDD, or 768 tasks during processing. Add to that 1 task that collects results, or something similar, and you end up at 769. At least that was my experience.
toggle quoted message
Show quoted text
The default value of 256 for `num_tokens` made sense in older versions, but in cassandra 3.x a new token allocation algorithm was implemented to improve performance for operations requiring token-range scans, which is precisely what Spark does. I experimented a bit with smaller values (e.g. 16) and managed to drastically reduce the number of tasks when scanning the entire graph. For further, reading, I recommend this article.
On Thursday, December 5, 2019 at 9:28:26 AM UTC+1 s...@... wrote:
Answering my own question - turned out I had had a mixup of keyspaces used between the two instancesDefault the conf/hadoop-graph/read-cql.properties readsjanusgraphmr.ioformat.conf.storage.cassandra.keyspaceWhile for CQL it should readjanusgraphmr.ioformat.conf.storage.cql.keyspaceAlso - as I made a 'named' (ve_graph) graph I had to point to that one rather than the janusgraph keyspace.Problem 1 solved. Now to the next - how can I lower the number of 'partitions' Spark is using (here 796 '... on localhost (executor driver) (769/769)')?
On Wednesday, December 4, 2019 at 11:46:42 PM UTC+1, Sture Lygren wrote:Hi,I'm trying to get JanusGraph 0.4.0 with a Cassandra (CQL) backend setup and running as OLAP while still keeping OLTP active in order to do graph updates. I've been searching high and low for some guidance, but so far without any luck. Hopefully someone here could tune in and help?Here's where I'm at currently
- local Hadoop running according to https://old-docs.janusgraph.org/0.4.0/hadoop-tp3.html
- gremlin server started as /bin/gremlin-server.sh conf/gremlin-server/gremlin-server-configuration.yaml
- gremlin-server-configuration.yaml points to init.groovy script doing the traversal mappings for OLTP and OLAP
def globals = [:]ve = ConfiguredGraphFactory.open("ve_graph")OLAPGraph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')globals << [g : ve.traversal(), sg: OLAPGraph.traversal().withComputer(org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer)]
- conf/hadoop-graph/read-cql.properties reads
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraphgremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormatgremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormatgremlin.hadoop.jarsInDistributedCache=truegremlin.hadoop.inputLocation=nonegremlin.hadoop.outputLocation=outputgremlin.spark.persistContext=truejanusgraphmr.ioformat.conf.storage.backend=cqljanusgraphmr.ioformat.conf.storage.hostname=127.0.0.1janusgraphmr.ioformat.conf.storage.port=9042janusgraphmr.ioformat.conf.storage.cassandra.keyspace=janusgraphcassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitionerspark.serializer=org.apache.spark.serializer.KryoSerializerspark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
- Running the gremlin shell I have
\,,,/(o o)-----oOOo-(3)-oOOo-----SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]plugin activated: tinkerpop.serverplugin activated: tinkerpop.tinkergraphplugin activated: tinkerpop.hadoopplugin activated: tinkerpop.sparkplugin activated: tinkerpop.utilitiesplugin activated: janusgraph.importsgremlin> :remote connect tinkerpop.server conf/remote.yaml session==>Configured localhost/127.0.0.1:8182-[655848fc-b46e-40be-8174-f0dc42cdabd4]gremlin> :remote console==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182]-[655848fc-b46e-40be-8174-f0dc42cdabd4] - type ':remote console' to return to local modegremlin> g==>graphtraversalsource[standardjanusgraph[cql:[127.0.0.1]], standard]gremlin>gremlin> sg==>graphtraversalsource[hadoopgraph[cqlinputformat->gryooutputformat], sparkgraphcomputer]gremlin> g.V().has('lbl','System').count()==>68gremlin> sg.V().has('lbl','System').count()
- The job is running for some time and while finishing the gremlin-server.log reads
253856 [Executor task launch worker for task 768] INFO org.apache.spark.executor.Executor - Finished task 768.0 in stage 0.0 (TID 768). 2388 bytes result sent to driver253858 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 768.0 in stage 0.0 (TID 768) in 6809 ms on localhost (executor driver) (769/769)253861 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (fold at SparkStarBarrierInterceptor.java:101) finished in 161.427 s253861 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool253876 [SparkGraphComputer-boss] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: fold at SparkStarBarrierInterceptor.java:101, took 161.598267 s253888 [SparkGraphComputer-boss] INFO org.apache.spark.rdd.MapPartitionsRDD - Removing RDD 1 from persistence list253901 [block-manager-slave-async-thread-pool-0] INFO org.apache.spark.storage.BlockManager - Removing RDD 1I've most likely missed some crucial point here, but I'm not able to spot it. Please help.
- However - the count (==> ) reads 0 for the sg traversal
HadoopMarc <bi...@...>
Hi Mladen,
Interesting read! Spark is not very sensitive to the number of tasks. I believe that for OLAP on HadoopGraph the optimum is for partitions of 256 Mb or so. Larger is difficult to hold in memory for reasonably sized executors. Smaller gives too much overhead. OLAP with janusgraph-hbase is much harder, because the partition size is determined by the HBase regions which need to be large (10GB). Also note that the entire graph needs to fit into the total memory of all executors because graph traversing is shuffle-heavy and spilling to disk will take endlessly.
Best wishes, Marc
Op maandag 30 november 2020 om 19:09:15 UTC+1 schreef Mladen Marović:
I know I'm quite late to the party, but for future reference - the number of input partitions in Spark depends on the partitioning of the source. In case of cassandra, partitioning is determined by the number of tokens each node gets (as configured by `num_tokens` in `cassandra.yaml`), which is set to 256 by default. So, if you have a 3-node cassandra cluster, by default each node should get 256 tokens, which would result in 3*256 = 768 tokens total. Since Spark reads directly from cassandra (if you're using `org.janusgraph.hadoop.formats.cql.CqlInputFormat`), that translates to 768 partitions in the input Spark RDD, or 768 tasks during processing. Add to that 1 task that collects results, or something similar, and you end up at 769. At least that was my experience.The default value of 256 for `num_tokens` made sense in older versions, but in cassandra 3.x a new token allocation algorithm was implemented to improve performance for operations requiring token-range scans, which is precisely what Spark does. I experimented a bit with smaller values (e.g. 16) and managed to drastically reduce the number of tasks when scanning the entire graph. For further, reading, I recommend this article.On Thursday, December 5, 2019 at 9:28:26 AM UTC+1 s...@... wrote:Answering my own question - turned out I had had a mixup of keyspaces used between the two instancesDefault the conf/hadoop-graph/read-cql.properties readsjanusgraphmr.ioformat.conf.storage.cassandra.keyspaceWhile for CQL it should readjanusgraphmr.ioformat.conf.storage.cql.keyspaceAlso - as I made a 'named' (ve_graph) graph I had to point to that one rather than the janusgraph keyspace.Problem 1 solved. Now to the next - how can I lower the number of 'partitions' Spark is using (here 796 '... on localhost (executor driver) (769/769)')?
On Wednesday, December 4, 2019 at 11:46:42 PM UTC+1, Sture Lygren wrote:Hi,I'm trying to get JanusGraph 0.4.0 with a Cassandra (CQL) backend setup and running as OLAP while still keeping OLTP active in order to do graph updates. I've been searching high and low for some guidance, but so far without any luck. Hopefully someone here could tune in and help?Here's where I'm at currently
- local Hadoop running according to https://old-docs.janusgraph.org/0.4.0/hadoop-tp3.html
- gremlin server started as /bin/gremlin-server.sh conf/gremlin-server/gremlin-server-configuration.yaml
- gremlin-server-configuration.yaml points to init.groovy script doing the traversal mappings for OLTP and OLAP
def globals = [:]ve = ConfiguredGraphFactory.open("ve_graph")OLAPGraph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')globals << [g : ve.traversal(), sg: OLAPGraph.traversal().withComputer(org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer)]
- conf/hadoop-graph/read-cql.properties reads
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraphgremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormatgremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormatgremlin.hadoop.jarsInDistributedCache=truegremlin.hadoop.inputLocation=nonegremlin.hadoop.outputLocation=outputgremlin.spark.persistContext=truejanusgraphmr.ioformat.conf.storage.backend=cqljanusgraphmr.ioformat.conf.storage.hostname=127.0.0.1janusgraphmr.ioformat.conf.storage.port=9042janusgraphmr.ioformat.conf.storage.cassandra.keyspace=janusgraphcassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitionerspark.serializer=org.apache.spark.serializer.KryoSerializerspark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
- Running the gremlin shell I have
\,,,/(o o)-----oOOo-(3)-oOOo-----SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]plugin activated: tinkerpop.serverplugin activated: tinkerpop.tinkergraphplugin activated: tinkerpop.hadoopplugin activated: tinkerpop.sparkplugin activated: tinkerpop.utilitiesplugin activated: janusgraph.importsgremlin> :remote connect tinkerpop.server conf/remote.yaml session==>Configured localhost/127.0.0.1:8182-[655848fc-b46e-40be-8174-f0dc42cdabd4]gremlin> :remote console==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182]-[655848fc-b46e-40be-8174-f0dc42cdabd4] - type ':remote console' to return to local modegremlin> g==>graphtraversalsource[standardjanusgraph[cql:[127.0.0.1]], standard]gremlin>gremlin> sg==>graphtraversalsource[hadoopgraph[cqlinputformat->gryooutputformat], sparkgraphcomputer]gremlin> g.V().has('lbl','System').count()==>68gremlin> sg.V().has('lbl','System').count()
- The job is running for some time and while finishing the gremlin-server.log reads
253856 [Executor task launch worker for task 768] INFO org.apache.spark.executor.Executor - Finished task 768.0 in stage 0.0 (TID 768). 2388 bytes result sent to driver253858 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 768.0 in stage 0.0 (TID 768) in 6809 ms on localhost (executor driver) (769/769)253861 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (fold at SparkStarBarrierInterceptor.java:101) finished in 161.427 s253861 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool253876 [SparkGraphComputer-boss] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: fold at SparkStarBarrierInterceptor.java:101, took 161.598267 s253888 [SparkGraphComputer-boss] INFO org.apache.spark.rdd.MapPartitionsRDD - Removing RDD 1 from persistence list253901 [block-manager-slave-async-thread-pool-0] INFO org.apache.spark.storage.BlockManager - Removing RDD 1I've most likely missed some crucial point here, but I'm not able to spot it. Please help.
- However - the count (==> ) reads 0 for the sg traversal
Mladen Marović <mladen...@...>
A slight correction and clarification of my previous post - the total number of partitions/splits is exactly equal to total_number_of_tokens + 1. In a 3-node cassandra cluster where each node has 256 tokens (if set to default), this would result in a total of 769 partitions, in a single-node cluster this would be 257, etc. There is no "1 task that collects results, or something similar".
toggle quoted message
Show quoted text
This makes sense when you consider that Cassandra partitions data using 64-bit row key hashes, that the total range of 64-bit integer hash values is equal to [-2^63, 2^63 - 1], and that tokens are simply 64-bit integer values used to determine what data partitions a node gets. Splitting that range with n different tokens always gives n + 1 subsets. A log excerpt from a 1-node cassandra cluster with 16 tokens confirms this:
18720 [Executor task launch worker for task 0] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-4815577940669380240, '-2942172956248108515] @[master])
18720 [Executor task launch worker for task 1] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((7326109958794842850, '7391123213565411179] @[master])
18721 [Executor task launch worker for task 3] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-2942172956248108515, '-2847854446434006096] @[master])
18740 [Executor task launch worker for task 2] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-9223372036854775808, '-8839354777455528291] @[master])
28369 [Executor task launch worker for task 4] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((4104296217363716109, '7326109958794842850] @[master])
28651 [Executor task launch worker for task 5] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((8156279557766590813, '-9223372036854775808] @[master])
34467 [Executor task launch worker for task 6] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-6978843450179888845, '-5467974851507832526] @[master])
54235 [Executor task launch worker for task 7] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((2164465249293820494, '3738744141825711063] @[master])
56122 [Executor task launch worker for task 8] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-2847854446434006096, '180444324727144184] @[master])
60564 [Executor task launch worker for task 9] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((180444324727144184, '720824306927062455] @[master])
74783 [Executor task launch worker for task 10] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-8839354777455528291, '-7732322859452179159] @[master])
78171 [Executor task launch worker for task 11] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-7732322859452179159, '-6978843450179888845] @[master])
79362 [Executor task launch worker for task 12] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((3738744141825711063, '4104296217363716109] @[master])
91036 [Executor task launch worker for task 13] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((-5467974851507832526, '-4815577940669380240] @[master])
92250 [Executor task launch worker for task 14] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((1437322944493769078, '2164465249293820494] @[master])
92363 [Executor task launch worker for task 15] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((720824306927062455, '1437322944493769078] @[master])
94339 [Executor task launch worker for task 16] INFO org.apache.spark.rdd.NewHadoopRDD - Input split: ColumnFamilySplit((7391123213565411179, '8156279557766590813] @[master])
Best regards,
Mladen
On Tuesday, December 1, 2020 at 8:05:19 AM UTC+1 HadoopMarc wrote:
Hi Mladen,Interesting read! Spark is not very sensitive to the number of tasks. I believe that for OLAP on HadoopGraph the optimum is for partitions of 256 Mb or so. Larger is difficult to hold in memory for reasonably sized executors. Smaller gives too much overhead. OLAP with janusgraph-hbase is much harder, because the partition size is determined by the HBase regions which need to be large (10GB). Also note that the entire graph needs to fit into the total memory of all executors because graph traversing is shuffle-heavy and spilling to disk will take endlessly.Best wishes, MarcOp maandag 30 november 2020 om 19:09:15 UTC+1 schreef Mladen Marović:I know I'm quite late to the party, but for future reference - the number of input partitions in Spark depends on the partitioning of the source. In case of cassandra, partitioning is determined by the number of tokens each node gets (as configured by `num_tokens` in `cassandra.yaml`), which is set to 256 by default. So, if you have a 3-node cassandra cluster, by default each node should get 256 tokens, which would result in 3*256 = 768 tokens total. Since Spark reads directly from cassandra (if you're using `org.janusgraph.hadoop.formats.cql.CqlInputFormat`), that translates to 768 partitions in the input Spark RDD, or 768 tasks during processing. Add to that 1 task that collects results, or something similar, and you end up at 769. At least that was my experience.The default value of 256 for `num_tokens` made sense in older versions, but in cassandra 3.x a new token allocation algorithm was implemented to improve performance for operations requiring token-range scans, which is precisely what Spark does. I experimented a bit with smaller values (e.g. 16) and managed to drastically reduce the number of tasks when scanning the entire graph. For further, reading, I recommend this article.On Thursday, December 5, 2019 at 9:28:26 AM UTC+1 s...@... wrote:Answering my own question - turned out I had had a mixup of keyspaces used between the two instancesDefault the conf/hadoop-graph/read-cql.properties readsjanusgraphmr.ioformat.conf.storage.cassandra.keyspaceWhile for CQL it should readjanusgraphmr.ioformat.conf.storage.cql.keyspaceAlso - as I made a 'named' (ve_graph) graph I had to point to that one rather than the janusgraph keyspace.Problem 1 solved. Now to the next - how can I lower the number of 'partitions' Spark is using (here 796 '... on localhost (executor driver) (769/769)')?
On Wednesday, December 4, 2019 at 11:46:42 PM UTC+1, Sture Lygren wrote:Hi,I'm trying to get JanusGraph 0.4.0 with a Cassandra (CQL) backend setup and running as OLAP while still keeping OLTP active in order to do graph updates. I've been searching high and low for some guidance, but so far without any luck. Hopefully someone here could tune in and help?Here's where I'm at currently
- local Hadoop running according to https://old-docs.janusgraph.org/0.4.0/hadoop-tp3.html
- gremlin server started as /bin/gremlin-server.sh conf/gremlin-server/gremlin-server-configuration.yaml
- gremlin-server-configuration.yaml points to init.groovy script doing the traversal mappings for OLTP and OLAP
def globals = [:]ve = ConfiguredGraphFactory.open("ve_graph")OLAPGraph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')globals << [g : ve.traversal(), sg: OLAPGraph.traversal().withComputer(org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer)]
- conf/hadoop-graph/read-cql.properties reads
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraphgremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormatgremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormatgremlin.hadoop.jarsInDistributedCache=truegremlin.hadoop.inputLocation=nonegremlin.hadoop.outputLocation=outputgremlin.spark.persistContext=truejanusgraphmr.ioformat.conf.storage.backend=cqljanusgraphmr.ioformat.conf.storage.hostname=127.0.0.1janusgraphmr.ioformat.conf.storage.port=9042janusgraphmr.ioformat.conf.storage.cassandra.keyspace=janusgraphcassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitionerspark.serializer=org.apache.spark.serializer.KryoSerializerspark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
- Running the gremlin shell I have
\,,,/(o o)-----oOOo-(3)-oOOo-----SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/sture/Scripts/janusgraph-0.4.0-hadoop2/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]plugin activated: tinkerpop.serverplugin activated: tinkerpop.tinkergraphplugin activated: tinkerpop.hadoopplugin activated: tinkerpop.sparkplugin activated: tinkerpop.utilitiesplugin activated: janusgraph.importsgremlin> :remote connect tinkerpop.server conf/remote.yaml session==>Configured localhost/127.0.0.1:8182-[655848fc-b46e-40be-8174-f0dc42cdabd4]gremlin> :remote console==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182]-[655848fc-b46e-40be-8174-f0dc42cdabd4] - type ':remote console' to return to local modegremlin> g==>graphtraversalsource[standardjanusgraph[cql:[127.0.0.1]], standard]gremlin>gremlin> sg==>graphtraversalsource[hadoopgraph[cqlinputformat->gryooutputformat], sparkgraphcomputer]gremlin> g.V().has('lbl','System').count()==>68gremlin> sg.V().has('lbl','System').count()
- The job is running for some time and while finishing the gremlin-server.log reads
253856 [Executor task launch worker for task 768] INFO org.apache.spark.executor.Executor - Finished task 768.0 in stage 0.0 (TID 768). 2388 bytes result sent to driver253858 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 768.0 in stage 0.0 (TID 768) in 6809 ms on localhost (executor driver) (769/769)253861 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (fold at SparkStarBarrierInterceptor.java:101) finished in 161.427 s253861 [task-result-getter-1] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool253876 [SparkGraphComputer-boss] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: fold at SparkStarBarrierInterceptor.java:101, took 161.598267 s253888 [SparkGraphComputer-boss] INFO org.apache.spark.rdd.MapPartitionsRDD - Removing RDD 1 from persistence list253901 [block-manager-slave-async-thread-pool-0] INFO org.apache.spark.storage.BlockManager - Removing RDD 1I've most likely missed some crucial point here, but I'm not able to spot it. Please help.
- However - the count (==> ) reads 0 for the sg traversal