File size: 12,582 Bytes
b585c7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
"""Load Data from a MediaWiki dump xml."""
import ast
import glob
import pickle
import uuid
from typing import List, Optional
import os
import bz2
import csv
import numpy as np
import pandas as pd
import pytest
from matplotlib import pyplot as plt

from langchain.docstore.document import Document
from langchain.document_loaders import MWDumpLoader

# path where downloaded wiki files exist, to be processed
root_path = "/data/jon/h2o-llm"


def unescape(x):
    try:
        x = ast.literal_eval(x)
    except:
        try:
            x = x.encode('ascii', 'ignore').decode('unicode_escape')
        except:
            pass
    return x


def get_views():
    # views = pd.read_csv('wiki_page_views_more_1000month.csv')
    views = pd.read_csv('wiki_page_views_more_5000month.csv')
    views.index = views['title']
    views = views['views']
    views = views.to_dict()
    views = {str(unescape(str(k))): v for k, v in views.items()}
    views2 = {k.replace('_', ' '): v for k, v in views.items()}
    # views has _ but pages has " "
    views.update(views2)
    return views


class MWDumpDirectLoader(MWDumpLoader):
    def __init__(self, data: str, encoding: Optional[str] = "utf8",
                 title_words_limit=None, use_views=True, verbose=True):
        """Initialize with file path."""
        self.data = data
        self.encoding = encoding
        self.title_words_limit = title_words_limit
        self.verbose = verbose
        if use_views:
            # self.views = get_views()
            # faster to use global shared values
            self.views = global_views
        else:
            self.views = None

    def load(self) -> List[Document]:
        """Load from file path."""
        import mwparserfromhell
        import mwxml

        dump = mwxml.Dump.from_page_xml(self.data)

        docs = []

        for page in dump.pages:
            if self.views is not None and page.title not in self.views:
                if self.verbose:
                    print("Skipped %s low views" % page.title, flush=True)
                continue
            for revision in page:
                if self.title_words_limit is not None:
                    num_words = len(' '.join(page.title.split('_')).split(' '))
                    if num_words > self.title_words_limit:
                        if self.verbose:
                            print("Skipped %s" % page.title, flush=True)
                        continue
                if self.verbose:
                    if self.views is not None:
                        print("Kept %s views: %s" % (page.title, self.views[page.title]), flush=True)
                    else:
                        print("Kept %s" % page.title, flush=True)

                code = mwparserfromhell.parse(revision.text)
                text = code.strip_code(
                    normalize=True, collapse=True, keep_template_params=False
                )
                title_url = str(page.title).replace(' ', '_')
                metadata = dict(title=page.title,
                                source="https://en.wikipedia.org/wiki/" + title_url,
                                id=page.id,
                                redirect=page.redirect,
                                views=self.views[page.title] if self.views is not None else -1,
                                )
                metadata = {k: v for k, v in metadata.items() if v is not None}
                docs.append(Document(page_content=text, metadata=metadata))

        return docs


def search_index(search_term, index_filename):
    byte_flag = False
    data_length = start_byte = 0
    index_file = open(index_filename, 'r')
    csv_reader = csv.reader(index_file, delimiter=':')
    for line in csv_reader:
        if not byte_flag and search_term == line[2]:
            start_byte = int(line[0])
            byte_flag = True
        elif byte_flag and int(line[0]) != start_byte:
            data_length = int(line[0]) - start_byte
            break
    index_file.close()
    return start_byte, data_length


def get_start_bytes(index_filename):
    index_file = open(index_filename, 'r')
    csv_reader = csv.reader(index_file, delimiter=':')
    start_bytes = set()
    for line in csv_reader:
        start_bytes.add(int(line[0]))
    index_file.close()
    return sorted(start_bytes)


def get_wiki_filenames():
    # requires
    # wget http://ftp.acc.umu.se/mirror/wikimedia.org/dumps/enwiki/20230401/enwiki-20230401-pages-articles-multistream-index.txt.bz2
    base_path = os.path.join(root_path, 'enwiki-20230401-pages-articles-multistream')
    index_file = 'enwiki-20230401-pages-articles-multistream-index.txt'
    index_filename = os.path.join(base_path, index_file)
    wiki_filename = os.path.join(base_path, 'enwiki-20230401-pages-articles-multistream.xml.bz2')
    return index_filename, wiki_filename


def get_documents_by_search_term(search_term):
    index_filename, wiki_filename = get_wiki_filenames()
    start_byte, data_length = search_index(search_term, index_filename)
    with open(wiki_filename, 'rb') as wiki_file:
        wiki_file.seek(start_byte)
        data = bz2.BZ2Decompressor().decompress(wiki_file.read(data_length))

    loader = MWDumpDirectLoader(data.decode())
    documents = loader.load()
    return documents


def get_one_chunk(wiki_filename, start_byte, end_byte, return_file=True,
                  title_words_limit=None,
                  use_views=True):
    data_length = end_byte - start_byte
    with open(wiki_filename, 'rb') as wiki_file:
        wiki_file.seek(start_byte)
        data = bz2.BZ2Decompressor().decompress(wiki_file.read(data_length))

    loader = MWDumpDirectLoader(data.decode(), title_words_limit=title_words_limit,
                                use_views=use_views)
    documents1 = loader.load()
    if return_file:
        base_tmp = "temp_wiki"
        if not os.path.isdir(base_tmp):
            os.makedirs(base_tmp, exist_ok=True)
        filename = os.path.join(base_tmp, str(uuid.uuid4()) + ".tmp.pickle")
        with open(filename, 'wb') as f:
            pickle.dump(documents1, f)
        return filename
    return documents1


from joblib import Parallel, delayed

global_views = get_views()


def get_all_documents(small_test=2, n_jobs=None, use_views=True):
    print("DO get all wiki docs: %s" % small_test, flush=True)
    index_filename, wiki_filename = get_wiki_filenames()
    start_bytes = get_start_bytes(index_filename)
    end_bytes = start_bytes[1:]
    start_bytes = start_bytes[:-1]

    if small_test:
        start_bytes = start_bytes[:small_test]
        end_bytes = end_bytes[:small_test]
        if n_jobs is None:
            n_jobs = 5
    else:
        if n_jobs is None:
            n_jobs = os.cpu_count() // 4

    # default loky backend leads to name space conflict problems
    return_file = True  # large return from joblib hangs
    documents = Parallel(n_jobs=n_jobs, verbose=10, backend='multiprocessing')(
        delayed(get_one_chunk)(wiki_filename, start_byte, end_byte,
                               return_file=return_file, use_views=use_views) for start_byte, end_byte in
        zip(start_bytes, end_bytes))
    if return_file:
        # then documents really are files
        files = documents.copy()
        documents = []
        for fil in files:
            with open(fil, 'rb') as f:
                documents.extend(pickle.load(f))
            os.remove(fil)
    else:
        from functools import reduce
        from operator import concat
        documents = reduce(concat, documents)
    assert isinstance(documents, list)

    print("DONE get all wiki docs", flush=True)
    return documents


def test_by_search_term():
    search_term = 'Apollo'
    assert len(get_documents_by_search_term(search_term)) == 100

    search_term = 'Abstract (law)'
    assert len(get_documents_by_search_term(search_term)) == 100

    search_term = 'Artificial languages'
    assert len(get_documents_by_search_term(search_term)) == 100


def test_start_bytes():
    index_filename, wiki_filename = get_wiki_filenames()
    assert len(get_start_bytes(index_filename)) == 227850


def test_get_all_documents():
    small_test = 20  # 227850
    n_jobs = os.cpu_count() // 4

    assert len(get_all_documents(small_test=small_test, n_jobs=n_jobs, use_views=False)) == small_test * 100

    assert len(get_all_documents(small_test=small_test, n_jobs=n_jobs, use_views=True)) == 429


def get_one_pageviews(fil):
    df1 = pd.read_csv(fil, sep=' ', header=None, names=['region', 'title', 'views', 'foo'], quoting=csv.QUOTE_NONE)
    df1.index = df1['title']
    df1 = df1[df1['region'] == 'en']
    df1 = df1.drop('region', axis=1)
    df1 = df1.drop('foo', axis=1)
    df1 = df1.drop('title', axis=1)  # already index

    base_tmp = "temp_wiki_pageviews"
    if not os.path.isdir(base_tmp):
        os.makedirs(base_tmp, exist_ok=True)
    filename = os.path.join(base_tmp, str(uuid.uuid4()) + ".tmp.csv")
    df1.to_csv(filename, index=True)
    return filename


def test_agg_pageviews(gen_files=False):
    if gen_files:
        path = os.path.join(root_path, 'wiki_pageviews/dumps.wikimedia.org/other/pageviews/2023/2023-04')
        files = glob.glob(os.path.join(path, 'pageviews*.gz'))
        # files = files[:2]  # test
        n_jobs = os.cpu_count() // 2
        csv_files = Parallel(n_jobs=n_jobs, verbose=10, backend='multiprocessing')(
            delayed(get_one_pageviews)(fil) for fil in files)
    else:
        # to continue without redoing above
        csv_files = glob.glob(os.path.join(root_path, 'temp_wiki_pageviews/*.csv'))

    df_list = []
    for csv_file in csv_files:
        print(csv_file)
        df1 = pd.read_csv(csv_file)
        df_list.append(df1)
    df = pd.concat(df_list, axis=0)
    df = df.groupby('title')['views'].sum().reset_index()
    df.to_csv("wiki_page_views.csv", index=True)


def test_reduce_pageview():
    filename = "wiki_page_views.csv"
    df = pd.read_csv(filename)
    df = df[df['views'] < 1e7]
    #
    plt.hist(df['views'], bins=100, log=True)
    views_avg = np.mean(df['views'])
    views_median = np.median(df['views'])
    plt.title("Views avg: %s median: %s" % (views_avg, views_median))
    plt.savefig(filename.replace('.csv', '.png'))
    plt.close()
    #
    views_limit = 5000
    df = df[df['views'] > views_limit]
    filename = "wiki_page_views_more_5000month.csv"
    df.to_csv(filename, index=True)
    #
    plt.hist(df['views'], bins=100, log=True)
    views_avg = np.mean(df['views'])
    views_median = np.median(df['views'])
    plt.title("Views avg: %s median: %s" % (views_avg, views_median))
    plt.savefig(filename.replace('.csv', '.png'))
    plt.close()


@pytest.mark.skip("Only if doing full processing again, some manual steps")
def test_do_wiki_full_all():
    # Install other requirements for wiki specific conversion:
    # pip install -r reqs_optional/requirements_optional_wikiprocessing.txt

    # Use "Transmission" in Ubuntu to get wiki dump using torrent:
    # See: https://meta.wikimedia.org/wiki/Data_dump_torrents
    # E.g. magnet:?xt=urn:btih:b2c74af2b1531d0b63f1166d2011116f44a8fed0&dn=enwiki-20230401-pages-articles-multistream.xml.bz2&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337

    # Get index
    os.system("wget http://ftp.acc.umu.se/mirror/wikimedia.org/dumps/enwiki/20230401/enwiki-20230401-pages-articles-multistream-index.txt.bz2")

    # Test that can use LangChain to get docs from subset of wiki as sampled out of full wiki directly using bzip multistream
    test_get_all_documents()

    # Check can search wiki multistream
    test_by_search_term()

    # Test can get all start bytes in index
    test_start_bytes()

    # Get page views, e.g. for entire month of April 2023
    os.system("wget -b -m -k -o wget.log -e robots=off https://dumps.wikimedia.org/other/pageviews/2023/2023-04/")

    # Aggregate page views from many files into single file
    test_agg_pageviews(gen_files=True)

    # Reduce page views to some limit, so processing of full wiki is not too large
    test_reduce_pageview()

    # Start generate.py with requesting wiki_full in prep.  This will use page views as referenced in get_views.
    # Note get_views as global() function done once is required to avoid very slow processing
    # WARNING: Requires alot of memory to handle, used up to 300GB system RAM at peak
    """
    python generate.py --langchain_mode='wiki_full' --langchain_modes="['wiki_full', 'UserData', 'MyData', 'github h2oGPT', 'DriverlessAI docs']" &> lc_out.log
    """