I have created a transaction log in hbase as backend using following code.
JanusGraphTransaction tx = graph.buildTransaction().logIdentifier("addedPerson").start();
JanusGraphVertex u = tx.addVertex("human");
u.property("name", "proteros");
u.property("age", 36);
JanusGraphVertex u1 = tx.addVertex("human");
u.property("name", "sandeep");
u.property("age", 34);
tx.commit();
and trying to replay it using below code.
LogProcessorFramework logProcessor1 = JanusGraphFactory.openTransactionLog(graph);
logProcessor1.addLogProcessor("addedPerson").setStartTime(Instant.EPOCH).setProcessorIdentifier("addedPersonProcessor").setStartTime(Instant.EPOCH).addProcessor(new ChangeProcessor() {
@Override
public void process(JanusGraphTransaction janusGraphTransaction, TransactionId transactionId, ChangeState changeState) {
System.out.println("processing old log");
for (Vertex v : changeState.getVertices(Change.ADDED)) {
System.out.println("vertex " + v.label());
if (v.label().equals("human")) totalHumansAdded.incrementAndGet();
}
}
}).build();
However, my logProcessor doesn't process any previously saved vertices and only detect changes which are performed in after the LogProcessor has been built.
I tried setting up Start Time to INSTANT.EPOCH, which as per documentation should make log processing to start from BEGINING but no help.
The only difference is see in logs is the MessagePuller is started from EPOCH.
22:59:19,039 INFO KCVSLog:744 - Loaded unidentified ReadMarker start time 2018-05-01T14:59:19.023Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@57db2b13
22:59:19,054 INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@2f953efd
22:59:19,054 INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@14bdbc74
22:59:19,054 INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@5a7fe64f
22:59:19,054 INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@41330d4f
Another point to note is, an vertex inserted using another Java Instance is detected by logprocessor instances running in any other Java Instance.
Looks like its a bug as it doesn't behave as per documentation.
When a log processor is built against a particular log, such as the addedPerson
log in the example above, it will start reading transactional change records from the log immediately upon successful construction and initialization up to the head of the log. The start time specified in the builder marks the time point in the log where the log processor will start reading records. Optionally, one can specify an identifier for the log processor in the builder. The log processor will use the identifier to regularly persist its state of processing, i.e. it will maintain a marker on the last read log record. If the log processor is later restarted with the same identifier, it will continue reading from the last read record. This is particularly useful when the log processor is supposed to run for long periods of time and is therefore likely to fail. In such failure situations, the log processor can simply be restarted with the same identifier. It must be ensured that log processor identifiers are unique in a JanusGraph cluster in order to avoid conflicts on the persisted read markers.
Any comments???
Regards,
Sandeep