7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423 | class VenueMatcher:
"""
Manages venue-side matching logic, including attribute checks,
spatial expansion, and selection strategies.
"""
def __init__(self, distributor):
self.distributor = distributor
self.config = distributor.config
self.verbose = distributor.verbose
# Attribute caches and indices
self.venue_attribute_cache = {}
self.categorical_index = {}
self.num_constraints = {}
self.numerical_match_rules = []
self.categorical_match_rules = []
self.attribute_index_built = False
def build_attribute_index(self, venues: List):
"""
Pre-process venue attributes for fast filtering.
"""
eligibility = self.config.get('eligibility', {})
attributes = eligibility.get('attributes', [])
# Reset and initialize basic indices
self.venue_attribute_cache = {}
self.categorical_index = {}
self.num_constraints = {}
self.venue_id_to_idx = {id(v): i for i, v in enumerate(venues)}
if not attributes:
self.attribute_index_built = True
return
# Pre-filter rules to those that actually have venue components
active_rules = []
for rule in attributes:
attr_name = rule.get('name')
attr_type = rule.get('type')
if attr_type == 'numerical' and rule.get('venue_constraints'):
v_con = rule.get('venue_constraints')
active_rules.append({
'name': attr_name,
'type': 'numerical',
'min_col': v_con.get('min_column'),
'max_col': v_con.get('max_column')
})
# Initialize arrays with int32 min/max as "no constraint" sentinels
_INT32_MIN = np.iinfo(np.int32).min
_INT32_MAX = np.iinfo(np.int32).max
self.num_constraints[attr_name] = {
'min': np.full(len(venues), _INT32_MIN, dtype=np.int32),
'max': np.full(len(venues), _INT32_MAX, dtype=np.int32)
}
elif attr_type == 'categorical' and rule.get('venue_column'):
active_rules.append({
'name': attr_name,
'type': 'categorical',
'col': rule.get('venue_column'),
'assume': rule.get('assume_if_missing', 'Mixed'),
'case_sensitive': rule.get('case_sensitive', False),
'rules': rule.get('matching_rules', {})
})
if not active_rules:
self.attribute_index_built = True
return
# Pre-process matching rules for categorical (avoiding repeated work)
for rule in active_rules:
if rule['type'] == 'categorical' and not rule['case_sensitive']:
rule['rules'] = {k.lower(): [v.lower() for v in vals] for k, vals in rule['rules'].items()}
# Single pass over venues
for i, venue in enumerate(venues):
v_props = venue.properties
v_id = id(venue)
for rule in active_rules:
attr_name = rule['name']
if rule['type'] == 'numerical':
min_val = v_props.get(rule['min_col']) if rule['min_col'] else None
max_val = v_props.get(rule['max_col']) if rule['max_col'] else None
if min_val is not None and min_val != '':
self.num_constraints[attr_name]['min'][i] = int(float(min_val))
if max_val is not None and max_val != '':
self.num_constraints[attr_name]['max'][i] = int(float(max_val))
else: # categorical
v_val = v_props.get(rule['col'])
if v_val is None or v_val == '':
v_val = rule['assume']
if not rule['case_sensitive']:
v_val = str(v_val).lower() if v_val else ''
allowed = rule['rules'].get(v_val)
if allowed:
for p_val in allowed:
index_key = (attr_name, self.distributor._normalize_value(p_val))
if index_key not in self.categorical_index:
self.categorical_index[index_key] = set()
self.categorical_index[index_key].add(v_id)
self.numerical_match_rules = [r for r in active_rules if r['type'] == 'numerical']
self.categorical_match_rules = [r for r in active_rules if r['type'] == 'categorical' and not r.get('venue_column')]
self.attribute_index_built = True
if self.verbose:
logger.info(f"Built attribute index for {len(venues)} venues with {len(attributes)} attributes")
if self.categorical_index:
logger.info(f"Built categorical index: {len(self.categorical_index)} unique value combinations")
def filter_venues_with_expansion(self, person, venues: List, initial_pool: List,
location: Tuple[float, float], search_limits: List[int],
person_attrs: Optional[Dict] = None) -> List:
"""
Filter venues for a person. Restores 'strict' behavior from baseline.
"""
# Tier 1: Try the initial pool (Fast path - matches old strict behavior)
eligible = self.filter_venues_by_person(person, initial_pool, person_attrs=person_attrs)
if eligible:
return eligible
# Expansion Tiers: Only triggered if explicitly configured and initial failed
# Performance Trade-off: We prefer skipping people over massive spatial searches.
if self.config.get('venue_selection', {}).get('consider_by') == 'count':
# Strict mode: If only one search limit or no limits, don't expand
if len(search_limits) <= 1:
return []
for search_count in search_limits:
# Skip if already tried this many or fewer
if search_count <= len(initial_pool):
continue
# RESTRAIN EXPANSION: Max 100 venues for performance
# The old code queried max 50-100. Checking 10,000 is too slow.
limit = min(search_count, 100)
if self.verbose:
logger.debug(f"Expanding search for person {person.id} to k={limit}")
expanded_pool = self.distributor._find_closest_venues(
location, self.distributor.venue_type, limit,
allowed_venue_ids=getattr(self.distributor, 'venue_ids', None)
)
eligible = self.filter_venues_by_person(person, expanded_pool, person_attrs=person_attrs)
if eligible:
return eligible
# If we've reached the safety cap, stop expanding
if limit >= 100:
break
return []
def filter_venues_by_person(self, person, venues: List, person_attrs: Optional[Dict] = None) -> List:
"""Filter venues based on person's attributes (age, gender, etc.)."""
match_attrs = getattr(self.distributor, '_pre_processed_match_attrs', [])
# Pre-fetch attributes for this person to avoid repeated slow lookups
if person_attrs is None:
person_attrs = {}
for rule in match_attrs:
attr = rule['attribute']
if attr not in person_attrs:
if rule.get('is_residence'):
res = person.residence
val = self.distributor._get_nested_value_with_dict_support(res, rule['residence_parts']) if res else None
elif rule.get('is_nested'):
val = self.distributor._get_nested_value_with_dict_support(person, rule['path_parts'])
else:
# Direct attribute
val = getattr(person, attr, None)
person_attrs[attr] = val
# Step 1: Pre-filter using categorical index
venues = self.prefilter_venues_by_categorical(person, venues, person_attrs=person_attrs)
if not venues:
return []
# Step 2: Filter remaining venues by other attributes
eligible_venues = []
for venue in venues:
if self.venue_accepts_person(person, venue, match_attrs, person_attrs=person_attrs):
eligible_venues.append(venue)
return eligible_venues
def venue_accepts_person(self, person, venue, attribute_rules: List[Dict], person_attrs: Optional[Dict] = None) -> bool:
"""Check if venue accepts person based on attribute rules using pre-computed arrays."""
v_id = id(venue)
v_idx = self.venue_id_to_idx.get(v_id)
if v_idx is None:
return self.venue_accepts_person_slow(person, venue, attribute_rules)
# Separate loops and pre-defined lists avoid dictionary lookups on 'rule'
for rule in self.numerical_match_rules:
attr_name = rule['name']
if attr_name in self.num_constraints:
person_value = self._get_person_attr(person, attr_name, person_attrs)
if person_value is None:
return False
constraints = self.num_constraints[attr_name]
# Direct array access is much faster than dict lookup
v_min = constraints['min'][v_idx]
if v_min != np.iinfo(np.int32).min and person_value < v_min:
return False
v_max = constraints['max'][v_idx]
if v_max != np.iinfo(np.int32).max and person_value > v_max:
return False
# Categorical rules with venue_column are handled via prefilter_venues_by_categorical
# We only check those without a venue_column (rare but possible)
for rule in self.categorical_match_rules:
# Implement categorical check here if needed (not common in hot path)
pass
return True
def venue_accepts_person_slow(self, person, venue, attribute_rules: List[Dict]) -> bool:
"""Fallback for venues without cache."""
for rule in attribute_rules:
attr_name = rule.get('name')
person_value = self.distributor._get_person_attribute(attr_name, person)
if person_value is None: return False
if rule.get('type') == 'numerical':
if not self._check_numerical_constraint(person_value, venue, rule): return False
elif rule.get('type') == 'categorical':
if not self._check_categorical_constraint(person_value, venue, rule): return False
return True
def prefilter_venues_by_categorical(self, person, venues: List, person_attrs: Optional[Dict] = None) -> List:
"""Pre-filter venues using categorical index for massive speedup."""
if not self.categorical_index:
return venues
eligibility = self.config.get('eligibility', {})
attributes = eligibility.get('attributes', [])
categorical_filters = []
for rule in attributes:
if rule.get('type') == 'categorical' and rule.get('venue_column'):
attr_name = rule.get('name')
val = self._get_person_attr(person, attr_name, person_attrs)
if val is not None:
# Normalize and handle case sensitivity for pre-filtering
norm_val = self.distributor._normalize_value(val)
# Search for the rule to check case sensitivity
is_case_sensitive = True
for rule in attributes:
if rule.get('name') == attr_name:
is_case_sensitive = rule.get('case_sensitive', False)
break
if not is_case_sensitive:
norm_val = norm_val.lower()
categorical_filters.append((attr_name, norm_val))
if not categorical_filters:
return venues
attr_name, val = categorical_filters[0]
filtered_ids = self.categorical_index.get((attr_name, val), set())
if len(categorical_filters) > 1:
filtered_ids = filtered_ids.copy()
for attr_name, val in categorical_filters[1:]:
filtered_ids &= self.categorical_index.get((attr_name, val), set())
return [v for v in venues if id(v) in filtered_ids]
def select_venue(self, person, venues: List, person_location: Tuple[float, float]) -> Optional[Any]:
"""Select final venue from eligible list based on strategy."""
if not venues: return None
strategy = self.config.get('allocation', {}).get('strategy', 'random')
if strategy == 'random':
return np.random.choice(venues)
elif strategy == 'closest':
valid_venues = [v for v in venues if v.coordinates]
if not valid_venues: return venues[0]
# Use scalar math for small sets, vectorized for large sets
if len(valid_venues) < 50:
return min(valid_venues, key=lambda v: self.distributor._haversine_distance(person_location, v.coordinates))
else:
coords = np.array([v.coordinates for v in valid_venues])
dists = self.distributor._haversine_distance_vectorized(person_location, coords)
return valid_venues[np.argmin(dists)]
elif strategy == 'proportional':
valid = [v for v in venues if v.coordinates]
if not valid: return venues[0]
# Use scalar math for small sets, vectorized for large sets
if len(valid) < 50:
dists = [self.distributor._haversine_distance(person_location, v.coordinates) for v in valid]
else:
coords = np.array([v.coordinates for v in valid])
dists = self.distributor._haversine_distance_vectorized(person_location, coords)
weights = np.array([1.0 / (d + 0.1) for d in dists])
return np.random.choice(valid, p=weights / weights.sum())
elif strategy == 'closest_balanced':
valid = [v for v in venues if v.coordinates]
if not valid: return np.random.choice(venues)
if len(valid) < 50:
dists = np.array([self.distributor._haversine_distance(person_location, v.coordinates) for v in valid])
else:
coords = np.array([v.coordinates for v in valid])
dists = self.distributor._haversine_distance_vectorized(person_location, coords)
dist_weights = 1.0 / (dists + 0.1)
# Factor in remaining capacity
remaining_caps = np.array([
self.distributor._get_remaining_capacity(v) for v in valid
], dtype=np.float64)
total_cap = max(self.distributor._get_venue_capacity(valid[0]), 1)
cap_weights = remaining_caps / total_cap
weights = dist_weights * cap_weights
weight_sum = weights.sum()
if weight_sum <= 0:
# All venues full — fall back to closest
return valid[np.argmin(dists)]
probs = weights / weight_sum
return np.random.choice(valid, p=probs)
elif strategy == 'largest_capacity':
return max(venues, key=lambda v: self.distributor._get_venue_capacity(v))
return venues[0]
def find_eligible_venues_for_location(self, location: Tuple[float, float], venues: List) -> List:
"""Find candidate venues based on distance/count config."""
selection = self.config.get('venue_selection', {})
consider_by = selection.get('consider_by', 'count')
if consider_by == 'count':
count = selection.get('count', 5)
if selection.get('criteria') == 'largest_capacity':
# Query a larger pool first to find large ones nearby
closest_pool = self.distributor._find_closest_venues(location, self.distributor.venue_type, max(count * 5, 20), allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))
return sorted(closest_pool, key=lambda v: self.distributor._get_venue_capacity(v), reverse=True)[:count]
return self.distributor._find_closest_venues(location, self.distributor.venue_type, count, allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))
elif consider_by == 'distance':
max_dist = selection.get('max_distance', 10)
unit = selection.get('max_distance_unit', 'km')
if unit == 'miles': max_dist *= 1.60934
elif unit == 'meters': max_dist /= 1000
eligible = []
valid = [v for v in venues if v.coordinates]
if valid:
coords = np.array([v.coordinates for v in valid])
dists = self.distributor._haversine_distance_vectorized(location, coords)
eligible = [v for v, d in zip(valid, dists) if d <= max_dist]
else:
eligible = []
if selection.get('criteria') == 'largest_capacity':
eligible.sort(key=lambda v: self.distributor._get_venue_capacity(v), reverse=True)
return eligible
return venues
def _get_person_attr(self, person, attr_name: str, person_attrs: Optional[Dict]) -> Any:
"""
Get person attribute using the distributor's lookup method.
"""
if person_attrs and attr_name in person_attrs:
return person_attrs[attr_name]
return self.distributor._get_person_attribute(attr_name, person)
def _check_numerical_constraint(self, val, venue, rule: Dict) -> bool:
constraints = rule.get('venue_constraints', {})
min_v = venue.properties.get(constraints.get('min_column')) if constraints.get('min_column') else None
max_v = venue.properties.get(constraints.get('max_column')) if constraints.get('max_column') else None
if min_v is not None and val < min_v: return False
if max_v is not None and val > max_v: return False
return True
def _check_categorical_constraint(self, val, venue, rule: Dict) -> bool:
col = rule.get('venue_column')
if not col: return True
v_val = venue.properties.get(col, rule.get('assume_if_missing', 'Mixed'))
matching = rule.get('matching_rules', {})
if not rule.get('case_sensitive', False):
v_val = self.distributor._normalize_value(v_val).lower()
val = self.distributor._normalize_value(val).lower()
matching = {k.lower(): [self.distributor._normalize_value(v).lower() for v in vals] for k, vals in matching.items()}
else:
v_val = self.distributor._normalize_value(v_val)
val = self.distributor._normalize_value(val)
matching = {k: [self.distributor._normalize_value(v) for v in vals] for k, vals in matching.items()}
return val in matching.get(v_val, []) if v_val in matching else True
|