File size: 4,117 Bytes
7feac49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
from queue import Queue
from threading import Thread
from typing import Any, Dict, List, Literal, Union

import json
import requests
import torch.distributed as dist
from accelerate.utils import gather_object
from modelscope.hub.api import ModelScopeConfig
from tqdm import tqdm

from .env import is_master
from .logger import get_logger
from .utils import check_json_format

logger = get_logger()


def download_ms_file(url: str, local_path: str, cookies=None) -> None:
    if cookies is None:
        cookies = ModelScopeConfig.get_cookies()
    resp = requests.get(url, cookies=cookies, stream=True)
    with open(local_path, 'wb') as f:
        for data in tqdm(resp.iter_lines()):
            f.write(data)


def read_from_jsonl(fpath: str, encoding: str = 'utf-8') -> List[Any]:
    res: List[Any] = []
    with open(fpath, 'r', encoding=encoding) as f:
        for line in f:
            res.append(json.loads(line))
    return res


def write_to_jsonl(fpath: str, obj_list: List[Any], encoding: str = 'utf-8') -> None:
    res: List[str] = []
    for obj in obj_list:
        res.append(json.dumps(obj, ensure_ascii=False))
    with open(fpath, 'w', encoding=encoding) as f:
        text = '\n'.join(res)
        f.write(f'{text}\n')


class JsonlWriter:

    def __init__(self, fpath: str, *, encoding: str = 'utf-8', strict: bool = True, enable_async: bool = False):
        self.fpath = os.path.abspath(os.path.expanduser(fpath)) if is_master() else None
        self.encoding = encoding
        self.strict = strict
        self.enable_async = enable_async
        self._queue = Queue()
        self._thread = None

    def _append_worker(self):
        while True:
            item = self._queue.get()
            self._append(**item)

    def _append(self, obj: Union[Dict, List[Dict]], gather_obj: bool = False):
        if isinstance(obj, (list, tuple)) and all(isinstance(item, dict) for item in obj):
            obj_list = obj
        else:
            obj_list = [obj]
        if gather_obj and dist.is_initialized():
            obj_list = gather_object(obj_list)
        if not is_master():
            return
        obj_list = check_json_format(obj_list)
        for i, _obj in enumerate(obj_list):
            obj_list[i] = json.dumps(_obj, ensure_ascii=False) + '\n'
        self._write_buffer(''.join(obj_list))

    def append(self, obj: Union[Dict, List[Dict]], gather_obj: bool = False):
        if self.enable_async:
            if self._thread is None:
                self._thread = Thread(target=self._append_worker, daemon=True)
                self._thread.start()
            self._queue.put({'obj': obj, 'gather_obj': gather_obj})
        else:
            self._append(obj, gather_obj=gather_obj)

    def _write_buffer(self, text: str):
        if not text:
            return
        assert is_master(), f'is_master(): {is_master()}'
        try:
            os.makedirs(os.path.dirname(self.fpath), exist_ok=True)
            with open(self.fpath, 'a', encoding=self.encoding) as f:
                f.write(text)
        except Exception:
            if self.strict:
                raise
            logger.error(f'Cannot write content to jsonl file. text: {text}')


def append_to_jsonl(fpath: str, obj: Union[Dict, List[Dict]], *, encoding: str = 'utf-8', strict: bool = True) -> None:
    jsonl_writer = JsonlWriter(fpath, encoding=encoding, strict=strict)
    jsonl_writer.append(obj)


def get_file_mm_type(file_name: str) -> Literal['image', 'video', 'audio']:
    video_extensions = {'.mp4', '.mkv', '.mov', '.avi', '.wmv', '.flv', '.webm'}
    audio_extensions = {'.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a'}
    image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}

    _, ext = os.path.splitext(file_name)

    if ext.lower() in video_extensions:
        return 'video'
    elif ext.lower() in audio_extensions:
        return 'audio'
    elif ext.lower() in image_extensions:
        return 'image'
    else:
        raise ValueError(f'file_name: {file_name}, ext: {ext}')