Skip to content

Allocation engine

AllocationEngine

Orchestrates the distribution process, including batching and individual allocation.

Source code in may/venue_distributor/allocation_engine.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
class AllocationEngine:
    """
    Orchestrates the distribution process, including batching and individual allocation.
    """

    def __init__(self, distributor):
        self.distributor = distributor
        self.config = distributor.config
        self.verbose = distributor.verbose

        # Pre-process attribute metadata for fast lookups (Global for this distributor)
        eligibility = self.config.get('eligibility', {})
        self.attribute_names = [rule.get('name') for rule in eligibility.get('attributes', [])]

        self.attr_getters = []
        for name in self.attribute_names:
            # Use a closure to capture the current name
            def make_getter(attr_name):
                return lambda p: self.distributor._get_person_attribute(attr_name, p)

            self.attr_getters.append(make_getter(name))

    def allocate_group(self, people: List, venues: List, allow_overflow: bool = False, group_search_limits=None) -> int:
        """Allocate a specific group of people with geo-unit level caching and attribute batching."""
        allocated_count = 0
        total_people = len(people)
        people_processed = 0
        progress_interval = max(1, total_people // 10)

        selection_config = self.config.get('venue_selection', {})
        target_count = selection_config.get('count', 5)

        # STRICT LIMITS: Follow baseline behavior. Default to target_count * 4 if no limits provided.
        # This avoid expensive None (all venues) searches.
        search_limits = group_search_limits if group_search_limits is not None else selection_config.get('search_limits', [target_count * 4])
        if not search_limits: search_limits = [target_count * 4]

        # Clean search limits and remove None/unlimited cases
        clean_limits = []
        for l in search_limits:
            if l is None: continue
            clean_limits.append(min(l, 100)) # Absolute cap at 100 venues
        if not clean_limits: clean_limits = [20]

        search_attempts = sorted(set(clean_limits))

        people_by_geo = defaultdict(list)
        for person in people:
            geo = self.distributor._get_geo_unit_at_level(person, self.distributor.world, target_level=self.distributor.batch_geo_level)
            if geo: people_by_geo[geo].append(person)

        for geo_unit, geo_people in people_by_geo.items():
            if not (geo_unit.coordinates and len(geo_unit.coordinates) == 2): continue
            lat, lon = geo_unit.coordinates

            # Find nearby venues once per geo unit
            geo_nearby = self.distributor._find_closest_venues((lat, lon), self.distributor.venue_type, search_attempts[0], allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))

            if not allow_overflow and not self.distributor._filter_venues_by_capacity(geo_nearby):
                people_processed += len(geo_people)
                self._log_progress(people_processed, total_people, progress_interval, allocated_count)
                continue

            people_by_attrs = defaultdict(list)
            for person in geo_people:
                vals = tuple(getter(person) for getter in self.attr_getters)
                people_by_attrs[vals].append(person)

            for attr_vals, people_group in people_by_attrs.items():
                person_attrs = dict(zip(self.attribute_names, attr_vals))
                eligible_venues = self.distributor.matcher.filter_venues_with_expansion(
                    person=people_group[0], venues=venues, initial_pool=geo_nearby,
                    location=(lat, lon), search_limits=search_attempts, person_attrs=person_attrs
                )

                if eligible_venues:
                    for person in people_group:
                        venue = None
                        with_cap = self.distributor._filter_venues_by_capacity(eligible_venues)
                        if with_cap:
                            # Selection strategy (e.g., closest) will pick from available venues
                            venue = self.distributor.matcher.select_venue(person, with_cap, (lat, lon))
                        elif allow_overflow:
                            venue = self.distributor.matcher.select_venue(person, eligible_venues, (lat, lon))

                        if venue:
                            venue.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                               activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                            self.distributor._increment_venue_count(venue)
                            allocated_count += 1

                        people_processed += 1
                        self._log_progress(people_processed, total_people, progress_interval, allocated_count)
                else:
                    people_processed += len(people_group)
                    self._log_progress(people_processed, total_people, progress_interval, allocated_count)

        return allocated_count

    def allocate_by_geo_unit(self, people: List, venues: List) -> List:
        """Batch allocation by geo_unit for performance."""
        people_by_geo = defaultdict(list)
        for person in people:
            geo = self.distributor._get_geo_unit_at_level(person, self.distributor.world, target_level=self.distributor.batch_geo_level)
            if geo: people_by_geo[geo].append(person)

        venues_by_geo = defaultdict(list)
        v_level = self.distributor.venue_geo_level
        for v in venues:
            if not v.geographical_unit:
                continue

            # Use the ancestor at the correct venue_geo_level for matching
            target_unit = v.geographical_unit
            if target_unit.level != v_level:
                target_unit = target_unit.get_ancestor_by_level(v_level)

            if target_unit:
                venues_by_geo[target_unit.name].append(v)

        total = len(people)
        processed = 0
        interval = max(1, total // 10)
        allocated = 0
        unallocated = []

        # Cache for venues by (search_unit, attribute_values) to avoid repeated attribute filtering
        # This keeps it correct even if different people in the same SGU have different eligibility
        pool_cache = {}

        # Determine strategy once
        strategy = self.config.get('allocation', {}).get('strategy', 'random')
        respect_capacity = self.config.get('venue_selection', {}).get('respect_capacity', True)

        for geo_unit, geo_people in people_by_geo.items():
            venue_search_unit = geo_unit if self.distributor.batch_geo_level == self.distributor.venue_geo_level else geo_unit.get_ancestor_by_level(self.distributor.venue_geo_level)
            if not venue_search_unit:
                unallocated.extend(geo_people)
                processed += len(geo_people)
                self._log_progress(processed, total, interval, allocated, prefix="  ")
                continue

            lat, lon = geo_unit.coordinates if (geo_unit.coordinates and len(geo_unit.coordinates) == 2) else (None, None)

            # Group by attributes within the SGU to ensure correctness for cases like schools/multisectors
            people_by_attrs = defaultdict(list)
            for person in geo_people:
                vals = tuple(getter(person) for getter in self.attr_getters)
                people_by_attrs[vals].append(person)

            for attr_vals, group in people_by_attrs.items():
                cache_key = (venue_search_unit.name, attr_vals)

                # 1. Get/Cache the pool of venues for this (LGU, attributes) combination
                if cache_key not in pool_cache:
                    eligible_pool = venues_by_geo.get(venue_search_unit.name, []) if self.config.get('venue_selection', {}).get('consider_by') == 'geo_unit' else self.distributor.matcher.find_eligible_venues_for_location((lat, lon), venues)

                    if eligible_pool:
                        p_attrs = dict(zip(self.attribute_names, attr_vals))
                        pool_cache[cache_key] = self.distributor.matcher.filter_venues_by_person(
                            group[0], eligible_pool, person_attrs=p_attrs
                        )
                    else:
                        pool_cache[cache_key] = []

                p_venues = pool_cache[cache_key]

                if not p_venues:
                    unallocated.extend(group)
                    processed += len(group)
                    self._log_progress(processed, total, interval, allocated, prefix="  ")
                    continue

                # 2. Efficient capacity iteration

                # Local available pool for this attribute group in this SGU
                if respect_capacity:
                    available_venues = [v for v in p_venues if self.distributor._get_remaining_capacity(v) > 0]
                else:
                    available_venues = list(p_venues)

                if not available_venues:
                    unallocated.extend(group)
                    processed += len(group)
                    self._log_progress(processed, total, interval, allocated, prefix="  ")
                    continue

                if strategy == 'random' and lat is not None:
                    np.random.shuffle(available_venues)
                    venue_ptr = 0

                    for person in group:
                        assigned = False
                        while venue_ptr < len(available_venues):
                            v = available_venues[venue_ptr]
                            if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0:
                                v.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                              activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                                self.distributor._increment_venue_count(v)
                                allocated += 1
                                assigned = True
                                if respect_capacity and self.distributor._get_remaining_capacity(v) <= 0:
                                    venue_ptr += 1
                                break
                            else:
                                venue_ptr += 1

                        if not assigned:
                            unallocated.append(person)

                elif strategy == 'closest' and lat is not None:
                    available_venues.sort(key=lambda v: self.distributor._haversine_distance((lat, lon), self.distributor._get_venue_location(v)))
                    venue_ptr = 0

                    for person in group:
                        assigned = False
                        while venue_ptr < len(available_venues):
                            v = available_venues[venue_ptr]
                            if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0:
                                v.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                              activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                                self.distributor._increment_venue_count(v)
                                allocated += 1
                                assigned = True
                                if respect_capacity and self.distributor._get_remaining_capacity(v) <= 0:
                                    venue_ptr += 1
                                break
                            else:
                                venue_ptr += 1
                        if not assigned:
                            unallocated.append(person)

                elif strategy == 'closest_balanced' and lat is not None:
                    # Weighted allocation: balance distance preference with remaining capacity
                    # Pre-compute distances once for all venues in this pool
                    venue_dists = np.array([
                        self.distributor._haversine_distance((lat, lon), self.distributor._get_venue_location(v))
                        for v in available_venues
                    ])

                    if self.verbose and not hasattr(self, '_cb_logged_geo'):
                        self._cb_logged_geo = set()

                    geo_name = geo_unit.name if hasattr(geo_unit, 'name') else str(geo_unit)
                    group_allocated = 0

                    # Pre-compute remaining capacity ONCE (not per person!)
                    remaining_caps = np.array([
                        self.distributor._get_remaining_capacity(v) if respect_capacity else 1
                        for v in available_venues
                    ], dtype=np.float64)

                    total_cap = self.distributor._get_venue_capacity(available_venues[0]) if available_venues else 1

                    # Pre-compute distance weights (constant for this SGU batch)
                    dist_weights = 1.0 / (venue_dists + 0.1)

                    for person in group:
                        # Filter to only venues with remaining capacity
                        valid_mask = remaining_caps > 0
                        if not valid_mask.any():
                            unallocated.append(person)
                            continue

                        valid_indices = np.where(valid_mask)[0]
                        valid_caps = remaining_caps[valid_indices]

                        # Capacity weight: venues with more remaining capacity are preferred
                        cap_weights = valid_caps / max(total_cap, 1)

                        # Combined weight = distance * capacity
                        weights = dist_weights[valid_indices] * cap_weights
                        weight_sum = weights.sum()

                        if weight_sum <= 0:
                            unallocated.append(person)
                            continue

                        probs = weights / weight_sum
                        chosen_idx = valid_indices[np.random.choice(len(valid_indices), p=probs)]
                        v = available_venues[chosen_idx]

                        v.add_to_subset(person, subset_key=self.distributor.subset_key,
                                       activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                        self.distributor._increment_venue_count(v)
                        allocated += 1
                        group_allocated += 1

                        # Update capacity array incrementally (avoid rebuilding)
                        if respect_capacity:
                            remaining_caps[chosen_idx] -= 1

                    # Log per-SGU summary for this group
                    if self.verbose and geo_name not in self._cb_logged_geo:
                        self._cb_logged_geo.add(geo_name)
                        venue_counts = [self.distributor.venue_capacity_tracker.get(id(v), 0) for v in available_venues]
                        used = sum(1 for c in venue_counts if c > 0)
                        logger.debug(f"  [closest_balanced] {geo_name}: {len(group)} people -> "
                                    f"{group_allocated} allocated across {used}/{len(available_venues)} venues, "
                                    f"counts={sorted(venue_counts, reverse=True)[:8]}")

                else:
                    # Fallback for complex strategies or missing coordinates
                    for person in group:
                        with_cap = [v for v in available_venues if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0]
                        if with_cap:
                            venue = self.distributor.matcher.select_venue(person, with_cap, (lat, lon) if lat is not None else None)
                            if venue:
                                venue.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                                  activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                                self.distributor._increment_venue_count(venue)
                                allocated += 1
                                continue
                        unallocated.append(person)

                processed += len(group)
                self._log_progress(processed, total, interval, allocated, prefix="  ")

        self.distributor.allocated_this_run += allocated
        return unallocated

    def allocate_individual(self, people: List, venues: List) -> List:
        """Allocate people individually."""
        allocated = 0
        unallocated = []
        total = len(people)
        interval = max(1, total // 10)

        for i, person in enumerate(people, 1):
            loc = self.distributor._get_person_location(person)
            if not loc:
                unallocated.append(person)
                continue

            pool = self.distributor.matcher.find_eligible_venues_for_location(loc, venues)
            # STRICT DEFAULT: Only use first limit or small pool
            search_limits = self.config.get('venue_selection', {}).get('search_limits', [20])
            p_venues = self.distributor.matcher.filter_venues_with_expansion(person, venues, pool, loc, search_limits)

            if p_venues:
                with_cap = self.distributor._filter_venues_by_capacity(p_venues)
                if with_cap:
                    venue = self.distributor.matcher.select_venue(person, with_cap, loc)
                    if venue:
                        venue.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                          activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                        self.distributor._increment_venue_count(venue)
                        allocated += 1
                        continue
            unallocated.append(person)
            self._log_progress(i, total, interval, allocated, prefix="  ")

        self.distributor.allocated_this_run += allocated
        return unallocated

    def _log_progress(self, current, total, interval, count, prefix="    "):
        if interval <= 0:
            return

        # Calculate which interval threshold was reached/crossed
        prev_threshold = (current - 1) // interval if current > 0 else -1
        curr_threshold = current // interval

        if curr_threshold > prev_threshold or current >= total:
            logger.info(f"{prefix}Progress: {current}/{total} people processed ({min(100, current/total*100):.1f}%) - {count} allocated")

allocate_by_geo_unit(people, venues)

Batch allocation by geo_unit for performance.

Source code in may/venue_distributor/allocation_engine.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def allocate_by_geo_unit(self, people: List, venues: List) -> List:
    """Batch allocation by geo_unit for performance."""
    people_by_geo = defaultdict(list)
    for person in people:
        geo = self.distributor._get_geo_unit_at_level(person, self.distributor.world, target_level=self.distributor.batch_geo_level)
        if geo: people_by_geo[geo].append(person)

    venues_by_geo = defaultdict(list)
    v_level = self.distributor.venue_geo_level
    for v in venues:
        if not v.geographical_unit:
            continue

        # Use the ancestor at the correct venue_geo_level for matching
        target_unit = v.geographical_unit
        if target_unit.level != v_level:
            target_unit = target_unit.get_ancestor_by_level(v_level)

        if target_unit:
            venues_by_geo[target_unit.name].append(v)

    total = len(people)
    processed = 0
    interval = max(1, total // 10)
    allocated = 0
    unallocated = []

    # Cache for venues by (search_unit, attribute_values) to avoid repeated attribute filtering
    # This keeps it correct even if different people in the same SGU have different eligibility
    pool_cache = {}

    # Determine strategy once
    strategy = self.config.get('allocation', {}).get('strategy', 'random')
    respect_capacity = self.config.get('venue_selection', {}).get('respect_capacity', True)

    for geo_unit, geo_people in people_by_geo.items():
        venue_search_unit = geo_unit if self.distributor.batch_geo_level == self.distributor.venue_geo_level else geo_unit.get_ancestor_by_level(self.distributor.venue_geo_level)
        if not venue_search_unit:
            unallocated.extend(geo_people)
            processed += len(geo_people)
            self._log_progress(processed, total, interval, allocated, prefix="  ")
            continue

        lat, lon = geo_unit.coordinates if (geo_unit.coordinates and len(geo_unit.coordinates) == 2) else (None, None)

        # Group by attributes within the SGU to ensure correctness for cases like schools/multisectors
        people_by_attrs = defaultdict(list)
        for person in geo_people:
            vals = tuple(getter(person) for getter in self.attr_getters)
            people_by_attrs[vals].append(person)

        for attr_vals, group in people_by_attrs.items():
            cache_key = (venue_search_unit.name, attr_vals)

            # 1. Get/Cache the pool of venues for this (LGU, attributes) combination
            if cache_key not in pool_cache:
                eligible_pool = venues_by_geo.get(venue_search_unit.name, []) if self.config.get('venue_selection', {}).get('consider_by') == 'geo_unit' else self.distributor.matcher.find_eligible_venues_for_location((lat, lon), venues)

                if eligible_pool:
                    p_attrs = dict(zip(self.attribute_names, attr_vals))
                    pool_cache[cache_key] = self.distributor.matcher.filter_venues_by_person(
                        group[0], eligible_pool, person_attrs=p_attrs
                    )
                else:
                    pool_cache[cache_key] = []

            p_venues = pool_cache[cache_key]

            if not p_venues:
                unallocated.extend(group)
                processed += len(group)
                self._log_progress(processed, total, interval, allocated, prefix="  ")
                continue

            # 2. Efficient capacity iteration

            # Local available pool for this attribute group in this SGU
            if respect_capacity:
                available_venues = [v for v in p_venues if self.distributor._get_remaining_capacity(v) > 0]
            else:
                available_venues = list(p_venues)

            if not available_venues:
                unallocated.extend(group)
                processed += len(group)
                self._log_progress(processed, total, interval, allocated, prefix="  ")
                continue

            if strategy == 'random' and lat is not None:
                np.random.shuffle(available_venues)
                venue_ptr = 0

                for person in group:
                    assigned = False
                    while venue_ptr < len(available_venues):
                        v = available_venues[venue_ptr]
                        if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0:
                            v.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                          activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                            self.distributor._increment_venue_count(v)
                            allocated += 1
                            assigned = True
                            if respect_capacity and self.distributor._get_remaining_capacity(v) <= 0:
                                venue_ptr += 1
                            break
                        else:
                            venue_ptr += 1

                    if not assigned:
                        unallocated.append(person)

            elif strategy == 'closest' and lat is not None:
                available_venues.sort(key=lambda v: self.distributor._haversine_distance((lat, lon), self.distributor._get_venue_location(v)))
                venue_ptr = 0

                for person in group:
                    assigned = False
                    while venue_ptr < len(available_venues):
                        v = available_venues[venue_ptr]
                        if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0:
                            v.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                          activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                            self.distributor._increment_venue_count(v)
                            allocated += 1
                            assigned = True
                            if respect_capacity and self.distributor._get_remaining_capacity(v) <= 0:
                                venue_ptr += 1
                            break
                        else:
                            venue_ptr += 1
                    if not assigned:
                        unallocated.append(person)

            elif strategy == 'closest_balanced' and lat is not None:
                # Weighted allocation: balance distance preference with remaining capacity
                # Pre-compute distances once for all venues in this pool
                venue_dists = np.array([
                    self.distributor._haversine_distance((lat, lon), self.distributor._get_venue_location(v))
                    for v in available_venues
                ])

                if self.verbose and not hasattr(self, '_cb_logged_geo'):
                    self._cb_logged_geo = set()

                geo_name = geo_unit.name if hasattr(geo_unit, 'name') else str(geo_unit)
                group_allocated = 0

                # Pre-compute remaining capacity ONCE (not per person!)
                remaining_caps = np.array([
                    self.distributor._get_remaining_capacity(v) if respect_capacity else 1
                    for v in available_venues
                ], dtype=np.float64)

                total_cap = self.distributor._get_venue_capacity(available_venues[0]) if available_venues else 1

                # Pre-compute distance weights (constant for this SGU batch)
                dist_weights = 1.0 / (venue_dists + 0.1)

                for person in group:
                    # Filter to only venues with remaining capacity
                    valid_mask = remaining_caps > 0
                    if not valid_mask.any():
                        unallocated.append(person)
                        continue

                    valid_indices = np.where(valid_mask)[0]
                    valid_caps = remaining_caps[valid_indices]

                    # Capacity weight: venues with more remaining capacity are preferred
                    cap_weights = valid_caps / max(total_cap, 1)

                    # Combined weight = distance * capacity
                    weights = dist_weights[valid_indices] * cap_weights
                    weight_sum = weights.sum()

                    if weight_sum <= 0:
                        unallocated.append(person)
                        continue

                    probs = weights / weight_sum
                    chosen_idx = valid_indices[np.random.choice(len(valid_indices), p=probs)]
                    v = available_venues[chosen_idx]

                    v.add_to_subset(person, subset_key=self.distributor.subset_key,
                                   activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                    self.distributor._increment_venue_count(v)
                    allocated += 1
                    group_allocated += 1

                    # Update capacity array incrementally (avoid rebuilding)
                    if respect_capacity:
                        remaining_caps[chosen_idx] -= 1

                # Log per-SGU summary for this group
                if self.verbose and geo_name not in self._cb_logged_geo:
                    self._cb_logged_geo.add(geo_name)
                    venue_counts = [self.distributor.venue_capacity_tracker.get(id(v), 0) for v in available_venues]
                    used = sum(1 for c in venue_counts if c > 0)
                    logger.debug(f"  [closest_balanced] {geo_name}: {len(group)} people -> "
                                f"{group_allocated} allocated across {used}/{len(available_venues)} venues, "
                                f"counts={sorted(venue_counts, reverse=True)[:8]}")

            else:
                # Fallback for complex strategies or missing coordinates
                for person in group:
                    with_cap = [v for v in available_venues if not respect_capacity or self.distributor._get_remaining_capacity(v) > 0]
                    if with_cap:
                        venue = self.distributor.matcher.select_venue(person, with_cap, (lat, lon) if lat is not None else None)
                        if venue:
                            venue.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                              activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                            self.distributor._increment_venue_count(venue)
                            allocated += 1
                            continue
                    unallocated.append(person)

            processed += len(group)
            self._log_progress(processed, total, interval, allocated, prefix="  ")

    self.distributor.allocated_this_run += allocated
    return unallocated

allocate_group(people, venues, allow_overflow=False, group_search_limits=None)

Allocate a specific group of people with geo-unit level caching and attribute batching.

Source code in may/venue_distributor/allocation_engine.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def allocate_group(self, people: List, venues: List, allow_overflow: bool = False, group_search_limits=None) -> int:
    """Allocate a specific group of people with geo-unit level caching and attribute batching."""
    allocated_count = 0
    total_people = len(people)
    people_processed = 0
    progress_interval = max(1, total_people // 10)

    selection_config = self.config.get('venue_selection', {})
    target_count = selection_config.get('count', 5)

    # STRICT LIMITS: Follow baseline behavior. Default to target_count * 4 if no limits provided.
    # This avoid expensive None (all venues) searches.
    search_limits = group_search_limits if group_search_limits is not None else selection_config.get('search_limits', [target_count * 4])
    if not search_limits: search_limits = [target_count * 4]

    # Clean search limits and remove None/unlimited cases
    clean_limits = []
    for l in search_limits:
        if l is None: continue
        clean_limits.append(min(l, 100)) # Absolute cap at 100 venues
    if not clean_limits: clean_limits = [20]

    search_attempts = sorted(set(clean_limits))

    people_by_geo = defaultdict(list)
    for person in people:
        geo = self.distributor._get_geo_unit_at_level(person, self.distributor.world, target_level=self.distributor.batch_geo_level)
        if geo: people_by_geo[geo].append(person)

    for geo_unit, geo_people in people_by_geo.items():
        if not (geo_unit.coordinates and len(geo_unit.coordinates) == 2): continue
        lat, lon = geo_unit.coordinates

        # Find nearby venues once per geo unit
        geo_nearby = self.distributor._find_closest_venues((lat, lon), self.distributor.venue_type, search_attempts[0], allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))

        if not allow_overflow and not self.distributor._filter_venues_by_capacity(geo_nearby):
            people_processed += len(geo_people)
            self._log_progress(people_processed, total_people, progress_interval, allocated_count)
            continue

        people_by_attrs = defaultdict(list)
        for person in geo_people:
            vals = tuple(getter(person) for getter in self.attr_getters)
            people_by_attrs[vals].append(person)

        for attr_vals, people_group in people_by_attrs.items():
            person_attrs = dict(zip(self.attribute_names, attr_vals))
            eligible_venues = self.distributor.matcher.filter_venues_with_expansion(
                person=people_group[0], venues=venues, initial_pool=geo_nearby,
                location=(lat, lon), search_limits=search_attempts, person_attrs=person_attrs
            )

            if eligible_venues:
                for person in people_group:
                    venue = None
                    with_cap = self.distributor._filter_venues_by_capacity(eligible_venues)
                    if with_cap:
                        # Selection strategy (e.g., closest) will pick from available venues
                        venue = self.distributor.matcher.select_venue(person, with_cap, (lat, lon))
                    elif allow_overflow:
                        venue = self.distributor.matcher.select_venue(person, eligible_venues, (lat, lon))

                    if venue:
                        venue.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                           activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                        self.distributor._increment_venue_count(venue)
                        allocated_count += 1

                    people_processed += 1
                    self._log_progress(people_processed, total_people, progress_interval, allocated_count)
            else:
                people_processed += len(people_group)
                self._log_progress(people_processed, total_people, progress_interval, allocated_count)

    return allocated_count

allocate_individual(people, venues)

Allocate people individually.

Source code in may/venue_distributor/allocation_engine.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def allocate_individual(self, people: List, venues: List) -> List:
    """Allocate people individually."""
    allocated = 0
    unallocated = []
    total = len(people)
    interval = max(1, total // 10)

    for i, person in enumerate(people, 1):
        loc = self.distributor._get_person_location(person)
        if not loc:
            unallocated.append(person)
            continue

        pool = self.distributor.matcher.find_eligible_venues_for_location(loc, venues)
        # STRICT DEFAULT: Only use first limit or small pool
        search_limits = self.config.get('venue_selection', {}).get('search_limits', [20])
        p_venues = self.distributor.matcher.filter_venues_with_expansion(person, venues, pool, loc, search_limits)

        if p_venues:
            with_cap = self.distributor._filter_venues_by_capacity(p_venues)
            if with_cap:
                venue = self.distributor.matcher.select_venue(person, with_cap, loc)
                if venue:
                    venue.add_to_subset(person, subset_key=self.distributor.subset_key, 
                                      activity_name=self.distributor.activity_map_key, activity_type=self.distributor.activity_type)
                    self.distributor._increment_venue_count(venue)
                    allocated += 1
                    continue
        unallocated.append(person)
        self._log_progress(i, total, interval, allocated, prefix="  ")

    self.distributor.allocated_this_run += allocated
    return unallocated