import java.util.concurrent.atomic.AtomicLong USER_DIR = System.getProperty('user.working_dir','.') + '/' DATA_DIR = USER_DIR + "outputs/" TX_CSV = DATA_DIR + "tx_list.csv" ALERT_CSV = DATA_DIR + "alerts.csv" PROP_FILE = "janusgraph.properties" println "Start loading transactions from " + TX_CSV counter = new AtomicLong() batchSize = 100000 cache = [:] graph = JanusGraphFactory.open(PROP_FILE) // create schema mgmt = graph.openManagement() // vertex schema mgmt.makePropertyKey('vid').dataType(String.class).make() mgmt.makePropertyKey('vtype').dataType(String.class).make() mgmt.makePropertyKey('category').dataType(String.class).make() mgmt.makePropertyKey('case').dataType(Boolean.class).make() // edge schema mgmt.makeEdgeLabel('account').make() mgmt.makeEdgeLabel('transaction').make() mgmt.makePropertyKey('tkey').dataType(String.class).make() vid = mgmt.getPropertyKey('vid') mgmt.buildIndex('vertexID',Vertex.class).addKey(vid).buildCompositeIndex() mgmt.commit() mutate = { -> if (0 == counter.incrementAndGet() % batchSize) { graph.tx().commit() } } addVertex = { def vid, def vtype, def category, def caseFlag -> if(!cache.containsKey(vid)){ v = graph.addVertex("vid", vid, "vtype", vtype, "category", category, "case", caseFlag) mutate() cache[vid] = v return v } return cache[vid] } DEFAULT_INDEX = -1 case_set = new HashSet() line_counter = new AtomicLong() /* * Load Alert CSV File */ println "START LOAD ALERT FILE " + ALERT_CSV // ALERT_KEY,ALERT_TEXT,ACCOUNT_ID,CUSTOMER_ID,EVENT_DATE,CHECK_NAME,Organization_Type,Escalated_To_Case_Investigation alert_id = "ALERT_KEY" account_id = "ACCOUNT_ID" customer_id = "CUSTOMER_ID" escalated = "Escalated_To_Case_Investigation" alert_idx = DEFAULT_INDEX acct_idx = DEFAULT_INDEX cust_idx = DEFAULT_INDEX escalated_idx = DEFAULT_INDEX reader = new BufferedReader(new FileReader(ALERT_CSV)) line = reader.readLine() fields = line.split(',', -1) for(int i=0; i