BenjiELCA's picture
correction of bug with dataAssociation
9134c9f
raw
history blame
18.2 kB
import xml.etree.ElementTree as ET
from modules.utils import class_dict
def rescale(scale, boxes):
for i in range(len(boxes)):
boxes[i] = [boxes[i][0]*scale,
boxes[i][1]*scale,
boxes[i][2]*scale,
boxes[i][3]*scale]
return boxes
def create_BPMN_id(data):
enums = {
'end_event': 1,
'start_event': 1,
'task': 1,
'sequenceFlow': 1,
'messageFlow': 1,
'message_event': 1,
'exclusiveGateway': 1,
'parallelGateway': 1,
'dataAssociation': 1,
'pool': 1,
'dataObject': 1,
'timerEvent': 1
}
BPMN_name = [class_dict[label] for label in data['labels']]
for idx, Bpmn_id in enumerate(BPMN_name):
if Bpmn_id == 'event':
if data['links'][idx][0] is not None and data['links'][idx][1] is None:
key = 'end_event'
elif data['links'][idx][0] is None and data['links'][idx][1] is not None:
key = 'start_event'
else:
key = {
'task': 'task',
'dataObject': 'dataObject',
'sequenceFlow': 'sequenceFlow',
'messageFlow': 'messageFlow',
'messageEvent': 'message_event',
'exclusiveGateway': 'exclusiveGateway',
'parallelGateway': 'parallelGateway',
'dataAssociation': 'dataAssociation',
'pool': 'pool',
'timerEvent': 'timerEvent'
}.get(Bpmn_id, None)
if key:
data['BPMN_id'][idx] = f'{key}_{enums[key]}'
enums[key] += 1
return data
def add_diagram_elements(parent, element_id, x, y, width, height):
"""Utility to add BPMN diagram notation for elements."""
shape = ET.SubElement(parent, 'bpmndi:BPMNShape', attrib={
'bpmnElement': element_id,
'id': element_id + '_di'
})
bounds = ET.SubElement(shape, 'dc:Bounds', attrib={
'x': str(x),
'y': str(y),
'width': str(width),
'height': str(height)
})
def add_diagram_edge(parent, element_id, waypoints):
"""Utility to add BPMN diagram notation for sequence flows."""
edge = ET.SubElement(parent, 'bpmndi:BPMNEdge', attrib={
'bpmnElement': element_id,
'id': element_id + '_di'
})
for x, y in waypoints:
ET.SubElement(edge, 'di:waypoint', attrib={
'x': str(x),
'y': str(y)
})
def check_status(link, keep_elements):
if link[0] in keep_elements and link[1] in keep_elements:
return 'middle'
elif link[0] is None and link[1] in keep_elements:
return 'start'
elif link[0] in keep_elements and link[1] is None:
return 'end'
else:
return 'middle'
def check_data_association(i, links, labels, keep_elements):
status, links_idx = [], []
for j, (k,l) in enumerate(links):
if labels[j] == list(class_dict.values()).index('dataAssociation'):
print('i,j,k,l',i,j,k,l)
if k==i:
status.append('output')
links_idx.append(j)
elif l==i:
status.append('input')
links_idx.append(j)
return status, links_idx
def create_data_Association(bpmn,data,size,element_id,current_idx,source_id,target_id):
waypoints = calculate_waypoints(data, size, current_idx, source_id, target_id)
add_diagram_edge(bpmn, element_id, waypoints)
def check_eventBasedGateway(i, links, labels):
status, links_idx = [], []
for j, (k,l) in enumerate(links):
if labels[j] == list(class_dict.values()).index('sequenceFlow'):
if k==i:
status.append('output')
links_idx.append(j)
elif l==i:
status.append('input')
links_idx.append(j)
return status, links_idx
# Function to dynamically create and layout BPMN elements
def create_bpmn_object(process, bpmnplane, text_mapping, definitions, size, data, keep_elements):
elements = data['BPMN_id']
positions = data['boxes']
links = data['links']
#print(links)
for i in keep_elements:
element_id = elements[i]
if element_id is None:
continue
element_type = element_id.split('_')[0]
x, y = positions[i][:2]
# Start Event
if element_type == 'start':
element = ET.SubElement(process, 'bpmn:startEvent', id=element_id, name=text_mapping[element_id])
add_diagram_elements(bpmnplane, element_id, x, y, size['start'][0], size['start'][1])
# Task
elif element_type == 'task':
element = ET.SubElement(process, 'bpmn:task', id=element_id, name=text_mapping[element_id])
status, datasAssociation_idx = check_data_association(i, data['links'], data['labels'], keep_elements)
if len(status) != 0:
for state, dataAssociation_idx in zip(status, datasAssociation_idx):
# Handle Data Input Association
if state == 'input':
dataObject_idx = links[dataAssociation_idx][0]
dataObject_name = elements[dataObject_idx]
dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}'
sub_element = ET.SubElement(element, 'bpmn:dataInputAssociation', id=f'dataInputAssociation_{dataObject_ref.split("_")[1]}')
ET.SubElement(sub_element, 'bpmn:sourceRef').text = dataObject_ref
create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, dataObject_name, element_id)
# Handle Data Output Association
elif state == 'output':
dataObject_idx = links[dataAssociation_idx][1]
dataObject_name = elements[dataObject_idx]
dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}'
sub_element = ET.SubElement(element, 'bpmn:dataOutputAssociation', id=f'dataOutputAssociation_{dataObject_ref.split("_")[1]}')
ET.SubElement(sub_element, 'bpmn:targetRef').text = dataObject_ref
create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, element_id, dataObject_name)
add_diagram_elements(bpmnplane, element_id, x, y, size['task'][0], size['task'][1])
# Message Events (Start, Intermediate, End)
elif element_type == 'message':
status = check_status(links[i], keep_elements)
if status == 'start':
element = ET.SubElement(process, 'bpmn:startEvent', id=element_id, name=text_mapping[element_id])
elif status == 'middle':
element = ET.SubElement(process, 'bpmn:intermediateCatchEvent', id=element_id, name=text_mapping[element_id])
elif status == 'end':
element = ET.SubElement(process, 'bpmn:endEvent', id=element_id, name=text_mapping[element_id])
status, datasAssociation_idx = check_data_association(i, data['links'], data['labels'], keep_elements)
print('status',status)
print('datasAssociation_idx',datasAssociation_idx)
if len(status) != 0:
for state, dataAssociation_idx in zip(status, datasAssociation_idx):
# Handle Data Input Association
if state == 'input':
dataObject_idx = links[dataAssociation_idx][0]
dataObject_name = elements[dataObject_idx]
dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}'
sub_element = ET.SubElement(element, 'bpmn:dataInputAssociation', id=f'dataInputAssociation_{dataObject_ref.split("_")[1]}')
ET.SubElement(sub_element, 'bpmn:sourceRef').text = dataObject_ref
create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, dataObject_name, element_id)
# Handle Data Output Association
elif state == 'output':
dataObject_idx = links[dataAssociation_idx][1]
dataObject_name = elements[dataObject_idx]
dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}'
sub_element = ET.SubElement(element, 'bpmn:dataOutputAssociation', id=f'dataOutputAssociation_{dataObject_ref.split("_")[1]}')
ET.SubElement(sub_element, 'bpmn:targetRef').text = dataObject_ref
create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, element_id, dataObject_name)
ET.SubElement(element, 'bpmn:messageEventDefinition', id=f'MessageEventDefinition_{i+1}')
add_diagram_elements(bpmnplane, element_id, x, y, size['message'][0], size['message'][1])
# End Event
elif element_type == 'end':
element = ET.SubElement(process, 'bpmn:endEvent', id=element_id, name=text_mapping[element_id])
add_diagram_elements(bpmnplane, element_id, x, y, size['end'][0], size['end'][1])
# Gateways (Exclusive, Parallel)
elif element_type in ['exclusiveGateway', 'parallelGateway']:
gateway_type = 'exclusiveGateway' if element_type == 'exclusiveGateway' else 'parallelGateway'
element = ET.SubElement(process, f'bpmn:{gateway_type}', id=element_id)
add_diagram_elements(bpmnplane, element_id, x, y, size[element_type][0], size[element_type][1])
elif element_type == 'eventBasedGateway':
element = ET.SubElement(process, 'bpmn:eventBasedGateway', id=element_id)
status, links_idx = check_eventBasedGateway(i, data['links'], data['labels'])
if len(status) != 0:
for state, link_idx in zip(status, links_idx):
# Handle Data Input Association
if state == 'input' :
gateway_idx = links[link_idx][0]
gateway_name = elements[gateway_idx]
sub_element = ET.SubElement(element, 'bpmn:eventBasedGateway', id=f'eventBasedGateway{gateway_name.split("_")[1]}')
create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], i, gateway_name, element_id)
# Handle Data Output Association
elif state == 'output':
gateway_idx = links[link_idx][1]
gateway_name = elements[gateway_idx]
sub_element = ET.SubElement(element, 'bpmn:eventBasedGateway', id=f'eventBasedGateway{gateway_name.split("_")[1]}')
create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], i, element_id, gateway_name)
add_diagram_elements(bpmnplane, element_id, x, y, size['eventBasedGateway'][0], size['eventBasedGateway'][1])
# Data Object
elif element_type == 'dataObject' or element_type == 'dataStore':
dataObject_idx = element_id.split('_')[1]
dataObject_ref = f'DataObjectReference_{dataObject_idx}'
element = ET.SubElement(process, 'bpmn:dataObjectReference', id=dataObject_ref, dataObjectRef=element_id, name=text_mapping[element_id])
ET.SubElement(process, f'bpmn:{element_type}', id=element_id)
add_diagram_elements(bpmnplane, dataObject_ref, x, y, size[element_type][0], size[element_type][1])
# Timer Event
elif element_type == 'timerEvent':
element = ET.SubElement(process, 'bpmn:intermediateCatchEvent', id=element_id, name=text_mapping[element_id])
ET.SubElement(element, 'bpmn:timerEventDefinition', id=f'TimerEventDefinition_{i+1}')
add_diagram_elements(bpmnplane, element_id, x, y, size['timerEvent'][0], size['timerEvent'][1])
def calculate_pool_bounds(data, keep_elements, size):
min_x = min_y = float('10000')
max_x = max_y = float('0')
for i in keep_elements:
if i >= len(data['BPMN_id']):
print("Problem with the index")
continue
element = data['BPMN_id'][i]
if element is None or data['labels'][i] == 13 or data['labels'][i] == 14 or data['labels'][i] == 15 or data['labels'][i] == 7 or data['labels'][i] == 15:
continue
element_type = element.split('_')[0]
x, y = data['boxes'][i][:2]
element_width, element_height = size[element_type]
min_x = min(min_x, x)
min_y = min(min_y, y)
max_x = max(max_x, x + element_width)
max_y = max(max_y, y + element_height)
return min_x, min_y, max_x, max_y
def calculate_pool_waypoints(idx, data, size, source_idx, target_idx, source_element, target_element):
# Get the bounding boxes of the source and target elements
source_box = data['boxes'][source_idx]
target_box = data['boxes'][target_idx]
# Get the midpoints of the source element
source_mid_x = (source_box[0] + source_box[2]) / 2
source_mid_y = (source_box[1] + source_box[3]) / 2
# Check if the connection involves a pool
if source_element == 'pool':
if target_element == 'pool':
return [(source_mid_x, source_mid_y), (source_mid_x, source_mid_y)]
pool_box = source_box
element_box = (target_box[0], target_box[1], target_box[0]+size[target_element][0], target_box[1]+size[target_element][1])
element_mid_x = (element_box[0] + element_box[2]) / 2
element_mid_y = (element_box[1] + element_box[3]) / 2
# Connect the pool's bottom or top side to the target element's top or bottom center
if pool_box[3] < element_box[1]: # Pool is above the target element
waypoints = [(element_mid_x, pool_box[3]-50), (element_mid_x, element_box[1])]
else: # Pool is below the target element
waypoints = [(element_mid_x, element_box[3]), (element_mid_x, pool_box[1]-50)]
else:
pool_box = target_box
element_box = (source_box[0], source_box[1], source_box[0]+size[source_element][0], source_box[1]+size[source_element][1])
element_mid_x = (element_box[0] + element_box[2]) / 2
element_mid_y = (element_box[1] + element_box[3]) / 2
# Connect the element's bottom or top center to the pool's top or bottom side
if pool_box[3] < element_box[1]: # Pool is above the target element
waypoints = [(element_mid_x, element_box[1]), (element_mid_x, pool_box[3]-50)]
else: # Pool is below the target element
waypoints = [(element_mid_x, element_box[3]), (element_mid_x, pool_box[1]-50)]
return waypoints
def calculate_waypoints(data, size, current_idx, source_id, target_id):
best_points = data['best_points'][current_idx]
pos_source = best_points[0]
pos_target = best_points[1]
source_idx = data['BPMN_id'].index(source_id)
target_idx = data['BPMN_id'].index(target_id)
name_source = source_id.split('_')[0]
name_target = target_id.split('_')[0]
#Get the position of the source and target
source_x, source_y = data['boxes'][source_idx][:2]
target_x, target_y = data['boxes'][target_idx][:2]
if pos_source == 'left':
source_x = source_x
source_y += size[name_source][1]/2
elif pos_source == 'right':
source_x += size[name_source][0]
source_y += size[name_source][1]/2
elif pos_source == 'top':
source_x += size[name_source][0]/2
source_y = source_y
elif pos_source == 'bottom':
source_x += size[name_source][0]/2
source_y += size[name_source][1]
if pos_target == 'left':
target_x = target_x
target_y += size[name_target][1]/2
elif pos_target == 'right':
target_x += size[name_target][0]
target_y += size[name_target][1]/2
elif pos_target == 'top':
target_x += size[name_target][0]/2
target_y = target_y
elif pos_target == 'bottom':
target_x += size[name_target][0]/2
target_y += size[name_target][1]
return [(source_x, source_y), (target_x, target_y)]
def create_flow_element(bpmn, text_mapping, idx, size, data, parent, message=False):
source_idx, target_idx = data['links'][idx]
source_id, target_id = data['BPMN_id'][source_idx], data['BPMN_id'][target_idx]
if message:
element_id = f'messageflow_{source_id}_{target_id}'
else:
element_id = f'sequenceflow_{source_id}_{target_id}'
if message:
if source_id.split('_')[0] == 'pool' or target_id.split('_')[0] == 'pool':
waypoints = calculate_pool_waypoints(idx, data, size, source_idx, target_idx, source_id.split('_')[0], target_id.split('_')[0])
if source_id.split('_')[0] == 'pool':
XML_source_id = f"participant_{source_id.split('_')[1]}"
XML_target_id = target_id
if target_id.split('_')[0] == 'pool':
XML_target_id = f"participant_{target_id.split('_')[1]}"
XML_source_id = source_id
element = ET.SubElement(parent, 'bpmn:messageFlow', id=element_id, sourceRef=XML_source_id, targetRef=XML_target_id, name=text_mapping[data['BPMN_id'][idx]])
else:
waypoints = calculate_waypoints(data, size, idx, source_id, target_id)
element = ET.SubElement(parent, 'bpmn:messageFlow', id=element_id, sourceRef=source_id, targetRef=target_id, name=text_mapping[data['BPMN_id'][idx]])
else:
waypoints = calculate_waypoints(data, size, idx, source_id, target_id)
element = ET.SubElement(parent, 'bpmn:sequenceFlow', id=element_id, sourceRef=source_id, targetRef=target_id, name=text_mapping[data['BPMN_id'][idx]])
add_diagram_edge(bpmn, element_id, waypoints)