Spaces:
Running
Running
import torch | |
from torch.utils.data import Dataset | |
import pandas as pd | |
import numpy as np | |
import tqdm | |
import random | |
from .vocab import Vocab | |
import pickle | |
import copy | |
import os | |
class TokenizerDataset(Dataset): | |
""" | |
Class name: TokenizerDataset | |
Tokenize the data in the dataset | |
Feat length: 17 | |
""" | |
def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
self.dataset_path = dataset_path | |
self.label_path = label_path | |
self.vocab = vocab # Vocab object | |
# Related to input dataset file | |
self.lines = [] | |
self.labels = [] | |
self.feats = [] | |
if self.label_path: | |
self.label_file = open(self.label_path, "r") | |
for line in self.label_file: | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
self.labels.append(int(line)) | |
self.label_file.close() | |
# Comment this section if you are not using feat attribute | |
try: | |
j = 0 | |
dataset_info_file = open(self.label_path.replace("label", "info"), "r") | |
for line in dataset_info_file: | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
# # highGRschool_w_prior | |
# feat_vec = [float(i) for i in line.split(",")[-3].split("\t")] | |
# highGRschool_w_prior_w_diffskill_wo_fa | |
feat_vec = [float(i) for i in line.split(",")[-3].split("\t")] | |
feat2 = [float(i) for i in line.split(",")[-2].split("\t")] | |
feat_vec.extend(feat2[1:]) | |
if j == 0: | |
print(len(feat_vec)) | |
j+=1 | |
self.feats.append(feat_vec) | |
dataset_info_file.close() | |
except Exception as e: | |
print(e) | |
self.file = open(self.dataset_path, "r") | |
for line in self.file: | |
if line: | |
line = line.strip() | |
if line: | |
self.lines.append(line) | |
self.file.close() | |
self.len = len(self.lines) | |
self.seq_len = seq_len | |
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
def __len__(self): | |
return self.len | |
def __getitem__(self, item): | |
org_line = self.lines[item].split("\t") | |
dup_line = [] | |
opt = False | |
for l in org_line: | |
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
opt = True | |
if opt and 'FinalAnswer-' in l: | |
dup_line.append('[UNK]') | |
else: | |
dup_line.append(l) | |
dup_line = "\t".join(dup_line) | |
# print(dup_line) | |
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
s1_label = self.labels[item] if self.label_path else 0 | |
segment_label = [1 for _ in range(len(s1))] | |
s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
s1.extend(padding), segment_label.extend(padding) | |
output = {'input': s1, | |
'label': s1_label, | |
'feat': s1_feat, | |
'segment_label': segment_label} | |
return {key: torch.tensor(value) for key, value in output.items()} | |
class TokenizerwSkillsDataset(Dataset): | |
""" | |
Feature length: 17 | |
""" | |
def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
print(f"dataset_path: {dataset_path}") | |
print(f"label_path: {label_path}") | |
self.dataset_path = dataset_path | |
self.label_path = label_path | |
self.vocab = vocab # Vocab object | |
self.seq_len = seq_len | |
# Related to input dataset file | |
self.lines = [] | |
self.labels = [] | |
self.feats = [] | |
selected_lines = [] | |
print("TokenizerwSkillsDataset...............................") | |
if self.label_path: | |
# Comment this section if you are not using feat attribute | |
dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines() | |
print(">>>>>>>>>>>>>>>>>", len(dataset_info_file)) | |
j = 0 | |
for idex, line in enumerate(dataset_info_file): | |
try: | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
feat_vec = [float(i) for i in line.split(",")[-9].split("\t")] | |
feat2 = [float(i) for i in line.split(",")[-8].split("\t")] | |
feat_vec.extend(feat2[1:]) | |
if j == 0: | |
print(";;;;", len(feat_vec), feat_vec) | |
j+=1 | |
self.feats.append(feat_vec) | |
selected_lines.append(idex) | |
except Exception as e: | |
print("................>") | |
print(e) | |
print("Error at index: ", idex) | |
self.label_file = open(self.label_path, "r") | |
for idex, line in enumerate(self.label_file): | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
if idex in selected_lines: | |
self.labels.append(int(line)) | |
# self.labels.append(int(line)) | |
self.label_file.close() | |
self.file = open(self.dataset_path, "r") | |
for idex, line in enumerate(self.file): | |
if line: | |
line = line.strip() | |
if line: | |
if idex in selected_lines: | |
self.lines.append(line) | |
# self.lines.append(line) | |
self.file.close() | |
self.len = len(self.lines) | |
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
def __len__(self): | |
return self.len | |
def __getitem__(self, item): | |
org_line = self.lines[item].split("\t") | |
dup_line = [] | |
opt = False | |
for l in org_line: | |
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
opt = True | |
if opt and 'FinalAnswer-' in l: | |
dup_line.append('[UNK]') | |
else: | |
dup_line.append(l) | |
dup_line = "\t".join(dup_line) | |
# print(dup_line) | |
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
s1_label = self.labels[item] if self.label_path else 0 | |
segment_label = [1 for _ in range(len(s1))] | |
s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
s1.extend(padding), segment_label.extend(padding) | |
# print(s1_feat) | |
output = {'input': s1, | |
'label': s1_label, | |
'feat': s1_feat, | |
'segment_label': segment_label} | |
return {key: torch.tensor(value) for key, value in output.items()} | |
class TokenizerwTimeDataset(Dataset): | |
""" | |
Feature length: 4 | |
""" | |
def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
print(f"dataset_path: {dataset_path}") | |
print(f"label_path: {label_path}") | |
self.dataset_path = dataset_path | |
self.label_path = label_path | |
self.vocab = vocab # Vocab object | |
self.seq_len = seq_len | |
# Related to input dataset file | |
self.lines = [] | |
self.labels = [] | |
self.feats = [] | |
selected_lines = [] | |
print("TokenizerwTimeDataset...............................") | |
time_df = pickle.load(open("ratio_proportion_change3_2223/sch_largest_100-coded/time_info/full_data_normalized_time.pkl", "rb")) | |
print("time: ?? ", time_df.shape) | |
if self.label_path: | |
# Comment this section if you are not using feat attribute | |
dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines() | |
print(">>>>>>>>>>>>>>>>>", len(dataset_info_file)) | |
j = 0 | |
for idex, line in enumerate(dataset_info_file): | |
try: | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
feat_vec = [] | |
sch = line.split(",")[0] | |
stu = line.split(",")[2] | |
progress = line.split(",")[3] | |
prob_id = line.split(",")[4] | |
total_time = time_df.loc[(sch, stu, progress, prob_id)]['total_time'].item() | |
faopt_time = time_df.loc[(sch, stu, progress, prob_id)]['faopt_time'].item() | |
opt_time = time_df.loc[(sch, stu, progress, prob_id)]['opt_time'].item() | |
nonopt_time = time_df.loc[(sch, stu, progress, prob_id)]['nonopt_time'].item() | |
feat_vec.append(faopt_time) | |
feat_vec.append(total_time) | |
feat_vec.append(opt_time) | |
feat_vec.append(nonopt_time) | |
if j == 0: | |
print(";;;;", len(feat_vec), feat_vec) | |
j+=1 | |
self.feats.append(feat_vec) | |
selected_lines.append(idex) | |
except Exception as e: | |
print("................>") | |
print(e) | |
print("Error at index: ", idex) | |
self.label_file = open(self.label_path, "r") | |
for idex, line in enumerate(self.label_file): | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
if idex in selected_lines: | |
self.labels.append(int(line)) | |
# self.labels.append(int(line)) | |
self.label_file.close() | |
self.file = open(self.dataset_path, "r") | |
for idex, line in enumerate(self.file): | |
if line: | |
line = line.strip() | |
if line: | |
if idex in selected_lines: | |
self.lines.append(line) | |
# self.lines.append(line) | |
self.file.close() | |
self.len = len(self.lines) | |
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
def __len__(self): | |
return self.len | |
def __getitem__(self, item): | |
org_line = self.lines[item].split("\t") | |
dup_line = [] | |
opt = False | |
for l in org_line: | |
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
opt = True | |
if opt and 'FinalAnswer-' in l: | |
dup_line.append('[UNK]') | |
else: | |
dup_line.append(l) | |
dup_line = "\t".join(dup_line) | |
# print(dup_line) | |
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
s1_label = self.labels[item] if self.label_path else 0 | |
segment_label = [1 for _ in range(len(s1))] | |
s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
s1.extend(padding), segment_label.extend(padding) | |
# print(s1_feat) | |
output = {'input': s1, | |
'label': s1_label, | |
'feat': s1_feat, | |
'segment_label': segment_label} | |
return {key: torch.tensor(value) for key, value in output.items()} | |
class TokenizerwSkillsTimeDataset(Dataset): | |
""" | |
Feature length: 17+4 = 21 | |
""" | |
def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
print(f"dataset_path: {dataset_path}") | |
print(f"label_path: {label_path}") | |
self.dataset_path = dataset_path | |
self.label_path = label_path | |
self.vocab = vocab # Vocab object | |
self.seq_len = seq_len | |
# Related to input dataset file | |
self.lines = [] | |
self.labels = [] | |
self.feats = [] | |
selected_lines = [] | |
print("TokenizerwSkillsTimeDataset...............................") | |
time_df = pickle.load(open("ratio_proportion_change3_2223/sch_largest_100-coded/time_info/full_data_normalized_time.pkl", "rb")) | |
print("time: ", time_df.shape) | |
if self.label_path: | |
# Comment this section if you are not using feat attribute | |
dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines() | |
print(">>>>>>>>>>>>>>>>>", len(dataset_info_file)) | |
j = 0 | |
for idex, line in enumerate(dataset_info_file): | |
try: | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
feat_vec = [float(i) for i in line.split(",")[-9].split("\t")] | |
feat2 = [float(i) for i in line.split(",")[-8].split("\t")] | |
feat_vec.extend(feat2[1:]) | |
sch = line.split(",")[0] | |
stu = line.split(",")[2] | |
progress = line.split(",")[3] | |
prob_id = line.split(",")[4] | |
total_time = time_df.loc[(sch, stu, progress, prob_id)]['total_time'].item() | |
faopt_time = time_df.loc[(sch, stu, progress, prob_id)]['faopt_time'].item() | |
opt_time = time_df.loc[(sch, stu, progress, prob_id)]['opt_time'].item() | |
nonopt_time = time_df.loc[(sch, stu, progress, prob_id)]['nonopt_time'].item() | |
feat_vec.append(faopt_time) | |
feat_vec.append(total_time) | |
feat_vec.append(opt_time) | |
feat_vec.append(nonopt_time) | |
if j == 0: | |
print(";;;;", len(feat_vec), feat_vec) | |
j+=1 | |
self.feats.append(feat_vec) | |
selected_lines.append(idex) | |
except Exception as e: | |
print("................>") | |
print(e) | |
print("Error at index: ", idex) | |
self.label_file = open(self.label_path, "r") | |
for idex, line in enumerate(self.label_file): | |
if line: | |
line = line.strip() | |
if not line: | |
continue | |
if idex in selected_lines: | |
self.labels.append(int(line)) | |
# self.labels.append(int(line)) | |
self.label_file.close() | |
self.file = open(self.dataset_path, "r") | |
for idex, line in enumerate(self.file): | |
if line: | |
line = line.strip() | |
if line: | |
if idex in selected_lines: | |
self.lines.append(line) | |
# self.lines.append(line) | |
self.file.close() | |
self.len = len(self.lines) | |
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
def __len__(self): | |
return self.len | |
def __getitem__(self, item): | |
org_line = self.lines[item].split("\t") | |
dup_line = [] | |
opt = False | |
for l in org_line: | |
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
opt = True | |
if opt and 'FinalAnswer-' in l: | |
dup_line.append('[UNK]') | |
else: | |
dup_line.append(l) | |
dup_line = "\t".join(dup_line) | |
# print(dup_line) | |
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
s1_label = self.labels[item] if self.label_path else 0 | |
segment_label = [1 for _ in range(len(s1))] | |
s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
s1.extend(padding), segment_label.extend(padding) | |
# print(s1_feat) | |
output = {'input': s1, | |
'label': s1_label, | |
'feat': s1_feat, | |
'segment_label': segment_label} | |
return {key: torch.tensor(value) for key, value in output.items()} |