|
"""
|
|
Copyright (c) 2022, salesforce.com, inc.
|
|
All rights reserved.
|
|
SPDX-License-Identifier: BSD-3-Clause
|
|
For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
|
|
"""
|
|
|
|
import time
|
|
from PIL import Image
|
|
from lavis.common.utils import get_abs_path, get_cache_path
|
|
from multiprocessing import Pool
|
|
from omegaconf import OmegaConf
|
|
from pathlib import Path
|
|
from torchvision.transforms import functional as TF
|
|
from tqdm import tqdm
|
|
import glob
|
|
import io
|
|
import json
|
|
import magic
|
|
import numpy as np
|
|
import os
|
|
import pandas as pd
|
|
import requests
|
|
import shelve
|
|
import zlib
|
|
|
|
headers = {
|
|
|
|
"User-Agent": "Googlebot-Image/1.0",
|
|
"X-Forwarded-For": "64.18.15.200",
|
|
}
|
|
|
|
|
|
def _df_split_apply(tup_arg):
|
|
split_ind, subset, func = tup_arg
|
|
r = subset.apply(func, axis=1)
|
|
return (split_ind, r)
|
|
|
|
|
|
def df_multiprocess(df, processes, chunk_size, func, dataset_name):
|
|
print("Generating parts...")
|
|
with shelve.open(
|
|
"%s_%s_%s_results.tmp" % (dataset_name, func.__name__, chunk_size)
|
|
) as results:
|
|
|
|
pbar = tqdm(total=len(df), position=0)
|
|
|
|
finished_chunks = set([int(k) for k in results.keys()])
|
|
pbar.desc = "Resuming"
|
|
for k in results.keys():
|
|
pbar.update(len(results[str(k)][1]))
|
|
|
|
pool_data = (
|
|
(index, df[i : i + chunk_size], func)
|
|
for index, i in enumerate(range(0, len(df), chunk_size))
|
|
if index not in finished_chunks
|
|
)
|
|
print(
|
|
int(len(df) / chunk_size),
|
|
"parts.",
|
|
chunk_size,
|
|
"per part.",
|
|
"Using",
|
|
processes,
|
|
"processes",
|
|
)
|
|
|
|
pbar.desc = "Downloading"
|
|
with Pool(processes) as pool:
|
|
for i, result in enumerate(
|
|
pool.imap_unordered(_df_split_apply, pool_data, 2)
|
|
):
|
|
results[str(result[0])] = result
|
|
pbar.update(len(result[1]))
|
|
pbar.close()
|
|
|
|
print("Finished Downloading.")
|
|
return
|
|
|
|
|
|
|
|
def _file_name(row):
|
|
name = (
|
|
"%s/%s_%s"
|
|
% (
|
|
|
|
storage_dir,
|
|
row.name,
|
|
(zlib.crc32(row["url"].encode("utf-8")) & 0xFFFFFFFF),
|
|
)
|
|
+ ".jpg"
|
|
)
|
|
return name
|
|
|
|
|
|
|
|
def check_mimetype(row):
|
|
if os.path.isfile(str(row["file"])):
|
|
row["mimetype"] = magic.from_file(row["file"], mime=True)
|
|
row["size"] = os.stat(row["file"]).st_size
|
|
return row
|
|
|
|
|
|
|
|
|
|
def check_download(row):
|
|
fname = _file_name(row)
|
|
try:
|
|
|
|
response = requests.head(
|
|
row["url"], stream=False, timeout=5, allow_redirects=True, headers=headers
|
|
)
|
|
row["status"] = response.status_code
|
|
row["headers"] = dict(response.headers)
|
|
except:
|
|
|
|
row["status"] = 408
|
|
return row
|
|
if response.ok:
|
|
row["file"] = fname
|
|
return row
|
|
|
|
|
|
def resize_img(req):
|
|
image = Image.open(req).convert("RGB")
|
|
image = TF.resize(
|
|
|
|
image,
|
|
size=resize_size,
|
|
)
|
|
return image
|
|
|
|
|
|
def download_image(row):
|
|
fname = _file_name(row)
|
|
|
|
if os.path.isfile(fname):
|
|
row["status"] = 200
|
|
row["file"] = fname
|
|
row["mimetype"] = magic.from_file(row["file"], mime=True)
|
|
row["size"] = os.stat(row["file"]).st_size
|
|
return row
|
|
|
|
try:
|
|
|
|
response = requests.get(
|
|
row["url"], stream=False, timeout=5, allow_redirects=True, headers=headers
|
|
)
|
|
row["status"] = response.status_code
|
|
|
|
except Exception as e:
|
|
|
|
row["status"] = 408
|
|
return row
|
|
|
|
if response.ok:
|
|
try:
|
|
|
|
response.raw.decode_content = True
|
|
img = resize_img(io.BytesIO(response.content))
|
|
img.save(fname)
|
|
|
|
row["mimetype"] = magic.from_file(fname, mime=True)
|
|
row["size"] = os.stat(fname).st_size
|
|
|
|
except Exception as e:
|
|
|
|
row["status"] = 408
|
|
|
|
row["file"] = fname
|
|
return row
|
|
|
|
|
|
def open_tsv(fname, folder):
|
|
print("Opening %s Data File..." % fname)
|
|
df = pd.read_csv(
|
|
fname, sep="\t", names=["url", "caption"]
|
|
)
|
|
df["folder"] = folder
|
|
print("Processing", len(df), " Images:")
|
|
return df
|
|
|
|
|
|
def df_from_shelve(chunk_size, func, dataset_name):
|
|
print("Generating Dataframe from results...")
|
|
with shelve.open(
|
|
"%s_%s_%s_results.tmp" % (dataset_name, func.__name__, chunk_size)
|
|
) as results:
|
|
keylist = sorted([int(k) for k in results.keys()])
|
|
df = pd.concat([results[str(k)][1] for k in keylist], sort=True)
|
|
return df
|
|
|
|
|
|
resize_size = 384
|
|
|
|
config_path = get_abs_path("configs/datasets/conceptual_caption/defaults_12m.yaml")
|
|
|
|
storage_dir = OmegaConf.load(
|
|
config_path
|
|
).datasets.conceptual_caption_12m.build_info.images.storage
|
|
storage_dir = Path(get_cache_path(storage_dir))
|
|
|
|
os.makedirs(storage_dir, exist_ok=True)
|
|
|
|
|
|
num_processes = 96
|
|
|
|
|
|
images_per_part = 100
|
|
|
|
data_name = "cc12m"
|
|
|
|
|
|
df = open_tsv("cc12m.tsv", data_name)
|
|
df_multiprocess(
|
|
df=df,
|
|
processes=num_processes,
|
|
chunk_size=images_per_part,
|
|
func=download_image,
|
|
dataset_name=data_name,
|
|
)
|
|
df = df_from_shelve(
|
|
chunk_size=images_per_part, func=download_image, dataset_name=data_name
|
|
)
|
|
df.to_csv(
|
|
"downloaded_%s_report.tsv.gz" % data_name,
|
|
compression="gzip",
|
|
sep="\t",
|
|
header=False,
|
|
index=False,
|
|
)
|
|
print("Saved.")
|
|
|