Spaces:
Runtime error
Runtime error
| from ..TikTokApi import TikTokApi | |
| from ..TikTokApi.api.user import User | |
| from ..TikTokApi.api.video import Video | |
| import asyncio | |
| import os | |
| import json | |
| from datetime import datetime | |
| import math | |
| import random | |
| from tqdm import tqdm | |
| # get your own ms_token from your cookies on tiktok.com | |
| ms_token = os.environ.get("ms_token", None) | |
| def debug(debug: bool = False): | |
| if debug: | |
| os.environ["DEBUG"] = "True" | |
| else: | |
| os.environ["DEBUG"] = "False" | |
| def openJson(path): | |
| try: | |
| with open(path, "r") as f: | |
| return json.loads(f.read()) | |
| except: | |
| raise Exception("Error opening json file") | |
| def saveJson(path, data): | |
| if not os.path.exists(os.path.dirname(path)): | |
| os.makedirs(os.path.dirname(path)) | |
| with open(path, "w") as f: | |
| f.write(json.dumps(data)) | |
| def openTxt(path): | |
| try: | |
| with open(path, "r") as f: | |
| return f.read().splitlines() | |
| except: | |
| raise Exception("Error opening txt file") | |
| def saveTxt(path, data): | |
| if not os.path.exists(os.path.dirname(path)): | |
| os.makedirs(os.path.dirname(path)) | |
| with open(path, "w") as f: | |
| f.write("\n".join(data)) | |
| def saveUserInfoInJson(username, data, hashtag="default"): | |
| saveJson(f"Data/JSON/Users/{hashtag}/{username}.json", data) | |
| def openUserInfoInJson(username, hashtag="default"): | |
| try: | |
| return openJson(f"Data/JSON/Users/{hashtag}/{username}.json") | |
| except: | |
| return None | |
| def compareUserDataViewsAndSaveWithMore(user1, user2): | |
| try: | |
| if user1["total_views"] > user2["total_views"]: | |
| return False | |
| else: | |
| return True | |
| except: | |
| print(f"Error comparing user data ") | |
| return True | |
| def debugPrint(text): | |
| #print(f"{datetime.now().strftime('%H:%M:%S.%f')}\t{text}") | |
| pass | |
| async def users_videos_with_hashtag(usernameList, hashtag, blackList: dict[list] = None, ms_token: str = None): | |
| ''' | |
| Asynchronous function that retrieves TikTok videos with a specific hashtag for a list of usernames, and saves the user's total views and total videos with the hashtag to a JSON file. | |
| Parameters: | |
| - `usernameList`: List of TikTok usernames to retrieve videos for. | |
| - `hashtag`: Hashtag to search for in the user's videos. | |
| - `blackList`: (Optional) Dictionary containing lists of usernames and video IDs to skip. | |
| - `ms_token`: (Optional) TikTok API access token. | |
| ''' | |
| async with TikTokApi() as api: | |
| debugPrint("Creating sessions") | |
| try: | |
| cookieFormLast: list = [openJson("Data/JSON/cookies.json")] | |
| except: | |
| print("No cookies found, creating new sessions") | |
| cookieFormLast = None | |
| await api.create_sessions(ms_tokens=[ms_token], | |
| num_sessions=1, | |
| sleep_after=20, | |
| headless=False, | |
| # executable_path="C:/Program Files/Google/Chrome/Application/chrome.exe", | |
| # browser="firefox", | |
| override_browser_args=[ | |
| "--disable-blink-features=AutomationControlled"], | |
| cookies=cookieFormLast, | |
| starting_url="https://www.tiktok.com/@tiltocacto0o" | |
| ) | |
| debugPrint("Sessions created") | |
| print(blackList.get("usernames", "")) | |
| for username in tqdm(usernameList): | |
| if username in blackList.get("usernames", ""): | |
| debugPrint( | |
| f"Skipping user {username} because it is in the blacklist") | |
| continue | |
| debugPrint(f"Getting user {username}") | |
| debugPrint(f"username = {username}") | |
| try: | |
| user: User = api.user(username=username) | |
| user_data = await user.info() | |
| except: | |
| print(f"Error getting user {username}") | |
| continue | |
| videosLen = user_data["userInfo"]["stats"]["videoCount"] | |
| debugPrint(f"videosLen = {videosLen} ") | |
| total_views = 0 | |
| total_videos_with_tag = 0 | |
| try: | |
| async for video in user.videos(count=videosLen): | |
| if video.id in blackList.get("videos", []): | |
| continue | |
| video: Video | |
| play_count = int(video.stats.get("playCount", 0)) | |
| if any(str(h.name).lower() == hashtag for h in video.hashtags): | |
| total_views += play_count | |
| total_videos_with_tag += 1 | |
| debugPrint(f"save {username} {total_views}") | |
| openUserInfoInJson(username=username, hashtag=hashtag) | |
| if compareUserDataViewsAndSaveWithMore( | |
| openUserInfoInJson(username=username, | |
| hashtag=hashtag), | |
| {"username": username, | |
| "total_views": total_views, | |
| "total_videos_with_tag": total_videos_with_tag} | |
| ): | |
| saveUserInfoInJson(username=username, | |
| data={ | |
| "username": username, "total_views": total_views, "total_videos_with_tag": total_videos_with_tag}, | |
| hashtag=hashtag) | |
| else: | |
| print(f"skip {username} {total_views}") | |
| except Exception as e: | |
| print(f"Error getting videos for user {username}") | |
| print(e) | |
| continue | |
| await asyncio.sleep(random.uniform(0.5, 1.5)) | |
| debugPrint("Closing sessions") | |
| cookietosave = await api.get_session_cookies(api.sessions[0]) | |
| saveJson("Data/JSON/cookies.json", cookietosave) | |
| await api.close_sessions() | |
| await api.stop_playwright() | |
| if __name__ == "__main__": | |
| os.environ["DEBUG"] = "True" | |
| # print(os.environ.pop("DEBUG", False)) | |
| usernameList = openTxt("Data/TXT/cacto0o.txt") | |
| hashtag = "костиккакто" | |
| blackList = openJson("Data/JSON/blackList.json") | |
| asyncio.run(users_videos_with_hashtag( | |
| usernameList=usernameList, hashtag=hashtag, blackList=blackList)) | |