|
import gradio as gr |
|
from docx import Document |
|
import pandas as pd |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import os |
|
import csv |
|
import time |
|
import pickle |
|
import logging |
|
from nltk.tokenize import word_tokenize |
|
from nltk.corpus import stopwords |
|
import string |
|
from selenium import webdriver |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from selenium.common.exceptions import NoSuchElementException, TimeoutException |
|
|
|
class LinkedInBot: |
|
def __init__(self, delay=5): |
|
if not os.path.exists("data"): |
|
os.makedirs("data") |
|
self.delay = delay |
|
self.driver = webdriver.Chrome() |
|
|
|
def login(self, email, password): |
|
"""Go to LinkedIn and login""" |
|
self.driver.maximize_window() |
|
self.driver.get('https://www.linkedin.com/login') |
|
self.driver.find_element(By.ID, 'username').send_keys(email) |
|
self.driver.find_element(By.ID, 'password').send_keys(password) |
|
self.driver.find_element(By.XPATH, "//button[@type='submit']").click() |
|
|
|
def save_cookie(self, path): |
|
with open(path, 'wb') as filehandler: |
|
pickle.dump(self.driver.get_cookies(), filehandler) |
|
|
|
def load_cookie(self, path): |
|
with open(path, 'rb') as cookiesfile: |
|
cookies = pickle.load(cookiesfile) |
|
for cookie in cookies: |
|
self.driver.add_cookie(cookie) |
|
|
|
def search_linkedin(self, keywords, location, date_posted): |
|
"""Enter keywords into the search bar""" |
|
self.driver.get("https://www.linkedin.com/jobs/") |
|
self.driver.get(f"https://www.linkedin.com/jobs/search/?keywords={keywords}&location={location}&f_TPR={date_posted}") |
|
|
|
def wait(self, by=By.ID, text=None, t_delay=None, max_retries=3): |
|
"""Wait until a specific element is present on the page.""" |
|
delay = self.delay if t_delay is None else t_delay |
|
retries = 0 |
|
while retries < max_retries: |
|
try: |
|
WebDriverWait(self.driver, delay).until(EC.presence_of_element_located((by, text))) |
|
return |
|
except TimeoutException: |
|
retries += 1 |
|
logging.warning(f"Element not found, retrying... ({retries}/{max_retries})") |
|
time.sleep(delay) |
|
logging.error("Element not found after retries.") |
|
|
|
def scroll_to(self, job_list_item): |
|
"""Scroll to the list item in the column and click on it.""" |
|
self.driver.execute_script("arguments[0].scrollIntoView();", job_list_item) |
|
job_list_item.click() |
|
|
|
def extract_additional_details(self, job): |
|
"""Extracts additional details like company size, position level, salary, job type, industry, and skills if available.""" |
|
company_size = None |
|
position_level = None |
|
salary = None |
|
job_type = None |
|
industry = None |
|
skills = None |
|
|
|
try: |
|
additional_info = job.find_element(By.CLASS_NAME, "job-details-jobs-unified-top-card__job-insight") |
|
|
|
|
|
salary_element = additional_info.find_element(By.XPATH, ".//span[contains(@class, 'job-details-jobs-unified-top-card__job-insight-view-model-secondary')]") |
|
salary = salary_element.text.strip() |
|
|
|
|
|
for span in additional_info.find_elements(By.XPATH, ".//span[contains(@class, 'job-details-jobs-unified-top-card__job-insight-view-model-secondary')]"): |
|
text = span.text.strip() |
|
if "Hybrid" in text: |
|
job_type = text |
|
elif "Full-time" in text: |
|
job_type = text |
|
elif "Mid-Senior level" in text: |
|
position_level = text |
|
else: |
|
industry = text |
|
|
|
|
|
company_info = additional_info.find_element(By.XPATH, ".//span") |
|
company_info_text = company_info.text.strip() |
|
if "employees" in company_info_text: |
|
company_size = company_info_text.split(" · ")[0] |
|
industry = company_info_text.split(" · ")[1] |
|
else: |
|
industry = company_info_text |
|
|
|
|
|
skills_button = additional_info.find_element(By.CLASS_NAME, "job-details-jobs-unified-top-card__job-insight-text-button") |
|
skills_link = skills_button.find_element(By.TAG_NAME, "a") |
|
skills = skills_link.text.split(": ")[1] |
|
|
|
except NoSuchElementException: |
|
pass |
|
|
|
return company_size, position_level, salary, job_type, industry, skills |
|
|
|
def get_position_data(self, job): |
|
"""Gets the position data for a posting.""" |
|
job_info = job.text.split('\n') |
|
if len(job_info) < 3: |
|
logging.warning("Incomplete job information, skipping...") |
|
return None |
|
|
|
position, company, *details = job_info |
|
location = details[0] if details else None |
|
description = self.get_job_description(job) |
|
|
|
return [position, company, location, description] |
|
|
|
|
|
def extract_additional_details(self, job): |
|
"""Extracts additional details like company size, position level, salary, and job type if available.""" |
|
company_size = None |
|
position_level = None |
|
salary = None |
|
job_type = None |
|
|
|
try: |
|
additional_info = job.find_element(By.CLASS_NAME, "job-card-search__company-size").text |
|
if "employees" in additional_info: |
|
company_size = additional_info.strip() |
|
except NoSuchElementException: |
|
pass |
|
|
|
try: |
|
position_level = job.find_element(By.CLASS_NAME, "job-card-search__badge").text |
|
except NoSuchElementException: |
|
pass |
|
|
|
try: |
|
salary = job.find_element(By.CLASS_NAME, "job-card-search__salary").text |
|
except NoSuchElementException: |
|
pass |
|
|
|
try: |
|
job_type = job.find_element(By.CLASS_NAME, "job-card-search__job-type").text |
|
except NoSuchElementException: |
|
pass |
|
|
|
return company_size, position_level, salary, job_type |
|
|
|
def get_job_description(self, job): |
|
"""Gets the job description.""" |
|
self.scroll_to(job) |
|
try: |
|
description_element = self.driver.find_element(By.CLASS_NAME, "jobs-description") |
|
description = description_element.text |
|
except NoSuchElementException: |
|
description = None |
|
return description |
|
|
|
def get_application_link(self, job): |
|
"""Gets the job application link.""" |
|
try: |
|
application_link_element = job.find_element(By.CLASS_NAME, "job-card-search__apply-button-container").find_element(By.TAG_NAME, "a") |
|
application_link = application_link_element.get_attribute("href") |
|
except NoSuchElementException: |
|
application_link = None |
|
return application_link |
|
|
|
def run(self, email, password, keywords, location, date_posted): |
|
if os.path.exists("data/cookies.txt"): |
|
self.driver.get("https://www.linkedin.com/") |
|
self.load_cookie("data/cookies.txt") |
|
self.driver.get("https://www.linkedin.com/") |
|
else: |
|
self.login(email=email, password=password) |
|
self.save_cookie("data/cookies.txt") |
|
|
|
logging.info("Begin LinkedIn keyword search") |
|
self.search_linkedin(keywords, location, date_posted) |
|
self.wait() |
|
|
|
csv_file_path = os.path.join("data", "data.csv") |
|
with open(csv_file_path, "w", newline="", encoding="utf-8") as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow(["Position", "Company", "Location", "Description"]) |
|
|
|
page = 1 |
|
while True: |
|
jobs = self.driver.find_elements(By.CLASS_NAME, "occludable-update") |
|
for job in jobs: |
|
job_data = self.get_position_data(job) |
|
if job_data: |
|
position, company, location, description = job_data |
|
writer.writerow([position, company, location, description]) |
|
|
|
next_button_xpath = f"//button[@aria-label='Page {page + 1}']" |
|
next_button = self.driver.find_elements(By.XPATH, next_button_xpath) |
|
if next_button: |
|
next_button[0].click() |
|
self.wait() |
|
page += 1 |
|
else: |
|
break |
|
|
|
logging.info("Done scraping.") |
|
logging.info("Closing session.") |
|
self.close_session() |
|
|
|
def close_session(self): |
|
"""Close the actual session""" |
|
logging.info("Closing session") |
|
self.driver.close() |
|
|
|
|
|
def extract_keywords(text): |
|
|
|
tokens = word_tokenize(text.lower()) |
|
|
|
stopwords_list = set(stopwords.words("english")) |
|
tokens = [token for token in tokens if token not in stopwords_list and token not in string.punctuation] |
|
return tokens |
|
|
|
|
|
def process_resume(uploaded_file): |
|
docx = Document(uploaded_file.name) |
|
resume_text = "" |
|
for paragraph in docx.paragraphs: |
|
resume_text += paragraph.text + "\n" |
|
return resume_text |
|
|
|
def keyword_similarity_check(resume_text, df, keywords): |
|
vectorizer = TfidfVectorizer() |
|
job_descriptions = df["Description"].fillna("") |
|
tfidf_matrix = vectorizer.fit_transform(job_descriptions) |
|
|
|
|
|
resume_keywords = extract_keywords(resume_text) |
|
job_description_keywords = [extract_keywords(desc) for desc in job_descriptions] |
|
|
|
|
|
common_keywords_count = sum(1 for keyword in resume_keywords if keyword in keywords) |
|
job_common_keywords_counts = [sum(1 for keyword in job_keywords if keyword in keywords) for job_keywords in job_description_keywords] |
|
|
|
|
|
similarity_scores = [count / len(keywords) * 100 for count in job_common_keywords_counts] |
|
df["Similarity (%)"] = similarity_scores |
|
df.to_csv("data/data.csv", index=False) |
|
return df |
|
|
|
def cosine_similarity_check(resume_text, df): |
|
vectorizer = TfidfVectorizer() |
|
job_descriptions = df["Description"].fillna("") |
|
tfidf_matrix = vectorizer.fit_transform(job_descriptions) |
|
resume_tfidf = vectorizer.transform([resume_text]) |
|
similarity_scores = cosine_similarity(resume_tfidf, tfidf_matrix)[0] |
|
df["Similarity (%)"] = similarity_scores * 100 |
|
df.to_csv("data/data.csv", index=False) |
|
return df |
|
|
|
def main(email, password, keywords, location, date_posted, resume_file): |
|
bot = LinkedInBot() |
|
bot.run(email, password, keywords, location, date_posted) |
|
|
|
df = pd.read_csv("data/data.csv") |
|
|
|
if resume_file: |
|
resume_text = process_resume(resume_file) |
|
keywords = extract_keywords(resume_text) |
|
df = keyword_similarity_check(resume_text, df, keywords) |
|
df = cosine_similarity_check(resume_text, df) |
|
|
|
return df |
|
|
|
iface = gr.Interface(fn=main, |
|
inputs=["text", "text", "text", "text", "text", "file"], |
|
outputs="csv", |
|
title="LinkedIn Job Analysis", |
|
description="Enter your LinkedIn credentials and search criteria to scrape job postings. Upload a resume to check for job similarity.") |
|
iface.launch() |
|
|