hahunavth commited on
Commit
edf61c6
1 Parent(s): 2a8b18d
.idea/vcs.xml ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ <?xml version="1.0" encoding="UTF-8"?>
2
+ <project version="4">
3
+ <component name="VcsDirectoryMappings">
4
+ <mapping directory="$PROJECT_DIR$" vcs="Git" />
5
+ </component>
6
+ </project>
__pycache__/google_sheet.cpython-310.pyc ADDED
Binary file (3.07 kB). View file
 
__pycache__/kaggle_service.cpython-310.pyc ADDED
Binary file (15.4 kB). View file
 
__pycache__/logger.cpython-310.pyc ADDED
Binary file (2.17 kB). View file
 
google_sheet.py CHANGED
@@ -2,6 +2,7 @@ import gspread
2
  from oauth2client.service_account import ServiceAccountCredentials
3
  from typing import Dict
4
 
 
5
  class SheetCRUDRepository:
6
  def __init__(self, worksheet):
7
  self.worksheet = worksheet
 
2
  from oauth2client.service_account import ServiceAccountCredentials
3
  from typing import Dict
4
 
5
+
6
  class SheetCRUDRepository:
7
  def __init__(self, worksheet):
8
  self.worksheet = worksheet
kaggle_service.py CHANGED
@@ -35,7 +35,8 @@ class KaggleApiWrapper(KaggleApi):
35
  config = super().read_config_environment(config_data, quiet)
36
  config['username'] = self.username
37
  config['key'] = self.secret
38
- config['proxy'] = "http://proxy.server:3128"
 
39
 
40
  return config_data
41
 
@@ -44,8 +45,24 @@ class KaggleApiWrapper(KaggleApi):
44
  pass
45
 
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  class KaggleNotebook:
48
- def __init__(self, api: KaggleApi, kernel_slug: str, container_path: str = "./tmp"):
49
  """
50
  :param api: KaggleApi
51
  :param kernel_slug: Notebook id, you can find it in the url of the notebook.
@@ -55,6 +72,9 @@ class KaggleNotebook:
55
  self.api = api
56
  self.kernel_slug = kernel_slug
57
  self.container_path = container_path
 
 
 
58
 
59
  def status(self) -> str or None:
60
  """
@@ -68,7 +88,11 @@ class KaggleNotebook:
68
  res = self.api.kernels_status(self.kernel_slug)
69
  print(f"Status: {res}")
70
  if res is None:
 
 
71
  return None
 
 
72
  return res['status']
73
 
74
  def _get_local_nb_path(self) -> str:
@@ -114,26 +138,29 @@ class KaggleNotebook:
114
  return json.loads(open(metadata_path).read())
115
 
116
  def check_nb_permission(self) -> Union[tuple[bool], tuple[bool, None]]:
117
- try:
118
- status = self.status()
119
- if status is None:
120
- return False, status
121
- return True, status
122
- except ApiException as e:
123
- print(f"Error: {e.status} {e.reason} with notebook {self.kernel_slug}")
124
- return False, None
125
 
126
  def check_datasets_permission(self) -> bool:
127
  meta = self.get_metadata()
128
  if meta is None:
129
  print("Warn: cannot get metadata. Pull and try again?")
130
  dataset_sources = meta['dataset_sources']
 
 
131
  for dataset in dataset_sources:
132
  try:
133
  self.api.dataset_status(dataset)
134
  except ApiException as e:
135
  print(f"Error: {e.status} {e.reason} with dataset {dataset} in notebook {self.kernel_slug}")
136
- return False
 
 
 
 
137
  return True
138
 
139
 
@@ -202,7 +229,7 @@ class AccountTransactionManager:
202
 
203
 
204
  class NbJob:
205
- def __init__(self, acc_dict: dict, nb_slug: str, rerun_stt: List[str] = None, not_rerun_stt: List[str] = None):
206
  """
207
 
208
  :param acc_dict:
@@ -222,6 +249,7 @@ class NbJob:
222
 
223
  self.acc_dict = acc_dict
224
  self.nb_slug = nb_slug
 
225
 
226
  def get_acc_dict(self):
227
  return self.acc_dict
@@ -230,17 +258,21 @@ class NbJob:
230
  return list(self.acc_dict.keys())
231
 
232
  def is_valid_with_acc(self, api):
233
- notebook = KaggleNotebook(api, self.nb_slug)
 
 
 
 
 
 
234
  try:
235
- notebook.pull()
 
 
236
  except ApiException as e:
237
- return False
238
- stt, _ = notebook.check_nb_permission()
239
- if not stt:
240
- return False
241
- stt = notebook.check_datasets_permission()
242
- if not stt:
243
- return False
244
  return True
245
 
246
  def is_valid(self):
@@ -260,7 +292,7 @@ class NbJob:
260
  :raises
261
  Exception if setup error
262
  """
263
- notebook = KaggleNotebook(api, self.nb_slug, "./tmp") # todo: change hardcode container_path here
264
 
265
  notebook.pull()
266
  assert notebook.check_datasets_permission(), f"User {api} does not have permission on datasets of notebook {self.nb_slug}"
@@ -277,7 +309,7 @@ class NbJob:
277
  sheet_logger.log(username=api.username, nb=self.nb_slug, log="Try but no effect. Seem account to be out of quota")
278
  return False
279
  if status3 in self.not_rerun_stt:
280
- sheet_logger.log(username=api.username, nb=self.nb_slug, log=f"Notebook is in ignore status list {self.not_rerun_stt}, do nothing!")
281
  return True
282
  if status3 not in ["queued", "running"]:
283
  # return False # todo: check when user is out of quota
@@ -291,8 +323,8 @@ class NbJob:
291
  return True
292
 
293
  @staticmethod
294
- def from_dict(obj: dict):
295
- return NbJob(acc_dict=obj['accounts'], nb_slug=obj['slug'], rerun_stt=obj.get('rerun_status'), not_rerun_stt=obj.get('not_rerun_stt'))
296
 
297
 
298
  class KernelRerunService:
@@ -334,12 +366,24 @@ class KernelRerunService:
334
  print(f"Using username: {api.username}")
335
 
336
  for job in self.jobs.values():
 
337
  if username in job.get_username_list():
338
  print(f"Validate user: {username}, job: {job.nb_slug}")
339
- if not job.is_valid_with_acc(api):
 
 
 
340
  print(f"Error: not valid")
341
  a = f"Setup error: {username} does not have permission on notebook {job.nb_slug} or related datasets"
342
- raise Exception(a)
 
 
 
 
 
 
 
 
343
  release()
344
  return True
345
 
@@ -349,7 +393,7 @@ class KernelRerunService:
349
  api, release = self.acc_manager.get_unlocked_api_unblocking(job.get_username_list())
350
  api.authenticate()
351
  print(f"Using username: {api.username}")
352
- notebook = KaggleNotebook(api, job.nb_slug)
353
  print(f"Notebook: {notebook.kernel_slug}")
354
  print(notebook.status())
355
  release()
 
35
  config = super().read_config_environment(config_data, quiet)
36
  config['username'] = self.username
37
  config['key'] = self.secret
38
+ # only work for pythonanyware
39
+ # config['proxy'] = "http://proxy.server:3128"
40
 
41
  return config_data
42
 
 
45
  pass
46
 
47
 
48
+ class ValidateException(Exception):
49
+ def __init__(self, message: str):
50
+ super(ValidateException, self).__init__(message)
51
+
52
+ @staticmethod
53
+ def from_api_exception(e: ApiException, kernel_slug: str):
54
+ return ValidateException(f"Error: {e.status} {e.reason} with notebook {kernel_slug}")
55
+
56
+ @staticmethod
57
+ def from_api_exception_list(el: List[ApiException], kernel_slug_list: List[str]):
58
+ message = f"Error: \n"
59
+ for e, k in zip(el, kernel_slug_list):
60
+ message = message + f"\t{e.status} {e.reason} with notebook {k}"
61
+ return ValidateException(message)
62
+
63
+
64
  class KaggleNotebook:
65
+ def __init__(self, api: KaggleApi, kernel_slug: str, container_path: str = "./tmp", id=None):
66
  """
67
  :param api: KaggleApi
68
  :param kernel_slug: Notebook id, you can find it in the url of the notebook.
 
72
  self.api = api
73
  self.kernel_slug = kernel_slug
74
  self.container_path = container_path
75
+ self.id = id
76
+ if self.id is None:
77
+ print(f"Warn: {self.__class__.__name__}.id is None")
78
 
79
  def status(self) -> str or None:
80
  """
 
88
  res = self.api.kernels_status(self.kernel_slug)
89
  print(f"Status: {res}")
90
  if res is None:
91
+ if self.id is not None:
92
+ sheet_logger.update_job_status(self.id, notebook_status='None')
93
  return None
94
+ if self.id is not None:
95
+ sheet_logger.update_job_status(self.id, notebook_status=res['status'])
96
  return res['status']
97
 
98
  def _get_local_nb_path(self) -> str:
 
138
  return json.loads(open(metadata_path).read())
139
 
140
  def check_nb_permission(self) -> Union[tuple[bool], tuple[bool, None]]:
141
+ status = self.status() # raise ApiException
142
+ if status is None:
143
+ return False, status
144
+ return True, status
145
+
 
 
 
146
 
147
  def check_datasets_permission(self) -> bool:
148
  meta = self.get_metadata()
149
  if meta is None:
150
  print("Warn: cannot get metadata. Pull and try again?")
151
  dataset_sources = meta['dataset_sources']
152
+ ex_list = []
153
+ slugs = []
154
  for dataset in dataset_sources:
155
  try:
156
  self.api.dataset_status(dataset)
157
  except ApiException as e:
158
  print(f"Error: {e.status} {e.reason} with dataset {dataset} in notebook {self.kernel_slug}")
159
+ ex_list.append(e)
160
+ slugs.append(self.kernel_slug)
161
+ # return False
162
+ if len(ex_list) > 0:
163
+ raise ValidateException.from_api_exception_list(ex_list, slugs)
164
  return True
165
 
166
 
 
229
 
230
 
231
  class NbJob:
232
+ def __init__(self, acc_dict: dict, nb_slug: str, rerun_stt: List[str] = None, not_rerun_stt: List[str] = None, id=None):
233
  """
234
 
235
  :param acc_dict:
 
249
 
250
  self.acc_dict = acc_dict
251
  self.nb_slug = nb_slug
252
+ self.id = id
253
 
254
  def get_acc_dict(self):
255
  return self.acc_dict
 
258
  return list(self.acc_dict.keys())
259
 
260
  def is_valid_with_acc(self, api):
261
+ """
262
+ :param api:
263
+ :return:
264
+ :raise: ValidationException
265
+ """
266
+ notebook = KaggleNotebook(api, self.nb_slug, id=self.id)
267
+
268
  try:
269
+ notebook.pull() # raise ApiException
270
+ stt, _ = notebook.check_nb_permission() # note: raise ApiException
271
+ stt = notebook.check_datasets_permission() # raise ValidationException
272
  except ApiException as e:
273
+ raise ValidateException.from_api_exception(e, self.nb_slug)
274
+ # if not stt:
275
+ # return False
 
 
 
 
276
  return True
277
 
278
  def is_valid(self):
 
292
  :raises
293
  Exception if setup error
294
  """
295
+ notebook = KaggleNotebook(api, self.nb_slug, "./tmp", id=self.id) # todo: change hardcode container_path here
296
 
297
  notebook.pull()
298
  assert notebook.check_datasets_permission(), f"User {api} does not have permission on datasets of notebook {self.nb_slug}"
 
309
  sheet_logger.log(username=api.username, nb=self.nb_slug, log="Try but no effect. Seem account to be out of quota")
310
  return False
311
  if status3 in self.not_rerun_stt:
312
+ sheet_logger.log(username=api.username, nb=self.nb_slug, log=f"Notebook status is {status3} is in ignore status list {self.not_rerun_stt}, do nothing!")
313
  return True
314
  if status3 not in ["queued", "running"]:
315
  # return False # todo: check when user is out of quota
 
323
  return True
324
 
325
  @staticmethod
326
+ def from_dict(obj: dict, id=None):
327
+ return NbJob(acc_dict=obj['accounts'], nb_slug=obj['slug'], rerun_stt=obj.get('rerun_status'), not_rerun_stt=obj.get('not_rerun_stt'), id=id)
328
 
329
 
330
  class KernelRerunService:
 
366
  print(f"Using username: {api.username}")
367
 
368
  for job in self.jobs.values():
369
+ ex_msg_list = []
370
  if username in job.get_username_list():
371
  print(f"Validate user: {username}, job: {job.nb_slug}")
372
+
373
+ try:
374
+ job.is_valid_with_acc(api)
375
+ except ValidateException as e:
376
  print(f"Error: not valid")
377
  a = f"Setup error: {username} does not have permission on notebook {job.nb_slug} or related datasets"
378
+ if job.id is not None: # if have id, write log
379
+ ex_msg_list.append(f"Account {username}\n" + str(e) + "\n")
380
+ else: # if not have id, raise
381
+ raise Exception(a)
382
+ if len(ex_msg_list) > 0:
383
+ sheet_logger.update_job_status(job.id, validate_status="\n".join(ex_msg_list))
384
+ else:
385
+ sheet_logger.update_job_status(job.id, validate_status="success")
386
+
387
  release()
388
  return True
389
 
 
393
  api, release = self.acc_manager.get_unlocked_api_unblocking(job.get_username_list())
394
  api.authenticate()
395
  print(f"Using username: {api.username}")
396
+ notebook = KaggleNotebook(api, job.nb_slug, id=job.id)
397
  print(f"Notebook: {notebook.kernel_slug}")
398
  print(notebook.status())
399
  release()
logger.py CHANGED
@@ -1,7 +1,6 @@
1
  import platform,socket,re,uuid,json,psutil,logging
2
  from datetime import datetime as dt
3
- from google_sheet import log_repo
4
-
5
 
6
  version="v1.0.0"
7
 
@@ -24,8 +23,9 @@ def get_sys_info():
24
 
25
 
26
  class SheetLogger:
27
- def __init__(self, log_repo):
28
  self.log_repo = log_repo
 
29
 
30
  def log(self, log='', nb='', username=''):
31
  self.log_repo.create({
@@ -37,4 +37,18 @@ class SheetLogger:
37
  "version": version
38
  })
39
 
40
- sheet_logger = SheetLogger(log_repo)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import platform,socket,re,uuid,json,psutil,logging
2
  from datetime import datetime as dt
3
+ from google_sheet import log_repo, conf_repo, SheetCRUDRepository
 
4
 
5
  version="v1.0.0"
6
 
 
23
 
24
 
25
  class SheetLogger:
26
+ def __init__(self, log_repo: SheetCRUDRepository, config_repo: SheetCRUDRepository):
27
  self.log_repo = log_repo
28
+ self.config_repo = config_repo
29
 
30
  def log(self, log='', nb='', username=''):
31
  self.log_repo.create({
 
37
  "version": version
38
  })
39
 
40
+ def update_job_status(self, row, validate_status: str = None, notebook_status: str = None):
41
+ data = self.config_repo.read(row)
42
+ data.update({"last_updated": str(dt.now())})
43
+ if validate_status is not None:
44
+ data.update({"validate_status": validate_status})
45
+ if notebook_status is not None:
46
+ data.update({"notebook_status": notebook_status})
47
+ self.config_repo.update(row, data)
48
+ # print(self.config_repo.find({"config": "hahunavth/vlsp-sv-2023-s2pecnet-train"}))
49
+
50
+
51
+ sheet_logger = SheetLogger(log_repo, conf_repo)
52
+
53
+ if __name__ == "__main__":
54
+ sheet_logger.update_job_status(5, "abc" , )
main.py CHANGED
@@ -5,27 +5,35 @@ from kaggle_service import KernelRerunService, NbJob
5
  from logger import sheet_logger
6
 
7
 
8
- # if __name__ == "__main__":
9
- configs = []
10
- try:
11
- for i in range(2, 1000):
12
- rs = conf_repo.read(i)
13
- if not rs:
14
- break
15
- cfg = json.loads(rs['config'])
16
- configs.append(cfg)
17
- print(cfg)
18
- except Exception as e:
19
- sheet_logger.log(log="Get config failed!!")
 
 
 
 
 
20
 
21
- service = KernelRerunService()
22
- for config in configs:
23
- service.add_job(NbJob.from_dict(config))
 
 
 
24
 
25
- try:
26
- service.validate_all()
27
- service.status_all()
28
- service.run_all()
29
- except Exception as e:
30
- sheet_logger.log(log=str(e))
31
 
 
5
  from logger import sheet_logger
6
 
7
 
8
+ if __name__ == "__main__":
9
+ configs = []
10
+ ids = []
11
+ try:
12
+ for i in range(2, 256):
13
+ rs = conf_repo.read(i)
14
+ if not rs:
15
+ break
16
+ try:
17
+ cfg = json.loads(rs['config'])
18
+ configs.append(cfg)
19
+ ids.append(i)
20
+ print(cfg)
21
+ except Exception as e:
22
+ sheet_logger.update_job_status(i, validate_status=str(e))
23
+ except Exception as e:
24
+ sheet_logger.log(log="Get config failed!!" + str(e))
25
 
26
+ service = KernelRerunService()
27
+ for config, idx in zip(configs, ids):
28
+ try:
29
+ service.add_job(NbJob.from_dict(config, id=idx))
30
+ except Exception as e:
31
+ sheet_logger.update_job_status(idx, validate_status=str(e))
32
 
33
+ try:
34
+ service.validate_all()
35
+ # service.status_all()
36
+ service.run_all()
37
+ except Exception as e:
38
+ sheet_logger.log(log=str(e))
39
 
merged_file.py DELETED
@@ -1,516 +0,0 @@
1
- import gspread
2
- from oauth2client.service_account import ServiceAccountCredentials
3
- from typing import Dict
4
-
5
-
6
- class SheetCRUDRepository:
7
- def __init__(self, worksheet):
8
- self.worksheet = worksheet
9
- self.titles = self.worksheet.row_values(1) # Assuming titles are in the first row
10
- assert len(set(self.titles)) == len(self.titles), f"Failed to init {SheetCRUDRepository.__class__}, titles: {self.titles} contain duplicated values!"
11
-
12
- def create(self, data: Dict):
13
- values = [data.get(title, '') for title in self.titles]
14
- self.worksheet.append_row(values)
15
-
16
- def read(self, row_index: int) -> Dict:
17
- """
18
- return {} if empty
19
- """
20
- values = self.worksheet.row_values(row_index)
21
- return {title: value for title, value in zip(self.titles, values)}
22
-
23
- def update(self, row_index: int, data: Dict):
24
- values = [data.get(title, '') for title in self.titles]
25
- self.worksheet.update(f"A{row_index}:Z{row_index}", [values])
26
-
27
- def delete(self, row_index: int):
28
- self.worksheet.delete_row(row_index)
29
-
30
- def find(self, search_dict):
31
- for col_title, value in search_dict.items():
32
- if col_title in self.titles:
33
- col_index = self.titles.index(col_title) + 1 # Adding 1 to match gspread indexing
34
- cell = self.worksheet.find(value, in_column=col_index)
35
- if cell is None:
36
- break
37
- row_number = cell.row
38
- return row_number, self.read(row_number)
39
- return None
40
-
41
- def create_repositories():
42
- scope = [
43
- 'https://www.googleapis.com/auth/spreadsheets',
44
- 'https://www.googleapis.com/auth/drive'
45
- ]
46
- creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
47
- client = gspread.authorize(creds)
48
- # sheet_url = "https://docs.google.com/spreadsheets/d/17OxKF0iP_aJJ0HCgJkwFsH762EUrtcEIYcPmyiiKnaM"
49
- sheet_url = "https://docs.google.com/spreadsheets/d/1KzUYgWwbvYXGfyehOTyZCCQf0udZiwVXxaxpmkXEe3E/edit?usp=sharing"
50
- sheet = client.open_by_url(sheet_url)
51
-
52
- config_repository = SheetCRUDRepository(sheet.get_worksheet(0))
53
- log_repository = SheetCRUDRepository(sheet.get_worksheet(1))
54
- return config_repository, log_repository
55
-
56
-
57
- conf_repo, log_repo = create_repositories()
58
-
59
-
60
- import platform,socket,re,uuid,json,psutil,logging
61
- from datetime import datetime as dt
62
- from google_sheet import log_repo
63
-
64
-
65
- version="v1.0.0"
66
-
67
-
68
- def get_sys_info():
69
- try:
70
- info={}
71
- info['platform']=platform.system()
72
- info['platform-release']=platform.release()
73
- info['platform-version']=platform.version()
74
- info['architecture']=platform.machine()
75
- info['hostname']=socket.gethostname()
76
- info['ip-address']=socket.gethostbyname(socket.gethostname())
77
- info['mac-address']=':'.join(re.findall('..', '%012x' % uuid.getnode()))
78
- info['processor']=platform.processor()
79
- info['ram']=str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB"
80
- return json.dumps(info)
81
- except Exception as e:
82
- logging.exception(e)
83
-
84
-
85
- class SheetLogger:
86
- def __init__(self, log_repo):
87
- self.log_repo = log_repo
88
-
89
- def log(self, log='', nb='', username=''):
90
- self.log_repo.create({
91
- "time": str(dt.now()),
92
- "notebook_name": nb,
93
- "kaggle_username": username,
94
- "log": log,
95
- "device": str(get_sys_info()),
96
- "version": version
97
- })
98
-
99
- sheet_logger = SheetLogger(log_repo)
100
-
101
- import json
102
- import os
103
- from typing import Callable, List, Union, Dict
104
-
105
- # fake default account to use kaggle.api.kaggle_api_extended
106
- os.environ['KAGGLE_USERNAME']=''
107
- os.environ['KAGGLE_KEY']=''
108
-
109
- from kaggle.api.kaggle_api_extended import KaggleApi
110
- from kaggle.rest import ApiException
111
- import shutil
112
- import time
113
- import threading
114
- import copy
115
- from logger import sheet_logger
116
-
117
-
118
- def get_api():
119
- api = KaggleApi()
120
- api.authenticate()
121
- return api
122
-
123
-
124
- class KaggleApiWrapper(KaggleApi):
125
- """
126
- Override KaggleApi.read_config_environment to use username and secret without environment variables
127
- """
128
-
129
- def __init__(self, username, secret):
130
- super().__init__()
131
- self.username = username
132
- self.secret = secret
133
-
134
- def read_config_environment(self, config_data=None, quiet=False):
135
- config = super().read_config_environment(config_data, quiet)
136
- config['username'] = self.username
137
- config['key'] = self.secret
138
-
139
- return config_data
140
-
141
- def __del__(self):
142
- # todo: fix bug when delete api
143
- pass
144
-
145
-
146
- class KaggleNotebook:
147
- def __init__(self, api: KaggleApi, kernel_slug: str, container_path: str = "./tmp"):
148
- """
149
- :param api: KaggleApi
150
- :param kernel_slug: Notebook id, you can find it in the url of the notebook.
151
- For example, `username/notebook-name-123456`
152
- :param container_path: Path to the local folder where the notebook will be downloaded
153
- """
154
- self.api = api
155
- self.kernel_slug = kernel_slug
156
- self.container_path = container_path
157
-
158
- def status(self) -> str or None:
159
- """
160
- :return:
161
- "running"
162
- "cancelAcknowledged"
163
- "queued": waiting for run
164
- "error": when raise exception in notebook
165
- Throw exception when failed
166
- """
167
- res = self.api.kernels_status(self.kernel_slug)
168
- print(f"Status: {res}")
169
- if res is None:
170
- return None
171
- return res['status']
172
-
173
- def _get_local_nb_path(self) -> str:
174
- return os.path.join(self.container_path, self.kernel_slug)
175
-
176
- def pull(self, path=None) -> str or None:
177
- """
178
-
179
- :param path:
180
- :return:
181
- :raises: ApiException if notebook not found or not share to user
182
- """
183
- self._clean()
184
- path = path or self._get_local_nb_path()
185
- metadata_path = os.path.join(path, "kernel-metadata.json")
186
- res = self.api.kernels_pull(self.kernel_slug, path=path, metadata=True, quiet=False)
187
- if not os.path.exists(metadata_path):
188
- print(f"Warn: Not found {metadata_path}. Clean {path}")
189
- self._clean()
190
- return None
191
- return res
192
-
193
- def push(self, path=None) -> str or None:
194
- status = self.status()
195
- if status in ['queued', 'running']:
196
- print("Warn: Notebook is " + status + ". Skip push notebook!")
197
- return None
198
-
199
- self.api.kernels_push(path or self._get_local_nb_path())
200
- time.sleep(1)
201
- status = self.status()
202
- return status
203
-
204
- def _clean(self) -> None:
205
- if os.path.exists(self._get_local_nb_path()):
206
- shutil.rmtree(self._get_local_nb_path())
207
-
208
- def get_metadata(self, path=None):
209
- path = path or self._get_local_nb_path()
210
- metadata_path = os.path.join(path, "kernel-metadata.json")
211
- if not os.path.exists(metadata_path):
212
- return None
213
- return json.loads(open(metadata_path).read())
214
-
215
- def check_nb_permission(self) -> Union[tuple[bool], tuple[bool, None]]:
216
- try:
217
- status = self.status()
218
- if status is None:
219
- return False, status
220
- return True, status
221
- except ApiException as e:
222
- print(f"Error: {e.status} {e.reason} with notebook {self.kernel_slug}")
223
- return False, None
224
-
225
- def check_datasets_permission(self) -> bool:
226
- meta = self.get_metadata()
227
- if meta is None:
228
- print("Warn: cannot get metadata. Pull and try again?")
229
- dataset_sources = meta['dataset_sources']
230
- for dataset in dataset_sources:
231
- try:
232
- self.api.dataset_status(dataset)
233
- except ApiException as e:
234
- print(f"Error: {e.status} {e.reason} with dataset {dataset} in notebook {self.kernel_slug}")
235
- return False
236
- return True
237
-
238
-
239
- class AccountTransactionManager:
240
- def __init__(self, acc_secret_dict: dict=None):
241
- """
242
- :param acc_secret_dict: {username: secret}
243
- """
244
- self.acc_secret_dict = acc_secret_dict
245
- if self.acc_secret_dict is None:
246
- self.acc_secret_dict = {}
247
- # self.api_dict = {username: KaggleApiWrapper(username, secret) for username, secret in acc_secret_dict.items()}
248
- # lock for each account to avoid concurrent use api
249
- self.lock_dict = {username: False for username in self.acc_secret_dict.keys()}
250
- self.state_lock = threading.Lock()
251
-
252
- def _get_api(self, username: str) -> KaggleApiWrapper:
253
- # return self.api_dict[username]
254
- return KaggleApiWrapper(username, self.acc_secret_dict[username])
255
-
256
- def _get_lock(self, username: str) -> bool:
257
- return self.lock_dict[username]
258
-
259
- def _set_lock(self, username: str, lock: bool) -> None:
260
- self.lock_dict[username] = lock
261
-
262
- def add_account(self, username, secret):
263
- if username not in self.acc_secret_dict.keys():
264
- self.state_lock.acquire()
265
- self.acc_secret_dict[username] = secret
266
- self.lock_dict[username] = False
267
- self.state_lock.release()
268
-
269
- def remove_account(self, username):
270
- if username in self.acc_secret_dict.keys():
271
- self.state_lock.acquire()
272
- del self.acc_secret_dict[username]
273
- del self.lock_dict[username]
274
- self.state_lock.release()
275
- else:
276
- print(f"Warn: try to remove account not in the list: {username}, list: {self.acc_secret_dict.keys()}")
277
-
278
- def get_unlocked_api_unblocking(self, username_list: List[str]) -> tuple[KaggleApiWrapper, Callable[[], None]]:
279
- """
280
- :param username_list: list of username
281
- :return: (api, release) where release is a function to release api
282
- """
283
- while True:
284
- print("get_unlocked_api_unblocking" + str(username_list))
285
- for username in username_list:
286
- self.state_lock.acquire()
287
- if not self._get_lock(username):
288
- self._set_lock(username, True)
289
- api = self._get_api(username)
290
-
291
- def release():
292
- self.state_lock.acquire()
293
- self._set_lock(username, False)
294
- api.__del__()
295
- self.state_lock.release()
296
-
297
- self.state_lock.release()
298
- return api, release
299
- self.state_lock.release()
300
- time.sleep(1)
301
-
302
-
303
- class NbJob:
304
- def __init__(self, acc_dict: dict, nb_slug: str, rerun_stt: List[str] = None, not_rerun_stt: List[str] = None):
305
- """
306
-
307
- :param acc_dict:
308
- :param nb_slug:
309
- :param rerun_stt:
310
- :param not_rerun_stt: If notebook status in this list, do not rerun it. (Note: do not add "queued", "running")
311
- """
312
- self.rerun_stt = rerun_stt
313
- if self.rerun_stt is None:
314
- self.rerun_stt = ['complete']
315
- self.not_rerun_stt = not_rerun_stt
316
-
317
- if self.not_rerun_stt is None:
318
- self.not_rerun_stt = ['queued', 'running', 'cancelAcknowledged']
319
- assert "queued" in self.not_rerun_stt
320
- assert "running" in self.not_rerun_stt
321
-
322
- self.acc_dict = acc_dict
323
- self.nb_slug = nb_slug
324
-
325
- def get_acc_dict(self):
326
- return self.acc_dict
327
-
328
- def get_username_list(self):
329
- return list(self.acc_dict.keys())
330
-
331
- def is_valid_with_acc(self, api):
332
- notebook = KaggleNotebook(api, self.nb_slug)
333
- try:
334
- notebook.pull()
335
- except ApiException as e:
336
- return False
337
- stt, _ = notebook.check_nb_permission()
338
- if not stt:
339
- return False
340
- stt = notebook.check_datasets_permission()
341
- if not stt:
342
- return False
343
- return True
344
-
345
- def is_valid(self):
346
- for username in self.acc_dict.keys():
347
- secrets = self.acc_dict[username]
348
- api = KaggleApiWrapper(username=username, secret=secrets)
349
- api.authenticate()
350
- if not self.is_valid_with_acc(api):
351
- return False
352
- return True
353
-
354
- def acc_check_and_rerun_if_need(self, api: KaggleApi) -> bool:
355
- """
356
- :return:
357
- True if rerun success or notebook is running
358
- False user does not have enough gpu quotas
359
- :raises
360
- Exception if setup error
361
- """
362
- notebook = KaggleNotebook(api, self.nb_slug, "./tmp") # todo: change hardcode container_path here
363
-
364
- notebook.pull()
365
- assert notebook.check_datasets_permission(), f"User {api} does not have permission on datasets of notebook {self.nb_slug}"
366
- success, status1 = notebook.check_nb_permission()
367
- assert success, f"User {api} does not have permission on notebook {self.nb_slug}" # todo: using api.username
368
-
369
- if status1 in self.rerun_stt:
370
- status2 = notebook.push()
371
- time.sleep(10)
372
- status3 = notebook.status()
373
-
374
- # if 3 times same stt -> acc out of quota
375
- if status1 == status2 == status3:
376
- sheet_logger.log(username=api.username, nb=self.nb_slug, log="Try but no effect. Seem account to be out of quota")
377
- return False
378
- if status3 in self.not_rerun_stt:
379
- sheet_logger.log(username=api.username, nb=self.nb_slug, log=f"Notebook is in ignore status list {self.not_rerun_stt}, do nothing!")
380
- return True
381
- if status3 not in ["queued", "running"]:
382
- # return False # todo: check when user is out of quota
383
- print(f"Error: status is {status3}")
384
- raise Exception("Setup exception")
385
- sheet_logger.log(username=api.username, nb=self.nb_slug,
386
- log=f"Schedule notebook successfully. Current status is '{status3}'")
387
- return True
388
-
389
- sheet_logger.log(username=api.username, nb=self.nb_slug, log=f"Notebook status is '{status1}' is not in {self.rerun_stt}, do nothing!")
390
- return True
391
-
392
- @staticmethod
393
- def from_dict(obj: dict):
394
- return NbJob(acc_dict=obj['accounts'], nb_slug=obj['slug'], rerun_stt=obj.get('rerun_status'), not_rerun_stt=obj.get('not_rerun_stt'))
395
-
396
-
397
- class KernelRerunService:
398
- def __init__(self):
399
- self.jobs: Dict[str, NbJob] = {}
400
- self.acc_manager = AccountTransactionManager()
401
- self.username2jobid = {}
402
- self.jobid2username = {}
403
-
404
- def add_job(self, nb_job: NbJob):
405
- if nb_job.nb_slug in self.jobs.keys():
406
- print("Warn: nb_job already in job list")
407
- return
408
- self.jobs[nb_job.nb_slug] = nb_job
409
- self.jobid2username[nb_job.nb_slug] = nb_job.get_username_list()
410
- for username in nb_job.get_username_list():
411
- if username not in self.username2jobid.keys():
412
- self.username2jobid[username] = []
413
- self.acc_manager.add_account(username, nb_job.acc_dict[username])
414
- self.username2jobid[username].append(nb_job.nb_slug)
415
-
416
- def remove_job(self, nb_job):
417
- if nb_job.nb_slug not in self.jobs.keys():
418
- print("Warn: try to remove nb_job not in list")
419
- return
420
- username_list = self.jobid2username[nb_job.nb_slug]
421
- username_list = [username for username in username_list if len(self.username2jobid[username]) == 1]
422
-
423
- for username in username_list:
424
- del self.username2jobid[username]
425
- self.acc_manager.remove_account(username)
426
- del self.jobs[nb_job.nb_slug]
427
- del self.jobid2username[nb_job.nb_slug]
428
-
429
- def validate_all(self):
430
- for username in self.acc_manager.acc_secret_dict.keys():
431
- api, release = self.acc_manager.get_unlocked_api_unblocking([username])
432
- api.authenticate()
433
- print(f"Using username: {api.username}")
434
-
435
- for job in self.jobs.values():
436
- if username in job.get_username_list():
437
- print(f"Validate user: {username}, job: {job.nb_slug}")
438
- if not job.is_valid_with_acc(api):
439
- print(f"Error: not valid")
440
- a = f"Setup error: {username} does not have permission on notebook {job.nb_slug} or related datasets"
441
- raise Exception(a)
442
- release()
443
- return True
444
-
445
- def status_all(self):
446
- for job in self.jobs.values():
447
- print(f"Job: {job.nb_slug}")
448
- api, release = self.acc_manager.get_unlocked_api_unblocking(job.get_username_list())
449
- api.authenticate()
450
- print(f"Using username: {api.username}")
451
- notebook = KaggleNotebook(api, job.nb_slug)
452
- print(f"Notebook: {notebook.kernel_slug}")
453
- print(notebook.status())
454
- release()
455
-
456
- def run(self, nb_job: NbJob):
457
- username_list = copy.copy(nb_job.get_username_list())
458
- while len(username_list) > 0:
459
- api, release = self.acc_manager.get_unlocked_api_unblocking(username_list)
460
- api.authenticate()
461
- print(f"Using username: {api.username}")
462
-
463
- try:
464
- result = nb_job.acc_check_and_rerun_if_need(api)
465
- if result:
466
- return True
467
- except Exception as e:
468
- print(e)
469
- release()
470
- break
471
-
472
- if api.username in username_list:
473
- username_list.remove(api.username)
474
- release()
475
- else:
476
- release()
477
- raise Exception("")
478
- return False
479
-
480
- def run_all(self):
481
- for job in self.jobs.values():
482
- success = self.run(job)
483
- print(f"Job: {job.nb_slug} {success}")
484
-
485
-
486
-
487
- import json
488
-
489
- from kaggle_service import KernelRerunService, NbJob
490
- from logger import sheet_logger
491
-
492
-
493
- if __name__ == "__main__":
494
- configs = []
495
- try:
496
- for i in range(2, 1000):
497
- rs = conf_repo.read(i)
498
- if not rs:
499
- break
500
- cfg = json.loads(rs['config'])
501
- configs.append(cfg)
502
- print(cfg)
503
- except Exception as e:
504
- sheet_logger.log(log="Get config failed!!")
505
-
506
- service = KernelRerunService()
507
- for config in configs:
508
- service.add_job(NbJob.from_dict(config))
509
-
510
- try:
511
- service.validate_all()
512
- service.status_all()
513
- service.run_all()
514
- except Exception as e:
515
- sheet_logger.log(log=str(e))
516
-