|
import pandas as pd |
|
import numpy as np |
|
import requests |
|
import zlib |
|
import os |
|
import shelve |
|
import magic |
|
from multiprocessing import Pool |
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _df_split_apply(tup_arg): |
|
split_ind, subset, func = tup_arg |
|
r = subset.apply(func, axis=1) |
|
return (split_ind, r) |
|
|
|
def df_multiprocess(df, processes, chunk_size, func, dataset_name): |
|
print("Generating parts...") |
|
with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results: |
|
|
|
pbar = tqdm(total=len(df), position=0) |
|
|
|
finished_chunks = set([int(k) for k in results.keys()]) |
|
pbar.desc = "Resuming" |
|
for k in results.keys(): |
|
pbar.update(len(results[str(k)][1])) |
|
|
|
pool_data = ((index, df[i:i + chunk_size], func) for index, i in enumerate(range(0, len(df), chunk_size)) if index not in finished_chunks) |
|
print(int(len(df) / chunk_size), "parts.", chunk_size, "per part.", "Using", processes, "processes") |
|
|
|
pbar.desc = "Downloading" |
|
with Pool(processes) as pool: |
|
for i, result in enumerate(pool.imap_unordered(_df_split_apply, pool_data, 2)): |
|
results[str(result[0])] = result |
|
pbar.update(len(result[1])) |
|
pbar.close() |
|
|
|
print("Finished Downloading.") |
|
return |
|
|
|
|
|
def _file_name(row): |
|
row.name = str(int(row.name) // 1000) |
|
return "%s/%s_%s.jpg" % (row['folder'], row.name, (zlib.crc32(row['url'].encode('utf-8')) & 0xffffffff)) |
|
|
|
|
|
def check_mimetype(row): |
|
if os.path.isfile(str(row['file'])): |
|
row['mimetype'] = magic.from_file(row['file'], mime=True) |
|
row['size'] = os.stat(row['file']).st_size |
|
return row |
|
|
|
|
|
|
|
def check_download(row): |
|
fname = _file_name(row) |
|
sub_dir = fname.split('_')[0] |
|
if not os.path.exists(sub_dir): |
|
os.mkdir(sub_dir) |
|
fname = '/'.join(fname.split('_')) |
|
try: |
|
|
|
response = requests.head(row['url'], stream=False, timeout=5, allow_redirects=True ) |
|
row['status'] = response.status_code |
|
row['headers'] = dict(response.headers) |
|
except: |
|
|
|
row['status'] = 408 |
|
return row |
|
if response.ok: |
|
row['file'] = fname |
|
return row |
|
|
|
def download_image(row): |
|
|
|
fname = _file_name(row) |
|
sub_dir = fname.split('_')[0] |
|
if not os.path.exists(sub_dir): |
|
os.mkdir(sub_dir) |
|
fname = '/'.join(fname.split('_')) |
|
|
|
|
|
if os.path.isfile(fname): |
|
row['status'] = 200 |
|
row['file'] = fname |
|
row['mimetype'] = magic.from_file(row['file'], mime=True) |
|
row['size'] = os.stat(row['file']).st_size |
|
return row |
|
|
|
try: |
|
|
|
response = requests.get(row['url'], stream=False, timeout=10, allow_redirects=True ) |
|
row['status'] = response.status_code |
|
|
|
except Exception as e: |
|
|
|
row['status'] = 408 |
|
return row |
|
|
|
if response.ok: |
|
try: |
|
with open(fname, 'wb') as out_file: |
|
|
|
response.raw.decode_content = True |
|
out_file.write(response.content) |
|
row['mimetype'] = magic.from_file(fname, mime=True) |
|
row['size'] = os.stat(fname).st_size |
|
except: |
|
|
|
row['status'] = 408 |
|
return row |
|
row['file'] = fname |
|
return row |
|
|
|
def open_tsv(fname, folder): |
|
print("Opening %s Data File..." % fname) |
|
df = pd.read_csv(fname, sep='\t', names=["caption","url"], usecols=range(1,2)) |
|
df['folder'] = folder |
|
print("Processing", len(df), " Images:") |
|
return df |
|
|
|
def df_from_shelve(chunk_size, func, dataset_name): |
|
print("Generating Dataframe from results...") |
|
with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results: |
|
keylist = sorted([int(k) for k in results.keys()]) |
|
df = pd.concat([results[str(k)][1] for k in keylist], sort=True) |
|
return df |
|
|
|
|
|
num_processes = 32 |
|
|
|
images_per_part = 100 |
|
|
|
|
|
|
|
data_name = "/CC3M/images/validation" |
|
df = open_tsv("/CC3M/validation.tsv", data_name) |
|
df_multiprocess(df=df, processes=num_processes, chunk_size=images_per_part, func=download_image, dataset_name=data_name) |
|
df = df_from_shelve(chunk_size=images_per_part, func=download_image, dataset_name=data_name) |
|
df.to_csv("%s_report.tsv.gz" % data_name, compression='gzip', sep='\t', header=False, index=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|