Skip to content

Assignment config

Configuration system for attribute assignment.

Simplified attribute assignment configuration: - Roles are mapped to household subsets (Kids, Young Adults, Adults, Old Adults) - Household structures use flexible pattern matching with actual/original conditions - Assignment rules are organized by household structure type - Cleaner, more user-friendly configuration format

AssignmentRule dataclass

Simplified assignment rule.

Source code in may/attribute_assignment/assignment_config.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@dataclass
class AssignmentRule:
    """
    Simplified assignment rule.
    """
    role: str  # Can also be list of roles (parsed from config)
    priority: int
    description: str
    assignment: Dict[str, Any]
    dependencies: List[str] = field(default_factory=list) # Roles this rule depends on

    def applies_to_role(self, role_name: str) -> bool:
        """Check if this rule applies to a role."""
        if isinstance(self.role, list):
            return role_name in self.role
        return role_name == self.role

applies_to_role(role_name)

Check if this rule applies to a role.

Source code in may/attribute_assignment/assignment_config.py
300
301
302
303
304
def applies_to_role(self, role_name: str) -> bool:
    """Check if this rule applies to a role."""
    if isinstance(self.role, list):
        return role_name in self.role
    return role_name == self.role

AttributeAssignmentConfig

Configuration loader for attribute assignment.

Simplified configuration system: - Roles map to subsets - Structures use matching_rules - Assignment rules organized by structure

Source code in may/attribute_assignment/assignment_config.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
class AttributeAssignmentConfig:
    """
    Configuration loader for attribute assignment.

    Simplified configuration system:
    - Roles map to subsets
    - Structures use matching_rules
    - Assignment rules organized by structure
    """

    def __init__(self, config_path: Path):
        """Load configuration from YAML."""
        self.config_path = Path(pr.resolve(str(config_path)))

        with open(self.config_path, 'r') as f:
            self.raw_config = yaml.safe_load(f)

        # Parse sections
        self.attribute_name = self._parse_attribute()
        self.assignment_level = self._parse_assignment_level()
        self.residence_venue_types = self._parse_residence_venue_types()
        self.filters = self._parse_filters()
        self.required_attributes = self._parse_required_attributes()
        self.region_mapping = self.raw_config.get('region_mapping', {})
        self.categories = self._parse_categories()
        self.roles = self._parse_roles()
        self.household_structures = self._parse_household_structures()
        self.data_sources = self._parse_data_sources()
        self.assignment_rules = self._parse_assignment_rules()
        self.venue_assignment_rules = self._parse_venue_assignment_rules()
        self.settings = self._parse_settings()

        # Cache valid roles per structure
        self._valid_roles_cache = {}

        # Cache category lookups
        self._category_lookup_cache = {}
        self._build_category_lookup_structures()

        logger.info(f"Loaded config for '{self.attribute_name}' from {self.config_path}")
        logger.info(f"  Assignment level: {self.assignment_level}")
        if self.required_attributes:
            logger.info(f"  Required attributes: {list(self.required_attributes.keys())}")
        if self.categories:
            logger.info(f"  Categories: {len(self.categories)}")
        logger.info(f"  Roles: {len(self.roles)}")
        logger.info(f"  Household structures: {len(self.household_structures)}")
        logger.info(f"  Assignment rules: {len(self.assignment_rules)}")

    def _parse_attribute(self) -> str:
        """Parse attribute name."""
        return self.raw_config.get('attribute', {}).get('name', 'unknown')

    def _parse_assignment_level(self) -> str:
        """Parse assignment level: 'person' or 'person_by_residence'."""
        return self.raw_config.get('attribute', {}).get('assignment_level', 'person_by_residence')

    def _parse_residence_venue_types(self) -> List[str]:
        """Residence venue types assigned by household structure (default ['household']).

        Other residence types fall through to venue_assignment_rules.
        """
        return self.raw_config.get('attribute', {}).get('residence_venue_types', ['household'])

    def _parse_filters(self) -> Dict[str, Any]:
        """Parse filters (e.g., activity-based filtering)."""
        return self.raw_config.get('filters', {})

    def _parse_required_attributes(self) -> Dict[str, Any]:
        """Parse required attributes (dependencies).

        Supports two formats:
        1. Dict format: {attr_name: {description: "...", required: true, ...}}
        2. List format: [{name: "attr_name", description: "...", required: true, ...}]

        Returns a dict format for backward compatibility.
        """
        raw_attrs = self.raw_config.get('required_attributes', {})

        # If it's already a dict, return it
        if isinstance(raw_attrs, dict):
            return raw_attrs

        # If it's a list, convert to dict using 'name' field as key
        if isinstance(raw_attrs, list):
            result = {}
            for attr in raw_attrs:
                if 'name' not in attr:
                    raise ValueError(f"Required attribute entry missing 'name' field: {attr}")
                name = attr['name']
                # Copy all fields except 'name' into the config
                config = {k: v for k, v in attr.items() if k != 'name'}
                result[name] = config
            return result

        return {}

    def _parse_categories(self) -> List[Dict[str, Any]]:
        """Parse categories (e.g., age bands)."""
        return self.raw_config.get('categories', [])

    def _parse_roles(self) -> Dict[str, Role]:
        """Parse role definitions."""
        roles = {}
        roles_config = self.raw_config.get('roles', {})

        for role_name, role_data in roles_config.items():
            if not isinstance(role_data, dict):
                continue

            # Use explicit 'type' if provided, otherwise infer from name prefix.
            # The config builder writes the type AS the name prefix
            # (primary_/secondary_/extra_) and omits the explicit key, but
            # hand-authored configs may still use arbitrary names + explicit type.
            role_type = role_data.get('type')
            if not role_type:
                if role_name.startswith('primary_'):
                    role_type = 'primary'
                elif role_name.startswith('secondary_'):
                    role_type = 'secondary'
                elif role_name.startswith('extra_'):
                    role_type = 'extra'
                else:
                    role_type = 'general'

            roles[role_name] = Role(
                name=role_name,
                description=role_data.get('description', ''),
                subsets=role_data.get('subsets', []),
                role_type=role_type
            )

        return roles

    def _parse_household_structures(self) -> Dict[str, HouseholdStructure]:
        """Parse household structure definitions."""
        structures = {}
        structures_config = self.raw_config.get('household_structures', {})

        for struct_name, struct_data in structures_config.items():
            if not isinstance(struct_data, dict):
                continue

            # Parse matching rules
            matching_rules = []
            for rule_data in struct_data.get('matching_rules', []):
                matching_rules.append(MatchingRule(
                    actual_patterns=rule_data.get('actual', []),
                    original_patterns=rule_data.get('original', []),
                    description=rule_data.get('description', '')
                ))

            structures[struct_name] = HouseholdStructure(
                name=struct_name,
                description=struct_data.get('description', ''),
                inheritance=struct_data.get('inheritance', False),
                matching_rules=matching_rules
            )

        return structures

    def _parse_data_sources(self) -> Dict[str, DataSourceConfig]:
        """Parse data sources (wrapped in DataSourceConfig for compatibility with v1)."""
        sources = {}
        sources_config = self.raw_config.get('data_sources', {})

        for source_name, source_data in sources_config.items():
            sources[source_name] = DataSourceConfig(
                name=source_name,
                type=source_data.get('type', 'csv_lookup'),
                description=source_data.get('description', ''),
                files=source_data.get('files', []),
                fallbacks=[source_data.get('fallback', {})],  # YAML uses 'fallback' key
                config=source_data
            )

        return sources

    def _parse_assignment_rules(self) -> Dict[str, StructureAssignmentRules]:
        """Parse structure-based assignment rules."""
        structure_rules = {}
        rules_config = self.raw_config.get('assignment_rules', {})

        for structure_name, struct_rules_data in rules_config.items():
            if not isinstance(struct_rules_data, dict):
                continue

            rules = []
            for i, rule_data in enumerate(struct_rules_data.get('rules', [])):
                assignment_data = rule_data.get('assignment', {})
                # Fail loudly on keys no strategy reads (dead config / typos).
                validate_assignment_config(
                    assignment_data,
                    where=f"{self.config_path.name}: assignment_rules."
                          f"{structure_name}.rules[{i}]",
                )

                # Extract dependencies from inheritance strategies
                dependencies = []
                inherit_from = assignment_data.get('inherit_from', {})
                if inherit_from:
                    # Forward inheritance uses 'roles' (list)
                    if 'roles' in inherit_from:
                        dependencies.extend(inherit_from['roles'])
                    # Reverse inheritance uses 'role' (string)
                    elif 'role' in inherit_from:
                        dependencies.append(inherit_from['role'])

                rules.append(AssignmentRule(
                    role=rule_data.get('role'),  # Can be string or list
                    priority=rule_data.get('priority', 999),
                    description=rule_data.get('description', ''),
                    assignment=assignment_data,
                    dependencies=list(set(dependencies)) # Unique dependencies
                ))

            # Sort rules by priority
            rules.sort(key=lambda r: r.priority)

            structure_rules[structure_name] = StructureAssignmentRules(
                structure_name=structure_name,
                description=struct_rules_data.get('description', ''),
                rules=rules
            )

        return structure_rules

    def _parse_venue_assignment_rules(self) -> List[Dict[str, Any]]:
        """Parse venue assignment rules."""
        rules = self.raw_config.get('venue_assignment_rules', [])
        for i, rule in enumerate(rules):
            validate_assignment_config(
                (rule or {}).get('assignment', {}),
                where=f"{self.config_path.name}: venue_assignment_rules[{i}]",
            )
        return rules

    def _parse_settings(self) -> Dict[str, Any]:
        """Parse settings."""
        return self.raw_config.get('settings', {})

    def get_household_structure(self, household, verbose: bool = False) -> Optional[str]:
        """
        Classify household structure.
        Returns first matching structure.
        """
        # Check if structure is already cached (only when not verbose)
        if not verbose:
            cached_structure = household.properties.get('_cached_household_structure')
            if cached_structure is not None:
                return cached_structure

        if verbose:
            logger.debug(f"  Classifying household {household.id}:")

        for struct_name, structure in self.household_structures.items():
            if structure.matches(household, verbose=verbose):
                # Cache the result (only when not verbose to avoid caching debug runs)
                if not verbose:
                    household.properties['_cached_household_structure'] = struct_name
                return struct_name

        if verbose:
            logger.debug(f"  ✗ No structure matched")

        # Cache None result as well
        if not verbose:
            household.properties['_cached_household_structure'] = None

        return None

    def get_person_role(self, person, household_structure: str,
                       assigned_roles: List[str], verbose: bool = False,
                       person_category: str = None) -> Optional[str]:
        """
        Determine person's role based on their subset and household structure.

        Args:
            person: Person object
            household_structure: Name of household structure
            assigned_roles: List of roles already assigned in this household
            verbose: If True, log matching details
            person_category: Optional pre-calculated person category (subset name)

        Returns:
            Role name or None
        """
        if verbose:
            logger.debug(f"    Determining role for {person}:")

        # Get assignment rules for this structure
        if household_structure not in self.assignment_rules:
            if verbose:
                logger.debug(f"      No assignment rules for structure '{household_structure}'")
            return None

        struct_rules = self.assignment_rules[household_structure]

        # Get valid roles for this structure (cached to avoid rebuilding for every person)
        if household_structure not in self._valid_roles_cache:
            valid_roles_for_structure = set()
            for rule in struct_rules.rules:
                if isinstance(rule.role, list):
                    valid_roles_for_structure.update(rule.role)
                else:
                    valid_roles_for_structure.add(rule.role)
            self._valid_roles_cache[household_structure] = valid_roles_for_structure
        else:
            valid_roles_for_structure = self._valid_roles_cache[household_structure]

        # Try each role in order until we find a matching one
        for role_name, role in self.roles.items():
            # Skip roles that don't have rules for this structure
            if role_name not in valid_roles_for_structure:
                continue

            if verbose:
                logger.debug(f"      Testing role '{role_name}':")

            # Check if person's subset matches this role
            # Use pre-calculated category if valid
            matched = False
            if person_category and person_category != "unknown":
                if person_category in role.subsets:
                    matched = True
                elif verbose:
                    logger.debug(f"        ✗ Category '{person_category}' not in role subsets {role.subsets}")
            else:
                # Fallback to internal lookup only if needed
                if role.matches(person, verbose=verbose):
                    matched = True

            if not matched:
                continue

            # Check if this role has been assigned already
            role_count = assigned_roles.count(role_name)

            # Determine if we should assign this role based on count and explicit type
            if role.role_type == 'primary' and role_count == 0:
                if verbose:
                    logger.debug(f"      ✓ Assigned role '{role_name}' (primary)")
                return role_name

            elif role.role_type == 'secondary' and role_count == 0:
                # Check if a primary role for the same subset was already assigned
                has_primary = False
                for assigned_name in assigned_roles:
                    assigned_role = self.roles.get(assigned_name)
                    if assigned_role and assigned_role.role_type == 'primary':
                        # Check if subsets overlap (e.g., both apply to 'Adults')
                        if set(assigned_role.subsets) & set(role.subsets):
                            has_primary = True
                            break

                if has_primary:
                    if verbose:
                        logger.debug(f"      ✓ Assigned role '{role_name}' (secondary)")
                    return role_name

            elif role.role_type == 'extra':
                # Check if both primary and secondary for the same subset were assigned
                has_primary = False
                has_secondary = False
                for assigned_name in assigned_roles:
                    assigned_role = self.roles.get(assigned_name)
                    if assigned_role:
                        if assigned_role.role_type == 'primary' and (set(assigned_role.subsets) & set(role.subsets)):
                            has_primary = True
                        if assigned_role.role_type == 'secondary' and (set(assigned_role.subsets) & set(role.subsets)):
                            has_secondary = True

                if has_primary and has_secondary:
                    if verbose:
                        logger.debug(f"      ✓ Assigned role '{role_name}' (extra)")
                    return role_name

            elif role.role_type == 'general':
                if verbose:
                    logger.debug(f"      ✓ Assigned role '{role_name}'")
                return role_name

        if verbose:
            logger.debug(f"      ✗ No role matched")
        return None

    def get_assignment_rule(self, household_structure: str, role: str,
                           verbose: bool = False) -> Optional[AssignmentRule]:
        """
        Get assignment rule for a role within a structure.
        """
        if household_structure not in self.assignment_rules:
            return None

        struct_rules = self.assignment_rules[household_structure]

        for rule in struct_rules.rules:
            if rule.applies_to_role(role):
                if verbose:
                    logger.debug(f"    ✓ Found rule for role '{role}'")
                return rule

        if verbose:
            logger.debug(f"    ✗ No rule for role '{role}'")
        return None

    def _build_category_lookup_structures(self):
        """
        Build lookup structures for categories.
        Called once during __init__ to avoid repeated iterations.
        """
        # Group categories by attribute name for faster filtering
        categories_by_attr = {}
        for category in self.categories:
            attr = category.get('attribute')
            if attr not in categories_by_attr:
                categories_by_attr[attr] = []
            categories_by_attr[attr].append(category)

        # For numerical categories (like age), sort by min value for binary search
        for attr, cats in list(categories_by_attr.items()):
            numerical_cats = [c for c in cats if c.get('type') == 'numerical']
            if numerical_cats:
                # Sort by min value
                numerical_cats.sort(key=lambda c: c['numerical']['min'])
                categories_by_attr[attr + '_numerical'] = numerical_cats

        self._categories_by_attr = categories_by_attr

    def get_category_for_value(self, value: Any, attribute_name: str = "age") -> Optional[Dict[str, Any]]:
        """
        Find which category a value falls into.

        Args:
            value: The value to categorize (e.g., age=25)
            attribute_name: The attribute name to match against (default: "age")

        Returns:
            Category dict with 'csv_value' or None if no match
        """
        # Check cache first (87% hit rate based on profiling patterns)
        cache_key = (attribute_name, value)
        if cache_key in self._category_lookup_cache:
            return self._category_lookup_cache[cache_key]

        result = None

        # Use pre-filtered categories instead of iterating all
        numerical_cats = self._categories_by_attr.get(attribute_name + '_numerical', [])
        if numerical_cats and isinstance(value, (int, float)):
            # For numerical, iterate through sorted categories (typically just 4-5)
            for category in numerical_cats:
                min_val = category['numerical']['min']
                max_val = category['numerical'].get('max')

                if max_val is None:
                    # No upper limit
                    if value >= min_val:
                        result = category
                        break
                elif min_val <= value <= max_val:
                    result = category
                    break
        else:
            # For categorical or fallback, check all categories for this attribute
            cats = self._categories_by_attr.get(attribute_name, [])
            for category in cats:
                if category.get('type') == 'categorical':
                    allowed = category.get('categorical', {}).get('allowed_values', [])
                    if value in allowed:
                        result = category
                        break

        # Cache the result
        self._category_lookup_cache[cache_key] = result
        return result

    def get_person_assignment_rule(self) -> Optional[AssignmentRule]:
        """
        Get assignment rule for person-level assignment.

        Returns:
            First assignment rule from 'person' structure or None
        """
        if 'person' not in self.assignment_rules:
            return None

        person_rules = self.assignment_rules['person']
        if not person_rules.rules:
            return None

        return person_rules.rules[0]

    def get_required_attribute_mapping(self, attr_name: str) -> Dict[str, str]:
        """
        Get mapping for a required attribute.

        Args:
            attr_name: Name of required attribute

        Returns:
            Mapping dict or empty dict
        """
        if attr_name in self.required_attributes:
            return self.required_attributes[attr_name].get('mapping', {})
        return {}

    @classmethod
    def from_yaml(cls, config_path: Path) -> 'AttributeAssignmentConfig':
        """Load configuration from YAML file."""
        return cls(config_path)

__init__(config_path)

Load configuration from YAML.

Source code in may/attribute_assignment/assignment_config.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def __init__(self, config_path: Path):
    """Load configuration from YAML."""
    self.config_path = Path(pr.resolve(str(config_path)))

    with open(self.config_path, 'r') as f:
        self.raw_config = yaml.safe_load(f)

    # Parse sections
    self.attribute_name = self._parse_attribute()
    self.assignment_level = self._parse_assignment_level()
    self.residence_venue_types = self._parse_residence_venue_types()
    self.filters = self._parse_filters()
    self.required_attributes = self._parse_required_attributes()
    self.region_mapping = self.raw_config.get('region_mapping', {})
    self.categories = self._parse_categories()
    self.roles = self._parse_roles()
    self.household_structures = self._parse_household_structures()
    self.data_sources = self._parse_data_sources()
    self.assignment_rules = self._parse_assignment_rules()
    self.venue_assignment_rules = self._parse_venue_assignment_rules()
    self.settings = self._parse_settings()

    # Cache valid roles per structure
    self._valid_roles_cache = {}

    # Cache category lookups
    self._category_lookup_cache = {}
    self._build_category_lookup_structures()

    logger.info(f"Loaded config for '{self.attribute_name}' from {self.config_path}")
    logger.info(f"  Assignment level: {self.assignment_level}")
    if self.required_attributes:
        logger.info(f"  Required attributes: {list(self.required_attributes.keys())}")
    if self.categories:
        logger.info(f"  Categories: {len(self.categories)}")
    logger.info(f"  Roles: {len(self.roles)}")
    logger.info(f"  Household structures: {len(self.household_structures)}")
    logger.info(f"  Assignment rules: {len(self.assignment_rules)}")

from_yaml(config_path) classmethod

Load configuration from YAML file.

Source code in may/attribute_assignment/assignment_config.py
824
825
826
827
@classmethod
def from_yaml(cls, config_path: Path) -> 'AttributeAssignmentConfig':
    """Load configuration from YAML file."""
    return cls(config_path)

get_assignment_rule(household_structure, role, verbose=False)

Get assignment rule for a role within a structure.

Source code in may/attribute_assignment/assignment_config.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
def get_assignment_rule(self, household_structure: str, role: str,
                       verbose: bool = False) -> Optional[AssignmentRule]:
    """
    Get assignment rule for a role within a structure.
    """
    if household_structure not in self.assignment_rules:
        return None

    struct_rules = self.assignment_rules[household_structure]

    for rule in struct_rules.rules:
        if rule.applies_to_role(role):
            if verbose:
                logger.debug(f"    ✓ Found rule for role '{role}'")
            return rule

    if verbose:
        logger.debug(f"    ✗ No rule for role '{role}'")
    return None

get_category_for_value(value, attribute_name='age')

Find which category a value falls into.

Parameters:

Name Type Description Default
value Any

The value to categorize (e.g., age=25)

required
attribute_name str

The attribute name to match against (default: "age")

'age'

Returns:

Type Description
Optional[Dict[str, Any]]

Category dict with 'csv_value' or None if no match

Source code in may/attribute_assignment/assignment_config.py
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
def get_category_for_value(self, value: Any, attribute_name: str = "age") -> Optional[Dict[str, Any]]:
    """
    Find which category a value falls into.

    Args:
        value: The value to categorize (e.g., age=25)
        attribute_name: The attribute name to match against (default: "age")

    Returns:
        Category dict with 'csv_value' or None if no match
    """
    # Check cache first (87% hit rate based on profiling patterns)
    cache_key = (attribute_name, value)
    if cache_key in self._category_lookup_cache:
        return self._category_lookup_cache[cache_key]

    result = None

    # Use pre-filtered categories instead of iterating all
    numerical_cats = self._categories_by_attr.get(attribute_name + '_numerical', [])
    if numerical_cats and isinstance(value, (int, float)):
        # For numerical, iterate through sorted categories (typically just 4-5)
        for category in numerical_cats:
            min_val = category['numerical']['min']
            max_val = category['numerical'].get('max')

            if max_val is None:
                # No upper limit
                if value >= min_val:
                    result = category
                    break
            elif min_val <= value <= max_val:
                result = category
                break
    else:
        # For categorical or fallback, check all categories for this attribute
        cats = self._categories_by_attr.get(attribute_name, [])
        for category in cats:
            if category.get('type') == 'categorical':
                allowed = category.get('categorical', {}).get('allowed_values', [])
                if value in allowed:
                    result = category
                    break

    # Cache the result
    self._category_lookup_cache[cache_key] = result
    return result

get_household_structure(household, verbose=False)

Classify household structure. Returns first matching structure.

Source code in may/attribute_assignment/assignment_config.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def get_household_structure(self, household, verbose: bool = False) -> Optional[str]:
    """
    Classify household structure.
    Returns first matching structure.
    """
    # Check if structure is already cached (only when not verbose)
    if not verbose:
        cached_structure = household.properties.get('_cached_household_structure')
        if cached_structure is not None:
            return cached_structure

    if verbose:
        logger.debug(f"  Classifying household {household.id}:")

    for struct_name, structure in self.household_structures.items():
        if structure.matches(household, verbose=verbose):
            # Cache the result (only when not verbose to avoid caching debug runs)
            if not verbose:
                household.properties['_cached_household_structure'] = struct_name
            return struct_name

    if verbose:
        logger.debug(f"  ✗ No structure matched")

    # Cache None result as well
    if not verbose:
        household.properties['_cached_household_structure'] = None

    return None

get_person_assignment_rule()

Get assignment rule for person-level assignment.

Returns:

Type Description
Optional[AssignmentRule]

First assignment rule from 'person' structure or None

Source code in may/attribute_assignment/assignment_config.py
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
def get_person_assignment_rule(self) -> Optional[AssignmentRule]:
    """
    Get assignment rule for person-level assignment.

    Returns:
        First assignment rule from 'person' structure or None
    """
    if 'person' not in self.assignment_rules:
        return None

    person_rules = self.assignment_rules['person']
    if not person_rules.rules:
        return None

    return person_rules.rules[0]

get_person_role(person, household_structure, assigned_roles, verbose=False, person_category=None)

Determine person's role based on their subset and household structure.

Parameters:

Name Type Description Default
person

Person object

required
household_structure str

Name of household structure

required
assigned_roles List[str]

List of roles already assigned in this household

required
verbose bool

If True, log matching details

False
person_category str

Optional pre-calculated person category (subset name)

None

Returns:

Type Description
Optional[str]

Role name or None

Source code in may/attribute_assignment/assignment_config.py
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def get_person_role(self, person, household_structure: str,
                   assigned_roles: List[str], verbose: bool = False,
                   person_category: str = None) -> Optional[str]:
    """
    Determine person's role based on their subset and household structure.

    Args:
        person: Person object
        household_structure: Name of household structure
        assigned_roles: List of roles already assigned in this household
        verbose: If True, log matching details
        person_category: Optional pre-calculated person category (subset name)

    Returns:
        Role name or None
    """
    if verbose:
        logger.debug(f"    Determining role for {person}:")

    # Get assignment rules for this structure
    if household_structure not in self.assignment_rules:
        if verbose:
            logger.debug(f"      No assignment rules for structure '{household_structure}'")
        return None

    struct_rules = self.assignment_rules[household_structure]

    # Get valid roles for this structure (cached to avoid rebuilding for every person)
    if household_structure not in self._valid_roles_cache:
        valid_roles_for_structure = set()
        for rule in struct_rules.rules:
            if isinstance(rule.role, list):
                valid_roles_for_structure.update(rule.role)
            else:
                valid_roles_for_structure.add(rule.role)
        self._valid_roles_cache[household_structure] = valid_roles_for_structure
    else:
        valid_roles_for_structure = self._valid_roles_cache[household_structure]

    # Try each role in order until we find a matching one
    for role_name, role in self.roles.items():
        # Skip roles that don't have rules for this structure
        if role_name not in valid_roles_for_structure:
            continue

        if verbose:
            logger.debug(f"      Testing role '{role_name}':")

        # Check if person's subset matches this role
        # Use pre-calculated category if valid
        matched = False
        if person_category and person_category != "unknown":
            if person_category in role.subsets:
                matched = True
            elif verbose:
                logger.debug(f"        ✗ Category '{person_category}' not in role subsets {role.subsets}")
        else:
            # Fallback to internal lookup only if needed
            if role.matches(person, verbose=verbose):
                matched = True

        if not matched:
            continue

        # Check if this role has been assigned already
        role_count = assigned_roles.count(role_name)

        # Determine if we should assign this role based on count and explicit type
        if role.role_type == 'primary' and role_count == 0:
            if verbose:
                logger.debug(f"      ✓ Assigned role '{role_name}' (primary)")
            return role_name

        elif role.role_type == 'secondary' and role_count == 0:
            # Check if a primary role for the same subset was already assigned
            has_primary = False
            for assigned_name in assigned_roles:
                assigned_role = self.roles.get(assigned_name)
                if assigned_role and assigned_role.role_type == 'primary':
                    # Check if subsets overlap (e.g., both apply to 'Adults')
                    if set(assigned_role.subsets) & set(role.subsets):
                        has_primary = True
                        break

            if has_primary:
                if verbose:
                    logger.debug(f"      ✓ Assigned role '{role_name}' (secondary)")
                return role_name

        elif role.role_type == 'extra':
            # Check if both primary and secondary for the same subset were assigned
            has_primary = False
            has_secondary = False
            for assigned_name in assigned_roles:
                assigned_role = self.roles.get(assigned_name)
                if assigned_role:
                    if assigned_role.role_type == 'primary' and (set(assigned_role.subsets) & set(role.subsets)):
                        has_primary = True
                    if assigned_role.role_type == 'secondary' and (set(assigned_role.subsets) & set(role.subsets)):
                        has_secondary = True

            if has_primary and has_secondary:
                if verbose:
                    logger.debug(f"      ✓ Assigned role '{role_name}' (extra)")
                return role_name

        elif role.role_type == 'general':
            if verbose:
                logger.debug(f"      ✓ Assigned role '{role_name}'")
            return role_name

    if verbose:
        logger.debug(f"      ✗ No role matched")
    return None

get_required_attribute_mapping(attr_name)

Get mapping for a required attribute.

Parameters:

Name Type Description Default
attr_name str

Name of required attribute

required

Returns:

Type Description
Dict[str, str]

Mapping dict or empty dict

Source code in may/attribute_assignment/assignment_config.py
810
811
812
813
814
815
816
817
818
819
820
821
822
def get_required_attribute_mapping(self, attr_name: str) -> Dict[str, str]:
    """
    Get mapping for a required attribute.

    Args:
        attr_name: Name of required attribute

    Returns:
        Mapping dict or empty dict
    """
    if attr_name in self.required_attributes:
        return self.required_attributes[attr_name].get('mapping', {})
    return {}

DataSourceConfig dataclass

Configuration for a data source.

Data sources provide probability distributions for attribute values based on context (e.g., geographical unit code, first person's ethnicity, etc.).

Source code in may/attribute_assignment/assignment_config.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@dataclass
class DataSourceConfig:
    """
    Configuration for a data source.

    Data sources provide probability distributions for attribute values
    based on context (e.g., geographical unit code, first person's ethnicity, etc.).
    """
    name: str
    type: str
    description: str
    files: List[Dict[str, Any]] = field(default_factory=list)
    fallbacks: List[Dict[str, Any]] = field(default_factory=list)
    config: Dict[str, Any] = field(default_factory=dict)

HouseholdStructure dataclass

Household structure with flexible matching rules.

Source code in may/attribute_assignment/assignment_config.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@dataclass
class HouseholdStructure:
    """
    Household structure with flexible matching rules.
    """
    name: str
    description: str
    inheritance: bool  # Whether this structure uses inheritance
    matching_rules: List[MatchingRule] = field(default_factory=list)

    def matches(self, household, verbose: bool = False) -> bool:
        """
        Check if household matches this structure.
        Returns True if ANY matching rule matches.
        """
        if verbose:
            logger.debug(f"    Testing structure '{self.name}':")
            logger.debug(f"      {len(self.matching_rules)} matching rule(s)")

        for rule in self.matching_rules:
            if rule.matches(household, verbose=verbose):
                if verbose:
                    logger.debug(f"    ✓ MATCHED structure '{self.name}'")
                return True

        if verbose:
            logger.debug(f"    ✗ No match for structure '{self.name}'")
        return False

matches(household, verbose=False)

Check if household matches this structure. Returns True if ANY matching rule matches.

Source code in may/attribute_assignment/assignment_config.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def matches(self, household, verbose: bool = False) -> bool:
    """
    Check if household matches this structure.
    Returns True if ANY matching rule matches.
    """
    if verbose:
        logger.debug(f"    Testing structure '{self.name}':")
        logger.debug(f"      {len(self.matching_rules)} matching rule(s)")

    for rule in self.matching_rules:
        if rule.matches(household, verbose=verbose):
            if verbose:
                logger.debug(f"    ✓ MATCHED structure '{self.name}'")
            return True

    if verbose:
        logger.debug(f"    ✗ No match for structure '{self.name}'")
    return False

MatchingRule dataclass

A rule for matching household patterns.

Can match based on: - actual pattern only - original pattern only - both actual AND original patterns (conditional matching)

Source code in may/attribute_assignment/assignment_config.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@dataclass
class MatchingRule:
    """
    A rule for matching household patterns.

    Can match based on:
    - actual pattern only
    - original pattern only
    - both actual AND original patterns (conditional matching)
    """
    actual_patterns: List[str] = field(default_factory=list)
    original_patterns: List[str] = field(default_factory=list)
    description: str = ""

    def matches(self, household, verbose: bool = False) -> bool:
        """
        Check if a household matches this rule.

        Args:
            household: Venue object with original_pattern and actual_pattern properties
            verbose: If True, log matching details

        Returns:
            True if household matches this rule
        """
        original_pattern = household.properties.get('original_pattern', '')

        # Compute actual pattern from household members
        actual_pattern = self._compute_actual_pattern(household)

        if verbose:
            logger.debug(f"      Testing matching rule:")
            logger.debug(f"        Description: {self.description}")
            logger.debug(f"        Household: original='{original_pattern}', actual='{actual_pattern}'")

        # If both actual and original are specified, BOTH must match
        if self.actual_patterns and self.original_patterns:
            actual_match = self._matches_any_pattern(actual_pattern, self.actual_patterns)
            original_match = original_pattern in self.original_patterns

            if verbose:
                logger.debug(f"        Actual match: {actual_match}")
                logger.debug(f"        Original match: {original_match}")

            return actual_match and original_match

        # If only actual patterns specified
        if self.actual_patterns:
            match = self._matches_any_pattern(actual_pattern, self.actual_patterns)
            if verbose:
                logger.debug(f"        Actual match: {match}")
            return match

        # If only original patterns specified
        if self.original_patterns:
            match = original_pattern in self.original_patterns
            if verbose:
                logger.debug(f"        Original match: {match}")
            return match

        # No patterns specified - always matches
        return True

    def _matches_any_pattern(self, actual_pattern: str, template_patterns: List[str]) -> bool:
        """
        Check if actual pattern matches any of the template patterns.
        Uses CompositionPattern for flexible matching with >=, <=, etc.
        """
        for template in template_patterns:
            if self._pattern_matches(actual_pattern, template):
                return True
        return False

    def _compute_actual_pattern(self, household) -> str:
        """
        Compute the actual composition pattern from household members.

        Args:
            household: Venue object with members

        Returns:
            Pattern string like "2 0 2 0" (counts per category)
        """
        # Check if pattern is already cached on the household
        cached_pattern = household.properties.get('_cached_actual_pattern')
        if cached_pattern is not None:
            return cached_pattern

        # Get age categories from household properties
        age_categories = household.properties.get('_age_categories', [])
        if not age_categories:
            # Try to get from config if available
            # For now, return empty string
            return ''

        # Build category name → index mapping
        category_indices = {cat.name: i for i, cat in enumerate(age_categories)}

        # Count members in each category
        counts = [0] * len(age_categories)

        members = household.get_all_members()
        for person in members:
            # Get person's household category from activity_map
            # UNIFIED STRUCTURE: activity_map['residence']['household'] = [subsets]
            if "residence" in person.activity_map and "household" in person.activity_map["residence"] and person.activity_map["residence"]["household"]:
                subset_name = person.activity_map["residence"]["household"][0].subset_name

                if subset_name in category_indices:
                    counts[category_indices[subset_name]] += 1

        # Return as space-separated string
        pattern = ' '.join(str(c) for c in counts)

        # Cache the pattern on the household for future lookups
        household.properties['_cached_actual_pattern'] = pattern

        return pattern

    def _pattern_matches(self, actual: str, template: str) -> bool:
        """
        Check if an actual pattern matches a template pattern.

        Examples:
            actual="2 0 2 0", template=">=1 >=0 2 0" -> True
            actual="0 1 2 0", template=">=1 >=0 2 0" -> False
            actual="2 1 2 0", template="0 >=1 1 <=2" -> False (has kids)
            actual="0 1 1 1", template="0 >=1 1 <=2" -> True
        """
        # Use cached function for pattern matching
        return _pattern_matches_cached(actual, template)

matches(household, verbose=False)

Check if a household matches this rule.

Parameters:

Name Type Description Default
household

Venue object with original_pattern and actual_pattern properties

required
verbose bool

If True, log matching details

False

Returns:

Type Description
bool

True if household matches this rule

Source code in may/attribute_assignment/assignment_config.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def matches(self, household, verbose: bool = False) -> bool:
    """
    Check if a household matches this rule.

    Args:
        household: Venue object with original_pattern and actual_pattern properties
        verbose: If True, log matching details

    Returns:
        True if household matches this rule
    """
    original_pattern = household.properties.get('original_pattern', '')

    # Compute actual pattern from household members
    actual_pattern = self._compute_actual_pattern(household)

    if verbose:
        logger.debug(f"      Testing matching rule:")
        logger.debug(f"        Description: {self.description}")
        logger.debug(f"        Household: original='{original_pattern}', actual='{actual_pattern}'")

    # If both actual and original are specified, BOTH must match
    if self.actual_patterns and self.original_patterns:
        actual_match = self._matches_any_pattern(actual_pattern, self.actual_patterns)
        original_match = original_pattern in self.original_patterns

        if verbose:
            logger.debug(f"        Actual match: {actual_match}")
            logger.debug(f"        Original match: {original_match}")

        return actual_match and original_match

    # If only actual patterns specified
    if self.actual_patterns:
        match = self._matches_any_pattern(actual_pattern, self.actual_patterns)
        if verbose:
            logger.debug(f"        Actual match: {match}")
        return match

    # If only original patterns specified
    if self.original_patterns:
        match = original_pattern in self.original_patterns
        if verbose:
            logger.debug(f"        Original match: {match}")
        return match

    # No patterns specified - always matches
    return True

Role dataclass

Role definition - maps to household subsets instead of conditions.

Source code in may/attribute_assignment/assignment_config.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@dataclass
class Role:
    """
    Role definition - maps to household subsets instead of conditions.
    """
    name: str
    description: str
    subsets: List[str]  # List of subset names this role applies to
    role_type: str = "general" # primary, secondary, extra, or general

    def matches(self, person, verbose: bool = False) -> bool:
        """
        Check if person's subset matches this role.
        """
        # Get person's subset from residence allocation
        # UNIFIED STRUCTURE: activity_map['residence'][venue_type] = [subsets]
        if "residence" not in person.activity_map:
            return False

        if not person.activity_map["residence"]:
            return False

        # Get the residence subset from any residence type (household, care_home, etc.)
        residence_dict = person.activity_map["residence"]
        residence_subset = None

        for venue_type, subsets in residence_dict.items():
            if subsets and isinstance(subsets, list) and len(subsets) > 0:
                residence_subset = subsets[0]
                break

        if residence_subset is None:  # Check for None explicitly, not truthiness (Subset has __len__)
            return False

        person_subset = residence_subset.subset_name

        if verbose:
            logger.debug(f"        Testing role '{self.name}': person_subset='{person_subset}', role_subsets={self.subsets}")

        return person_subset in self.subsets

matches(person, verbose=False)

Check if person's subset matches this role.

Source code in may/attribute_assignment/assignment_config.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def matches(self, person, verbose: bool = False) -> bool:
    """
    Check if person's subset matches this role.
    """
    # Get person's subset from residence allocation
    # UNIFIED STRUCTURE: activity_map['residence'][venue_type] = [subsets]
    if "residence" not in person.activity_map:
        return False

    if not person.activity_map["residence"]:
        return False

    # Get the residence subset from any residence type (household, care_home, etc.)
    residence_dict = person.activity_map["residence"]
    residence_subset = None

    for venue_type, subsets in residence_dict.items():
        if subsets and isinstance(subsets, list) and len(subsets) > 0:
            residence_subset = subsets[0]
            break

    if residence_subset is None:  # Check for None explicitly, not truthiness (Subset has __len__)
        return False

    person_subset = residence_subset.subset_name

    if verbose:
        logger.debug(f"        Testing role '{self.name}': person_subset='{person_subset}', role_subsets={self.subsets}")

    return person_subset in self.subsets

StructureAssignmentRules dataclass

Assignment rules for a specific household structure.

Source code in may/attribute_assignment/assignment_config.py
307
308
309
310
311
312
313
314
@dataclass
class StructureAssignmentRules:
    """
    Assignment rules for a specific household structure.
    """
    structure_name: str
    description: str
    rules: List[AssignmentRule] = field(default_factory=list)