emvecchi's picture
Update app.py
5e721ae verified
raw
history blame
No virus
11.1 kB
import json
import os
from dataclasses import dataclass
from typing import List, Optional
from PIL import Image
import pandas as pd
import streamlit as st
from huggingface_hub import HfFileSystem
@dataclass
class Field:
type: str
title: str
name: str = None
help: Optional[str] = None
children: List['Field'] = None
# Function to get user ID from URL
def get_user_id_from_url():
user_id = st.query_params.get("user_id", "")
return user_id
HF_TOKEN = os.environ.get("HF_TOKEN_WRITE")
print("is none?", HF_TOKEN is None)
hf_fs = HfFileSystem(token=HF_TOKEN)
input_repo_path = 'datasets/emvecchi/annotate-pilot'
output_repo_path = 'datasets/emvecchi/annotate-pilot'
to_annotate_file_name = 'to_annotate.csv' # CSV file to annotate
COLS_TO_SAVE = ['comment_id']
fields: List[Field] = [
Field(name="topic", type="input_col", title="**Topic:**"),
Field(name="parent_comment", type="input_col", title="**Preceeding Comment:**"),
Field(name="comment", type="input_col", title="**Comment:**"),
Field(name="image_name", type="input_col", title="**Visualization of high contributing properties:**"),
Field(type="container", title="", children=[
Field(name="to_moderate", type="radio",
title="**Moderator Intervention**: Do feel this comment/discussion would benefit from moderator intervention?"),
Field(name="actions_clear", type="slider",
title="**Priority**: With what level of priority would you need to interact with this comment?"),
]),
#Field(type="expander",
# title="Expand and fill-out this section if you see **issues in the original comment**",
# children=[
# Field(name="issues_in_comment", type="slider",
# title="Do **you see some issues** in the original comment?"),
# Field(name="moderator_spotted", type="slider",
# title="Based on the reply, has the **moderator spotted the issues** in the original comment?"),
# Field(name="reply_addresses_issues", type="slider",
# title="How well does the reply **address those issues**?")
# ]),
#Field(type="container", title="**Score the following properties of the moderator comment?**", children=[
# Field(name="neutrality", type="slider", title="Neutrality",
# help='Remain Neutral on the topic and on the Comment Substance and Commenter’s Viewpoint. The reply shouldn’t give away the opinion of the moderator on the topic or comment. '),
# # FieldDict(name="attitude", type="slider", title="Attitude", help=''),
# Field(name="clarity", type="slider", title="Clarity",
# help="Plain language, simple, clear, avoid overwhelming the user e.g. too many questions"),
# Field(name="curiosity", type="slider", title="Curiosity",
# help="Moderators should model a spirit of inquiry and a desire to learn from and understand commenter’s experience and views. Try to be interested in the bases upon which each commenter stakes his or her claims and the lines of reasoning that has led each commenter to those particular conclusions."),
# # TODO
# Field(name="bias", type="slider", title="Bias",
# help="Does the reply show some biases towards the commenter? Are there stereotypes or prejudices?"),
# Field(name="encouraging", type="slider", title="Encouraging",
# help="Welcoming, encouraging and acknowledging. Avoid Evaluative and/or Condescending Responses"),
#]),
#
Field(name="other_comments", type="text", title="Further comments: free text"),
]
INPUT_FIELD_DEFAULT_VALUES = {'slider': 4,
'text': None,
'textarea': None,
'checkbox': False,
'radio': None}
SHOW_HELP_ICON = False
def read_data(_path):
with hf_fs.open(input_repo_path + '/' + _path) as f:
return pd.read_csv(f)
def read_saved_data():
_path = get_path()
if hf_fs.exists(output_repo_path + '/' + _path):
with hf_fs.open(output_repo_path + '/' + _path) as f:
try:
return json.load(f)
except json.JSONDecodeError as e:
print(e)
return None
# Write a remote file
def save_data(data):
hf_fs.mkdir(f"{output_repo_path}/{data['user_id']}")
with hf_fs.open(f"{output_repo_path}/{get_path()}", "w") as f:
f.write(json.dumps(data))
def get_path():
return f"{st.session_state.user_id}/{st.session_state.current_index}.json"
def display_image(image_path):
with hf_fs.open(image_path) as f:
img = Image.open(f)
st.image(img, caption='10 most contributing properties', use_column_width=True)
#################################### Streamlit App ####################################
# Function to navigate rows
def navigate(index_change):
st.session_state.current_index += index_change
print(st.session_state.current_index)
# https://discuss.streamlit.io/t/click-twice-on-button-for-changing-state/45633/2
st.rerun()
def show_field(f: Field, index: int):
if f.type not in INPUT_FIELD_DEFAULT_VALUES.keys():
match f.type:
case 'input_col':
st.write(f.title)
if f.name == 'image_name':
st.write(f.title)
image_name = st.session_state.data.iloc[index][f.name]
if image_name: # Ensure the image name is not empty
image_path = os.path.join(input_repo_path, 'images', image_name)
display_image(image_path)
else:
st.write(st.session_state.data.iloc[index][f.name])
case 'markdown':
st.markdown(f.title)
case 'expander' | 'container':
with (st.expander(f.title) if f.type == 'expander' else st.container(border=True)):
if f.type == 'container':
st.markdown(f.title)
for child in f.children:
show_field(child, index)
else:
key = f.name + str(index)
value = st.session_state.default_values[f.name] = data_collected[f.name] if data_collected else \
INPUT_FIELD_DEFAULT_VALUES[f.type]
if not SHOW_HELP_ICON:
f.title = f'**{f.title}**\n\n{f.help}' if f.help else f.title
f.help = None
match f.type:
case 'checkbox':
st.session_state.data_inputs[f.name] = st.checkbox(f.title,
key=key,
value=value, help=f.help)
case 'radio':
st.session_state.data_inputs[f.name] = st.radio(f.title,
["yes","no","other"],
key=key,
value=value, help=f.help)
case 'slider':
st.session_state.data_inputs[f.name] = st.slider(f.title,
min_value=1, max_value=7, step=1,
key=key,
value=value, help=f.help)
case 'text':
st.session_state.data_inputs[f.name] = st.text_input(f.title, key=key, value=value)
case 'textarea':
st.session_state.data_inputs[f.name] = st.text_area(f.title, key=key, value=value)
# st.set_page_config(layout='wide')
# Title of the app
st.title("Moderation Prediction")
st.markdown(
"""<style>
div[data-testid="stMarkdownContainer"] > p {
font-size: 1rem;
}
</style>
""", unsafe_allow_html=True)
st.markdown(
"""<details open>
<summary>Annotation Guidelines</summary>
some guidelines here
</details>
""", unsafe_allow_html=True)
# Load the data to annotate
if 'data' not in st.session_state:
st.session_state.data = read_data(to_annotate_file_name)
# Initialize the current index
if 'current_index' not in st.session_state:
st.session_state.current_index = -1
if st.session_state.current_index == -1:
user_id_from_url = get_user_id_from_url()
if user_id_from_url:
st.session_state.user_id = user_id_from_url
navigate(1)
else:
st.session_state.user_id = st.text_input('Please enter your user ID to proceed', value=user_id_from_url)
if st.button("Next"):
navigate(1)
elif st.session_state.current_index < len(st.session_state.data):
st.write(f"username is {st.session_state.user_id}")
# Creating the form
with st.form("feedback_form"):
index = st.session_state.current_index
data_collected = read_saved_data()
st.session_state.default_values = {}
st.session_state.data_inputs = {}
for field in fields:
if field.name not in st.session_state.data.columns:
# Field doesn't exist in input dataframe, add it with a default value
st.session_state.data_inputs[field.name] = None
show_field(field, index)
submitted = st.form_submit_button("Submit")
if submitted:
with st.spinner(text="saving"):
save_data({
'user_id': st.session_state.user_id,
'index': st.session_state.current_index,
**st.session_state.data.iloc[index][COLS_TO_SAVE].to_dict(),
**st.session_state.data_inputs
})
st.success("Feedback submitted successfully!")
navigate(1)
else:
st.write("Finished all data points!")
# Navigation buttons
if st.session_state.current_index > 0:
if st.button("Previous"):
with st.spinner(text="in progress"):
navigate(-1)
if 0 <= st.session_state.current_index < len(st.session_state.data):
st.write(f"Page {st.session_state.current_index + 1} out of {len(st.session_state.data)}")
# disable text input enter to submit
# https://discuss.streamlit.io/t/text-input-how-to-disable-press-enter-to-apply/14457/6
import streamlit.components.v1 as components
components.html(
"""
<script>
const inputs = window.parent.document.querySelectorAll('input');
inputs.forEach(input => {
input.addEventListener('keydown', function(event) {
if (event.key === 'Enter') {
event.preventDefault();
}
});
});
</script>
""",
height=0
)
st.markdown(
"""<style>
div[data-testid="InputInstructions"] {
visibility: hidden;
}
</style>""", unsafe_allow_html=True
)