Spaces:
Sleeping
Sleeping
File size: 10,424 Bytes
54a81cc 10df191 aa89488 54a81cc 52084ff 33c417d 7de933f 8be8093 54a81cc 52084ff 2d9f0ba 52084ff 7971cd8 df87278 c08af9f df87278 0a274b3 c08af9f 7971cd8 8be8093 7de933f 8be8093 52084ff 8be8093 aa89488 4254bd0 fc753c1 10df191 fc753c1 10df191 aa89488 52084ff 31f35b1 52084ff 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 9aec593 54a81cc 35eb21a 9473a08 aa89488 9473a08 3dde2a6 fbc8f33 9473a08 3dde2a6 9473a08 54a81cc 09aa473 2c7c714 31f35b1 8d55090 fbc8f33 35eb21a 54a81cc 35eb21a 09aa473 54a81cc 35eb21a 54a81cc 35eb21a 54a81cc 9aec593 3d040cd d7e002e 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 9aec593 52084ff 9aec593 52084ff 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8c201cf 54a81cc 76f99fa 1ec60dd 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 54a81cc 8be8093 10df191 54a81cc 8be8093 54a81cc 8be8093 54a81cc 10df191 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
import gradio as gr
import os
import threading
import random
from datasets import load_dataset, Dataset, Features, Value, concatenate_datasets
from huggingface_hub import login
import json
import re
# reset = False
def check_word_count(caption):
# Check if the caption has 3 or more words
return gr.update(interactive=len(caption.split()) >= 3)
# Authenticate with Hugging Face
token = os.getenv("HUGGINGFACE_TOKEN")
if token:
login(token=token)
else:
print("HUGGINGFACE_TOKEN environment variable not set.")
dataset_name = "GeorgeIbrahim/EGYCOCO" # Replace with your dataset name
with open('nearest_neighbors_with_captions.json', 'r') as f:
results = json.load(f)
# Load or create the dataset
try:
dataset = load_dataset(dataset_name, split="train")
dataset = dataset.filter(lambda example: example["image_id"] != "COCO_val2014_000000111367.jpg")
print("Loaded existing dataset:", dataset)
print("Dataset features:", dataset.features) # Check if 'split' is part of features
# Check if the 'split' column exists; if not, add it
if 'split' not in dataset.column_names:
# Define the 'split' values based on `image_id`
split_values = [
"dev" if example["image_id"] in results else "train"
for example in dataset
]
# Add 'split' column to the dataset
dataset = dataset.add_column("split", split_values)
print("Added 'split' column to dataset.")
else:
print("'split' column already exists.")
# Create a dictionary to keep track of the highest annotation count for each image
annotation_counts = {}
for example in dataset:
image_id = example["image_id"]
count = example["annotation_count"]
if image_id not in annotation_counts or count > annotation_counts[image_id]:
annotation_counts[image_id] = count
print("Annotation counts:", annotation_counts)
except Exception as e:
print(f"Error loading dataset: {e}")
# Create an empty dataset if it doesn't exist
features = Features({
'image_id': Value(dtype='string'),
'caption': Value(dtype='string'),
'annotation_count': Value(dtype='int32'),
'split': Value(dtype='string')
})
dataset = Dataset.from_dict({'image_id': [], 'caption': [], 'annotation_count': [], 'split': []}, features=features)
annotation_counts = {}
dataset.push_to_hub(dataset_name) # Push the empty dataset to Hugging Face
# Initialize or reset data as needed based on the `reset` flag
# if reset:
# # Clear the annotation counts
# annotation_counts = {}
# shown_counts = {} # If you are tracking shown counts separately for images
# # Optionally, clear or reinitialize the dataset
# features = Features({
# 'image_id': Value(dtype='string'),
# 'caption': Value(dtype='string'),
# 'annotation_count': Value(dtype='int32'),
# 'split': Value(dtype='string')
# })
# dataset = Dataset.from_dict({
# 'image_id': [],
# 'caption': [],
# 'annotation_count': [],
# 'split': []
# }, features=features)
# # Push the reset dataset to Hugging Face or perform other necessary actions
# dataset.push_to_hub(dataset_name)
# print("Data has been reset.")
image_folder = "images"
image_files = [f for f in os.listdir(image_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
len_files = len(image_files)
lock = threading.Lock()
def get_caption_for_image_id(image_path):
"""
Retrieve the caption for a given image_id from the JSON data.
"""
# Extract the numeric part of the image ID
match = re.search(r'_(\d+)\.', image_path)
if match:
image_id = match.group(1).lstrip('0') # Remove leading zeros
print("Searching for image_id:", image_id) # Debugging line
# Check if image_id is a test image
if image_id in results:
print("Found caption in results:", results[image_id]["caption"]) # Debugging line
return results[image_id]["caption"]
# If image_id is not a test image, search in nearest neighbors
for test_image_data in results.values():
for neighbor in test_image_data["nearest_neighbors"]:
if neighbor["image_id"] == image_id:
print("Found caption in nearest neighbors:", neighbor["caption"]) # Debugging line
return neighbor["caption"]
# Return None if the image_id is not found
print("Caption not found for image_id:", image_id) # Debugging line
return None
# Function to get a random image that hasn’t been fully annotated
def get_next_image(session_data):
with lock:
# Available images filter
available_images = []
# Iterate over each image file to apply the filtering logic
for img in image_files:
# Match and extract the image_id from the filename
match = re.search(r'_(\d+)\.', img)
if match:
image_id_2 = match.group(1).lstrip('0') # Remove leading zeros
# Apply the filtering conditions
if (img not in annotation_counts or
(image_id_2 in results and annotation_counts.get(img, 0) < 2) or
(image_id_2 not in results and annotation_counts.get(img, 0) == 0)):
available_images.append(img)
# print("Available images:", available_images) # Debugging line
print(available_images)
print("Remaining images: ", len_files - len(available_images))
# random.shuffle(available_images)
# Check if the user already has an image
if session_data["current_image"] is None and available_images:
# Assign a new random image to the user
session_data["current_image"] = random.choice(available_images)
# print("Current image_id:", session_data["current_image"]) # Print the current image_id
return os.path.join(image_folder, session_data["current_image"]) if session_data["current_image"] else None
# Function to save the annotation to Hugging Face dataset and fetch the next image
def save_annotation(caption, session_data):
global dataset, annotation_counts # Declare global dataset and annotation_counts at the start of the function
if session_data["current_image"] is None:
return gr.update(visible=False), gr.update(value="All images have been annotated!"), gr.update(value="")
with lock:
image_id = session_data["current_image"]
match = re.search(r'_(\d+)\.', image_id)
image_2 = match.group(1).lstrip('0')
split = "dev" if image_2 in results else "train"
# Save caption or "skipped" based on user input
if caption.strip().lower() == "skip":
caption = "skipped"
# Get current annotation count
annotation_count = annotation_counts.get(image_id, 0)
# Add the new annotation as a new row to the dataset
new_data = Dataset.from_dict({
"image_id": [image_id],
"caption": [caption],
"annotation_count": [annotation_count + 1],
"split": [split]
}, features=Features({
'image_id': Value(dtype='string'),
'caption': Value(dtype='string'),
'annotation_count': Value(dtype='int32'),
'split': Value(dtype='string')
}))
# Update the annotation count in the dictionary
annotation_counts[image_id] = annotation_count + 1
# Concatenate with the existing dataset and push the updated dataset to Hugging Face
dataset = concatenate_datasets([dataset, new_data])
dataset = dataset.filter(lambda example: example['caption'].strip() != "")
dataset.push_to_hub(dataset_name)
print("Pushed updated dataset")
# # Clear user's current image if the validation image has been annotated twice
# if (split == "train" and annotation_count > 1) or (split == "dev" and annotation_count > 2):
session_data["current_image"] = None
# Fetch the next image
next_image = get_next_image(session_data)
if next_image:
next_caption = get_caption_for_image_id(os.path.basename(next_image)) # Retrieve the caption for the new image
print("Next image_id:", os.path.basename(next_image)) # Debugging line
return gr.update(value=next_image), gr.update(value=""), gr.update(value=next_caption or "")
else:
return gr.update(visible=False), gr.update(value="All images have been annotated!"), gr.update(value="")
def initialize_interface(session_data):
next_image = get_next_image(session_data)
if next_image:
next_caption = get_caption_for_image_id(os.path.basename(next_image)) # Retrieve caption for initial image
print("Initial image_id:", os.path.basename(next_image)) # Print the initial image_id
return gr.update(value=next_image), gr.update(value=next_caption or "")
else:
return gr.update(visible=False), gr.update(value="All images have been annotated!")
# Build the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Image Captioning Tool")
gr.Markdown("Please provide your caption in Egyptian Arabic 'Masri'")
session_data = gr.State({"current_image": None}) # Session-specific state
with gr.Row():
image = gr.Image()
caption = gr.Textbox(placeholder="Enter caption here...")
existing_caption = gr.Textbox(label="Existing Caption", interactive=False) # Display existing caption
submit = gr.Button("Submit", interactive=False) # Initially disabled
# Enable/disable the submit button based on word count
caption.change(fn=check_word_count, inputs=caption, outputs=submit)
# Define actions for buttons
submit.click(fn=save_annotation, inputs=[caption, session_data], outputs=[image, caption, existing_caption])
# Load initial image
demo.load(fn=initialize_interface, inputs=session_data, outputs=[image, existing_caption])
demo.launch(share=True) |