Source code for rofunc.simulator.utils.ycb_downloader
# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import multiprocessing
import os
from functools import partial
from urllib.request import Request, urlopen
import rofunc as rf
from rofunc.utils.oslab.path import get_rofunc_path
[docs]def download_ycb_objects(objects_to_download="all", files_to_download=['google_16k'],
extract=True, core_num=20):
"""
Download YCB objects from the official website
:param objects_to_download: List of objects to download. If "all", will download all objects. http://ycb-benchmarks.s3-website-us-east-1.amazonaws.com/
:param files_to_download: List of files to download for each object.
'berkeley_rgbd' contains all of the depth maps and images from the Carmines.
'berkeley_rgb_highres' contains all of the high-res images from the Canon cameras.
'berkeley_processed' contains all of the segmented point clouds and textured meshes.
'google_16k' contains google meshes with 16k vertices.
'google_64k' contains google meshes with 64k vertices.
'google_512k' contains google meshes with 512k vertices.
:param extract: Extract all files from the downloaded .tgz, and remove .tgz files. If false, will just download all .tgz files to output_directory
:param core_num: Number of cores to use for parallel downloading
:return:
"""
base_url = "http://ycb-benchmarks.s3-website-us-east-1.amazonaws.com/data/"
objects_url = "https://ycb-benchmarks.s3.amazonaws.com/data/objects.json"
# Define an output folder
output_directory = os.path.join(get_rofunc_path(), "simulator/assets/urdf/ycb")
objects = fetch_objects(objects_url)
pool = multiprocessing.Pool(core_num)
parallel_1 = partial(parallel, objects_to_download=objects_to_download, files_to_download=files_to_download,
extract=extract, base_url=base_url, output_directory=output_directory)
pool.map(parallel_1, objects)
rf.logger.beauty_print("Downloaded all YCB objects to %s" % output_directory)
[docs]def parallel(object, objects_to_download, files_to_download, extract, base_url, output_directory):
if objects_to_download == "all" or object in objects_to_download:
for file_type in files_to_download:
url = tgz_url(base_url, object, file_type)
if not check_url(url):
continue
filename = "{path}/{object}_{file_type}.tgz".format(
path=output_directory,
object=object,
file_type=file_type)
download_file(url, filename)
if extract:
extract_tgz(filename, output_directory)
[docs]def fetch_objects(url):
""" Fetches the object information before download """
response = urlopen(url)
html = response.read()
objects = json.loads(html)
return objects["objects"]
[docs]def download_file(url, filename, pbar=None):
""" Downloads files from a given URL """
u = urlopen(url)
f = open(filename, "wb")
file_size = int(u.getheader("Content-Length"))
if pbar is not None:
pbar.set_postfix_str("%s (%.2f MB)" % (os.path.basename(filename), file_size / 1000000.0))
file_size_dl = 0
block_sz = 65536
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
# status = r"%10d [%3.2f%%]" % (file_size_dl / 1000000.0, file_size_dl * 100. / file_size)
# status = status + chr(8) * (len(status) + 1)
# print(status)
f.close()
rf.logger.beauty_print("Downloaded %s" % filename)
[docs]def tgz_url(base_url, object, type):
""" Get the TGZ file URL for a particular object and dataset type """
if type in ["berkeley_rgbd", "berkeley_rgb_highres"]:
return base_url + "berkeley/{object}/{object}_{type}.tgz".format(object=object, type=type)
elif type in ["berkeley_processed"]:
return base_url + "berkeley/{object}/{object}_berkeley_meshes.tgz".format(object=object, type=type)
else:
return base_url + "google/{object}_{type}.tgz".format(object=object, type=type)
[docs]def check_url(url):
""" Check the validity of a URL """
try:
request = Request(url)
request.get_method = lambda: 'HEAD'
response = urlopen(request)
return True
except Exception as e:
return False
if __name__ == '__main__':
download_ycb_objects()