File size: 27,797 Bytes
ffb3860
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
import contextlib
import functools
import hashlib
import inspect
import os
import gc
import pathlib
import random
import shutil
import subprocess
import sys
import threading
import time
import traceback
import zipfile
from datetime import datetime

import filelock
import requests, uuid
from typing import Tuple, Callable, Dict
from tqdm.auto import tqdm
from joblib import Parallel
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import pandas as pd


def set_seed(seed: int):
    """
    Sets the seed of the entire notebook so results are the same every time we run.
    This is for REPRODUCIBILITY.
    """
    import torch
    np.random.seed(seed)
    random_state = np.random.RandomState(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    os.environ['PYTHONHASHSEED'] = str(seed)
    return random_state


def flatten_list(lis):
    """Given a list, possibly nested to any level, return it flattened."""
    new_lis = []
    for item in lis:
        if type(item) == type([]):
            new_lis.extend(flatten_list(item))
        else:
            new_lis.append(item)
    return new_lis


def clear_torch_cache():
    import torch
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
        gc.collect()


def ping():
    try:
        print('Ping: %s' % str(datetime.now()), flush=True)
    except AttributeError:
        # some programs wrap print and will fail with flush passed
        pass


def get_torch_allocated():
    import torch
    return torch.cuda.memory_allocated()


def get_device():
    import torch
    if torch.cuda.is_available():
        device = "cuda"
    else:
        device = "cpu"

    return device


def system_info():
    import psutil

    system = {}
    # https://stackoverflow.com/questions/48951136/plot-multiple-graphs-in-one-plot-using-tensorboard
    # https://arshren.medium.com/monitoring-your-devices-in-python-5191d672f749
    temps = psutil.sensors_temperatures(fahrenheit=False)
    if 'coretemp' in temps:
        coretemp = temps['coretemp']
        temp_dict = {k.label: k.current for k in coretemp}
        for k, v in temp_dict.items():
            system['CPU_C/%s' % k] = v

    # https://github.com/gpuopenanalytics/pynvml/blob/master/help_query_gpu.txt
    from pynvml.smi import nvidia_smi
    nvsmi = nvidia_smi.getInstance()

    gpu_power_dict = {'W_gpu%d' % i: x['power_readings']['power_draw'] for i, x in
                      enumerate(nvsmi.DeviceQuery('power.draw')['gpu'])}
    for k, v in gpu_power_dict.items():
        system['GPU_W/%s' % k] = v

    gpu_temp_dict = {'C_gpu%d' % i: x['temperature']['gpu_temp'] for i, x in
                     enumerate(nvsmi.DeviceQuery('temperature.gpu')['gpu'])}
    for k, v in gpu_temp_dict.items():
        system['GPU_C/%s' % k] = v

    gpu_memory_free_dict = {'MiB_gpu%d' % i: x['fb_memory_usage']['free'] for i, x in
                            enumerate(nvsmi.DeviceQuery('memory.free')['gpu'])}
    gpu_memory_total_dict = {'MiB_gpu%d' % i: x['fb_memory_usage']['total'] for i, x in
                             enumerate(nvsmi.DeviceQuery('memory.total')['gpu'])}
    gpu_memory_frac_dict = {k: gpu_memory_free_dict[k] / gpu_memory_total_dict[k] for k in gpu_memory_total_dict}
    for k, v in gpu_memory_frac_dict.items():
        system[f'GPU_M/%s' % k] = v

    system['hash'] = get_githash()

    return system


def system_info_print():
    try:
        df = pd.DataFrame.from_dict(system_info(), orient='index')
        # avoid slamming GPUs
        time.sleep(1)
        return df.to_markdown()
    except Exception as e:
        return "Error: %s" % str(e)


def zip_data(root_dirs=None, zip_file=None, base_dir='./', fail_any_exception=False):
    try:
        return _zip_data(zip_file=zip_file, base_dir=base_dir, root_dirs=root_dirs)
    except Exception as e:
        traceback.print_exc()
        print('Exception in zipping: %s' % str(e))
        if not fail_any_exception:
            raise


def _zip_data(root_dirs=None, zip_file=None, base_dir='./'):
    if isinstance(root_dirs, str):
        root_dirs = [root_dirs]
    if zip_file is None:
        datetime_str = str(datetime.now()).replace(" ", "_").replace(":", "_")
        host_name = os.getenv('HF_HOSTNAME', 'emptyhost')
        zip_file = "data_%s_%s.zip" % (datetime_str, host_name)
    assert root_dirs is not None
    if not os.path.isdir(os.path.dirname(zip_file)) and os.path.dirname(zip_file):
        os.makedirs(os.path.dirname(zip_file), exist_ok=True)
    with zipfile.ZipFile(zip_file, "w") as expt_zip:
        for root_dir in root_dirs:
            if root_dir is None:
                continue
            for root, d, files in os.walk(root_dir):
                for file in files:
                    file_to_archive = os.path.join(root, file)
                    assert os.path.exists(file_to_archive)
                    path_to_archive = os.path.relpath(file_to_archive, base_dir)
                    expt_zip.write(filename=file_to_archive, arcname=path_to_archive)
    return zip_file, zip_file


def save_generate_output(output=None, base_model=None, save_dir=None):
    try:
        return _save_generate_output(output=output, base_model=base_model, save_dir=save_dir)
    except Exception as e:
        traceback.print_exc()
        print('Exception in saving: %s' % str(e))


def _save_generate_output(output=None, base_model=None, save_dir=None):
    """
    Save conversation to .json, row by row.
    json_file_path is path to final JSON file. If not in ., then will attempt to make directories.
    Appends if file exists
    """
    assert save_dir, "save_dir must be provided"
    if os.path.exists(save_dir) and not os.path.isdir(save_dir):
        raise RuntimeError("save_dir already exists and is not a directory!")
    os.makedirs(save_dir, exist_ok=True)
    import json
    if output[-10:] == '\n\n<human>:':
        # remove trailing <human>:
        output = output[:-10]
    with filelock.FileLock("save_dir.lock"):
        # lock logging in case have concurrency
        with open(os.path.join(save_dir, "history.json"), "a") as f:
            # just add [ at start, and ] at end, and have proper JSON dataset
            f.write(
                "  " + json.dumps(
                    dict(text=output, time=time.ctime(), base_model=base_model)
                ) + ",\n"
            )


def s3up(filename):
    try:
        return _s3up(filename)
    except Exception as e:
        traceback.print_exc()
        print('Exception for file %s in s3up: %s' % (filename, str(e)))
        return "Failed to upload %s: Error: %s" % (filename, str(e))


def _s3up(filename):
    import boto3

    aws_access_key_id = os.getenv('AWS_SERVER_PUBLIC_KEY')
    aws_secret_access_key = os.getenv('AWS_SERVER_SECRET_KEY')
    bucket = os.getenv('AWS_BUCKET')
    assert aws_access_key_id, "Set AWS key"
    assert aws_secret_access_key, "Set AWS secret"
    assert bucket, "Set AWS Bucket"

    s3 = boto3.client('s3',
                      aws_access_key_id=os.getenv('AWS_SERVER_PUBLIC_KEY'),
                      aws_secret_access_key=os.getenv('AWS_SERVER_SECRET_KEY'),
                      )
    ret = s3.upload_file(
        Filename=filename,
        Bucket=os.getenv('AWS_BUCKET'),
        Key=filename,
    )
    if ret in [None, '']:
        return "Successfully uploaded %s" % filename


def get_githash():
    try:
        githash = subprocess.run(['git', 'rev-parse', 'HEAD'], stdout=subprocess.PIPE).stdout.decode('utf-8')[0:-1]
    except:
        githash = ''
    return githash


def copy_code(run_id):
    """
    copy code to track changes
    :param run_id:
    :return:
    """
    rnd_num = str(random.randint(0, 2 ** 31))
    run_id = 'run_' + str(run_id)
    os.makedirs(run_id, exist_ok=True)
    me_full = os.path.join(pathlib.Path(__file__).parent.resolve(), __file__)
    me_file = os.path.basename(__file__)
    new_me = os.path.join(run_id, me_file + '_' + get_githash())
    if os.path.isfile(new_me):
        new_me = os.path.join(run_id, me_file + '_' + get_githash() + '_' + rnd_num)
        shutil.copy(me_full, new_me)
    else:
        shutil.copy(me_full, new_me)


class NullContext(threading.local):
    """No-op context manager, executes block without doing any additional processing.

    Used as a stand-in if a particular block of code is only sometimes
    used with a normal context manager:
    """

    def __init__(self, *args, **kwargs):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.finally_act()

    def finally_act(self):
        pass


def wrapped_partial(func, *args, **kwargs):
    """
    Give partial properties of normal function, like __name__ attribute etc.
    :param func:
    :param args:
    :param kwargs:
    :return:
    """
    partial_func = functools.partial(func, *args, **kwargs)
    functools.update_wrapper(partial_func, func)
    return partial_func


class ThreadException(Exception):
    pass


class EThread(threading.Thread):
    # Function that raises the custom exception
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None, streamer=None, bucket=None):
        self.bucket = bucket
        self.streamer = streamer
        self.exc = None
        self._return = None
        super().__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)

    def run(self):
        # Variable that stores the exception, if raised by someFunction
        try:
            if self._target is not None:
                self._return = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            print("thread exception: %s" % str(sys.exc_info()))
            self.bucket.put(sys.exc_info())
            self.exc = e
            if self.streamer:
                print("make stop: %s" % str(sys.exc_info()), flush=True)
                self.streamer.do_stop = True
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs

    def join(self, timeout=None):
        threading.Thread.join(self)
        # Since join() returns in caller thread
        # we re-raise the caught exception
        # if any was caught
        if self.exc:
            raise self.exc
        return self._return


def import_matplotlib():
    import matplotlib
    matplotlib.use('agg')
    # KEEP THESE HERE! START
    import matplotlib.pyplot as plt
    import pandas as pd
    # to avoid dlopen deadlock in fork
    import pandas.core.computation.expressions as pd_expressions
    import pandas._libs.groupby as pd_libgroupby
    import pandas._libs.reduction as pd_libreduction
    import pandas.core.algorithms as pd_algorithms
    import pandas.core.common as pd_com
    import numpy as np
    # KEEP THESE HERE! END


def get_sha(value):
    return hashlib.md5(str(value).encode('utf-8')).hexdigest()


def sanitize_filename(name):
    """
    Sanitize file *base* names.
    :param name: name to sanitize
    :return:
    """
    bad_chars = ['[', ']', ',', '/', '\\', '\\w', '\\s', '-', '+', '\"', '\'', '>', '<', ' ', '=', ')', '(', ':', '^']
    for char in bad_chars:
        name = name.replace(char, "_")

    length = len(name)
    file_length_limit = 250  # bit smaller than 256 for safety
    sha_length = 32
    real_length_limit = file_length_limit - (sha_length + 2)
    if length > file_length_limit:
        sha = get_sha(name)
        half_real_length_limit = max(1, int(real_length_limit / 2))
        name = name[0:half_real_length_limit] + "_" + sha + "_" + name[length - half_real_length_limit:length]

    return name


def shutil_rmtree(*args, **kwargs):
    return shutil.rmtree(*args, **kwargs)


def remove(path: str):
    try:
        if path is not None and os.path.exists(path):
            if os.path.isdir(path):
                shutil_rmtree(path, ignore_errors=True)
            else:
                with contextlib.suppress(FileNotFoundError):
                    os.remove(path)
    except:
        pass


def makedirs(path, exist_ok=True):
    """
    Avoid some inefficiency in os.makedirs()
    :param path:
    :param exist_ok:
    :return:
    """
    if os.path.isdir(path) and os.path.exists(path):
        assert exist_ok, "Path already exists"
        return path
    os.makedirs(path, exist_ok=exist_ok)


def atomic_move_simple(src, dst):
    try:
        shutil.move(src, dst)
    except (shutil.Error, FileExistsError):
        pass
    remove(src)


def download_simple(url, dest=None, print_func=None):
    if print_func is not None:
        print_func("BEGIN get url %s" % str(url))
    if url.startswith("file://"):
        from requests_file import FileAdapter
        s = requests.Session()
        s.mount('file://', FileAdapter())
        url_data = s.get(url, stream=True)
    else:
        url_data = requests.get(url, stream=True)
    if dest is None:
        dest = os.path.basename(url)
    if url_data.status_code != requests.codes.ok:
        msg = "Cannot get url %s, code: %s, reason: %s" % (
            str(url),
            str(url_data.status_code),
            str(url_data.reason),
        )
        raise requests.exceptions.RequestException(msg)
    url_data.raw.decode_content = True
    makedirs(os.path.dirname(dest), exist_ok=True)
    uuid_tmp = str(uuid.uuid4())[:6]
    dest_tmp = dest + "_dl_" + uuid_tmp + ".tmp"
    with open(dest_tmp, "wb") as f:
        shutil.copyfileobj(url_data.raw, f)
    atomic_move_simple(dest_tmp, dest)
    if print_func is not None:
        print_func("END get url %s" % str(url))


def download(url, dest=None, dest_path=None):
    if dest_path is not None:
        dest = os.path.join(dest_path, os.path.basename(url))
        if os.path.isfile(dest):
            print("already downloaded %s -> %s" % (url, dest))
            return dest
    elif dest is not None:
        if os.path.exists(dest):
            print("already downloaded %s -> %s" % (url, dest))
            return dest
    else:
        uuid_tmp = "dl2_" + str(uuid.uuid4())[:6]
        dest = uuid_tmp + os.path.basename(url)

    print("downloading %s to %s" % (url, dest))

    if url.startswith("file://"):
        from requests_file import FileAdapter
        s = requests.Session()
        s.mount('file://', FileAdapter())
        url_data = s.get(url, stream=True)
    else:
        url_data = requests.get(url, stream=True)

    if url_data.status_code != requests.codes.ok:
        msg = "Cannot get url %s, code: %s, reason: %s" % (
            str(url), str(url_data.status_code), str(url_data.reason))
        raise requests.exceptions.RequestException(msg)
    url_data.raw.decode_content = True
    dirname = os.path.dirname(dest)
    if dirname != "" and not os.path.isdir(dirname):
        makedirs(os.path.dirname(dest), exist_ok=True)
    uuid_tmp = "dl3_" + str(uuid.uuid4())[:6]
    dest_tmp = dest + "_" + uuid_tmp + ".tmp"
    with open(dest_tmp, 'wb') as f:
        shutil.copyfileobj(url_data.raw, f)
    try:
        shutil.move(dest_tmp, dest)
    except FileExistsError:
        pass
    remove(dest_tmp)
    return dest


def get_url(x, from_str=False, short_name=False):
    if not from_str:
        source = x.metadata['source']
    else:
        source = x
    if short_name:
        source_name = get_short_name(source)
    else:
        source_name = source
    if source.startswith('http://') or source.startswith('https://'):
        return """<a href="%s" target="_blank"  rel="noopener noreferrer">%s</a>""" % (
            source, source_name)
    else:
        return """<a href="file/%s" target="_blank"  rel="noopener noreferrer">%s</a>""" % (
            source, source_name)


def get_short_name(name, maxl=50):
    if name is None:
        return ''
    length = len(name)
    if length > maxl:
        allow_length = maxl - 3
        half_allowed = max(1, int(allow_length / 2))
        name = name[0:half_allowed] + "..." + name[length - half_allowed:length]
    return name


def cuda_vis_check(total_gpus):
    """Helper function to count GPUs by environment variable
    Stolen from Jon's h2o4gpu utils
    """
    cudavis = os.getenv("CUDA_VISIBLE_DEVICES")
    which_gpus = []
    if cudavis is not None:
        # prune away white-space, non-numerics,
        # except commas for simple checking
        cudavis = "".join(cudavis.split())
        import re
        cudavis = re.sub("[^0-9,]", "", cudavis)

        lencudavis = len(cudavis)
        if lencudavis == 0:
            total_gpus = 0
        else:
            total_gpus = min(
                total_gpus,
                os.getenv("CUDA_VISIBLE_DEVICES").count(",") + 1)
            which_gpus = os.getenv("CUDA_VISIBLE_DEVICES").split(",")
            which_gpus = [int(x) for x in which_gpus]
    else:
        which_gpus = list(range(0, total_gpus))

    return total_gpus, which_gpus


def get_ngpus_vis(raise_if_exception=True):
    ngpus_vis1 = 0

    shell = False
    if shell:
        cmd = "nvidia-smi -L 2> /dev/null"
    else:
        cmd = ["nvidia-smi", "-L"]

    try:
        timeout = 5 * 3
        o = subprocess.check_output(cmd, shell=shell, timeout=timeout)
        lines = o.decode("utf-8").splitlines()
        ngpus_vis1 = 0
        for line in lines:
            if 'Failed to initialize NVML' not in line:
                ngpus_vis1 += 1
    except (FileNotFoundError, subprocess.CalledProcessError, OSError):
        # GPU systems might not have nvidia-smi, so can't fail
        pass
    except subprocess.TimeoutExpired as e:
        print('Failed get_ngpus_vis: %s' % str(e))
        if raise_if_exception:
            raise

    ngpus_vis1, which_gpus = cuda_vis_check(ngpus_vis1)
    return ngpus_vis1


def get_mem_gpus(raise_if_exception=True, ngpus=None):
    totalmem_gpus1 = 0
    usedmem_gpus1 = 0
    freemem_gpus1 = 0

    if ngpus == 0:
        return totalmem_gpus1, usedmem_gpus1, freemem_gpus1

    try:
        cmd = "nvidia-smi -q 2> /dev/null | grep -A 3 'FB Memory Usage'"
        o = subprocess.check_output(cmd, shell=True, timeout=15)
        lines = o.decode("utf-8").splitlines()
        for line in lines:
            if 'Total' in line:
                totalmem_gpus1 += int(line.split()[2]) * 1024 ** 2
            if 'Used' in line:
                usedmem_gpus1 += int(line.split()[2]) * 1024 ** 2
            if 'Free' in line:
                freemem_gpus1 += int(line.split()[2]) * 1024 ** 2
    except (FileNotFoundError, subprocess.CalledProcessError, OSError):
        # GPU systems might not have nvidia-smi, so can't fail
        pass
    except subprocess.TimeoutExpired as e:
        print('Failed get_mem_gpus: %s' % str(e))
        if raise_if_exception:
            raise

    return totalmem_gpus1, usedmem_gpus1, freemem_gpus1


class ForkContext(threading.local):
    """
        Set context for forking
        Ensures state is returned once done
    """

    def __init__(self, args=None, kwargs=None, forkdata_capable=True):
        """
        :param args:
        :param kwargs:
        :param forkdata_capable: whether fork is forkdata capable and will use copy-on-write forking of args/kwargs
        """
        self.forkdata_capable = forkdata_capable
        if self.forkdata_capable:
            self.has_args = args is not None
            self.has_kwargs = kwargs is not None
            forkdatacontext.args = args
            forkdatacontext.kwargs = kwargs
        else:
            self.has_args = False
            self.has_kwargs = False

    def __enter__(self):
        try:
            # flush all outputs so doesn't happen during fork -- don't print/log inside ForkContext contexts!
            sys.stdout.flush()
            sys.stderr.flush()
        except BaseException as e:
            # exit not called if exception, and don't want to leave forkdatacontext filled in that case
            print("ForkContext failure on enter: %s" % str(e))
            self.finally_act()
            raise
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.finally_act()

    def finally_act(self):
        """
            Done when exception hit or exit is reached in context
            first reset forkdatacontext as crucial to have reset even if later 2 calls fail
        :return: None
        """
        if self.forkdata_capable and (self.has_args or self.has_kwargs):
            forkdatacontext._reset()


class _ForkDataContext(threading.local):
    def __init__(
            self,
            args=None,
            kwargs=None,
    ):
        """
        Global context for fork to carry data to subprocess instead of relying upon copy/pickle/serialization

        :param args: args
        :param kwargs: kwargs
        """
        assert isinstance(args, (tuple, type(None)))
        assert isinstance(kwargs, (dict, type(None)))
        self.__args = args
        self.__kwargs = kwargs

    @property
    def args(self) -> Tuple:
        """returns args"""
        return self.__args

    @args.setter
    def args(self, args):
        if self.__args is not None:
            raise AttributeError(
                "args cannot be overwritten: %s %s" % (str(self.__args), str(self.__kwargs))
            )

        self.__args = args

    @property
    def kwargs(self) -> Dict:
        """returns kwargs"""
        return self.__kwargs

    @kwargs.setter
    def kwargs(self, kwargs):
        if self.__kwargs is not None:
            raise AttributeError(
                "kwargs cannot be overwritten: %s %s" % (str(self.__args), str(self.__kwargs))
            )

        self.__kwargs = kwargs

    def _reset(self):
        """Reset fork arg-kwarg context to default values"""
        self.__args = None
        self.__kwargs = None

    def get_args_kwargs(self, func, args, kwargs) -> Tuple[Callable, Tuple, Dict]:
        if self.__args:
            args = self.__args[1:]
            if not func:
                assert len(self.__args) > 0, "if have no func, must have in args"
                func = self.__args[0]  # should always be there
        if self.__kwargs:
            kwargs = self.__kwargs
        try:
            return func, args, kwargs
        finally:
            forkdatacontext._reset()

    @staticmethod
    def get_args_kwargs_for_traced_func(func, args, kwargs):
        """
        Return args/kwargs out of forkdatacontext when using copy-on-write way of passing args/kwargs
        :param func: actual function ran by _traced_func, which itself is directly what mppool treats as function
        :param args:
        :param kwargs:
        :return: func, args, kwargs from forkdatacontext if used, else originals
        """
        # first 3 lines are debug
        func_was_None = func is None
        args_was_None_or_empty = args is None or len(args) == 0
        kwargs_was_None_or_empty = kwargs is None or len(kwargs) == 0

        forkdatacontext_args_was_None = forkdatacontext.args is None
        forkdatacontext_kwargs_was_None = forkdatacontext.kwargs is None
        func, args, kwargs = forkdatacontext.get_args_kwargs(func, args, kwargs)
        using_forkdatacontext = func_was_None and func is not None  # pulled func out of forkdatacontext.__args[0]
        assert forkdatacontext.args is None, "forkdatacontext.args should be None after get_args_kwargs"
        assert forkdatacontext.kwargs is None, "forkdatacontext.kwargs should be None after get_args_kwargs"

        proc_type = kwargs.get('proc_type', 'SUBPROCESS')
        if using_forkdatacontext:
            assert proc_type == "SUBPROCESS" or proc_type == "SUBPROCESS"
        if proc_type == "NORMAL":
            assert forkdatacontext_args_was_None, "if no fork, expect forkdatacontext.args None entering _traced_func"
            assert forkdatacontext_kwargs_was_None, "if no fork, expect forkdatacontext.kwargs None entering _traced_func"
        assert func is not None, "function should not be None, indicates original args[0] was None or args was None"

        return func, args, kwargs


forkdatacontext = _ForkDataContext()


def _traced_func(func, *args, **kwargs):
    func, args, kwargs = forkdatacontext.get_args_kwargs_for_traced_func(func, args, kwargs)
    return func(*args, **kwargs)


def call_subprocess_onetask(func, args=None, kwargs=None):
    if isinstance(args, list):
        args = tuple(args)
    if args is None:
        args = ()
    if kwargs is None:
        kwargs = {}
    args = list(args)
    args = [func] + args
    args = tuple(args)
    with ForkContext(args=args, kwargs=kwargs):
        args = (None,)
        kwargs = {}
        with ProcessPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_traced_func, *args, **kwargs)
            return future.result()


class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()


def get_kwargs(func, exclude_names=None, **kwargs):
    func_names = list(inspect.signature(func).parameters)
    missing_kwargs = [x for x in func_names if x not in kwargs]
    if exclude_names:
        for k in exclude_names:
            if k in missing_kwargs:
                missing_kwargs.remove(k)
            if k in func_names:
                func_names.remove(k)
    assert not missing_kwargs, "Missing %s" % missing_kwargs
    kwargs = {k: v for k, v in kwargs.items() if k in func_names}
    return kwargs


import pkg_resources
have_faiss = False

try:
    assert pkg_resources.get_distribution('faiss') is not None
    have_faiss = True
except (pkg_resources.DistributionNotFound, AssertionError):
    pass
try:
    assert pkg_resources.get_distribution('faiss_gpu') is not None
    have_faiss = True
except (pkg_resources.DistributionNotFound, AssertionError):
    pass
try:
    assert pkg_resources.get_distribution('faiss_cpu') is not None
    have_faiss = True
except (pkg_resources.DistributionNotFound, AssertionError):
    pass


def hash_file(file):
    try:
        import hashlib

        # BUF_SIZE is totally arbitrary, change for your app!
        BUF_SIZE = 65536  # lets read stuff in 64kb chunks!

        md5 = hashlib.md5()
        #sha1 = hashlib.sha1()

        with open(file, 'rb') as f:
            while True:
                data = f.read(BUF_SIZE)
                if not data:
                    break
                md5.update(data)
                #sha1.update(data)
    except BaseException as e:
        print("Cannot hash %s due to %s" % (file, str(e)))
        traceback.print_exc()
        md5 = None
    return md5.hexdigest()


def start_faulthandler():
    # If hit server or any subprocess with signal SIGUSR1, it'll print out all threads stack trace, but wont't quit or coredump
    # If more than one fork tries to write at same time, then looks corrupted.
    import faulthandler
    import signal

    # SIGUSR1 in h2oai/__init__.py as well
    faulthandler.enable()
    faulthandler.register(signal.SIGUSR1)