8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372 | class AllocationEngine:
"""
Orchestrates the distribution process, including batching and individual allocation.
"""
def __init__(self, distributor):
self.distributor = distributor
self.config = distributor.config
self.verbose = distributor.verbose
# Pre-process attribute metadata for fast lookups (Global for this distributor)
eligibility = self.config.get('eligibility', {})
self.attribute_names = [rule.get('name') for rule in eligibility.get('attributes', [])]
self.attr_getters = []
for name in self.attribute_names:
# Use a closure to capture the current name
def make_getter(attr_name):
return lambda p: self.distributor._get_person_attribute(attr_name, p)
self.attr_getters.append(make_getter(name))
def allocate_group(self, people: List, venues: List, allow_overflow: bool = False, group_search_limits=None) -> int:
"""Allocate a specific group of people with geo-unit level caching and attribute batching."""
allocated_count = 0
total_people = len(people)
people_processed = 0
progress_interval = max(1, total_people // 10)
selection_config = self.config.get('venue_selection', {})
target_count = selection_config.get('count', 5)
# STRICT LIMITS: Follow baseline behavior. Default to target_count * 4 if no limits provided.
# This avoid expensive None (all venues) searches.
search_limits = group_search_limits if group_search_limits is not None else selection_config.get('search_limits', [target_count * 4])
if not search_limits: search_limits = [target_count * 4]
# Clean search limits and remove None/unlimited cases
clean_limits = []
for l in search_limits:
if l is None: continue
clean_limits.append(min(l, 100)) # Absolute cap at 100 venues
if not clean_limits: clean_limits = [20]
search_attempts = sorted(set(clean_limits))
people_by_geo = defaultdict(list)
for person in people:
geo = self.distributor._get_geo_unit_at_level(person, self.distributor.world, target_level=self.distributor.batch_geo_level)
if geo: people_by_geo[geo].append(person)
for geo_unit, geo_people in people_by_geo.items():
if not (geo_unit.coordinates and len(geo_unit.coordinates) == 2): continue
lat, lon = geo_unit.coordinates
# Find nearby venues once per geo unit
geo_nearby = self.distributor._find_closest_venues((lat, lon), self.distributor.venue_type, search_attempts[0], allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))
if not allow_overflow and not self.distributor._filter_venues_by_capacity(geo_nearby):
people_processed += len(geo_people)
self._log_progress(people_processed, total_people, progress_interval, allocated_count)
continue
people_by_attrs = defaultdict(list)
for person in geo_people:
vals = tuple(getter(person) for getter in self.attr_getters)
people_by_attrs[vals].append(person)
for attr_vals, people_group in people_by_attrs.items():
person_attrs = dict(zip(self.attribute_names, attr_vals))
eligible_venues = self.distributor.matcher.filter_venues_with_expansion(
person=people_group[0], venues=venues, initial_pool=geo_nearby,
location=(lat, lon), search_limits=search_attempts, person_attrs=person_attrs
)
if eligible_venues:
for person in people_group:
venue = None
with_cap = self.distributor._filter_venues_by_capacity(eligible_venues)
if with_cap:
# Selection strategy (e.g., closest) will pick from available venues
venue = self.distributor.matcher.select_venue(person, with_cap, (lat, lon))
elif allow_overflow:
venue = self.distributor.matcher.select_venue(person, eligible_venues, (lat, lon))
if venue:
venue.add_to_subset(person, subset_key=self.distributor.subset_key,
activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
self.distributor._increment_venue_count(venue)
allocated_count += 1
people_processed += 1
self._log_progress(people_processed, total_people, progress_interval, allocated_count)
else:
people_processed += len(people_group)
self._log_progress(people_processed, total_people, progress_interval, allocated_count)
return allocated_count
def allocate_by_geo_unit(self, people: List, venues: List) -> List:
"""Batch allocation by geo_unit for performance."""
people_by_geo = defaultdict(list)
for person in people:
geo = self.distributor._get_geo_unit_at_level(person, self.distributor.world, target_level=self.distributor.batch_geo_level)
if geo: people_by_geo[geo].append(person)
venues_by_geo = defaultdict(list)
v_level = self.distributor.venue_geo_level
for v in venues:
if not v.geographical_unit:
continue
# Use the ancestor at the correct venue_geo_level for matching
target_unit = v.geographical_unit
if target_unit.level != v_level:
target_unit = target_unit.get_ancestor_by_level(v_level)
if target_unit:
venues_by_geo[target_unit.name].append(v)
total = len(people)
processed = 0
interval = max(1, total // 10)
allocated = 0
unallocated = []
# Cache for venues by (search_unit, attribute_values) to avoid repeated attribute filtering
# This keeps it correct even if different people in the same SGU have different eligibility
pool_cache = {}
# Determine strategy once
strategy = self.config.get('allocation', {}).get('strategy', 'random')
respect_capacity = self.config.get('venue_selection', {}).get('respect_capacity', True)
for geo_unit, geo_people in people_by_geo.items():
venue_search_unit = geo_unit if self.distributor.batch_geo_level == self.distributor.venue_geo_level else geo_unit.get_ancestor_by_level(self.distributor.venue_geo_level)
if not venue_search_unit:
unallocated.extend(geo_people)
processed += len(geo_people)
self._log_progress(processed, total, interval, allocated, prefix=" ")
continue
lat, lon = geo_unit.coordinates if (geo_unit.coordinates and len(geo_unit.coordinates) == 2) else (None, None)
# Group by attributes within the SGU to ensure correctness for cases like schools/multisectors
people_by_attrs = defaultdict(list)
for person in geo_people:
vals = tuple(getter(person) for getter in self.attr_getters)
people_by_attrs[vals].append(person)
for attr_vals, group in people_by_attrs.items():
cache_key = (venue_search_unit.name, attr_vals)
# 1. Get/Cache the pool of venues for this (LGU, attributes) combination
if cache_key not in pool_cache:
eligible_pool = venues_by_geo.get(venue_search_unit.name, []) if self.config.get('venue_selection', {}).get('consider_by') == 'geo_unit' else self.distributor.matcher.find_eligible_venues_for_location((lat, lon), venues)
if eligible_pool:
p_attrs = dict(zip(self.attribute_names, attr_vals))
pool_cache[cache_key] = self.distributor.matcher.filter_venues_by_person(
group[0], eligible_pool, person_attrs=p_attrs
)
else:
pool_cache[cache_key] = []
p_venues = pool_cache[cache_key]
if not p_venues:
unallocated.extend(group)
processed += len(group)
self._log_progress(processed, total, interval, allocated, prefix=" ")
continue
# 2. Efficient capacity iteration
# Local available pool for this attribute group in this SGU
if respect_capacity:
available_venues = [v for v in p_venues if self.distributor._get_remaining_capacity(v) > 0]
else:
available_venues = list(p_venues)
if not available_venues:
unallocated.extend(group)
processed += len(group)
self._log_progress(processed, total, interval, allocated, prefix=" ")
continue
if strategy == 'random' and lat is not None:
np.random.shuffle(available_venues)
venue_ptr = 0
for person in group:
assigned = False
while venue_ptr < len(available_venues):
v = available_venues[venue_ptr]
if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0:
v.add_to_subset(person, subset_key=self.distributor.subset_key,
activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
self.distributor._increment_venue_count(v)
allocated += 1
assigned = True
if respect_capacity and self.distributor._get_remaining_capacity(v) <= 0:
venue_ptr += 1
break
else:
venue_ptr += 1
if not assigned:
unallocated.append(person)
elif strategy == 'closest' and lat is not None:
available_venues.sort(key=lambda v: self.distributor._haversine_distance((lat, lon), self.distributor._get_venue_location(v)))
venue_ptr = 0
for person in group:
assigned = False
while venue_ptr < len(available_venues):
v = available_venues[venue_ptr]
if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0:
v.add_to_subset(person, subset_key=self.distributor.subset_key,
activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
self.distributor._increment_venue_count(v)
allocated += 1
assigned = True
if respect_capacity and self.distributor._get_remaining_capacity(v) <= 0:
venue_ptr += 1
break
else:
venue_ptr += 1
if not assigned:
unallocated.append(person)
elif strategy == 'closest_balanced' and lat is not None:
# Weighted allocation: balance distance preference with remaining capacity
# Pre-compute distances once for all venues in this pool
venue_dists = np.array([
self.distributor._haversine_distance((lat, lon), self.distributor._get_venue_location(v))
for v in available_venues
])
if self.verbose and not hasattr(self, '_cb_logged_geo'):
self._cb_logged_geo = set()
geo_name = geo_unit.name if hasattr(geo_unit, 'name') else str(geo_unit)
group_allocated = 0
# Pre-compute remaining capacity ONCE (not per person!)
remaining_caps = np.array([
self.distributor._get_remaining_capacity(v) if respect_capacity else 1
for v in available_venues
], dtype=np.float64)
total_cap = self.distributor._get_venue_capacity(available_venues[0]) if available_venues else 1
# Pre-compute distance weights (constant for this SGU batch)
dist_weights = 1.0 / (venue_dists + 0.1)
for person in group:
# Filter to only venues with remaining capacity
valid_mask = remaining_caps > 0
if not valid_mask.any():
unallocated.append(person)
continue
valid_indices = np.where(valid_mask)[0]
valid_caps = remaining_caps[valid_indices]
# Capacity weight: venues with more remaining capacity are preferred
cap_weights = valid_caps / max(total_cap, 1)
# Combined weight = distance * capacity
weights = dist_weights[valid_indices] * cap_weights
weight_sum = weights.sum()
if weight_sum <= 0:
unallocated.append(person)
continue
probs = weights / weight_sum
chosen_idx = valid_indices[np.random.choice(len(valid_indices), p=probs)]
v = available_venues[chosen_idx]
v.add_to_subset(person, subset_key=self.distributor.subset_key,
activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
self.distributor._increment_venue_count(v)
allocated += 1
group_allocated += 1
# Update capacity array incrementally (avoid rebuilding)
if respect_capacity:
remaining_caps[chosen_idx] -= 1
# Log per-SGU summary for this group
if self.verbose and geo_name not in self._cb_logged_geo:
self._cb_logged_geo.add(geo_name)
venue_counts = [self.distributor.venue_capacity_tracker.get(id(v), 0) for v in available_venues]
used = sum(1 for c in venue_counts if c > 0)
logger.debug(f" [closest_balanced] {geo_name}: {len(group)} people -> "
f"{group_allocated} allocated across {used}/{len(available_venues)} venues, "
f"counts={sorted(venue_counts, reverse=True)[:8]}")
else:
# Fallback for complex strategies or missing coordinates
for person in group:
with_cap = [v for v in available_venues if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0]
if with_cap:
venue = self.distributor.matcher.select_venue(person, with_cap, (lat, lon) if lat is not None else None)
if venue:
venue.add_to_subset(person, subset_key=self.distributor.subset_key,
activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
self.distributor._increment_venue_count(venue)
allocated += 1
continue
unallocated.append(person)
processed += len(group)
self._log_progress(processed, total, interval, allocated, prefix=" ")
self.distributor.allocated_this_run += allocated
return unallocated
def allocate_individual(self, people: List, venues: List) -> List:
"""Allocate people individually."""
allocated = 0
unallocated = []
total = len(people)
interval = max(1, total // 10)
for i, person in enumerate(people, 1):
loc = self.distributor._get_person_location(person)
if not loc:
unallocated.append(person)
continue
pool = self.distributor.matcher.find_eligible_venues_for_location(loc, venues)
# STRICT DEFAULT: Only use first limit or small pool
search_limits = self.config.get('venue_selection', {}).get('search_limits', [20])
p_venues = self.distributor.matcher.filter_venues_with_expansion(person, venues, pool, loc, search_limits)
if p_venues:
with_cap = self.distributor._filter_venues_by_capacity(p_venues)
if with_cap:
venue = self.distributor.matcher.select_venue(person, with_cap, loc)
if venue:
venue.add_to_subset(person, subset_key=self.distributor.subset_key,
activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
self.distributor._increment_venue_count(venue)
allocated += 1
continue
unallocated.append(person)
self._log_progress(i, total, interval, allocated, prefix=" ")
self.distributor.allocated_this_run += allocated
return unallocated
def _log_progress(self, current, total, interval, count, prefix=" "):
if interval <= 0:
return
# Calculate which interval threshold was reached/crossed
prev_threshold = (current - 1) // interval if current > 0 else -1
curr_threshold = current // interval
if curr_threshold > prev_threshold or current >= total:
logger.info(f"{prefix}Progress: {current}/{total} people processed ({min(100, current/total*100):.1f}%) - {count} allocated")
|