"""parsing.ipynb |
Automatically generated by Colaboratory. |
Original file is located at |
https://colab.research.google.com/drive/1thvkAz498jADcaVirJG91V-3-XBhdkq1 |
""" |
import requests |
from bs4 import BeautifulSoup |
import re |
import os |
import pandas as pd |
import numpy as np |
from tqdm import tqdm |
def get_transcripts_from_url(url): |
response = requests.get(url) |
soup = BeautifulSoup(response.content, 'html.parser') |
titles = soup.find_all('li') |
transcript_paths = [] |
for title in titles: |
a = title.find('a') |
path = a.get("href") |
transcript_paths.append("https://fangj.github.io/friends/" + path) |
return transcript_paths |
def get_text_from_html(url): |
path = url |
response = requests.get(path) |
html_content = response.text |
soup = BeautifulSoup(html_content, 'html.parser') |
transcript = soup.find_all('p') |
transcript_name = path.split("/")[-1].replace(".html", ".txt") |
with open(os.path.join("friends_raw_scripts", transcript_name), 'w', encoding='utf-8') as file: |
text = soup.get_text(strip=False).lower().replace("'", ""). replace('"', "").replace("\xa0", "") |
file.write(text + "\n") |
return transcript_name |
def clean_and_write_text(transcript_name): |
char = [] |
texts = [] |
flag = None |
pattern = re.compile(r'\b\w+:') |
with open(os.path.join("friends_raw_scripts", transcript_name), 'r', encoding='utf-8') as file: |
final_transcript = file.readlines() |
skip_lines = 10 |
pattern = re.compile(r'\b\w+:') |
scene_words = ["commercial break", "closing credits", "opening credits", "end"] |
for ind in range(1, len(final_transcript) - 1): |
final_list = [] |
pre_line = final_transcript[ind - 1].strip() |
cur_line = final_transcript[ind].strip() |
next_line = final_transcript[ind + 1].strip() |
next_condition = re.sub(r"\([^()]*\)|\[[^\[\]]*\]", '', next_line).strip() |
cur_conditon = re.sub(r"\([^()]*\)|\[[^\[\]]*\]", '', cur_line).strip() |
if sum([bool(pre_line), bool(cur_line), bool(next_line)]) == 1: |
continue |
elif cur_line in scene_words: |
continue |
elif "by:" in cur_line or "note:" in cur_line: |
continue |
elif "[" in cur_line or "]" in cur_line: |
continue |
elif not cur_conditon: |
continue |
elif pattern.search(cur_line) and flag == None: |
name, text = cur_line.split(":", maxsplit=1) |
char.append(name) |
text = re.sub(r'\([^)]*\)', '', text) |
text = text.strip() |
flag = "char" |
if pattern.search(next_line) or not next_condition or next_line in scene_words or "[" in next_line: |
texts.append(text) |
flag = None |
if len(char) != len(texts): |
print(ind) |
print(char[-1], texts[-1]) |
elif cur_line and flag == 'char': |
text += " " + cur_line |
if pattern.search(next_line) or not next_condition or next_line in scene_words or "[" in next_line: |
text = re.sub(r"\([^()]*\)|\[[^\[\]]*\]", '', text).strip() |
texts.append(text) |
flag = None |
if len(char) != len(texts): |
print(ind) |
print(char[-1], texts[-1]) |
new_name = "pre_" + transcript_name |
with open(os.path.join("friends_preprocessed_scripts", new_name), 'w', encoding='utf-8') as file: |
for c, d in zip(char, texts): |
file.write(f"{c}: {d}\n") |
raw_texts_exists = False |
transcript_paths = get_transcripts_from_url("https://fangj.github.io/friends/") |
transcript_paths[:10] |
os.makedirs("friends_preprocessed_scripts", exist_ok=True) |
os.makedirs("friends_raw_scripts", exist_ok=True) |
if not raw_texts_exists: |
print("Parse all scripts from this website https://fangj.github.io/friends/") |
for path in tqdm(transcript_paths, desc='Total'): |
transcript_name = get_text_from_html(path) |
clean_and_write_text(transcript_name) |
dir_list = [file for file in os.listdir("./friends_preprocessed_scripts")] |
def df_scripts(path): |
"""function take preprocessed_script.txt from dir and create dataframes""" |
chars = [] |
texts = [] |
with open(os.path.join("friends_preprocessed_scripts", path), 'r', encoding="utf-8") as file: |
for line in file: |
char, text = line.split(":", 1) |
chars.append(char) |
texts.append(text.strip().lower()) |
df_name = path.replace("prep_SP_", "df_").replace(".txt", ".csv") |
df = pd.DataFrame({'Characters': chars, 'Dialogs': texts}) |
df.to_csv(os.path.join("dataframes", "friends", df_name), index=False) |
os.makedirs("dataframes/friends", exist_ok=True) |
for preprocessed_script in dir_list: |
df_scripts(preprocessed_script) |
def collect_df(threshold=10): |
"""function concatenate dataframes in one single dataframe""" |
dfs = [] |
for file in os.listdir("dataframes/friends"): |
dfs.append(pd.read_csv(os.path.join("dataframes", "friends", file))) |
df = pd.concat(dfs, ignore_index=True).dropna().reset_index(drop=True) |
high_chars = df.Characters.value_counts() |
high_chars_ind = high_chars[high_chars > threshold].index |
df = df[df["Characters"].isin(high_chars_ind)] |
print(f"Number of characters in dataframe {len(df.Characters.value_counts())}") |
return df |
"""### Which most frequent characters we can meet in the movie""" |
def form_df(df, char): |
favorite_character_df = df[df.Characters == char] |
favorite_character_ind = favorite_character_df.index.tolist() |
text_to_favorite_character_ind = (np.array(favorite_character_ind) - 1).tolist() |
favorite_character_dialog = df[df.index.isin(favorite_character_ind)] |
text_to_favorite_character = df[df.index.isin(text_to_favorite_character_ind)] |
text_to_favorite_character = text_to_favorite_character[text_to_favorite_character["Characters"] != char] |
question_to_favorite_character = text_to_favorite_character |
question_to_favorite_character_ind = question_to_favorite_character.index.tolist() |
true_answers_ind = (np.array(question_to_favorite_character_ind) + 1).tolist() |
favorite_character_answer = favorite_character_dialog[favorite_character_dialog.index.isin(true_answers_ind)] |
favorite_character_answer.to_csv("favorite_character_answer.csv") |
question_to_favorite_character = question_to_favorite_character.rename( |
columns={"Characters": "questioner", "Dialogs": "question"}) |
favorite_character_answer = favorite_character_answer.rename(columns={"Characters": "answerer", "Dialogs": "answer"}) |
question_to_favorite_character.reset_index(inplace=True, drop=True) |
favorite_character_answer.reset_index(inplace=True, drop=True) |
df = pd.concat([question_to_favorite_character, favorite_character_answer], axis=1) |
return df |
def form_df_negative(df, df_char, char): |
true_label = pd.DataFrame({"label": np.ones(shape=len(df_char), dtype=np.int8)}) |
df_true_labels = pd.concat([df_char, true_label], axis=1) |
random_character_df = df[df.Characters != char].reset_index(drop=True) |
indices = np.random.choice(np.arange(len(random_character_df)), size=(len(df_true_labels)), replace=False) |
random_character_df = random_character_df[random_character_df.index.isin(indices)] |
df_negative_labels = df_true_labels.drop(columns="label", axis=1) |
df_negative_labels["answer"] = random_character_df["Dialogs"].reset_index(drop=True) |
df_negative_labels = df_negative_labels.rename(columns={"Dialogs": "question"}) |
negative_label = pd.DataFrame({"label": np.zeros(shape=len(df_char), dtype=np.int8)}) |
df_negative_labels = pd.concat([df_negative_labels, negative_label], axis=1) |
final_df = pd.concat([df_negative_labels, df_true_labels], axis=0) |
final_df = final_df.sample(frac=1).reset_index(drop=True) |
return final_df |
"""## Choose your favorite character""" |
df = collect_df(threshold=10) |
df.to_csv("full_trancscripts.csv", index=False) |
characters = ["rachel", "ross", "chandler", "monica", "joey", "phoebe"] |
for char in tqdm(characters): |
df_char = form_df(df, char) |
df_char.to_csv(char + "_friends.csv", index=False) |
df_char_label = form_df_negative(df, df_char, char) |
df_char_label.to_csv(char + "_friends_label.csv", index=False) |
print("script created") |