-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDatabase.py
executable file
·165 lines (135 loc) · 5.81 KB
/
Database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from __future__ import unicode_literals
from PIL import Image
from tqdm import tqdm
import numpy as np
import os
from collections import OrderedDict
import json
from matplotlib.image import _rgb_to_rgba
class Database:
def __init__(self, _seqs, path, preload_images=True):
self.points = OrderedDict()
self.path = path
self.seqs = _seqs
self.image_cache = {}
if preload_images:
print("Preloading images")
images = []
for iii in self.seqs:
images = images + iii
images = set(images)
for image in tqdm(images):
self.get_image(image)
self.gcp_reprojections = {}
def go_to_image_path(self):
img_folder = self.path + "/images/"
return img_folder
def get_path(self):
return self.path
def get_seqs(self):
return self.seqs
def init_points(self, points):
for point in points:
point_id, observations = point['id'], point['observations']
# for observation in observations:
# h, w = self.get_image_size(observation["shot_id"])
# observation["projection"] = features.denormalized_image_coordinates(
# np.array([observation["projection"]]), w, h)[0]
self.points[point_id] = observations
def get_points(self):
return self.points
def get_image(self, img_name):
if img_name not in self.image_cache:
rgb = Image.open(self.go_to_image_path() + img_name)
# Reduce to some reasonable maximum size
scale = max(rgb.size) / 8000
if scale > 1:
new_w = int(round(rgb.size[0] / scale))
new_h = int(round(rgb.size[1] / scale))
rgb = rgb.resize((new_w, new_h), resample=Image.NEAREST)
# Matplotlib will transform to rgba when plotting
self.image_cache[img_name] = _rgb_to_rgba(np.asarray(rgb))
return self.image_cache[img_name]
def get_image_size(self, img_name):
img = self.get_image(img_name)
return img.shape[:2]
def get_visible_points_coords(self, main_image):
visible_points_coords = OrderedDict()
for point_id, observations in self.points.items():
pair_images = [obs["shot_id"] for obs in observations]
for observation in observations:
# if observation["shot_id"] == main_image and pair_image in pair_images or len(pair_images) == 1:
if observation["shot_id"] == main_image:
visible_points_coords[point_id] = observation["projection"]
return visible_points_coords
def point_exists(self, point_id):
return point_id in self.points
def point_sees(self, point, image):
for obs in self.points[point]:
if image == obs['shot_id']:
return True
return False
def add_point(self, point_id):
if self.point_exists(point_id):
print('ERROR: trying to add an existing point')
return
self.points[point_id] = []
def add_point_observation(self, point_id, shot_id, projection):
if not self.point_exists(point_id):
print('ERROR: trying to modify a non-existing point')
return
self.points[point_id].append({
"shot_id": shot_id,
"projection": projection,
})
def write_to_file(self, filename):
data = {"points": []}
for point_id, observations in self.points.items():
point = {"id": point_id, "observations": []}
for observation in observations:
# h, w = self.get_image_size(observation["shot_id"])
# scaled_projection = features.normalized_image_coordinates(
# np.array([observation["projection"]]), w, h)[0].tolist()
# point["observations"].append({
# "shot_id": observation["shot_id"],
# "projection": scaled_projection,
# })
point["observations"].append({
"shot_id": observation["shot_id"],
"projection": observation["projection"],
})
data["points"].append(point)
with open(filename, 'wt') as fp:
json.dump(data, fp, indent=4, sort_keys=True)
def get_worst_gcp(self):
worst_gcp_error = 0
for gcp_id in self.gcp_reprojections:
for shot_id in self.gcp_reprojections[gcp_id]:
err = self.gcp_reprojections[gcp_id][shot_id]['error']
if err > worst_gcp_error:
worst_gcp_error = err
shot_worst_gcp = shot_id
worst_gcp = gcp_id
return worst_gcp, shot_worst_gcp, worst_gcp_error
def remove_gcp(self, point_id):
if self.point_exists(point_id):
del self.points[point_id]
def remove_point_observation(self, point_id, shot_id):
if not self.point_exists(point_id):
print('ERROR: trying to modify a non-existing point')
return
self.points[point_id] = [obs for obs in self.points[point_id] if obs["shot_id"] != shot_id]
if point_id in self.gcp_reprojections:
if shot_id in self.gcp_reprojections[point_id]:
self.gcp_reprojections[point_id][shot_id]['error'] = 0
def bring_next_image(self, image, image_idx):
seq = self.seqs[image_idx]
next_idx = seq.index(image) + 1
return seq[min(next_idx, len(seq) - 1)]
def bring_previous_image(self, image, image_idx):
seq = self.seqs[image_idx]
previous_idx = seq.index(image) - 1
return seq[max(previous_idx, 0)]