Skip to content

Matcher

VenueMatcher

Manages venue-side matching logic, including attribute checks, spatial expansion, and selection strategies.

Source code in may/venue_distributor/matcher.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class VenueMatcher:
    """
    Manages venue-side matching logic, including attribute checks,
    spatial expansion, and selection strategies.
    """

    def __init__(self, distributor):
        self.distributor = distributor
        self.config = distributor.config
        self.verbose = distributor.verbose

        # Attribute caches and indices
        self.venue_attribute_cache = {}
        self.categorical_index = {}
        self.num_constraints = {}
        self.numerical_match_rules = []
        self.categorical_match_rules = []
        self.attribute_index_built = False

    def build_attribute_index(self, venues: List):
        """
        Pre-process venue attributes for fast filtering.
        """
        eligibility = self.config.get('eligibility', {})
        attributes = eligibility.get('attributes', [])

        # Reset and initialize basic indices
        self.venue_attribute_cache = {}
        self.categorical_index = {}
        self.num_constraints = {}
        self.venue_id_to_idx = {id(v): i for i, v in enumerate(venues)}

        if not attributes:
            self.attribute_index_built = True
            return

        # Pre-filter rules to those that actually have venue components
        active_rules = []
        for rule in attributes:
            attr_name = rule.get('name')
            attr_type = rule.get('type')

            if attr_type == 'numerical' and rule.get('venue_constraints'):
                v_con = rule.get('venue_constraints')
                active_rules.append({
                    'name': attr_name,
                    'type': 'numerical',
                    'min_col': v_con.get('min_column'),
                    'max_col': v_con.get('max_column')
                })
                # Initialize arrays with int32 min/max as "no constraint" sentinels
                _INT32_MIN = np.iinfo(np.int32).min
                _INT32_MAX = np.iinfo(np.int32).max
                self.num_constraints[attr_name] = {
                    'min': np.full(len(venues), _INT32_MIN, dtype=np.int32),
                    'max': np.full(len(venues), _INT32_MAX, dtype=np.int32)
                }
            elif attr_type == 'categorical' and rule.get('venue_column'):
                active_rules.append({
                    'name': attr_name,
                    'type': 'categorical',
                    'col': rule.get('venue_column'),
                    'assume': rule.get('assume_if_missing', 'Mixed'),
                    'case_sensitive': rule.get('case_sensitive', False),
                    'rules': rule.get('matching_rules', {})
                })

        if not active_rules:
            self.attribute_index_built = True
            return

        # Pre-process matching rules for categorical (avoiding repeated work)
        for rule in active_rules:
            if rule['type'] == 'categorical' and not rule['case_sensitive']:
                rule['rules'] = {k.lower(): [v.lower() for v in vals] for k, vals in rule['rules'].items()}

        # Single pass over venues
        for i, venue in enumerate(venues):
            v_props = venue.properties
            v_id = id(venue)

            for rule in active_rules:
                attr_name = rule['name']

                if rule['type'] == 'numerical':
                    min_val = v_props.get(rule['min_col']) if rule['min_col'] else None
                    max_val = v_props.get(rule['max_col']) if rule['max_col'] else None

                    if min_val is not None and min_val != '':
                        self.num_constraints[attr_name]['min'][i] = int(float(min_val))
                    if max_val is not None and max_val != '':
                        self.num_constraints[attr_name]['max'][i] = int(float(max_val))

                else: # categorical
                    v_val = v_props.get(rule['col'])
                    if v_val is None or v_val == '':
                        v_val = rule['assume']

                    if not rule['case_sensitive']:
                        v_val = str(v_val).lower() if v_val else ''

                    allowed = rule['rules'].get(v_val)
                    if allowed:
                        for p_val in allowed:
                            index_key = (attr_name, self.distributor._normalize_value(p_val))
                            if index_key not in self.categorical_index:
                                self.categorical_index[index_key] = set()
                            self.categorical_index[index_key].add(v_id)

        self.numerical_match_rules = [r for r in active_rules if r['type'] == 'numerical']
        self.categorical_match_rules = [r for r in active_rules if r['type'] == 'categorical' and not r.get('venue_column')]

        self.attribute_index_built = True

        if self.verbose:
            logger.info(f"Built attribute index for {len(venues)} venues with {len(attributes)} attributes")
            if self.categorical_index:
                logger.info(f"Built categorical index: {len(self.categorical_index)} unique value combinations")

    def filter_venues_with_expansion(self, person, venues: List, initial_pool: List, 
                                   location: Tuple[float, float], search_limits: List[int], 
                                   person_attrs: Optional[Dict] = None) -> List:
        """
        Filter venues for a person. Restores 'strict' behavior from baseline.
        """
        # Tier 1: Try the initial pool (Fast path - matches old strict behavior)
        eligible = self.filter_venues_by_person(person, initial_pool, person_attrs=person_attrs)
        if eligible:
            return eligible

        # Expansion Tiers: Only triggered if explicitly configured and initial failed
        # Performance Trade-off: We prefer skipping people over massive spatial searches.
        if self.config.get('venue_selection', {}).get('consider_by') == 'count':
            # Strict mode: If only one search limit or no limits, don't expand
            if len(search_limits) <= 1:
                return []

            for search_count in search_limits:
                # Skip if already tried this many or fewer
                if search_count <= len(initial_pool):
                    continue

                # RESTRAIN EXPANSION: Max 100 venues for performance
                # The old code queried max 50-100. Checking 10,000 is too slow.
                limit = min(search_count, 100) 

                if self.verbose:
                    logger.debug(f"Expanding search for person {person.id} to k={limit}")

                expanded_pool = self.distributor._find_closest_venues(
                    location, self.distributor.venue_type, limit, 
                    allowed_venue_ids=getattr(self.distributor, 'venue_ids', None)
                )
                eligible = self.filter_venues_by_person(person, expanded_pool, person_attrs=person_attrs)

                if eligible:
                    return eligible

                # If we've reached the safety cap, stop expanding
                if limit >= 100:
                    break

        return []

    def filter_venues_by_person(self, person, venues: List, person_attrs: Optional[Dict] = None) -> List:
        """Filter venues based on person's attributes (age, gender, etc.)."""
        match_attrs = getattr(self.distributor, '_pre_processed_match_attrs', [])

        # Pre-fetch attributes for this person to avoid repeated slow lookups
        if person_attrs is None:
            person_attrs = {}
            for rule in match_attrs:
                attr = rule['attribute']
                if attr not in person_attrs:
                    if rule.get('is_residence'):
                        res = person.residence
                        val = self.distributor._get_nested_value_with_dict_support(res, rule['residence_parts']) if res else None
                    elif rule.get('is_nested'):
                        val = self.distributor._get_nested_value_with_dict_support(person, rule['path_parts'])
                    else:
                        # Direct attribute
                        val = getattr(person, attr, None)
                    person_attrs[attr] = val

        # Step 1: Pre-filter using categorical index
        venues = self.prefilter_venues_by_categorical(person, venues, person_attrs=person_attrs)
        if not venues:
            return []

        # Step 2: Filter remaining venues by other attributes
        eligible_venues = []
        for venue in venues:
            if self.venue_accepts_person(person, venue, match_attrs, person_attrs=person_attrs):
                eligible_venues.append(venue)

        return eligible_venues

    def venue_accepts_person(self, person, venue, attribute_rules: List[Dict], person_attrs: Optional[Dict] = None) -> bool:
        """Check if venue accepts person based on attribute rules using pre-computed arrays."""
        v_id = id(venue)
        v_idx = self.venue_id_to_idx.get(v_id)

        if v_idx is None:
            return self.venue_accepts_person_slow(person, venue, attribute_rules)

        # Separate loops and pre-defined lists avoid dictionary lookups on 'rule'
        for rule in self.numerical_match_rules:
            attr_name = rule['name']
            if attr_name in self.num_constraints:
                person_value = self._get_person_attr(person, attr_name, person_attrs)
                if person_value is None:
                    return False

                constraints = self.num_constraints[attr_name]
                # Direct array access is much faster than dict lookup
                v_min = constraints['min'][v_idx]
                if v_min != np.iinfo(np.int32).min and person_value < v_min:
                    return False

                v_max = constraints['max'][v_idx]
                if v_max != np.iinfo(np.int32).max and person_value > v_max:
                    return False

        # Categorical rules with venue_column are handled via prefilter_venues_by_categorical
        # We only check those without a venue_column (rare but possible)
        for rule in self.categorical_match_rules:
            # Implement categorical check here if needed (not common in hot path)
            pass

        return True

    def venue_accepts_person_slow(self, person, venue, attribute_rules: List[Dict]) -> bool:
        """Fallback for venues without cache."""
        for rule in attribute_rules:
            attr_name = rule.get('name')
            person_value = self.distributor._get_person_attribute(attr_name, person)
            if person_value is None: return False

            if rule.get('type') == 'numerical':
                if not self._check_numerical_constraint(person_value, venue, rule): return False
            elif rule.get('type') == 'categorical':
                if not self._check_categorical_constraint(person_value, venue, rule): return False
        return True

    def prefilter_venues_by_categorical(self, person, venues: List, person_attrs: Optional[Dict] = None) -> List:
        """Pre-filter venues using categorical index for massive speedup."""
        if not self.categorical_index:
            return venues

        eligibility = self.config.get('eligibility', {})
        attributes = eligibility.get('attributes', [])

        categorical_filters = []
        for rule in attributes:
            if rule.get('type') == 'categorical' and rule.get('venue_column'):
                attr_name = rule.get('name')
                val = self._get_person_attr(person, attr_name, person_attrs)
                if val is not None:
                    # Normalize and handle case sensitivity for pre-filtering
                    norm_val = self.distributor._normalize_value(val)

                    # Search for the rule to check case sensitivity
                    is_case_sensitive = True
                    for rule in attributes:
                        if rule.get('name') == attr_name:
                            is_case_sensitive = rule.get('case_sensitive', False)
                            break

                    if not is_case_sensitive:
                        norm_val = norm_val.lower()

                    categorical_filters.append((attr_name, norm_val))

        if not categorical_filters:
            return venues

        attr_name, val = categorical_filters[0]
        filtered_ids = self.categorical_index.get((attr_name, val), set())

        if len(categorical_filters) > 1:
            filtered_ids = filtered_ids.copy()
            for attr_name, val in categorical_filters[1:]:
                filtered_ids &= self.categorical_index.get((attr_name, val), set())

        return [v for v in venues if id(v) in filtered_ids]

    def select_venue(self, person, venues: List, person_location: Tuple[float, float]) -> Optional[Any]:
        """Select final venue from eligible list based on strategy."""
        if not venues: return None

        strategy = self.config.get('allocation', {}).get('strategy', 'random')
        if strategy == 'random':
            return np.random.choice(venues)
        elif strategy == 'closest':
            valid_venues = [v for v in venues if v.coordinates]
            if not valid_venues: return venues[0]

            # Use scalar math for small sets, vectorized for large sets
            if len(valid_venues) < 50:
                return min(valid_venues, key=lambda v: self.distributor._haversine_distance(person_location, v.coordinates))
            else:
                coords = np.array([v.coordinates for v in valid_venues])
                dists = self.distributor._haversine_distance_vectorized(person_location, coords)
                return valid_venues[np.argmin(dists)]
        elif strategy == 'proportional':
            valid = [v for v in venues if v.coordinates]
            if not valid: return venues[0]

            # Use scalar math for small sets, vectorized for large sets
            if len(valid) < 50:
                dists = [self.distributor._haversine_distance(person_location, v.coordinates) for v in valid]
            else:
                coords = np.array([v.coordinates for v in valid])
                dists = self.distributor._haversine_distance_vectorized(person_location, coords)

            weights = np.array([1.0 / (d + 0.1) for d in dists])
            return np.random.choice(valid, p=weights / weights.sum())
        elif strategy == 'closest_balanced':
            valid = [v for v in venues if v.coordinates]
            if not valid: return np.random.choice(venues)

            if len(valid) < 50:
                dists = np.array([self.distributor._haversine_distance(person_location, v.coordinates) for v in valid])
            else:
                coords = np.array([v.coordinates for v in valid])
                dists = self.distributor._haversine_distance_vectorized(person_location, coords)

            dist_weights = 1.0 / (dists + 0.1)

            # Factor in remaining capacity
            remaining_caps = np.array([
                self.distributor._get_remaining_capacity(v) for v in valid
            ], dtype=np.float64)

            total_cap = max(self.distributor._get_venue_capacity(valid[0]), 1)
            cap_weights = remaining_caps / total_cap

            weights = dist_weights * cap_weights
            weight_sum = weights.sum()

            if weight_sum <= 0:
                # All venues full — fall back to closest
                return valid[np.argmin(dists)]

            probs = weights / weight_sum
            return np.random.choice(valid, p=probs)
        elif strategy == 'largest_capacity':
            return max(venues, key=lambda v: self.distributor._get_venue_capacity(v))

        return venues[0]

    def find_eligible_venues_for_location(self, location: Tuple[float, float], venues: List) -> List:
        """Find candidate venues based on distance/count config."""
        selection = self.config.get('venue_selection', {})
        consider_by = selection.get('consider_by', 'count')

        if consider_by == 'count':
            count = selection.get('count', 5)
            if selection.get('criteria') == 'largest_capacity':
                # Query a larger pool first to find large ones nearby
                closest_pool = self.distributor._find_closest_venues(location, self.distributor.venue_type, max(count * 5, 20), allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))
                return sorted(closest_pool, key=lambda v: self.distributor._get_venue_capacity(v), reverse=True)[:count]
            return self.distributor._find_closest_venues(location, self.distributor.venue_type, count, allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))

        elif consider_by == 'distance':
            max_dist = selection.get('max_distance', 10)
            unit = selection.get('max_distance_unit', 'km')
            if unit == 'miles': max_dist *= 1.60934
            elif unit == 'meters': max_dist /= 1000

            eligible = []
            valid = [v for v in venues if v.coordinates]
            if valid:
                coords = np.array([v.coordinates for v in valid])
                dists = self.distributor._haversine_distance_vectorized(location, coords)
                eligible = [v for v, d in zip(valid, dists) if d <= max_dist]
            else:
                eligible = []

            if selection.get('criteria') == 'largest_capacity':
                eligible.sort(key=lambda v: self.distributor._get_venue_capacity(v), reverse=True)
            return eligible

        return venues

    def _get_person_attr(self, person, attr_name: str, person_attrs: Optional[Dict]) -> Any:
        """
        Get person attribute using the distributor's lookup method.
        """
        if person_attrs and attr_name in person_attrs:
            return person_attrs[attr_name]

        return self.distributor._get_person_attribute(attr_name, person)

    def _check_numerical_constraint(self, val, venue, rule: Dict) -> bool:
        constraints = rule.get('venue_constraints', {})
        min_v = venue.properties.get(constraints.get('min_column')) if constraints.get('min_column') else None
        max_v = venue.properties.get(constraints.get('max_column')) if constraints.get('max_column') else None
        if min_v is not None and val < min_v: return False
        if max_v is not None and val > max_v: return False
        return True

    def _check_categorical_constraint(self, val, venue, rule: Dict) -> bool:
        col = rule.get('venue_column')
        if not col: return True
        v_val = venue.properties.get(col, rule.get('assume_if_missing', 'Mixed'))
        matching = rule.get('matching_rules', {})
        if not rule.get('case_sensitive', False):
            v_val = self.distributor._normalize_value(v_val).lower()
            val = self.distributor._normalize_value(val).lower()
            matching = {k.lower(): [self.distributor._normalize_value(v).lower() for v in vals] for k, vals in matching.items()}
        else:
            v_val = self.distributor._normalize_value(v_val)
            val = self.distributor._normalize_value(val)
            matching = {k: [self.distributor._normalize_value(v) for v in vals] for k, vals in matching.items()}

        return val in matching.get(v_val, []) if v_val in matching else True

build_attribute_index(venues)

Pre-process venue attributes for fast filtering.

Source code in may/venue_distributor/matcher.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def build_attribute_index(self, venues: List):
    """
    Pre-process venue attributes for fast filtering.
    """
    eligibility = self.config.get('eligibility', {})
    attributes = eligibility.get('attributes', [])

    # Reset and initialize basic indices
    self.venue_attribute_cache = {}
    self.categorical_index = {}
    self.num_constraints = {}
    self.venue_id_to_idx = {id(v): i for i, v in enumerate(venues)}

    if not attributes:
        self.attribute_index_built = True
        return

    # Pre-filter rules to those that actually have venue components
    active_rules = []
    for rule in attributes:
        attr_name = rule.get('name')
        attr_type = rule.get('type')

        if attr_type == 'numerical' and rule.get('venue_constraints'):
            v_con = rule.get('venue_constraints')
            active_rules.append({
                'name': attr_name,
                'type': 'numerical',
                'min_col': v_con.get('min_column'),
                'max_col': v_con.get('max_column')
            })
            # Initialize arrays with int32 min/max as "no constraint" sentinels
            _INT32_MIN = np.iinfo(np.int32).min
            _INT32_MAX = np.iinfo(np.int32).max
            self.num_constraints[attr_name] = {
                'min': np.full(len(venues), _INT32_MIN, dtype=np.int32),
                'max': np.full(len(venues), _INT32_MAX, dtype=np.int32)
            }
        elif attr_type == 'categorical' and rule.get('venue_column'):
            active_rules.append({
                'name': attr_name,
                'type': 'categorical',
                'col': rule.get('venue_column'),
                'assume': rule.get('assume_if_missing', 'Mixed'),
                'case_sensitive': rule.get('case_sensitive', False),
                'rules': rule.get('matching_rules', {})
            })

    if not active_rules:
        self.attribute_index_built = True
        return

    # Pre-process matching rules for categorical (avoiding repeated work)
    for rule in active_rules:
        if rule['type'] == 'categorical' and not rule['case_sensitive']:
            rule['rules'] = {k.lower(): [v.lower() for v in vals] for k, vals in rule['rules'].items()}

    # Single pass over venues
    for i, venue in enumerate(venues):
        v_props = venue.properties
        v_id = id(venue)

        for rule in active_rules:
            attr_name = rule['name']

            if rule['type'] == 'numerical':
                min_val = v_props.get(rule['min_col']) if rule['min_col'] else None
                max_val = v_props.get(rule['max_col']) if rule['max_col'] else None

                if min_val is not None and min_val != '':
                    self.num_constraints[attr_name]['min'][i] = int(float(min_val))
                if max_val is not None and max_val != '':
                    self.num_constraints[attr_name]['max'][i] = int(float(max_val))

            else: # categorical
                v_val = v_props.get(rule['col'])
                if v_val is None or v_val == '':
                    v_val = rule['assume']

                if not rule['case_sensitive']:
                    v_val = str(v_val).lower() if v_val else ''

                allowed = rule['rules'].get(v_val)
                if allowed:
                    for p_val in allowed:
                        index_key = (attr_name, self.distributor._normalize_value(p_val))
                        if index_key not in self.categorical_index:
                            self.categorical_index[index_key] = set()
                        self.categorical_index[index_key].add(v_id)

    self.numerical_match_rules = [r for r in active_rules if r['type'] == 'numerical']
    self.categorical_match_rules = [r for r in active_rules if r['type'] == 'categorical' and not r.get('venue_column')]

    self.attribute_index_built = True

    if self.verbose:
        logger.info(f"Built attribute index for {len(venues)} venues with {len(attributes)} attributes")
        if self.categorical_index:
            logger.info(f"Built categorical index: {len(self.categorical_index)} unique value combinations")

filter_venues_by_person(person, venues, person_attrs=None)

Filter venues based on person's attributes (age, gender, etc.).

Source code in may/venue_distributor/matcher.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def filter_venues_by_person(self, person, venues: List, person_attrs: Optional[Dict] = None) -> List:
    """Filter venues based on person's attributes (age, gender, etc.)."""
    match_attrs = getattr(self.distributor, '_pre_processed_match_attrs', [])

    # Pre-fetch attributes for this person to avoid repeated slow lookups
    if person_attrs is None:
        person_attrs = {}
        for rule in match_attrs:
            attr = rule['attribute']
            if attr not in person_attrs:
                if rule.get('is_residence'):
                    res = person.residence
                    val = self.distributor._get_nested_value_with_dict_support(res, rule['residence_parts']) if res else None
                elif rule.get('is_nested'):
                    val = self.distributor._get_nested_value_with_dict_support(person, rule['path_parts'])
                else:
                    # Direct attribute
                    val = getattr(person, attr, None)
                person_attrs[attr] = val

    # Step 1: Pre-filter using categorical index
    venues = self.prefilter_venues_by_categorical(person, venues, person_attrs=person_attrs)
    if not venues:
        return []

    # Step 2: Filter remaining venues by other attributes
    eligible_venues = []
    for venue in venues:
        if self.venue_accepts_person(person, venue, match_attrs, person_attrs=person_attrs):
            eligible_venues.append(venue)

    return eligible_venues

filter_venues_with_expansion(person, venues, initial_pool, location, search_limits, person_attrs=None)

Filter venues for a person. Restores 'strict' behavior from baseline.

Source code in may/venue_distributor/matcher.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def filter_venues_with_expansion(self, person, venues: List, initial_pool: List, 
                               location: Tuple[float, float], search_limits: List[int], 
                               person_attrs: Optional[Dict] = None) -> List:
    """
    Filter venues for a person. Restores 'strict' behavior from baseline.
    """
    # Tier 1: Try the initial pool (Fast path - matches old strict behavior)
    eligible = self.filter_venues_by_person(person, initial_pool, person_attrs=person_attrs)
    if eligible:
        return eligible

    # Expansion Tiers: Only triggered if explicitly configured and initial failed
    # Performance Trade-off: We prefer skipping people over massive spatial searches.
    if self.config.get('venue_selection', {}).get('consider_by') == 'count':
        # Strict mode: If only one search limit or no limits, don't expand
        if len(search_limits) <= 1:
            return []

        for search_count in search_limits:
            # Skip if already tried this many or fewer
            if search_count <= len(initial_pool):
                continue

            # RESTRAIN EXPANSION: Max 100 venues for performance
            # The old code queried max 50-100. Checking 10,000 is too slow.
            limit = min(search_count, 100) 

            if self.verbose:
                logger.debug(f"Expanding search for person {person.id} to k={limit}")

            expanded_pool = self.distributor._find_closest_venues(
                location, self.distributor.venue_type, limit, 
                allowed_venue_ids=getattr(self.distributor, 'venue_ids', None)
            )
            eligible = self.filter_venues_by_person(person, expanded_pool, person_attrs=person_attrs)

            if eligible:
                return eligible

            # If we've reached the safety cap, stop expanding
            if limit >= 100:
                break

    return []

find_eligible_venues_for_location(location, venues)

Find candidate venues based on distance/count config.

Source code in may/venue_distributor/matcher.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def find_eligible_venues_for_location(self, location: Tuple[float, float], venues: List) -> List:
    """Find candidate venues based on distance/count config."""
    selection = self.config.get('venue_selection', {})
    consider_by = selection.get('consider_by', 'count')

    if consider_by == 'count':
        count = selection.get('count', 5)
        if selection.get('criteria') == 'largest_capacity':
            # Query a larger pool first to find large ones nearby
            closest_pool = self.distributor._find_closest_venues(location, self.distributor.venue_type, max(count * 5, 20), allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))
            return sorted(closest_pool, key=lambda v: self.distributor._get_venue_capacity(v), reverse=True)[:count]
        return self.distributor._find_closest_venues(location, self.distributor.venue_type, count, allowed_venue_ids=getattr(self.distributor, 'venue_ids', None))

    elif consider_by == 'distance':
        max_dist = selection.get('max_distance', 10)
        unit = selection.get('max_distance_unit', 'km')
        if unit == 'miles': max_dist *= 1.60934
        elif unit == 'meters': max_dist /= 1000

        eligible = []
        valid = [v for v in venues if v.coordinates]
        if valid:
            coords = np.array([v.coordinates for v in valid])
            dists = self.distributor._haversine_distance_vectorized(location, coords)
            eligible = [v for v, d in zip(valid, dists) if d <= max_dist]
        else:
            eligible = []

        if selection.get('criteria') == 'largest_capacity':
            eligible.sort(key=lambda v: self.distributor._get_venue_capacity(v), reverse=True)
        return eligible

    return venues

prefilter_venues_by_categorical(person, venues, person_attrs=None)

Pre-filter venues using categorical index for massive speedup.

Source code in may/venue_distributor/matcher.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def prefilter_venues_by_categorical(self, person, venues: List, person_attrs: Optional[Dict] = None) -> List:
    """Pre-filter venues using categorical index for massive speedup."""
    if not self.categorical_index:
        return venues

    eligibility = self.config.get('eligibility', {})
    attributes = eligibility.get('attributes', [])

    categorical_filters = []
    for rule in attributes:
        if rule.get('type') == 'categorical' and rule.get('venue_column'):
            attr_name = rule.get('name')
            val = self._get_person_attr(person, attr_name, person_attrs)
            if val is not None:
                # Normalize and handle case sensitivity for pre-filtering
                norm_val = self.distributor._normalize_value(val)

                # Search for the rule to check case sensitivity
                is_case_sensitive = True
                for rule in attributes:
                    if rule.get('name') == attr_name:
                        is_case_sensitive = rule.get('case_sensitive', False)
                        break

                if not is_case_sensitive:
                    norm_val = norm_val.lower()

                categorical_filters.append((attr_name, norm_val))

    if not categorical_filters:
        return venues

    attr_name, val = categorical_filters[0]
    filtered_ids = self.categorical_index.get((attr_name, val), set())

    if len(categorical_filters) > 1:
        filtered_ids = filtered_ids.copy()
        for attr_name, val in categorical_filters[1:]:
            filtered_ids &= self.categorical_index.get((attr_name, val), set())

    return [v for v in venues if id(v) in filtered_ids]

select_venue(person, venues, person_location)

Select final venue from eligible list based on strategy.

Source code in may/venue_distributor/matcher.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def select_venue(self, person, venues: List, person_location: Tuple[float, float]) -> Optional[Any]:
    """Select final venue from eligible list based on strategy."""
    if not venues: return None

    strategy = self.config.get('allocation', {}).get('strategy', 'random')
    if strategy == 'random':
        return np.random.choice(venues)
    elif strategy == 'closest':
        valid_venues = [v for v in venues if v.coordinates]
        if not valid_venues: return venues[0]

        # Use scalar math for small sets, vectorized for large sets
        if len(valid_venues) < 50:
            return min(valid_venues, key=lambda v: self.distributor._haversine_distance(person_location, v.coordinates))
        else:
            coords = np.array([v.coordinates for v in valid_venues])
            dists = self.distributor._haversine_distance_vectorized(person_location, coords)
            return valid_venues[np.argmin(dists)]
    elif strategy == 'proportional':
        valid = [v for v in venues if v.coordinates]
        if not valid: return venues[0]

        # Use scalar math for small sets, vectorized for large sets
        if len(valid) < 50:
            dists = [self.distributor._haversine_distance(person_location, v.coordinates) for v in valid]
        else:
            coords = np.array([v.coordinates for v in valid])
            dists = self.distributor._haversine_distance_vectorized(person_location, coords)

        weights = np.array([1.0 / (d + 0.1) for d in dists])
        return np.random.choice(valid, p=weights / weights.sum())
    elif strategy == 'closest_balanced':
        valid = [v for v in venues if v.coordinates]
        if not valid: return np.random.choice(venues)

        if len(valid) < 50:
            dists = np.array([self.distributor._haversine_distance(person_location, v.coordinates) for v in valid])
        else:
            coords = np.array([v.coordinates for v in valid])
            dists = self.distributor._haversine_distance_vectorized(person_location, coords)

        dist_weights = 1.0 / (dists + 0.1)

        # Factor in remaining capacity
        remaining_caps = np.array([
            self.distributor._get_remaining_capacity(v) for v in valid
        ], dtype=np.float64)

        total_cap = max(self.distributor._get_venue_capacity(valid[0]), 1)
        cap_weights = remaining_caps / total_cap

        weights = dist_weights * cap_weights
        weight_sum = weights.sum()

        if weight_sum <= 0:
            # All venues full — fall back to closest
            return valid[np.argmin(dists)]

        probs = weights / weight_sum
        return np.random.choice(valid, p=probs)
    elif strategy == 'largest_capacity':
        return max(venues, key=lambda v: self.distributor._get_venue_capacity(v))

    return venues[0]

venue_accepts_person(person, venue, attribute_rules, person_attrs=None)

Check if venue accepts person based on attribute rules using pre-computed arrays.

Source code in may/venue_distributor/matcher.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def venue_accepts_person(self, person, venue, attribute_rules: List[Dict], person_attrs: Optional[Dict] = None) -> bool:
    """Check if venue accepts person based on attribute rules using pre-computed arrays."""
    v_id = id(venue)
    v_idx = self.venue_id_to_idx.get(v_id)

    if v_idx is None:
        return self.venue_accepts_person_slow(person, venue, attribute_rules)

    # Separate loops and pre-defined lists avoid dictionary lookups on 'rule'
    for rule in self.numerical_match_rules:
        attr_name = rule['name']
        if attr_name in self.num_constraints:
            person_value = self._get_person_attr(person, attr_name, person_attrs)
            if person_value is None:
                return False

            constraints = self.num_constraints[attr_name]
            # Direct array access is much faster than dict lookup
            v_min = constraints['min'][v_idx]
            if v_min != np.iinfo(np.int32).min and person_value < v_min:
                return False

            v_max = constraints['max'][v_idx]
            if v_max != np.iinfo(np.int32).max and person_value > v_max:
                return False

    # Categorical rules with venue_column are handled via prefilter_venues_by_categorical
    # We only check those without a venue_column (rare but possible)
    for rule in self.categorical_match_rules:
        # Implement categorical check here if needed (not common in hot path)
        pass

    return True

venue_accepts_person_slow(person, venue, attribute_rules)

Fallback for venues without cache.

Source code in may/venue_distributor/matcher.py
238
239
240
241
242
243
244
245
246
247
248
249
def venue_accepts_person_slow(self, person, venue, attribute_rules: List[Dict]) -> bool:
    """Fallback for venues without cache."""
    for rule in attribute_rules:
        attr_name = rule.get('name')
        person_value = self.distributor._get_person_attribute(attr_name, person)
        if person_value is None: return False

        if rule.get('type') == 'numerical':
            if not self._check_numerical_constraint(person_value, venue, rule): return False
        elif rule.get('type') == 'categorical':
            if not self._check_categorical_constraint(person_value, venue, rule): return False
    return True