Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from seleniumwire import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.chrome.service import Service | |
| import time | |
| import shutil | |
| chrome_options = webdriver.ChromeOptions() | |
| chrome_options.add_argument('--headless') | |
| chrome_options.add_argument('--disable-gpu') | |
| chrome_options.add_argument('--no-sandbox') | |
| chrome_options.add_argument('--ignore-certificate-errors-spki-list') | |
| chrome_options.add_argument('--ignore-ssl-errors') | |
| chrome_options.add_argument('log-level=3') | |
| # CHROMEDRIVER_PATH = 'chromedriver.exe' | |
| def get_chromedriver_path(): | |
| return shutil.which('chromedriver') | |
| service = Service(executable_path=get_chromedriver_path()) | |
| driver = webdriver.Chrome(service=service, options=chrome_options) | |
| def get_address(search_term): | |
| lat, lon = [], [] | |
| try: | |
| print('HIT.......') | |
| driver.get("https://www.google.com/maps") | |
| time.sleep(3) | |
| driver.find_element(By.ID, "searchboxinput").send_keys(search_term) | |
| driver.find_element(By.ID, "searchbox-searchbutton").click() | |
| time.sleep(4) | |
| url = driver.current_url | |
| if '!3d' in url: | |
| print('--------------------------coor-----------------------------------') | |
| url = url.split('!3d')[1] | |
| url = url.split('!4d') | |
| lat.append(url[0]) | |
| lon.append(url[1].split('!')[0]) | |
| return lat[0], lon[0] | |
| else: | |
| time.sleep(0.25) | |
| f2 = driver.find_elements(By.XPATH, '//a')[1:] | |
| if f2: | |
| for k in f2: | |
| if k.get_attribute('aria-label') is None: | |
| continue | |
| if search_term in k.get_attribute('aria-label'): | |
| print('-------------------jump------------------------------') | |
| k.click() | |
| time.sleep(2.5) | |
| url = driver.current_url | |
| if '!3d' in url: | |
| url = url.split('!3d')[1] | |
| url = url.split('!4d') | |
| lat.append(url[0]) | |
| lon.append(url[1].split('!')[0]) | |
| break | |
| else: | |
| print('------------------------none--------------------------') | |
| lat.append('0') | |
| lon.append('0') | |
| break | |
| return lat[0], lon[0] | |
| else: | |
| print('------------------------fatal none--------------------------') | |
| lat.append('0') | |
| lon.append('0') | |
| time.sleep(0.6) | |
| return lat[0], lon[0] | |
| except Exception as e: | |
| # print("5 error") | |
| return e | |
| def main(): | |
| st.title("Address Finder") | |
| uploaded_file = st.file_uploader("Choose a CSV file", type=["csv"]) | |
| if uploaded_file is not None: | |
| df = pd.read_csv(uploaded_file) | |
| st.write("Process Started wait till progress Bar Start ! Note: 0 is Mutliple Location Error" ) | |
| latitudes, longitudes = [], [] | |
| add = df['address'].to_list() | |
| # Create progress bar | |
| progress_bar = st.progress(0) | |
| progress_text = st.empty() | |
| processed_data = [] | |
| for row in range(len(add)): | |
| try: | |
| result = get_address(add[row]) | |
| print('HIT Over....') | |
| if type(result) == str: | |
| st.error(f'{result}') | |
| break | |
| latitudes.append(float(result[0])) | |
| longitudes.append(float(result[1])) | |
| # Update progress bar | |
| progress_value = (row + 1) / len(df['address']) | |
| progress_bar.progress(progress_value) | |
| progress_text.text(f"Progress: {int(progress_value * 100)}%") | |
| # Add to processed_data | |
| processed_data.append({'Address': add[row], 'latitudes': result[0], 'Longitudes': result[1]}) | |
| # Display df2 on the side | |
| st.sidebar.title("Processed Data (df2)") | |
| st.sidebar.write(pd.DataFrame(processed_data)) | |
| except Exception as e: | |
| st.error(f'Error processing row {row + 1}: {str(e)}') | |
| break | |
| # Create df2 DataFrame | |
| df2 = pd.DataFrame(processed_data) | |
| st.success("Processing completed!") | |
| st.write("Final Processed Data:") | |
| st.write(df2) | |
| if __name__ == '__main__': | |
| main() | |