How to load large size data in janusgraph?


Jerry <zhiyi.j...@...>
 

I have the following script for loading data to janusgraph but it is too slow. It seems that the loading script runs in a single thread.

How to make the loading faster/parallel? The sample raw data file has 2,738,871 lines(202MB) and it takes several minutes to load the data.

I did not create and index nor schema before load.

I am using a 250 GB memory machine so the bottleneck is on janusgraph.

graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
g = graph.traversal()

commentsID = [:]

new File('comments_0_0.csv').eachLine { line, count ->
  // Skip header
  if (count > 1){
    columns = line.split('\\|',-1)

        def types = 'Comments'
        def id_str, creationDate, locationIP, browserUsed, content, length
        (id_str, creationDate, locationIP, browserUsed, content, length) = columns
        long id = Long.parseLong(id_str)

    node = g.addV('Message')
      .property('id', id)
      .property('creationDate', creationDate)
      .property('locationIP', locationIP)
      .property('browserUsed', browserUsed)
      .property('content', content)
      .property('length', length)
      .property('types', types)
      .next()
        nodeId = node.longId()

    commentsID[id_str] = nodeId
  }
}
graph.tx().commit()


Oleksandr Porunov <alexand...@...>
 

I would suggest to parallelize the data loading. I don't know Python, so I can't suggest the technique for it but in Java you would split your file in batches, make ExecutorService with for example 10-20 threads, push tasks into the ExecutorService (each task for example could load 10000 vertices into the graph and commit separately each task). Don't forget to create a separate transaction for each new task `taskTransaction = graph.tx().createThreadedTx()`. Also, use `storage.batch-loading=true` property to enable batch loading.


On Friday, March 29, 2019 at 11:09:01 PM UTC+2, Jerry wrote:
I have the following script for loading data to janusgraph but it is too slow. It seems that the loading script runs in a single thread.

How to make the loading faster/parallel? The sample raw data file has 2,738,871 lines(202MB) and it takes several minutes to load the data.

I did not create and index nor schema before load.

I am using a 250 GB memory machine so the bottleneck is on janusgraph.

graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
g = graph.traversal()

commentsID = [:]

new File('comments_0_0.csv').eachLine { line, count ->
  // Skip header
  if (count > 1){
    columns = line.split('\\|',-1)

        def types = 'Comments'
        def id_str, creationDate, locationIP, browserUsed, content, length
        (id_str, creationDate, locationIP, browserUsed, content, length) = columns
        long id = Long.parseLong(id_str)

    node = g.addV('Message')
      .property('id', id)
      .property('creationDate', creationDate)
      .property('locationIP', locationIP)
      .property('browserUsed', browserUsed)
      .property('content', content)
      .property('length', length)
      .property('types', types)
      .next()
        nodeId = node.longId()

    commentsID[id_str] = nodeId
  }
}
graph.tx().commit()


Jerry <zhiyi.j...@...>
 

Is there any full example on how to split/multiple thread/create separate transactions?


On Friday, March 29, 2019 at 2:46:29 PM UTC-7, Oleksandr Porunov wrote:
I would suggest to parallelize the data loading. I don't know Python, so I can't suggest the technique for it but in Java you would split your file in batches, make ExecutorService with for example 10-20 threads, push tasks into the ExecutorService (each task for example could load 10000 vertices into the graph and commit separately each task). Don't forget to create a separate transaction for each new task `taskTransaction = graph.tx().createThreadedTx()`. Also, use `storage.batch-loading=true` property to enable batch loading.

On Friday, March 29, 2019 at 11:09:01 PM UTC+2, Jerry wrote:
I have the following script for loading data to janusgraph but it is too slow. It seems that the loading script runs in a single thread.

How to make the loading faster/parallel? The sample raw data file has 2,738,871 lines(202MB) and it takes several minutes to load the data.

I did not create and index nor schema before load.

I am using a 250 GB memory machine so the bottleneck is on janusgraph.

graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
g = graph.traversal()

commentsID = [:]

new File('comments_0_0.csv').eachLine { line, count ->
  // Skip header
  if (count > 1){
    columns = line.split('\\|',-1)

        def types = 'Comments'
        def id_str, creationDate, locationIP, browserUsed, content, length
        (id_str, creationDate, locationIP, browserUsed, content, length) = columns
        long id = Long.parseLong(id_str)

    node = g.addV('Message')
      .property('id', id)
      .property('creationDate', creationDate)
      .property('locationIP', locationIP)
      .property('browserUsed', browserUsed)
      .property('content', content)
      .property('length', length)
      .property('types', types)
      .next()
        nodeId = node.longId()

    commentsID[id_str] = nodeId
  }
}
graph.tx().commit()


Oleksandr Porunov <alexand...@...>
 

Splitting and making parallel upload isn't a question related to the database. You can to search for different approaches.
Here is the first link which I found: https://www.blopig.com/blog/2016/08/processing-large-files-using-python/
Here is the second link: https://medium.com/@ageitgey/quick-tip-speed-up-your-python-data-processing-scripts-with-process-pools-cf275350163a
Here is the link which talk about parallel processing in Python: https://stackabuse.com/parallel-processing-in-python/

All you need to do is to make a pool with say 10 threads (for example) and push all jobs into pool. For example, you can make each job to process 10000 lines of the file. With such parameters you will have 274 jobs in the pool which will be processed in parallel in 10 threads. Processed jobs will be removed from the pool.
In each job you should create a separate transaction at the beginning. I told you to create a separate transaction you should call `graph.tx().createThreadedTx()`.
Please read the documentation about multi threaded transactions: https://docs.janusgraph.org/latest/tx.html#multi-thread-tx 

On Friday, March 29, 2019 at 11:56:14 PM UTC+2, Jerry wrote:
Is there any full example on how to split/multiple thread/create separate transactions?

On Friday, March 29, 2019 at 2:46:29 PM UTC-7, Oleksandr Porunov wrote:
I would suggest to parallelize the data loading. I don't know Python, so I can't suggest the technique for it but in Java you would split your file in batches, make ExecutorService with for example 10-20 threads, push tasks into the ExecutorService (each task for example could load 10000 vertices into the graph and commit separately each task). Don't forget to create a separate transaction for each new task `taskTransaction = graph.tx().createThreadedTx()`. Also, use `storage.batch-loading=true` property to enable batch loading.

On Friday, March 29, 2019 at 11:09:01 PM UTC+2, Jerry wrote:
I have the following script for loading data to janusgraph but it is too slow. It seems that the loading script runs in a single thread.

How to make the loading faster/parallel? The sample raw data file has 2,738,871 lines(202MB) and it takes several minutes to load the data.

I did not create and index nor schema before load.

I am using a 250 GB memory machine so the bottleneck is on janusgraph.

graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
g = graph.traversal()

commentsID = [:]

new File('comments_0_0.csv').eachLine { line, count ->
  // Skip header
  if (count > 1){
    columns = line.split('\\|',-1)

        def types = 'Comments'
        def id_str, creationDate, locationIP, browserUsed, content, length
        (id_str, creationDate, locationIP, browserUsed, content, length) = columns
        long id = Long.parseLong(id_str)

    node = g.addV('Message')
      .property('id', id)
      .property('creationDate', creationDate)
      .property('locationIP', locationIP)
      .property('browserUsed', browserUsed)
      .property('content', content)
      .property('length', length)
      .property('types', types)
      .next()
        nodeId = node.longId()

    commentsID[id_str] = nodeId
  }
}
graph.tx().commit()


Алексей Петренко <alex19...@...>
 

Hello, everyone!
Could anybody provide an example of code, which inserts data into janusgraph in parallel. I understand the main concepts of how to implement it, but face different exceptions each time.
I attach my code to this post, so everybody may look and leave their comments.

val service = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2)
val threadedGraph = g.tx().createThreadedTx<Graph>()
val tt = threadedGraph.traversal()
val vertexTasks =
dataGraph
.vertices
.toList()
.chunked(1000)
.map {
VertexTask(
vertices = it,
g = tt,
existingVertices = existingVertices
)
}
val addedVertices = service
.invokeAll(vertexTasks)
.stream()
.map {
it.get()
}
.toList()
.flatten()
.toMap()
val edgeTasks =
dataGraph
.edges
.toList()
.chunked(1000)
.map {
EdgeTask(
edges = it,
g = tt,
addedVertices = addedVertices
)
}
val addedEdges = service
.invokeAll(edgeTasks)
.stream()
.map {
it.get()
}
.toList()
.flatten()
.toMap()
service.shutdown()

private class VertexTask(
private val vertices: List<VertexData>,
private val g: GraphTraversalSource,
private val existingVertices: Map<String, String>

) : Callable<List<Pair<VertexData, Vertex>>> {

override fun call(): List<Pair<VertexData, Vertex>> = vertices
.map {
it to (if (g.V(existingVertices[it.id]).hasNext()) g.V(existingVertices[it.id]).next() else g.addVertex(it))
}.also {
g.tx().commit()
}
}

private class EdgeTask(
private val edges: List<EdgeData>,
private val g: GraphTraversalSource,
private val addedVertices: Map<VertexData, Vertex>
) : Callable<List<Pair<EdgeData, Edge>>> {

override fun call(): List<Pair<EdgeData, Edge>> = edges
.filter {
it.hasSourceAndTarget()
}.map {
it to g.addEdge(it, addedVertices.getValue(it.source!!), addedVertices.getValue(it.target!!))
}.also {
g.tx().commit()
}
}
суббота, 30 марта 2019 г. в 11:54:02 UTC+3, alex...@...:

Splitting and making parallel upload isn't a question related to the database. You can to search for different approaches.
Here is the second link: https://medium.com/@ageitgey/quick-tip-speed-up-your-python-data-processing-scripts-with-process-pools-cf275350163a
Here is the link which talk about parallel processing in Python: https://stackabuse.com/parallel-processing-in-python/

All you need to do is to make a pool with say 10 threads (for example) and push all jobs into pool. For example, you can make each job to process 10000 lines of the file. With such parameters you will have 274 jobs in the pool which will be processed in parallel in 10 threads. Processed jobs will be removed from the pool.
In each job you should create a separate transaction at the beginning. I told you to create a separate transaction you should call `graph.tx().createThreadedTx()`.
Please read the documentation about multi threaded transactions: https://docs.janusgraph.org/latest/tx.html#multi-thread-tx 

On Friday, March 29, 2019 at 11:56:14 PM UTC+2, Jerry wrote:
Is there any full example on how to split/multiple thread/create separate transactions?

On Friday, March 29, 2019 at 2:46:29 PM UTC-7, Oleksandr Porunov wrote:
I would suggest to parallelize the data loading. I don't know Python, so I can't suggest the technique for it but in Java you would split your file in batches, make ExecutorService with for example 10-20 threads, push tasks into the ExecutorService (each task for example could load 10000 vertices into the graph and commit separately each task). Don't forget to create a separate transaction for each new task `taskTransaction = graph.tx().createThreadedTx()`. Also, use `storage.batch-loading=true` property to enable batch loading.

On Friday, March 29, 2019 at 11:09:01 PM UTC+2, Jerry wrote:
I have the following script for loading data to janusgraph but it is too slow. It seems that the loading script runs in a single thread.

How to make the loading faster/parallel? The sample raw data file has 2,738,871 lines(202MB) and it takes several minutes to load the data.

I did not create and index nor schema before load.

I am using a 250 GB memory machine so the bottleneck is on janusgraph.

graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
g = graph.traversal()

commentsID = [:]

new File('comments_0_0.csv').eachLine { line, count ->
  // Skip header
  if (count > 1){
    columns = line.split('\\|',-1)

        def types = 'Comments'
        def id_str, creationDate, locationIP, browserUsed, content, length
        (id_str, creationDate, locationIP, browserUsed, content, length) = columns
        long id = Long.parseLong(id_str)

    node = g.addV('Message')
      .property('id', id)
      .property('creationDate', creationDate)
      .property('locationIP', locationIP)
      .property('browserUsed', browserUsed)
      .property('content', content)
      .property('length', length)
      .property('types', types)
      .next()
        nodeId = node.longId()

    commentsID[id_str] = nodeId
  }
}
graph.tx().commit()