Skip to content

Special cases

SpecialCaseManager

Handles special case allocations (e.g., boarding schools). These are processed before normal allocation.

Source code in may/venue_distributor/special_cases.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class SpecialCaseManager:
    """
    Handles special case allocations (e.g., boarding schools).
    These are processed before normal allocation.
    """

    def __init__(self, distributor):
        self.distributor = distributor
        self.config = distributor.config
        self.verbose = distributor.verbose

    def handle_special_cases(self, people: List, venues: List, world) -> Tuple[List, List]:
        """
        Handle special case allocations.
        Returns Tuple of (remaining_people, unallocated_special_case_people).
        """
        special_cases = self.config.get('special_cases', [])
        if not special_cases:
            return people, []

        # Build venue index for fast lookup
        venue_index = {}
        for venue in venues:
            if hasattr(venue, 'name') and hasattr(venue, 'geographical_unit') and venue.geographical_unit:
                key = (venue.name, venue.geographical_unit.name)
                venue_index[key] = venue

        if venue_index and self.verbose:
            logger.debug(f"Built special case venue index with {len(venue_index)} entries")

        remaining_people = []
        unallocated_special_case_people = []
        allocated_count = 0

        for person in people:
            matched_any = False
            allocated = False

            for case in special_cases:
                if self.matches_special_case(person, case):
                    matched_any = True
                    if self.allocate_special_case(person, case, venues, venue_index):
                        allocated = True
                        allocated_count += 1
                        break

            if not matched_any:
                remaining_people.append(person)
            elif not allocated:
                unallocated_special_case_people.append(person)

        if allocated_count > 0:
            logger.info(f"Allocated {allocated_count} people via special cases")
            self.distributor.allocated_this_run += allocated_count

        return remaining_people, unallocated_special_case_people

    def matches_special_case(self, person, case: Dict) -> bool:
        """Check if person matches special case condition."""
        condition = case.get('condition', {})

        if 'person_residence_type' in condition:
            required_type = condition['person_residence_type']
            res_venue = person.residence
            if res_venue is None or not hasattr(res_venue, 'type') or res_venue.type != required_type:
                return False

        if 'filters' in condition:
            filters = condition['filters']
            if not self.distributor.filtering.person_matches_filters(person, filters):
                return False

        return True

    def allocate_special_case(self, person, case: Dict, venues: List, venue_index: Dict = None) -> bool:
        """Allocate person according to special case rule."""
        rule = case.get('allocation_rule', {})
        strategy = rule.get('strategy')
        match_by = rule.get('match_by', [])

        selected_venue = None

        if strategy:
            geo_unit = self.distributor._get_geo_unit_at_level(person, self.distributor.world)
            if geo_unit and geo_unit.coordinates:
                loc = geo_unit.coordinates
                if strategy == 'closest':
                    min_dist = float('inf')
                    for venue in venues:
                        if venue.coordinates and len(venue.coordinates) == 2:
                            dist = self.distributor._haversine_distance(loc, venue.coordinates)
                            if dist < min_dist:
                                min_dist = dist
                                selected_venue = venue
                elif strategy == 'random' and venues:
                    selected_venue = np.random.choice(venues)

        elif match_by and venue_index:
            lookup_key = self._extract_lookup_key(person, match_by)
            if lookup_key:
                selected_venue = venue_index.get(lookup_key)

            if not selected_venue:
                selected_venue = self._fallback_search(person, venues, match_by)

        elif match_by:
            selected_venue = self._fallback_search(person, venues, match_by)

        if selected_venue:
            selected_venue.add_to_subset(
                person, 
                subset_key=self.distributor.subset_key, 
                activity_name=self.distributor.activity_map_key, 
                activity_type=self.distributor.activity_type
            )
            self.distributor._increment_venue_count(selected_venue)
            return True

        res_name = self.distributor._get_person_attribute('residence.name', person)
        if_no_match = rule.get('if_no_match', 'error')
        if if_no_match == 'error':
            raise ValueError(f"Special case allocation failed for person {person.id} with residence '{res_name}'")
        elif if_no_match == 'warn':
            logger.warning(f"Special case: No match found for person {person.id} with residence '{res_name}'")

        return False

    def _extract_lookup_key(self, person, match_by: List[Dict]) -> Optional[Tuple]:
        try:
            parts = []
            for criterion in match_by:
                source = criterion.get('source', '')
                if source.startswith('person.'):
                    val = self.distributor._get_person_attribute(source.replace('person.', ''), person)
                    if val is None: return None
                    parts.append(val)
            return tuple(parts) if parts else None
        except Exception:
            return None

    def _fallback_search(self, person, venues: List, match_by: List[Dict]):
        for venue in venues:
            if self._venue_matches_criteria(person, venue, match_by):
                return venue
        return None

    def _venue_matches_criteria(self, person, venue, match_by: List[Dict]) -> bool:
        for criterion in match_by:
            source = criterion.get('source')
            target = criterion.get('target')
            match_type = criterion.get('match_type', 'exact')

            src_val = self.distributor._get_person_attribute(source.replace('person.', ''), person)
            tgt_val = self.distributor._get_nested_value(venue, target.replace('venue.', ''))

            if match_type == 'exact' and src_val != tgt_val:
                return False
        return True

allocate_special_case(person, case, venues, venue_index=None)

Allocate person according to special case rule.

Source code in may/venue_distributor/special_cases.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def allocate_special_case(self, person, case: Dict, venues: List, venue_index: Dict = None) -> bool:
    """Allocate person according to special case rule."""
    rule = case.get('allocation_rule', {})
    strategy = rule.get('strategy')
    match_by = rule.get('match_by', [])

    selected_venue = None

    if strategy:
        geo_unit = self.distributor._get_geo_unit_at_level(person, self.distributor.world)
        if geo_unit and geo_unit.coordinates:
            loc = geo_unit.coordinates
            if strategy == 'closest':
                min_dist = float('inf')
                for venue in venues:
                    if venue.coordinates and len(venue.coordinates) == 2:
                        dist = self.distributor._haversine_distance(loc, venue.coordinates)
                        if dist < min_dist:
                            min_dist = dist
                            selected_venue = venue
            elif strategy == 'random' and venues:
                selected_venue = np.random.choice(venues)

    elif match_by and venue_index:
        lookup_key = self._extract_lookup_key(person, match_by)
        if lookup_key:
            selected_venue = venue_index.get(lookup_key)

        if not selected_venue:
            selected_venue = self._fallback_search(person, venues, match_by)

    elif match_by:
        selected_venue = self._fallback_search(person, venues, match_by)

    if selected_venue:
        selected_venue.add_to_subset(
            person, 
            subset_key=self.distributor.subset_key, 
            activity_name=self.distributor.activity_map_key, 
            activity_type=self.distributor.activity_type
        )
        self.distributor._increment_venue_count(selected_venue)
        return True

    res_name = self.distributor._get_person_attribute('residence.name', person)
    if_no_match = rule.get('if_no_match', 'error')
    if if_no_match == 'error':
        raise ValueError(f"Special case allocation failed for person {person.id} with residence '{res_name}'")
    elif if_no_match == 'warn':
        logger.warning(f"Special case: No match found for person {person.id} with residence '{res_name}'")

    return False

handle_special_cases(people, venues, world)

Handle special case allocations. Returns Tuple of (remaining_people, unallocated_special_case_people).

Source code in may/venue_distributor/special_cases.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def handle_special_cases(self, people: List, venues: List, world) -> Tuple[List, List]:
    """
    Handle special case allocations.
    Returns Tuple of (remaining_people, unallocated_special_case_people).
    """
    special_cases = self.config.get('special_cases', [])
    if not special_cases:
        return people, []

    # Build venue index for fast lookup
    venue_index = {}
    for venue in venues:
        if hasattr(venue, 'name') and hasattr(venue, 'geographical_unit') and venue.geographical_unit:
            key = (venue.name, venue.geographical_unit.name)
            venue_index[key] = venue

    if venue_index and self.verbose:
        logger.debug(f"Built special case venue index with {len(venue_index)} entries")

    remaining_people = []
    unallocated_special_case_people = []
    allocated_count = 0

    for person in people:
        matched_any = False
        allocated = False

        for case in special_cases:
            if self.matches_special_case(person, case):
                matched_any = True
                if self.allocate_special_case(person, case, venues, venue_index):
                    allocated = True
                    allocated_count += 1
                    break

        if not matched_any:
            remaining_people.append(person)
        elif not allocated:
            unallocated_special_case_people.append(person)

    if allocated_count > 0:
        logger.info(f"Allocated {allocated_count} people via special cases")
        self.distributor.allocated_this_run += allocated_count

    return remaining_people, unallocated_special_case_people

matches_special_case(person, case)

Check if person matches special case condition.

Source code in may/venue_distributor/special_cases.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def matches_special_case(self, person, case: Dict) -> bool:
    """Check if person matches special case condition."""
    condition = case.get('condition', {})

    if 'person_residence_type' in condition:
        required_type = condition['person_residence_type']
        res_venue = person.residence
        if res_venue is None or not hasattr(res_venue, 'type') or res_venue.type != required_type:
            return False

    if 'filters' in condition:
        filters = condition['filters']
        if not self.distributor.filtering.person_matches_filters(person, filters):
            return False

    return True