1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
| import os import glob import json import shutil import xml.etree.ElementTree as ET from collections import defaultdict, Counter from tqdm import tqdm
START_BOUNDING_BOX_ID = 1
class VOC2COCOConverter: def __init__(self, xml_dir, json_dir, classes, img_dir, proportions=[8, 1, 1], copy_images=False, min_samples_per_class=20): self.xml_dir = xml_dir self.json_dir = json_dir self.img_dir = img_dir self.classes = classes self.proportions = proportions self.copy_images = copy_images self.min_samples_per_class = min_samples_per_class
self.pre_define_categories = {} for i, cls in enumerate(self.classes): self.pre_define_categories[cls] = i + 1 def convert(self): xml_files_by_class = self._get_sorted_xml_files_by_class() dataset_size = len(self.proportions) xml_files_by_dataset = [defaultdict(list) for _ in range(dataset_size)] xml_files_count_by_dataset = [0] * dataset_size for cls, xml_files in xml_files_by_class.items(): total_files = len(xml_files) datasets_limits = [int(total_files * p / sum(self.proportions)) for p in self.proportions] datasets_limits[-1] = total_files - sum(datasets_limits[:-1])
start = 0 for i, limit in enumerate(datasets_limits): xml_files_by_dataset[i][cls] = xml_files[start:start + limit] xml_files_count_by_dataset[i] += limit start += limit
for idx, xml_files_dict in enumerate(xml_files_by_dataset): dataset_dir = '' if self.copy_images: dataset_dir = os.path.join(self.json_dir, f'dataset_{idx + 1}') os.makedirs(dataset_dir, exist_ok=True) json_file_name = f'dataset_{idx + 1}.json' xml_files = sum(xml_files_dict.values(), []) self._convert_annotation(tqdm(xml_files), os.path.join(self.json_dir, json_file_name)) if dataset_dir: self._copy_images(tqdm(xml_files), dataset_dir)
print(f"\n在数据集{idx+1}中,各个类型的样本数量分别为:") for cls, files in xml_files_dict.items(): print(f"类型 {cls} 的样本数量是: {len(files)}")
print("\n各个数据集中相同类型样本的数量比值是:") for cls in self.classes: print("\n类型 {}:".format(cls)) for i in range(len(self.proportions) - 1): if len(xml_files_by_dataset[i + 1].get(cls, [])) != 0 : print("数据集 {} 和 数据集 {} 的样本数量比是: {}".format( i + 1, i + 2, len(xml_files_by_dataset[i].get(cls, [])) / len(xml_files_by_dataset[i + 1].get(cls, [])) ))
def _get_sorted_xml_files_by_class(self): xml_files_by_class = defaultdict(list) for xml_file in glob.glob(os.path.join(self.xml_dir, "*.xml")): tree = ET.parse(xml_file) root = tree.getroot() for obj in root.findall('object'): class_name = obj.find('name').text if class_name in self.classes: xml_files_by_class[class_name].append(xml_file)
if self.min_samples_per_class is not None: xml_files_by_class = { cls: files for cls, files in xml_files_by_class.items() if len(files) > self.min_samples_per_class }
xml_files_by_class = dict( sorted(xml_files_by_class.items(), key=lambda item: len(item[1]), reverse=True))
return xml_files_by_class
def _copy_images(self, xml_files, dataset_dir): for xml_file in xml_files: img_file = os.path.join(self.img_dir, os.path.basename(xml_file).replace('.xml', '.jpg')) if os.path.exists(img_file): shutil.copy(img_file, dataset_dir)
def _get_files_by_majority_class(self): xml_files_by_class = defaultdict(list) for xml_file in glob.glob(os.path.join(self.xml_dir, "*.xml")): tree = ET.parse(xml_file) root = tree.getroot() class_counts = defaultdict(int) for obj in root.findall('object'): class_name = obj.find('name').text if class_name in self.classes: class_counts[class_name] += 1 majority_class = max(class_counts, key=class_counts.get) xml_files_by_class[majority_class].append(xml_file)
return dict(sorted(xml_files_by_class.items(), key=lambda item: len(item[1]), reverse=True))
def _convert_annotation(self, xml_list, json_file): json_dict = {"info":['none'], "license":['none'], "images": [], "annotations": [], "categories": []} categories = self.pre_define_categories.copy() bnd_id = START_BOUNDING_BOX_ID all_categories = {}
for index, line in enumerate(xml_list): xml_f = line tree = ET.parse(xml_f) root = tree.getroot()
filename = os.path.basename(xml_f)[:-4] + ".jpg"
image_id = int(filename.split('.')[0][-9:])
size = self._get_and_check(root, 'size', 1) width = int(self._get_and_check(size, 'width', 1).text) height = int(self._get_and_check(size, 'height', 1).text) image = {'file_name': filename, 'height': height, 'width': width, 'id':image_id} json_dict['images'].append(image)
for obj in self._get(root, 'object'): category = self._get_and_check(obj, 'name', 1).text if category in all_categories: all_categories[category] += 1 else: all_categories[category] = 1
if category not in categories: new_id = len(categories) + 1 print(filename) print("[warning] 类别 '{}' 不在 'pre_define_categories'({})中,将自动创建新的id: {}".format(category, self.pre_define_categories, new_id)) categories[category] = new_id
category_id = categories[category] bndbox = self._get_and_check(obj, 'bndbox', 1) xmin = int(float(self._get_and_check(bndbox, 'xmin', 1).text)) ymin = int(float(self._get_and_check(bndbox, 'ymin', 1).text)) xmax = int(float(self._get_and_check(bndbox, 'xmax', 1).text)) ymax = int(float(self._get_and_check(bndbox, 'ymax', 1).text)) o_width = abs(xmax - xmin) o_height = abs(ymax - ymin)
ann = {'area': o_width*o_height, 'iscrowd': 0, 'image_id': image_id, 'bbox':[xmin, ymin, o_width, o_height], 'category_id': category_id, 'id': bnd_id, 'ignore': 0, 'segmentation': []} json_dict['annotations'].append(ann) bnd_id = bnd_id + 1
for cate, cid in categories.items(): cat = {'supercategory': 'none', 'id': cid, 'name': cate} json_dict['categories'].append(cat) json_fp = open(json_file, 'w') json_str = json.dumps(json_dict) json_fp.write(json_str) json_fp.close() print("------------已完成创建 {}--------------".format(json_file)) print("找到 {} 类别: {} -->>> 你的预定类别 {}: {}".format(len(all_categories), all_categories.keys(), len(self.pre_define_categories), self.pre_define_categories.keys())) print("类别: id --> {}".format(categories)) def _get(self, root, name): return root.findall(name)
def _get_and_check(self, root, name, length): vars = root.findall(name) if len(vars) == 0: raise NotImplementedError('Can not find %s in %s.'%(name, root.tag)) if length > 0 and len(vars) != length: raise NotImplementedError('The size of %s is supposed to be %d, but is %d.'%(name, length, len(vars))) if length == 1: vars = vars[0] return vars
if __name__ == '__main__': xml_dir = 'path/to/xml/directory' json_dir = 'path/to/json/directory' classes = ['cat', 'dog', 'person'] img_dir = 'path/to/image/directory' proportions = [80, 10, 10] converter = VOC2COCOConverter(xml_dir, json_dir, classes, img_dir, proportions, copy_images=True) converter.convert()
|