Spaces:
Paused
Paused
"""Load Data from a MediaWiki dump xml.""" | |
import ast | |
import glob | |
import pickle | |
import uuid | |
from typing import List, Optional | |
import os | |
import bz2 | |
import csv | |
import numpy as np | |
import pandas as pd | |
import pytest | |
from matplotlib import pyplot as plt | |
from langchain.docstore.document import Document | |
from langchain.document_loaders import MWDumpLoader | |
# path where downloaded wiki files exist, to be processed | |
root_path = "/data/jon/h2o-llm" | |
def unescape(x): | |
try: | |
x = ast.literal_eval(x) | |
except: | |
try: | |
x = x.encode('ascii', 'ignore').decode('unicode_escape') | |
except: | |
pass | |
return x | |
def get_views(): | |
# views = pd.read_csv('wiki_page_views_more_1000month.csv') | |
views = pd.read_csv('wiki_page_views_more_5000month.csv') | |
views.index = views['title'] | |
views = views['views'] | |
views = views.to_dict() | |
views = {str(unescape(str(k))): v for k, v in views.items()} | |
views2 = {k.replace('_', ' '): v for k, v in views.items()} | |
# views has _ but pages has " " | |
views.update(views2) | |
return views | |
class MWDumpDirectLoader(MWDumpLoader): | |
def __init__(self, data: str, encoding: Optional[str] = "utf8", | |
title_words_limit=None, use_views=True, verbose=True): | |
"""Initialize with file path.""" | |
self.data = data | |
self.encoding = encoding | |
self.title_words_limit = title_words_limit | |
self.verbose = verbose | |
if use_views: | |
# self.views = get_views() | |
# faster to use global shared values | |
self.views = global_views | |
else: | |
self.views = None | |
def load(self) -> List[Document]: | |
"""Load from file path.""" | |
import mwparserfromhell | |
import mwxml | |
dump = mwxml.Dump.from_page_xml(self.data) | |
docs = [] | |
for page in dump.pages: | |
if self.views is not None and page.title not in self.views: | |
if self.verbose: | |
print("Skipped %s low views" % page.title, flush=True) | |
continue | |
for revision in page: | |
if self.title_words_limit is not None: | |
num_words = len(' '.join(page.title.split('_')).split(' ')) | |
if num_words > self.title_words_limit: | |
if self.verbose: | |
print("Skipped %s" % page.title, flush=True) | |
continue | |
if self.verbose: | |
if self.views is not None: | |
print("Kept %s views: %s" % (page.title, self.views[page.title]), flush=True) | |
else: | |
print("Kept %s" % page.title, flush=True) | |
code = mwparserfromhell.parse(revision.text) | |
text = code.strip_code( | |
normalize=True, collapse=True, keep_template_params=False | |
) | |
title_url = str(page.title).replace(' ', '_') | |
metadata = dict(title=page.title, | |
source="https://en.wikipedia.org/wiki/" + title_url, | |
id=page.id, | |
redirect=page.redirect, | |
views=self.views[page.title] if self.views is not None else -1, | |
) | |
metadata = {k: v for k, v in metadata.items() if v is not None} | |
docs.append(Document(page_content=text, metadata=metadata)) | |
return docs | |
def search_index(search_term, index_filename): | |
byte_flag = False | |
data_length = start_byte = 0 | |
index_file = open(index_filename, 'r') | |
csv_reader = csv.reader(index_file, delimiter=':') | |
for line in csv_reader: | |
if not byte_flag and search_term == line[2]: | |
start_byte = int(line[0]) | |
byte_flag = True | |
elif byte_flag and int(line[0]) != start_byte: | |
data_length = int(line[0]) - start_byte | |
break | |
index_file.close() | |
return start_byte, data_length | |
def get_start_bytes(index_filename): | |
index_file = open(index_filename, 'r') | |
csv_reader = csv.reader(index_file, delimiter=':') | |
start_bytes = set() | |
for line in csv_reader: | |
start_bytes.add(int(line[0])) | |
index_file.close() | |
return sorted(start_bytes) | |
def get_wiki_filenames(): | |
# requires | |
# wget http://ftp.acc.umu.se/mirror/wikimedia.org/dumps/enwiki/20230401/enwiki-20230401-pages-articles-multistream-index.txt.bz2 | |
base_path = os.path.join(root_path, 'enwiki-20230401-pages-articles-multistream') | |
index_file = 'enwiki-20230401-pages-articles-multistream-index.txt' | |
index_filename = os.path.join(base_path, index_file) | |
wiki_filename = os.path.join(base_path, 'enwiki-20230401-pages-articles-multistream.xml.bz2') | |
return index_filename, wiki_filename | |
def get_documents_by_search_term(search_term): | |
index_filename, wiki_filename = get_wiki_filenames() | |
start_byte, data_length = search_index(search_term, index_filename) | |
with open(wiki_filename, 'rb') as wiki_file: | |
wiki_file.seek(start_byte) | |
data = bz2.BZ2Decompressor().decompress(wiki_file.read(data_length)) | |
loader = MWDumpDirectLoader(data.decode()) | |
documents = loader.load() | |
return documents | |
def get_one_chunk(wiki_filename, start_byte, end_byte, return_file=True, | |
title_words_limit=None, | |
use_views=True): | |
data_length = end_byte - start_byte | |
with open(wiki_filename, 'rb') as wiki_file: | |
wiki_file.seek(start_byte) | |
data = bz2.BZ2Decompressor().decompress(wiki_file.read(data_length)) | |
loader = MWDumpDirectLoader(data.decode(), title_words_limit=title_words_limit, | |
use_views=use_views) | |
documents1 = loader.load() | |
if return_file: | |
base_tmp = "temp_wiki" | |
if not os.path.isdir(base_tmp): | |
os.makedirs(base_tmp, exist_ok=True) | |
filename = os.path.join(base_tmp, str(uuid.uuid4()) + ".tmp.pickle") | |
with open(filename, 'wb') as f: | |
pickle.dump(documents1, f) | |
return filename | |
return documents1 | |
from joblib import Parallel, delayed | |
global_views = get_views() | |
def get_all_documents(small_test=2, n_jobs=None, use_views=True): | |
print("DO get all wiki docs: %s" % small_test, flush=True) | |
index_filename, wiki_filename = get_wiki_filenames() | |
start_bytes = get_start_bytes(index_filename) | |
end_bytes = start_bytes[1:] | |
start_bytes = start_bytes[:-1] | |
if small_test: | |
start_bytes = start_bytes[:small_test] | |
end_bytes = end_bytes[:small_test] | |
if n_jobs is None: | |
n_jobs = 5 | |
else: | |
if n_jobs is None: | |
n_jobs = os.cpu_count() // 4 | |
# default loky backend leads to name space conflict problems | |
return_file = True # large return from joblib hangs | |
documents = Parallel(n_jobs=n_jobs, verbose=10, backend='multiprocessing')( | |
delayed(get_one_chunk)(wiki_filename, start_byte, end_byte, | |
return_file=return_file, use_views=use_views) for start_byte, end_byte in | |
zip(start_bytes, end_bytes)) | |
if return_file: | |
# then documents really are files | |
files = documents.copy() | |
documents = [] | |
for fil in files: | |
with open(fil, 'rb') as f: | |
documents.extend(pickle.load(f)) | |
os.remove(fil) | |
else: | |
from functools import reduce | |
from operator import concat | |
documents = reduce(concat, documents) | |
assert isinstance(documents, list) | |
print("DONE get all wiki docs", flush=True) | |
return documents | |
def test_by_search_term(): | |
search_term = 'Apollo' | |
assert len(get_documents_by_search_term(search_term)) == 100 | |
search_term = 'Abstract (law)' | |
assert len(get_documents_by_search_term(search_term)) == 100 | |
search_term = 'Artificial languages' | |
assert len(get_documents_by_search_term(search_term)) == 100 | |
def test_start_bytes(): | |
index_filename, wiki_filename = get_wiki_filenames() | |
assert len(get_start_bytes(index_filename)) == 227850 | |
def test_get_all_documents(): | |
small_test = 20 # 227850 | |
n_jobs = os.cpu_count() // 4 | |
assert len(get_all_documents(small_test=small_test, n_jobs=n_jobs, use_views=False)) == small_test * 100 | |
assert len(get_all_documents(small_test=small_test, n_jobs=n_jobs, use_views=True)) == 429 | |
def get_one_pageviews(fil): | |
df1 = pd.read_csv(fil, sep=' ', header=None, names=['region', 'title', 'views', 'foo'], quoting=csv.QUOTE_NONE) | |
df1.index = df1['title'] | |
df1 = df1[df1['region'] == 'en'] | |
df1 = df1.drop('region', axis=1) | |
df1 = df1.drop('foo', axis=1) | |
df1 = df1.drop('title', axis=1) # already index | |
base_tmp = "temp_wiki_pageviews" | |
if not os.path.isdir(base_tmp): | |
os.makedirs(base_tmp, exist_ok=True) | |
filename = os.path.join(base_tmp, str(uuid.uuid4()) + ".tmp.csv") | |
df1.to_csv(filename, index=True) | |
return filename | |
def test_agg_pageviews(gen_files=False): | |
if gen_files: | |
path = os.path.join(root_path, 'wiki_pageviews/dumps.wikimedia.org/other/pageviews/2023/2023-04') | |
files = glob.glob(os.path.join(path, 'pageviews*.gz')) | |
# files = files[:2] # test | |
n_jobs = os.cpu_count() // 2 | |
csv_files = Parallel(n_jobs=n_jobs, verbose=10, backend='multiprocessing')( | |
delayed(get_one_pageviews)(fil) for fil in files) | |
else: | |
# to continue without redoing above | |
csv_files = glob.glob(os.path.join(root_path, 'temp_wiki_pageviews/*.csv')) | |
df_list = [] | |
for csv_file in csv_files: | |
print(csv_file) | |
df1 = pd.read_csv(csv_file) | |
df_list.append(df1) | |
df = pd.concat(df_list, axis=0) | |
df = df.groupby('title')['views'].sum().reset_index() | |
df.to_csv("wiki_page_views.csv", index=True) | |
def test_reduce_pageview(): | |
filename = "wiki_page_views.csv" | |
df = pd.read_csv(filename) | |
df = df[df['views'] < 1e7] | |
# | |
plt.hist(df['views'], bins=100, log=True) | |
views_avg = np.mean(df['views']) | |
views_median = np.median(df['views']) | |
plt.title("Views avg: %s median: %s" % (views_avg, views_median)) | |
plt.savefig(filename.replace('.csv', '.png')) | |
plt.close() | |
# | |
views_limit = 5000 | |
df = df[df['views'] > views_limit] | |
filename = "wiki_page_views_more_5000month.csv" | |
df.to_csv(filename, index=True) | |
# | |
plt.hist(df['views'], bins=100, log=True) | |
views_avg = np.mean(df['views']) | |
views_median = np.median(df['views']) | |
plt.title("Views avg: %s median: %s" % (views_avg, views_median)) | |
plt.savefig(filename.replace('.csv', '.png')) | |
plt.close() | |
def test_do_wiki_full_all(): | |
# Install other requirements for wiki specific conversion: | |
# pip install -r reqs_optional/requirements_optional_wikiprocessing.txt | |
# Use "Transmission" in Ubuntu to get wiki dump using torrent: | |
# See: https://meta.wikimedia.org/wiki/Data_dump_torrents | |
# E.g. magnet:?xt=urn:btih:b2c74af2b1531d0b63f1166d2011116f44a8fed0&dn=enwiki-20230401-pages-articles-multistream.xml.bz2&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337 | |
# Get index | |
os.system("wget http://ftp.acc.umu.se/mirror/wikimedia.org/dumps/enwiki/20230401/enwiki-20230401-pages-articles-multistream-index.txt.bz2") | |
# Test that can use LangChain to get docs from subset of wiki as sampled out of full wiki directly using bzip multistream | |
test_get_all_documents() | |
# Check can search wiki multistream | |
test_by_search_term() | |
# Test can get all start bytes in index | |
test_start_bytes() | |
# Get page views, e.g. for entire month of April 2023 | |
os.system("wget -b -m -k -o wget.log -e robots=off https://dumps.wikimedia.org/other/pageviews/2023/2023-04/") | |
# Aggregate page views from many files into single file | |
test_agg_pageviews(gen_files=True) | |
# Reduce page views to some limit, so processing of full wiki is not too large | |
test_reduce_pageview() | |
# Start generate.py with requesting wiki_full in prep. This will use page views as referenced in get_views. | |
# Note get_views as global() function done once is required to avoid very slow processing | |
# WARNING: Requires alot of memory to handle, used up to 300GB system RAM at peak | |
""" | |
python generate.py --langchain_mode='wiki_full' --langchain_modes="['wiki_full', 'UserData', 'MyData', 'github h2oGPT', 'DriverlessAI docs']" &> lc_out.log | |
""" | |