43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949 | class AttributeAssigner:
"""
Main orchestrator for attribute assignment.
Uses structure-based assignment with straightforward role logic.
"""
def __init__(self, config: AttributeAssignmentConfig, data_manager: DataSourceManager):
"""
Initialize attribute assigner.
Args:
config: Attribute assignment configuration
data_manager: Data source manager with loaded data
"""
self.config = config
self.data_manager = data_manager
self.attribute_name = config.attribute_name
# Logging settings
self.verbose = config.settings.get('logging', {}).get('detailed_assignment_logging', False)
# Cache strategy objects to avoid repeated creation
self._strategy_cache = {} # Maps strategy config hash to strategy instance
# Pre-compute filter configuration
self._has_filters = hasattr(config, 'filters') and config.filters
self._optimized_filters = []
# Filters that inspect the person's activity_map (assigned venues/subsets)
# rather than a scalar attribute. Populated below.
self._activity_venue_filters = []
if self._has_filters:
for name, cfg in config.filters.items():
if name == 'activities':
continue
# activity_venue: require an assigned venue of a given type
# (optionally in a given subset) under some activity.
if cfg.get('type') == 'activity_venue':
self._activity_venue_filters.append({
'activity': cfg.get('activity', 'primary_activity'),
'venue_types': set(cfg.get('venue_types', [])),
'subset_names': set(cfg.get('subset_names', [])),
})
continue
attr = cfg.get('attribute')
ftype = cfg.get('type')
num = cfg.get('numerical', {})
# Categorical "include" values: accept either nested
# (categorical: {values: [...]}) or flat (values: [...]) form.
cat_values = cfg.get('categorical', {}).get('values', cfg.get('values'))
self._optimized_filters.append({
'attr': attr,
'type': ftype,
'min': num.get('min'),
'max': num.get('max'),
'values': cat_values,
'is_age': attr == 'age',
'is_sex': attr == 'sex'
})
self._activity_filters = config.filters.get('activities', {}) if self._has_filters else {}
self._include_activities = self._activity_filters.get('include', [])
self._exclude_activities = self._activity_filters.get('exclude', [])
self._required_attrs = list(config.required_attributes.items()) if config.required_attributes else []
# Statistics
self.stats = {
'total_people': 0,
'people_in_households': 0,
'people_in_other_residences': 0,
'households_processed': 0,
'other_residences_processed': 0,
'assignments_by_venue_type': defaultdict(int),
'assignments_by_rule': defaultdict(int),
'assignments_by_role': defaultdict(int),
'assignments_by_strategy': defaultdict(int),
'attribute_distribution': defaultdict(int),
'household_structure_counts': defaultdict(int),
'fallbacks_by_reason': defaultdict(int),
'unassigned_people': 0,
'filtered_people': 0, # People filtered out by age/activity filters
'assigned_people': 0, # People successfully assigned
}
def _get_or_create_strategy(self, assignment_config):
"""
Get cached strategy or create new one.
Args:
assignment_config: Strategy configuration dict (from AttributeAssignmentRule)
Returns:
Strategy instance
"""
# Use object id as cache key
config_key = id(assignment_config)
# Check cache
if config_key not in self._strategy_cache:
# Create new strategy
self._strategy_cache[config_key] = StrategyFactory.create_strategy(
assignment_config, self.data_manager
)
return self._strategy_cache[config_key]
def assign_all(self, venue_manager) -> Dict[str, Any]:
"""
Assign attribute based on assignment level (household or person).
Args:
venue_manager: VenueManager with households and people
Returns:
Dictionary with assignment statistics
"""
logger.info(f"Starting attribute assignment for '{self.attribute_name}'...")
logger.info(f"Assignment level: {self.config.assignment_level}")
logger.info("=" * 80)
# Branch based on assignment level
if self.config.assignment_level == "person":
self._assign_all_people(venue_manager)
elif self.config.assignment_level == "person_by_residence":
self._assign_all_residences(venue_manager)
else:
raise ValueError(f"Unknown assignment_level: '{self.config.assignment_level}'. "
f"Expected 'person' or 'person_by_residence'.")
# Report statistics
self._report_statistics()
return self.stats
def _assign_all_residences(self, venue_manager):
"""Assign attributes at residence level (households and communal establishments)."""
# Get all venues
all_venues = venue_manager.get_all_venues_list()
logger.info(f"Found {len(all_venues)} total venues")
# Count total people across ALL venues for accurate statistics
total_people_in_simulation = sum(venue.size() for venue in all_venues)
# Get the residence venue types assigned by household structure
# (defaults to ["household"]); other residences use venue_assignment_rules.
residence_venue_types = self.config.residence_venue_types or ["household"]
# Separate structure-assigned residences from the rest based on config
households = [v for v in all_venues if v.type in residence_venue_types]
other_residences = [v for v in all_venues if v.type not in residence_venue_types]
people_in_other_residences = sum(venue.size() for venue in other_residences)
logger.info(f" Households: {len(households)}")
if other_residences:
logger.info(f" Other residences: {len(other_residences)} (containing {people_in_other_residences} people)")
logger.info("")
# Process households with structure-based logic
logger.info("Processing households...")
total = len(households)
progress_interval = max(1, total // 20) # Report every 5%
for i, household in enumerate(households):
self._assign_household(household)
# Log progress
if (i + 1) % progress_interval == 0 or (i + 1) == total:
progress = ((i + 1) / total) * 100
logger.info(f" Progress: {i+1:,}/{total:,} ({progress:.1f}%)")
logger.info(f"✓ Processed {self.stats['households_processed']} households")
logger.info("")
# Process other residences (care homes, dorms, etc.) with simpler logic
people_assigned_in_other_residences = 0
if other_residences:
logger.info(f"Processing {len(other_residences)} other residences (care homes, dorms, etc.)...")
people_assigned_in_other_residences = self._assign_other_residences(other_residences)
logger.info(f"✓ Assigned {people_assigned_in_other_residences} people in other residences")
logger.info("")
# Set total_people from the authoritative venue count
self.stats['total_people'] = total_people_in_simulation
def _assign_other_residences(self, venues):
"""
Assign attributes to people in non-household residences (care homes, dorms, etc.).
Uses rules defined in venue_assignment_rules section of config.
Args:
venues: List of non-household residence venues
Returns:
Number of people assigned
"""
people_assigned = 0
venues_processed = 0
for venue in venues:
members = venue.get_all_members()
if not members:
continue
# Find the assignment rule for this venue type
venue_rule = None
for rule in self.config.venue_assignment_rules:
if venue.type in rule.get('venue_types', []):
venue_rule = rule
break
if not venue_rule:
logger.warning(f"No assignment rule found for venue type '{venue.type}', skipping")
continue
# Get the assignment strategy from the rule
assignment_config = venue_rule.get('assignment', {})
try:
strategy = self._get_or_create_strategy(assignment_config)
venue_assigned = 0
# Assign each person
for person in members:
# Skip if already assigned
if self.attribute_name in person.properties:
continue
# Create context for strategy
context = {
'attribute_name': self.attribute_name,
'venue_type': venue.type
}
value = strategy.assign(person, venue, context)
if value is not None:
person.properties[self.attribute_name] = value
self.stats['attribute_distribution'][value] += 1
self.stats['assignments_by_strategy'][f'venue_{venue.type}'] += 1
self.stats['assignments_by_venue_type'][venue.type] += 1
people_assigned += 1
venue_assigned += 1
else:
self.stats['unassigned_people'] += 1
logger.warning(f"Failed to assign {self.attribute_name} to person {person.id} in {venue.type}")
if venue_assigned > 0:
venues_processed += 1
except Exception as e:
logger.error(f"Error assigning attributes to venue {venue.id} ({venue.type}): {e}")
continue
# Update stats
self.stats['people_in_other_residences'] = people_assigned
self.stats['other_residences_processed'] = venues_processed
return people_assigned
def _passes_filters(self, person):
"""
Check if person passes all configured filters.
Args:
person: Person object
Returns:
bool: True if person passes all filters
"""
if not self._has_filters:
# Check required attributes even if no filters
for attr_name, attr_config in self._required_attrs:
if attr_config.get('required', False):
if attr_name not in person.properties:
if attr_config.get('error_if_missing', False):
return False
return True
# 1. Attribute filters
for f in self._optimized_filters:
# Use direct attribute access for age/sex (significant speedup)
if f['is_age']:
person_value = person.age
elif f['is_sex']:
person_value = person.sex
else:
# Check properties first
person_value = person.properties.get(f['attr'])
if person_value is None:
person_value = getattr(person, f['attr'], None)
if person_value is None:
continue
if f['type'] == 'numerical':
vmin = f['min']
if vmin is not None and person_value < vmin:
return False
vmax = f['max']
if vmax is not None and person_value > vmax:
return False
elif f['type'] == 'categorical':
values = f['values']
if values is not None and person_value not in values:
return False
# 2. Activity filters (Fast set intersection check)
if self._include_activities or self._exclude_activities:
# use direct attribute access for activities (it's a slot)
person_activities = person.activities
if self._include_activities:
# simple loop is faster than generator for small lists
has_activity = False
for a in self._include_activities:
if a in person_activities:
has_activity = True
break
if not has_activity:
return False
if self._exclude_activities:
for a in self._exclude_activities:
if a in person_activities:
return False
# 2b. Activity-venue filters: require an actually-assigned venue of a
# given type (optionally in a given subset) under some activity. Used to
# gate commute on people who genuinely got a workplace venue.
for f in self._activity_venue_filters:
venue_map = person.activity_map.get(f['activity'], {})
venue_types = f['venue_types']
subset_names = f['subset_names']
matched = False
for vt, subsets in venue_map.items():
if venue_types and vt not in venue_types:
continue
if subset_names:
if any(getattr(s, 'subset_name', None) in subset_names for s in subsets):
matched = True
break
elif subsets:
matched = True
break
if not matched:
return False
# 3. Required attributes
for attr_name, attr_config in self._required_attrs:
if attr_config.get('required', False):
if attr_name not in person.properties:
if attr_config.get('error_if_missing', False):
return False
return True
def _assign_all_people(self, venue_manager):
"""
Assign attributes at person level.
"""
# Get all people from venue manager
all_people = []
for venue in venue_manager.get_all_venues_list():
all_people.extend(venue.get_all_members())
logger.info(f"Found {len(all_people)} total people")
logger.info("")
# Check required attributes
self._check_required_attributes(all_people)
# Pre-filter all people once
logger.info("Pre-filtering people by age/activity filters...")
eligible_people = []
for person in all_people:
if self._passes_filters(person):
eligible_people.append(person)
else:
self.stats['filtered_people'] += 1
self.stats['total_people'] = len(all_people)
logger.info(f" ✓ Eligible for assignment: {len(eligible_people)} / {len(all_people)} people")
logger.info(f" ✓ Filtered out: {self.stats['filtered_people']} people")
logger.info("")
# Get assignment rule and strategy
rule = self.config.get_person_assignment_rule()
if not rule:
logger.warning(f"No assignment rule for person-level attribute '{self.attribute_name}'")
self.stats['unassigned_people'] = len(eligible_people)
logger.info("")
return
strategy = self._get_or_create_strategy(rule.assignment)
# Check if strategy supports batch assignment
if hasattr(strategy, 'assign_batch') and callable(getattr(strategy, 'assign_batch')):
logger.info("Using batch assignment...")
self._assign_all_people_batch(eligible_people, strategy)
else:
logger.info("Using standard assignment...")
self._assign_all_people_sequential(eligible_people, strategy)
logger.info(f"✓ Processed {len(all_people)} people")
logger.info(f"✓ Filtered {self.stats['filtered_people']} people (age/activity filters)")
logger.info(f"✓ Assigned {self.stats['assigned_people']} people")
logger.info(f"✓ Unassigned {self.stats['unassigned_people']} people (failed assignment)")
logger.info(f"✓ Fallback used: {self.stats.get('fallback_count', 0)} times")
logger.info("")
def _assign_all_people_batch(self, eligible_people, strategy):
"""
Batch assignment mode.
Uses strategy's assign_batch method to process all people together.
Args:
eligible_people: List of pre-filtered people
strategy: Assignment strategy with assign_batch method
"""
logger.info("Processing people in batch mode...")
total = len(eligible_people)
# Progress tracking
progress_interval = max(1, total // 20) # Report every 5%
# Prepare batch data
logger.info(f" Preparing batch data for {total:,} people...")
households = [self._get_person_residence_venue(p) for p in eligible_people]
contexts = [{'attribute_name': self.attribute_name} for _ in eligible_people]
# Call batch assignment
logger.info(f" Running batch assignment...")
results = strategy.assign_batch(eligible_people, households, contexts)
# Assign results to people
logger.info(f" Applying results to people...")
for i, (person, value) in enumerate(zip(eligible_people, results)):
if value is not None:
# Handle single value or multiple values (dict)
if isinstance(value, dict):
# Multiple attributes returned
for attr_name, attr_value in value.items():
person.properties[attr_name] = attr_value
if attr_name == self.attribute_name:
self.stats['attribute_distribution'][str(attr_value)] += 1
else:
# Single attribute
person.properties[self.attribute_name] = value
self.stats['attribute_distribution'][str(value)] += 1
self.stats['assignments_by_strategy'][strategy.strategy_type] += 1
self.stats['assigned_people'] += 1
else:
self.stats['unassigned_people'] += 1
# Log progress
if (i + 1) % progress_interval == 0 or (i + 1) == total:
progress = ((i + 1) / total) * 100
logger.info(f" Progress: {i+1:,}/{total:,} ({progress:.1f}%)")
logger.info(f"✓ Batch processed {total:,} people")
def _assign_all_people_sequential(self, eligible_people, strategy):
"""
Standard sequential assignment mode.
Args:
eligible_people: List of pre-filtered people
strategy: Assignment strategy (already created, reuse it!)
"""
logger.info("Processing people sequentially...")
total = len(eligible_people)
# Progress tracking
progress_interval = max(1, total // 20) # Report every 5%
# Sample tracking for debugging
sample_size = min(10, total) if total > 0 else 0
sample_indices = set(np.random.choice(total, sample_size, replace=False)) if total > 0 else set()
samples_logged = []
# Process each person with the pre-created strategy
for i, person in enumerate(eligible_people):
# Track if this is a sample person
is_sample = i in sample_indices
if is_sample:
logger.debug(f"\n [SAMPLE {len(samples_logged)+1}] Person {person.id}:")
logger.debug(f" Age: {person.age}, Sex: {person.sex}")
logger.debug(f" Geo Unit: {person.geographical_unit.name if person.geographical_unit else 'None'}")
logger.debug(f" Existing attributes: {list(person.properties.keys())}")
# Pass strategy directly instead of looking it up again
household = self._get_person_residence_venue(person)
context = {'attribute_name': self.attribute_name, 'debug': is_sample}
try:
value = strategy.assign(person, household, context)
if value is not None:
# Handle single value or multiple values (dict)
if isinstance(value, dict):
for attr_name, attr_value in value.items():
person.properties[attr_name] = attr_value
if attr_name == self.attribute_name:
self.stats['attribute_distribution'][str(attr_value)] += 1
else:
person.properties[self.attribute_name] = value
self.stats['attribute_distribution'][str(value)] += 1
self.stats['assignments_by_strategy'][strategy.strategy_type] += 1
self.stats['assigned_people'] += 1
# Record fallback reason
if 'fallback_reason' in context:
self.stats['fallbacks_by_reason'][context['fallback_reason']] += 1
del context['fallback_reason']
else:
self.stats['unassigned_people'] += 1
except Exception as e:
logger.error(f"Exception assigning {self.attribute_name} to person {person.id}: {e}")
self.stats['unassigned_people'] += 1
if is_sample:
result = person.properties.get(self.attribute_name, "NOT_ASSIGNED")
logger.debug(f" Result: {self.attribute_name} = {result}")
samples_logged.append((person.id, result))
# Log progress
if (i + 1) % progress_interval == 0 or (i + 1) == total:
progress = ((i + 1) / total) * 100
logger.info(f" Progress: {i+1:,}/{total:,} ({progress:.1f}%)")
def _assign_household(self, household):
"""
Assign attribute to all people in a household.
Main assignment flow:
1. Classify household structure
2. Sort people by configured assignment order
3. For each person:
a. Determine role based on subset + already assigned roles
b. Get assignment rule for (structure, role)
c. Execute strategy
d. Track assigned roles
Args:
household: Venue object (type="household")
"""
# Get all members
members = household.get_all_members()
if not members:
return
if self.verbose:
logger.debug(f"\n{'=' * 80}")
logger.debug(f"Processing Household {household.id} "
f"(geo_unit={household.geographical_unit.name if household.geographical_unit else 'None'})")
logger.debug(f" Members: {len(members)}")
logger.debug(f" Original pattern: {household.properties.get('original_pattern', 'N/A')}")
logger.debug(f" Actual pattern: {household.properties.get('actual_pattern', 'N/A')}")
# Pre-calculate person categories (subsets) to avoid repeated lookups
# UNIFIED STRUCTURE: activity_map['residence']['household'] = [subsets]
person_categories = {}
for person in members:
category = "unknown"
if "residence" in person.activity_map and "household" in person.activity_map["residence"]:
res_subsets = person.activity_map["residence"]["household"]
if res_subsets:
category = res_subsets[0].subset_name
person_categories[person.id] = category
# 1. Classify household structure
# Pass pre-calculated categories if possible, but get_household_structure currently uses internal logic
structure = self.config.get_household_structure(household, verbose=self.verbose)
if not structure:
if self.verbose:
logger.debug(f" Could not classify household {household.id}, skipping")
else:
logger.warning(f"Could not classify household {household.id}, skipping")
self.stats['unassigned_people'] += len(members)
return
# Store structure in household properties
household.properties['_structure'] = structure
self.stats['household_structure_counts'][structure] += 1
if not self.verbose:
logger.debug(f"Household {household.id}: structure={structure}, members={len(members)}")
# Initialize assignment context
context = {
'attribute_name': self.attribute_name,
'household_structure': structure,
}
# Track assigned roles (as a list to maintain order and count)
assigned_roles: List[str] = []
# 2. Get dependency-aware assignment order
# This ensures that roles with 'inherit_from' are processed after their dependencies
sorted_members = self._get_dependency_aware_order(
members, structure, person_categories
)
# 3. Assign each person in order
for person in sorted_members:
category = person_categories.get(person.id, "unknown")
if self.verbose:
logger.debug(f"\n Assigning {person} (category={category}):")
# 3a. Determine role
# Pass pre-calculated category
role = self.config.get_person_role(
person, structure, assigned_roles, verbose=self.verbose,
person_category=category
)
if not role:
if self.verbose:
logger.debug(f" Could not determine role, skipping")
else:
logger.warning(f" Could not determine role for {person} in {household.id}")
self.stats['unassigned_people'] += 1
continue
# Track assigned roles
assigned_roles.append(role)
# Store person by role in context (for strategies to reference)
person_key = f"{role}_person"
context[person_key] = person
# 3b. Get assignment rule
# get_assignment_rule is already fairly fast, but could be memoized in config
rule = self.config.get_assignment_rule(structure, role, verbose=self.verbose)
if not rule:
if self.verbose:
logger.debug(f" No rule found for role '{role}', skipping")
else:
logger.warning(f" No rule for role '{role}' in structure '{structure}' for {person}")
self.stats['unassigned_people'] += 1
continue
# 3c. Create and execute strategy
try:
strategy = self._get_or_create_strategy(rule.assignment)
value = strategy.assign(person, household, context)
if value is not None:
# Assign attribute to person's properties dict
person.properties[self.attribute_name] = value
# Update statistics
self.stats['assignments_by_role'][role] += 1
self.stats['assignments_by_strategy'][strategy.strategy_type] += 1
self.stats['assigned_people'] += 1
# Track distribution — use str() for unhashable types (e.g. lists)
dist_key = str(value) if isinstance(value, (list, dict)) else value
self.stats['attribute_distribution'][dist_key] += 1
# Record fallback reason
if 'fallback_reason' in context:
self.stats['fallbacks_by_reason'][context['fallback_reason']] += 1
del context['fallback_reason']
if self.verbose:
logger.debug(f" ✓ Assigned: {self.attribute_name}={value} "
f"(role={role}, strategy={strategy.strategy_type})")
else:
logger.debug(f" {person}: {self.attribute_name}={value} (role={role})")
else:
logger.warning(f" Strategy returned None for {person} (role={role})")
self.stats['unassigned_people'] += 1
except Exception as e:
logger.error(f" Error assigning to {person}: {e}")
self.stats['unassigned_people'] += 1
if self.verbose:
logger.debug(f"{'=' * 80}\n")
self.stats['households_processed'] += 1
self.stats['people_in_households'] += len(members)
self.stats['total_people'] += len(members)
def _get_dependency_aware_order(self, members, structure: str,
person_categories: Dict[int, str] = None) -> List:
"""
Get person assignment order that satisfies role dependencies.
Uses a topological sort so inheritors are processed after the roles they
inherit from. Members are ordered by id as the stable base/tie-breaker —
which role a person takes (primary vs secondary, child, elder) depends on
their subset and this order, not on any configurable category priority.
"""
# 1. Determine roles for everyone first (predictive)
# We need to know who is who to build the dependency graph
temp_assigned_roles = []
person_to_role = {}
# Stable base order (by id) used to predict roles, since role depends on order.
base_sorted = sorted(members, key=lambda p: p.id)
for person in base_sorted:
category = person_categories.get(person.id, "unknown") if person_categories else "unknown"
role = self.config.get_person_role(
person, structure, temp_assigned_roles, verbose=False,
person_category=category
)
if role:
person_to_role[person.id] = role
temp_assigned_roles.append(role)
# 2. Build dependency graph
# graph[A] = [B, C] means A must come before B and C
adj = defaultdict(list)
in_degree = defaultdict(int)
# Map roles to people
role_to_people = defaultdict(list)
for pid, role in person_to_role.items():
role_to_people[role].append(pid)
# Add edges based on assignment rule dependencies
for pid, role in person_to_role.items():
rule = self.config.get_assignment_rule(structure, role)
if rule and rule.dependencies:
for dep_role in rule.dependencies:
# If dep_role is in the household, add edges from all people with that role
if dep_role in role_to_people:
for dep_pid in role_to_people[dep_role]:
if dep_pid != pid: # Avoid self-dependency
adj[dep_pid].append(pid)
in_degree[pid] += 1
# 3. Topological Sort (Kahn's Algorithm), tie-broken by base (id) order.
queue = []
# Initial nodes with no dependencies
for p in base_sorted:
if in_degree[p.id] == 0:
queue.append(p)
result = []
processed_count = 0
while queue:
# Among equal dependency levels, keep the stable base (id) order.
queue.sort(key=lambda p: [x.id for x in base_sorted].index(p.id))
curr = queue.pop(0)
result.append(curr)
processed_count += 1
for neighbor_id in adj[curr.id]:
in_degree[neighbor_id] -= 1
if in_degree[neighbor_id] == 0:
# Find person object for this ID
neighbor = next(p for p in members if p.id == neighbor_id)
queue.append(neighbor)
# 4. Handle remaining people (cycle or isolated)
if processed_count < len(members):
# If there's a cycle or missing dependencies, just append remaining in base order
processed_ids = {p.id for p in result}
for p in base_sorted:
if p.id not in processed_ids:
result.append(p)
return result
def _get_person_category(self, person) -> str:
"""
Get person's category (subset name) from their household activity.
Args:
person: Person object
Returns:
Category name or "unknown"
"""
# UNIFIED STRUCTURE: activity_map['residence']['household'] = [subsets]
if "residence" in person.activity_map and "household" in person.activity_map["residence"] and person.activity_map["residence"]["household"]:
return person.activity_map["residence"]["household"][0].subset_name
return "unknown"
def _get_person_residence_venue(self, person):
"""Get residence venue for a person (e.g., household, pub, care home)."""
# UNIFIED STRUCTURE: activity_map['residence'][venue_type] = [subsets]
if "residence" in person.activity_map:
for venue_type, subsets in person.activity_map["residence"].items():
if subsets:
venue = subsets[0].venue
if venue is not None:
logger.debug(f" Person {person.id} residence found: {venue_type} (ID={venue.id})")
return venue
return None
def _check_required_attributes(self, people):
"""Check and log required attribute availability."""
if not self.config.required_attributes:
return
logger.info("Checking required attributes...")
for attr_name, attr_config in self.config.required_attributes.items():
if not attr_config.get('required', False):
continue
missing_count = sum(1 for p in people if attr_name not in p.properties)
total_count = len(people)
present_count = total_count - missing_count
logger.info(f" '{attr_name}': {present_count}/{total_count} people have this attribute")
if missing_count > 0:
logger.warning(f" {missing_count} people missing required attribute '{attr_name}'")
logger.info("")
def _report_statistics(self):
"""Report assignment statistics."""
logger.info("=" * 80)
logger.info("ASSIGNMENT STATISTICS")
logger.info("=" * 80)
logger.info(f"Total people: {self.stats['total_people']}")
# Show filtered/assigned/unassigned breakdown
if self.stats.get('filtered_people', 0) > 0:
logger.info(f"Filtered people (age/activity): {self.stats['filtered_people']}")
if self.stats.get('assigned_people', 0) > 0:
logger.info(f"Assigned people: {self.stats['assigned_people']}")
if self.stats['unassigned_people'] > 0:
logger.info(f"Unassigned people (failures): {self.stats['unassigned_people']}")
# Household-specific stats
if self.stats['people_in_households'] > 0:
logger.info(f" In households: {self.stats['people_in_households']}")
if self.stats['people_in_other_residences'] > 0:
logger.info(f" In other residences: {self.stats['people_in_other_residences']}")
if self.stats['households_processed'] > 0:
logger.info(f"Households processed: {self.stats['households_processed']}")
if self.stats['other_residences_processed'] > 0:
logger.info(f"Other residences processed: {self.stats['other_residences_processed']}")
logger.info("")
# Show breakdown by venue type if applicable
if self.stats['assignments_by_venue_type']:
logger.info("Assignments by venue type:")
for venue_type, count in sorted(self.stats['assignments_by_venue_type'].items()):
logger.info(f" {venue_type}: {count}")
# Show fallback diagnostics
if self.stats['fallbacks_by_reason']:
logger.info("")
logger.info("FALLBACK DIAGNOSTICS (Total fallbacks: {})".format(sum(self.stats['fallbacks_by_reason'].values())))
for reason, count in sorted(self.stats['fallbacks_by_reason'].items()):
logger.info(f" {reason}: {count}")
logger.info("")
# Household structure distribution (only if household-level)
if self.stats['household_structure_counts']:
logger.info("Household structures:")
for structure, count in sorted(self.stats['household_structure_counts'].items()):
logger.info(f" {structure}: {count}")
logger.info("")
# Role distribution (only if household-level)
if self.stats['assignments_by_role']:
logger.info("Assignments by role:")
for role, count in sorted(self.stats['assignments_by_role'].items()):
logger.info(f" {role}: {count}")
logger.info("")
# Strategy distribution
if self.stats['assignments_by_strategy']:
logger.info("Assignments by strategy:")
for strategy, count in sorted(self.stats['assignments_by_strategy'].items()):
logger.info(f" {strategy}: {count}")
logger.info("")
# Attribute distribution (can be disabled via settings)
show_distribution = self.config.settings.get('logging', {}).get('show_attribute_distribution', True)
if show_distribution:
logger.info(f"{self.attribute_name.capitalize()} distribution:")
total_assigned = sum(self.stats['attribute_distribution'].values())
for value, count in sorted(self.stats['attribute_distribution'].items()):
percentage = (count / total_assigned * 100) if total_assigned > 0 else 0
logger.info(f" {value}: {count:6d} ({percentage:5.2f}%)")
logger.info("")
else:
# Still show summary count even when distribution is hidden
unique_values = len(self.stats['attribute_distribution'])
total_assigned = sum(self.stats['attribute_distribution'].values())
logger.info(f"{self.attribute_name.capitalize()} distribution: {unique_values} unique values, {total_assigned} total assignments")
logger.info("")
logger.info("=" * 80)
|