diff --git a/spp_programs/models/managers/deduplication_manager.py b/spp_programs/models/managers/deduplication_manager.py
index cc9cb561..1581ab06 100644
--- a/spp_programs/models/managers/deduplication_manager.py
+++ b/spp_programs/models/managers/deduplication_manager.py
@@ -131,12 +131,15 @@ def _check_duplicate_by_individual_ids(self, beneficiaries):
group_of_duplicates[group_membership.individual.id].append(group_membership.group.id)
_logger.debug("Found %s duplicate group(s)", len(group_of_duplicates))
- for _individual, group_ids in group_of_duplicates.items():
+ for individual_id, group_ids in group_of_duplicates.items():
group_ids_set = set(group_ids)
duplicate_beneficiaries = beneficiaries.filtered(lambda rec, gids=group_ids_set: rec.partner_id.id in gids)
duplicate_beneficiariy_ids = duplicate_beneficiaries.mapped("id")
- self._record_duplicate(self, duplicate_beneficiariy_ids, "Duplicate individuals")
+ individual_rec = self.env["res.partner"].browse(individual_id)
+ group_names = duplicate_beneficiaries.mapped("partner_id.name")
+ reason = f"Shared member: {individual_rec.name} found in {len(group_ids)} groups ({', '.join(group_names)})"
+ self._record_duplicate(self, duplicate_beneficiariy_ids, reason)
duplicated_enrolled = duplicate_beneficiaries.filtered(lambda rec: rec.state == "enrolled")
if len(duplicated_enrolled) == 1:
@@ -158,7 +161,11 @@ class IDDocumentDeduplication(models.Model):
_inherit = ["spp.base.deduplication.manager", "spp.manager.source.mixin"]
_description = "ID Deduplication Manager"
- supported_id_document_type_ids = fields.Many2many("spp.id.type", string="Supported ID Document Types")
+ supported_id_document_type_ids = fields.Many2many(
+ "spp.vocabulary.code",
+ string="Supported ID Document Types",
+ domain=[("vocabulary_id.namespace_uri", "=", "urn:openspp:vocab:id-type")],
+ )
def deduplicate_beneficiaries(self, states):
for rec in self:
@@ -233,7 +240,7 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
if x.id_type_id in self.supported_id_document_type_ids and (
(not x.expiry_date) or x.expiry_date > date.today()
):
- id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.name + "-" + x.value}
+ id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.display + "-" + x.value}
individual_id_docs.update(id_doc_id_with_id_type_and_value)
# Check ID Docs of each group
@@ -242,7 +249,7 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
if x.id_type_id in self.supported_id_document_type_ids and (
(not x.expiry_date) or x.expiry_date > date.today()
):
- id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.name + "-" + x.value}
+ id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.display + "-" + x.value}
individual_id_docs.update(id_doc_id_with_id_type_and_value)
_logger.debug("Found %s ID document(s) to check", len(individual_id_docs))
@@ -264,6 +271,11 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
)
_logger.debug("Found %s group(s) with duplicates", len(group_with_duplicates))
+ # Build mapping: individual_id -> list of duplicate ID doc descriptions
+ individual_dup_docs = {}
+ for doc in duplicated_doc_ids:
+ individual_dup_docs.setdefault(doc.partner_id.id, []).append(f"{doc.id_type_id.display}: {doc.value}")
+
group_of_duplicates = {}
for group_membership in group_with_duplicates:
_logger.debug("Processing group membership duplicate")
@@ -272,12 +284,15 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
group_of_duplicates[group_membership.individual.id].append(group_membership.group.id)
_logger.debug("Found %s duplicate group(s)", len(group_of_duplicates))
- for _individual, group_ids in group_of_duplicates.items():
+ for individual_id, group_ids in group_of_duplicates.items():
group_ids_set = set(group_ids)
duplicate_beneficiaries = beneficiaries.filtered(lambda rec, gids=group_ids_set: rec.partner_id.id in gids)
duplicate_beneficiariy_ids = duplicate_beneficiaries.mapped("id")
- self._record_duplicate(self, duplicate_beneficiariy_ids, "Duplicate ID Documents")
+ individual_rec = self.env["res.partner"].browse(individual_id)
+ doc_info = ", ".join(individual_dup_docs.get(individual_id, []))
+ reason = f"Duplicate ID document ({doc_info}) on member: {individual_rec.name}"
+ self._record_duplicate(self, duplicate_beneficiariy_ids, reason)
duplicated_enrolled = duplicate_beneficiaries.filtered(lambda rec: rec.state == "enrolled")
if len(duplicated_enrolled) == 1:
@@ -295,55 +310,65 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
def _check_duplicate_by_individual(self, beneficiaries):
"""
- This method is used to check if there are any duplicates
- among the individuals id docs.
- :param beneficiary_ids: The beneficiaries.
- :return:
+ Check for duplicate ID documents among individual beneficiaries.
+ Groups duplicates by shared ID value and applies keep-one-enrolled logic.
"""
_logger.debug("-" * 100)
individual_ids = beneficiaries.mapped("partner_id.id")
individuals = self.env["res.partner"].search([("id", "in", individual_ids)])
_logger.debug("Checking ID Document Duplicates for %s individual(s)", len(individuals))
+ # Map: reg_id.id -> "IDType-Value", also track reg_id -> partner_id
individual_id_docs = {}
- # Check ID Documents of each individual
+ reg_id_to_partner = {}
for i in individuals:
for x in i.reg_ids:
if x.id_type_id in self.supported_id_document_type_ids and (
(not x.expiry_date) or x.expiry_date > date.today()
):
- id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.name + "-" + x.value}
- individual_id_docs.update(id_doc_id_with_id_type_and_value)
+ doc_key = x.id_type_id.display + "-" + x.value
+ individual_id_docs[x.id] = doc_key
+ reg_id_to_partner[x.id] = i.id
_logger.debug("Found %s ID document(s) to check", len(individual_id_docs))
rev_dict = {}
for key, value in individual_id_docs.items():
rev_dict.setdefault(value, set()).add(key)
- duplicate_ids = filter(lambda x: len(x) > 1, rev_dict.values())
- duplicate_ids = list(duplicate_ids)
- duplicate_ids = list(itertools.chain.from_iterable(duplicate_ids))
- _logger.debug("Found %s duplicate ID document(s)", len(duplicate_ids))
+ all_duplicated_memberships = self.env["spp.program.membership"]
- duplicated_doc_ids = self.env["spp.registry.id"].search([("id", "in", duplicate_ids)])
- individual_ids = [x.partner_id.id for x in duplicated_doc_ids]
- individual_ids = list(dict.fromkeys(individual_ids))
- _logger.debug("Found %s individual(s) with duplicated ID documents", len(individual_ids))
- individual_program_membership = self.env["spp.program.membership"].search(
- [
- ("partner_id", "in", individual_ids),
- ("program_id", "=", self.program_id.id),
- ]
- )
+ # Process each group of duplicates sharing the same ID value
+ for doc_key, reg_ids in rev_dict.items():
+ if len(reg_ids) <= 1:
+ continue
- for duplicates in individual_program_membership:
- duplicate_individuals = [duplicates.id]
- self._record_duplicate(self, duplicate_individuals, "Duplicate ID Documents")
+ partner_ids = list({reg_id_to_partner[rid] for rid in reg_ids})
+ dup_memberships = self.env["spp.program.membership"].search(
+ [
+ ("partner_id", "in", partner_ids),
+ ("program_id", "=", self.program_id.id),
+ ]
+ )
+ if not dup_memberships:
+ continue
- if duplicates.state == "enrolled":
- duplicates.write({"state": "duplicated"})
+ names = dup_memberships.mapped("partner_id.name")
+ reason = f"Duplicate ID document ({doc_key}) shared by: {', '.join(names)}"
+ self._record_duplicate(self, dup_memberships.ids, reason)
- return individual_program_membership
+ # Keep-one-enrolled logic
+ duplicated_enrolled = dup_memberships.filtered(lambda rec: rec.state == "enrolled")
+ if len(duplicated_enrolled) == 1:
+ to_mark = dup_memberships.filtered(lambda rec: rec.state != "enrolled")
+ else:
+ to_mark = dup_memberships
+ to_mark.filtered(lambda rec: rec.state not in ["exited", "not_eligible", "duplicated"]).write(
+ {"state": "duplicated"}
+ )
+
+ all_duplicated_memberships |= dup_memberships
+
+ return all_duplicated_memberships
class PhoneNumberDeduplication(models.Model):
@@ -462,6 +487,10 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
)
_logger.debug("Found %s group(s) with duplicates", len(group_with_duplicates))
+ # Build mapping: individual_id -> list of duplicate phone numbers
+ individual_dup_phones = {}
+ for phone_rec in duplicate_individuals_ids:
+ individual_dup_phones.setdefault(phone_rec.partner_id.id, []).append(phone_rec.phone_no)
group_of_duplicates = {}
for group_membership in group_with_duplicates:
@@ -471,12 +500,15 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries):
group_of_duplicates[group_membership.individual.id].append(group_membership.group.id)
_logger.debug("Found %s duplicate group(s)", len(group_of_duplicates))
- for _individual, group_ids in group_of_duplicates.items():
+ for individual_id, group_ids in group_of_duplicates.items():
group_ids_set = set(group_ids)
duplicate_beneficiaries = beneficiaries.filtered(lambda rec, gids=group_ids_set: rec.partner_id.id in gids)
duplicate_beneficiariy_ids = duplicate_beneficiaries.mapped("id")
- self._record_duplicate(self, duplicate_beneficiariy_ids, "Duplicate Phone Numbers")
+ individual_rec = self.env["res.partner"].browse(individual_id)
+ phone_info = ", ".join(individual_dup_phones.get(individual_id, []))
+ reason = f"Duplicate phone number ({phone_info}) on member: {individual_rec.name}"
+ self._record_duplicate(self, duplicate_beneficiariy_ids, reason)
duplicated_enrolled = duplicate_beneficiaries.filtered(lambda rec: rec.state == "enrolled")
if len(duplicated_enrolled) == 1:
@@ -524,21 +556,44 @@ def _check_duplicate_by_individual(self, beneficiaries):
individual_ids = [x.partner_id.id for x in duplicated_phone_ids]
individual_ids = list(dict.fromkeys(individual_ids))
_logger.debug("Individual IDS with Duplicated Phone Numbers: %s", individual_ids)
- individual_program_membership = self.env["spp.program.membership"].search(
- [
- ("partner_id", "in", individual_ids),
- ("program_id", "=", self.program_id.id),
- ]
- )
- for duplicates in individual_program_membership:
- duplicate_individuals = [duplicates.id]
- self._record_duplicate(self, duplicate_individuals, "Duplicate Phone Numbers")
+ all_duplicated_memberships = self.env["spp.program.membership"]
+
+ # Build reverse map: phone_sanitized -> list of partner_ids
+ phone_to_partners = {}
+ for phone_rec in duplicated_phone_ids:
+ phone_to_partners.setdefault(phone_rec.phone_sanitized, set()).add(phone_rec.partner_id.id)
+
+ for phone_val, partner_id_set in phone_to_partners.items():
+ if len(partner_id_set) <= 1:
+ continue
+
+ dup_memberships = self.env["spp.program.membership"].search(
+ [
+ ("partner_id", "in", list(partner_id_set)),
+ ("program_id", "=", self.program_id.id),
+ ]
+ )
+ if not dup_memberships:
+ continue
+
+ names = dup_memberships.mapped("partner_id.name")
+ reason = f"Duplicate phone number ({phone_val}) shared by: {', '.join(names)}"
+ self._record_duplicate(self, dup_memberships.ids, reason)
+
+ # Keep-one-enrolled logic
+ duplicated_enrolled = dup_memberships.filtered(lambda rec: rec.state == "enrolled")
+ if len(duplicated_enrolled) == 1:
+ to_mark = dup_memberships.filtered(lambda rec: rec.state != "enrolled")
+ else:
+ to_mark = dup_memberships
+ to_mark.filtered(lambda rec: rec.state not in ["exited", "not_eligible", "duplicated"]).write(
+ {"state": "duplicated"}
+ )
- if duplicates.state == "enrolled":
- duplicates.write({"state": "duplicated"})
+ all_duplicated_memberships |= dup_memberships
- return individual_program_membership
+ return all_duplicated_memberships
class IDPhoneEligibilityManager(models.Model):
diff --git a/spp_programs/models/managers/program_manager.py b/spp_programs/models/managers/program_manager.py
index 125331c2..eccb41a2 100644
--- a/spp_programs/models/managers/program_manager.py
+++ b/spp_programs/models/managers/program_manager.py
@@ -214,8 +214,10 @@ def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=Fa
for el in eligibility_managers:
members = el.enroll_eligible_registrants(members)
# enroll the one not already enrolled:
+ # Exclude members that are duplicated or exited — those states
+ # should only be changed through their own workflows.
_logger.debug("members filtered: %s", members)
- not_enrolled = members.filtered(lambda m: m.state != "enrolled")
+ not_enrolled = members.filtered(lambda m: m.state not in ("enrolled", "duplicated", "exited"))
_logger.debug("not_enrolled: %s", not_enrolled)
not_enrolled.write(
{
@@ -226,7 +228,7 @@ def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=Fa
# dis-enroll the one not eligible anymore:
enrolled_members_ids = members.ids
members_to_remove = member_before.filtered(
- lambda m: m.state != "not_eligible" and m.id not in enrolled_members_ids
+ lambda m: m.state not in ("not_eligible", "duplicated", "exited") and m.id not in enrolled_members_ids
)
# _logger.debug("members_to_remove: %s", members_to_remove)
members_to_remove.write(
diff --git a/spp_programs/models/program_membership.py b/spp_programs/models/program_membership.py
index ee4132ae..c5ee399c 100644
--- a/spp_programs/models/program_membership.py
+++ b/spp_programs/models/program_membership.py
@@ -52,6 +52,23 @@ class SPPProgramMembership(models.Model):
registrant_id = fields.Integer(string="Registrant ID", related="partner_id.id")
+ duplicate_reason = fields.Char(
+ string="Duplicate Reason",
+ compute="_compute_duplicate_reason",
+ )
+
+ def _compute_duplicate_reason(self):
+ for rec in self:
+ if rec.state == "duplicated":
+ dup_records = self.env["spp.program.membership.duplicate"].search(
+ [("beneficiary_ids", "in", rec.id), ("state", "=", "duplicate")],
+ order="id desc",
+ limit=1,
+ )
+ rec.duplicate_reason = dup_records.reason if dup_records else False
+ else:
+ rec.duplicate_reason = False
+
@api.constrains("partner_id", "program_id")
def _check_unique_partner_per_program(self):
# Prefetch partner_id and program_id to avoid N+1 queries in loop
@@ -212,7 +229,13 @@ def enroll_eligible_registrants(self):
member = em.enroll_eligible_registrants(member)
if len(member) > 0:
- if self.state != "enrolled":
+ if self.state in ("duplicated", "exited"):
+ message = _(
+ "Cannot enroll: beneficiary is currently %s.",
+ dict(self._fields["state"].selection).get(self.state, self.state),
+ )
+ kind = "warning"
+ elif self.state != "enrolled":
self.write(
{
"state": "enrolled",
@@ -221,19 +244,22 @@ def enroll_eligible_registrants(self):
)
message = _("%s Beneficiaries enrolled.", len(member))
kind = "success"
- return {
- "type": "ir.actions.client",
- "tag": "display_notification",
- "params": {
- "title": _("Enrollment"),
- "message": message,
- "sticky": False,
- "type": kind,
- "next": {
- "type": "ir.actions.act_window_close",
- },
+ else:
+ message = _("Beneficiary is already enrolled.")
+ kind = "info"
+ return {
+ "type": "ir.actions.client",
+ "tag": "display_notification",
+ "params": {
+ "title": _("Enrollment"),
+ "message": message,
+ "sticky": False,
+ "type": kind,
+ "next": {
+ "type": "ir.actions.act_window_close",
},
- }
+ },
+ }
else:
self.state = "not_eligible"
diff --git a/spp_programs/models/programs.py b/spp_programs/models/programs.py
index a1ad8016..e086c6d2 100644
--- a/spp_programs/models/programs.py
+++ b/spp_programs/models/programs.py
@@ -380,17 +380,38 @@ def deduplicate_beneficiaries(self):
message = None
kind = "success"
if len(deduplication_managers):
+ # Count already-flagged duplicates before running
+ already_duplicated = self.env["spp.program.membership"].search_count(
+ [("program_id", "=", rec.id), ("state", "=", "duplicated")]
+ )
+
states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
duplicates = 0
for el in deduplication_managers:
duplicates += el.deduplicate_beneficiaries(states)
- if duplicates > 0:
- message = _("%s Instances of Beneficiaries duplicate.", duplicates)
+ # Count total duplicates after running
+ total_duplicated = self.env["spp.program.membership"].search_count(
+ [("program_id", "=", rec.id), ("state", "=", "duplicated")]
+ )
+ new_duplicates = total_duplicated - already_duplicated
+
+ if total_duplicated > 0:
+ parts = []
+ if new_duplicates > 0:
+ parts.append(_("%(new)s new duplicate(s) found", new=new_duplicates))
+ if already_duplicated > 0:
+ parts.append(_("%(existing)s already flagged", existing=already_duplicated))
+ message = ", ".join(parts) + "."
+ kind = "warning"
+ elif duplicates > 0:
+ message = _(
+ "Found %(count)s duplicate beneficiaries.",
+ count=duplicates,
+ )
kind = "warning"
else:
message = _("No duplicates found.")
- kind = "success"
else:
raise UserError(_("No Deduplication Manager defined."))
@@ -399,7 +420,6 @@ def deduplicate_beneficiaries(self):
"type": "ir.actions.client",
"tag": "display_notification",
"params": {
- "title": _("Deduplication"),
"message": message,
"sticky": False,
"type": kind,
@@ -701,7 +721,7 @@ def check_managers_limit(self):
combined_message = ", ".join(error_messages)
raise UserError(
f"Only one manager can be configured under {combined_message}."
- f"Please delete any new manager(s) before saving your changes." # noqa: B950
+ f"Please delete any new manager(s) before saving your changes."
)
def _pre_enrollment_hook(self, partner):
diff --git a/spp_programs/tests/__init__.py b/spp_programs/tests/__init__.py
index c56ebc92..e3e7f202 100644
--- a/spp_programs/tests/__init__.py
+++ b/spp_programs/tests/__init__.py
@@ -1,17 +1,20 @@
+from . import common_compliance
+from . import test_compliance_cel
from . import test_create_program_wiz
+from . import test_create_program_wizard_cash
+from . import test_create_program_wizard_cel
+from . import test_cycle
+from . import test_deduplication
+from . import test_eligibility_cel
+from . import test_eligibility_cel_integration
from . import test_enrollment_wizard
-from . import test_entitlement_report_wiz
from . import test_entitlement
+from . import test_entitlement_manager_cash
+from . import test_entitlement_report_wiz
+from . import test_program_enrollment
+from . import test_program_membership
from . import test_programs
from . import test_registrant
-from . import test_stock_rule
-from . import test_cycle
-from . import common_compliance
from . import test_spp_cycle_compliance
from . import test_spp_program_create_wizard_compliance
-from . import test_entitlement_manager_cash
-from . import test_create_program_wizard_cash
-from . import test_eligibility_cel
-from . import test_eligibility_cel_integration
-from . import test_compliance_cel
-from . import test_create_program_wizard_cel
+from . import test_stock_rule
diff --git a/spp_programs/tests/test_deduplication.py b/spp_programs/tests/test_deduplication.py
new file mode 100644
index 00000000..f7242ebe
--- /dev/null
+++ b/spp_programs/tests/test_deduplication.py
@@ -0,0 +1,477 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+import uuid
+
+from odoo.tests import TransactionCase
+
+
+class TestDeduplicationCommon(TransactionCase):
+ """Common setup for deduplication tests."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.program = cls.env["spp.program"].create(
+ {"name": f"Dedup Program {uuid.uuid4().hex[:8]}", "target_type": "group"}
+ )
+
+ # Get existing ID type vocabulary and use existing codes, or create test ones
+ cls.id_vocab = cls.env["spp.vocabulary"].search([("namespace_uri", "=", "urn:openspp:vocab:id-type")], limit=1)
+ if cls.id_vocab:
+ # Use existing codes from the system vocabulary
+ existing_codes = cls.env["spp.vocabulary.code"].search([("vocabulary_id", "=", cls.id_vocab.id)], limit=2)
+ if len(existing_codes) >= 2:
+ cls.id_type_national = existing_codes[0]
+ cls.id_type_passport = existing_codes[1]
+ else:
+ # Create local extensions
+ cls.id_type_national = cls.env["spp.vocabulary.code"].create(
+ {
+ "vocabulary_id": cls.id_vocab.id,
+ "code": f"NID-{uuid.uuid4().hex[:6]}",
+ "display": "National ID",
+ "is_local": True,
+ }
+ )
+ cls.id_type_passport = cls.env["spp.vocabulary.code"].create(
+ {
+ "vocabulary_id": cls.id_vocab.id,
+ "code": f"PP-{uuid.uuid4().hex[:6]}",
+ "display": "Passport",
+ "is_local": True,
+ }
+ )
+ else:
+ cls.id_vocab = cls.env["spp.vocabulary"].create(
+ {
+ "name": "ID Types",
+ "namespace_uri": "urn:openspp:vocab:id-type",
+ }
+ )
+ cls.id_type_national = cls.env["spp.vocabulary.code"].create(
+ {
+ "vocabulary_id": cls.id_vocab.id,
+ "code": "NID",
+ "display": "National ID",
+ }
+ )
+ cls.id_type_passport = cls.env["spp.vocabulary.code"].create(
+ {
+ "vocabulary_id": cls.id_vocab.id,
+ "code": "PASSPORT",
+ "display": "Passport",
+ }
+ )
+
+ # Create default deduplication manager
+ cls.dedup_default = cls.env["spp.deduplication.manager.default"].create(
+ {"name": "Default Dedup", "program_id": cls.program.id}
+ )
+ cls.dedup_manager = cls.env["spp.deduplication.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{cls.dedup_default._name},{cls.dedup_default.id}",
+ }
+ )
+ cls.program.write({"deduplication_manager_ids": [(4, cls.dedup_manager.id)]})
+
+ # Create ID deduplication manager
+ cls.id_dedup = cls.env["spp.deduplication.manager.id_dedup"].create(
+ {
+ "name": "ID Dedup",
+ "program_id": cls.program.id,
+ "supported_id_document_type_ids": [(6, 0, [cls.id_type_national.id, cls.id_type_passport.id])],
+ }
+ )
+
+ # Create phone deduplication manager
+ cls.phone_dedup = cls.env["spp.deduplication.manager.phone_number"].create(
+ {"name": "Phone Dedup", "program_id": cls.program.id}
+ )
+
+ def _create_group(self, name, individuals=None):
+ """Helper to create a group with individuals."""
+ group = self.env["res.partner"].create({"name": name, "is_registrant": True, "is_group": True})
+ if individuals:
+ for individual in individuals:
+ self.env["spp.group.membership"].create({"group": group.id, "individual": individual.id})
+ return group
+
+ def _create_individual(self, name, reg_ids=None, phone_numbers=None):
+ """Helper to create an individual with optional IDs and phone numbers."""
+ individual = self.env["res.partner"].create({"name": name, "is_registrant": True, "is_group": False})
+ if reg_ids:
+ for id_type, value in reg_ids:
+ self.env["spp.registry.id"].create(
+ {
+ "partner_id": individual.id,
+ "id_type_id": id_type.id,
+ "value": value,
+ }
+ )
+ if phone_numbers:
+ for phone in phone_numbers:
+ self.env["spp.phone.number"].create({"partner_id": individual.id, "phone_no": phone})
+ return individual
+
+ def _enroll_in_program(self, partner, state="draft"):
+ """Helper to create a program membership."""
+ return self.env["spp.program.membership"].create(
+ {
+ "partner_id": partner.id,
+ "program_id": self.program.id,
+ "state": state,
+ }
+ )
+
+
+class TestDefaultDeduplication(TestDeduplicationCommon):
+ """Tests for default deduplication (shared individuals across groups)."""
+
+ def test_no_duplicates_found(self):
+ """When groups don't share members, no duplicates are found."""
+ ind1 = self._create_individual("Individual 1")
+ ind2 = self._create_individual("Individual 2")
+ group1 = self._create_group("Group A", [ind1])
+ group2 = self._create_group("Group B", [ind2])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ count = self.dedup_default.deduplicate_beneficiaries(states)
+ self.assertEqual(count, 0)
+
+ def test_shared_individual_detected(self):
+ """When two groups share a member, duplicates are detected."""
+ shared = self._create_individual("Shared Individual")
+ ind1 = self._create_individual("Unique 1")
+ ind2 = self._create_individual("Unique 2")
+ group1 = self._create_group("Group A", [shared, ind1])
+ group2 = self._create_group("Group B", [shared, ind2])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ count = self.dedup_default.deduplicate_beneficiaries(states)
+ self.assertGreater(count, 0)
+
+ def test_detailed_reason_includes_member_name(self):
+ """Duplicate reason includes the shared member's name."""
+ shared = self._create_individual("John Shared")
+ group1 = self._create_group("Group X", [shared])
+ group2 = self._create_group("Group Y", [shared])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.dedup_default.deduplicate_beneficiaries(states)
+
+ dup_record = self.env["spp.program.membership.duplicate"].search(
+ [("deduplication_manager_id", "=", self.dedup_default.id)], limit=1
+ )
+ self.assertIn("John Shared", dup_record.reason)
+ self.assertIn("Shared member", dup_record.reason)
+
+ def test_keep_one_enrolled(self):
+ """When one enrolled beneficiary is duplicated, it stays enrolled."""
+ shared = self._create_individual("Shared")
+ group1 = self._create_group("Group Enrolled", [shared])
+ group2 = self._create_group("Group Draft", [shared])
+ m1 = self._enroll_in_program(group1, "enrolled")
+ m2 = self._enroll_in_program(group2, "draft")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.dedup_default.deduplicate_beneficiaries(states)
+
+ m1.invalidate_recordset()
+ m2.invalidate_recordset()
+ self.assertEqual(m1.state, "enrolled")
+ self.assertEqual(m2.state, "duplicated")
+
+ def test_exited_not_overwritten(self):
+ """Exited state is preserved during deduplication."""
+ shared = self._create_individual("Shared")
+ group1 = self._create_group("Group A", [shared])
+ group2 = self._create_group("Group B", [shared])
+ self._enroll_in_program(group1, "enrolled")
+ m2 = self._enroll_in_program(group2, "exited")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.dedup_default.deduplicate_beneficiaries(states)
+
+ m2.invalidate_recordset()
+ self.assertEqual(m2.state, "exited")
+
+ def test_individual_target_type_skips_dedup(self):
+ """Default dedup only works for group target type."""
+ self.program.target_type = "individual"
+ ind = self._create_individual("Ind")
+ self._enroll_in_program(ind, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ count = self.dedup_default.deduplicate_beneficiaries(states)
+ self.assertEqual(count, 0)
+
+
+class TestIDDocumentDeduplication(TestDeduplicationCommon):
+ """Tests for ID document deduplication."""
+
+ def test_group_duplicate_id_docs(self):
+ """Detect duplicate ID documents across groups."""
+ ind1 = self._create_individual("Person A", reg_ids=[(self.id_type_national, "12345")])
+ ind2 = self._create_individual("Person B", reg_ids=[(self.id_type_national, "12345")])
+ group1 = self._create_group("Group A", [ind1])
+ group2 = self._create_group("Group B", [ind2])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ self.program.target_type = "group"
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ count = self.id_dedup.deduplicate_beneficiaries(states)
+ self.assertGreater(count, 0)
+
+ def test_group_duplicate_reason_includes_doc_info(self):
+ """Duplicate reason includes the document type and value."""
+ ind1 = self._create_individual("Alice", reg_ids=[(self.id_type_national, "NID-999")])
+ ind2 = self._create_individual("Bob", reg_ids=[(self.id_type_national, "NID-999")])
+ group1 = self._create_group("Group A", [ind1])
+ group2 = self._create_group("Group B", [ind2])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ self.program.target_type = "group"
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.id_dedup.deduplicate_beneficiaries(states)
+
+ dup_record = self.env["spp.program.membership.duplicate"].search(
+ [("deduplication_manager_id", "=", self.id_dedup.id)], limit=1
+ )
+ self.assertIn("National ID", dup_record.reason)
+
+ def test_individual_duplicate_id_docs(self):
+ """Detect duplicate ID documents for individual target type."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Person A", reg_ids=[(self.id_type_national, "SAME-ID")])
+ ind2 = self._create_individual("Person B", reg_ids=[(self.id_type_national, "SAME-ID")])
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ result = self.id_dedup.deduplicate_beneficiaries(states)
+ self.assertGreater(result, 0)
+
+ def test_individual_duplicate_reason_includes_names(self):
+ """Individual duplicate reason shows shared names."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Alice Ind", reg_ids=[(self.id_type_passport, "PP-001")])
+ ind2 = self._create_individual("Bob Ind", reg_ids=[(self.id_type_passport, "PP-001")])
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.id_dedup.deduplicate_beneficiaries(states)
+
+ dup_record = self.env["spp.program.membership.duplicate"].search(
+ [("deduplication_manager_id", "=", self.id_dedup.id)], limit=1
+ )
+ self.assertIn("Passport", dup_record.reason)
+ self.assertIn("shared by", dup_record.reason)
+
+ def test_individual_keep_one_enrolled(self):
+ """Keep-one-enrolled logic for individual dedup."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Enrolled One", reg_ids=[(self.id_type_national, "DUP-100")])
+ ind2 = self._create_individual("Draft One", reg_ids=[(self.id_type_national, "DUP-100")])
+ m1 = self._enroll_in_program(ind1, "enrolled")
+ m2 = self._enroll_in_program(ind2, "draft")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.id_dedup.deduplicate_beneficiaries(states)
+
+ m1.invalidate_recordset()
+ m2.invalidate_recordset()
+ # One should remain enrolled or both marked if both enrolled
+ self.assertIn(m2.state, ["duplicated", "draft"])
+
+ def test_no_duplicate_when_ids_differ(self):
+ """No duplicates when ID values are different."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Person A", reg_ids=[(self.id_type_national, "UNIQUE-1")])
+ ind2 = self._create_individual("Person B", reg_ids=[(self.id_type_national, "UNIQUE-2")])
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ result = self.id_dedup.deduplicate_beneficiaries(states)
+ self.assertEqual(result, 0)
+
+ def test_expired_ids_ignored(self):
+ """Expired ID documents are not considered for deduplication."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Person A")
+ ind2 = self._create_individual("Person B")
+ # Create expired IDs with same value
+ self.env["spp.registry.id"].create(
+ {
+ "partner_id": ind1.id,
+ "id_type_id": self.id_type_national.id,
+ "value": "EXPIRED-ID",
+ "expiry_date": "2020-01-01",
+ }
+ )
+ self.env["spp.registry.id"].create(
+ {
+ "partner_id": ind2.id,
+ "id_type_id": self.id_type_national.id,
+ "value": "EXPIRED-ID",
+ "expiry_date": "2020-01-01",
+ }
+ )
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ result = self.id_dedup.deduplicate_beneficiaries(states)
+ self.assertEqual(result, 0)
+
+ def test_unsupported_id_type_ignored(self):
+ """IDs of unsupported types are ignored."""
+ other_vocab = self.env["spp.vocabulary"].create({"name": "Other", "namespace_uri": "urn:test:other"})
+ other_type = self.env["spp.vocabulary.code"].create(
+ {"vocabulary_id": other_vocab.id, "code": "OTH", "display": "Other ID"}
+ )
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Person A", reg_ids=[(other_type, "SAME")])
+ ind2 = self._create_individual("Person B", reg_ids=[(other_type, "SAME")])
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ result = self.id_dedup.deduplicate_beneficiaries(states)
+ self.assertEqual(result, 0)
+
+ def test_supported_id_type_field_uses_vocabulary_code(self):
+ """Verify that supported_id_document_type_ids points to spp.vocabulary.code."""
+ field = self.id_dedup._fields["supported_id_document_type_ids"]
+ self.assertEqual(field.comodel_name, "spp.vocabulary.code")
+
+
+class TestPhoneNumberDeduplication(TestDeduplicationCommon):
+ """Tests for phone number deduplication."""
+
+ def test_group_duplicate_phone_numbers(self):
+ """Detect duplicate phone numbers across groups."""
+ ind1 = self._create_individual("Person A", phone_numbers=["+1234567890"])
+ ind2 = self._create_individual("Person B", phone_numbers=["+1234567890"])
+ group1 = self._create_group("Group A", [ind1])
+ group2 = self._create_group("Group B", [ind2])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ self.program.target_type = "group"
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ count = self.phone_dedup.deduplicate_beneficiaries(states)
+ self.assertGreater(count, 0)
+
+ def test_group_phone_reason_includes_phone(self):
+ """Phone duplicate reason includes the phone number."""
+ ind1 = self._create_individual("Alice", phone_numbers=["+9876543210"])
+ ind2 = self._create_individual("Bob", phone_numbers=["+9876543210"])
+ group1 = self._create_group("Group A", [ind1])
+ group2 = self._create_group("Group B", [ind2])
+ self._enroll_in_program(group1, "enrolled")
+ self._enroll_in_program(group2, "enrolled")
+
+ self.program.target_type = "group"
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ self.phone_dedup.deduplicate_beneficiaries(states)
+
+ dup_record = self.env["spp.program.membership.duplicate"].search(
+ [("deduplication_manager_id", "=", self.phone_dedup.id)], limit=1
+ )
+ self.assertTrue(dup_record)
+ self.assertIn("Duplicate phone number", dup_record.reason)
+
+ def test_individual_duplicate_phone_numbers(self):
+ """Detect duplicate phone numbers for individual target type."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Person A", phone_numbers=["+1111111111"])
+ ind2 = self._create_individual("Person B", phone_numbers=["+1111111111"])
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ result = self.phone_dedup.deduplicate_beneficiaries(states)
+ self.assertGreater(result, 0)
+
+ def test_no_duplicate_different_phones(self):
+ """No duplicates when phone numbers differ."""
+ self.program.target_type = "individual"
+ ind1 = self._create_individual("Person A", phone_numbers=["+1111111111"])
+ ind2 = self._create_individual("Person B", phone_numbers=["+2222222222"])
+ self._enroll_in_program(ind1, "enrolled")
+ self._enroll_in_program(ind2, "enrolled")
+
+ states = ["draft", "enrolled", "eligible", "paused", "duplicated"]
+ result = self.phone_dedup.deduplicate_beneficiaries(states)
+ self.assertEqual(result, 0)
+
+
+class TestRecordDuplicate(TestDeduplicationCommon):
+ """Tests for the _record_duplicate method."""
+
+ def test_record_creates_duplicate_entry(self):
+ """_record_duplicate creates a duplicate record."""
+ group = self._create_group("Test Group")
+ membership = self._enroll_in_program(group)
+
+ self.dedup_default._record_duplicate(self.dedup_default, [membership.id], "Test reason")
+
+ dup = self.env["spp.program.membership.duplicate"].search(
+ [
+ ("beneficiary_ids", "in", membership.id),
+ ("reason", "=", "Test reason"),
+ ]
+ )
+ self.assertTrue(dup)
+ self.assertEqual(dup.state, "duplicate")
+
+ def test_no_duplicate_entry_for_same_reason(self):
+ """No duplicate entry created if one already exists for same reason."""
+ group = self._create_group("Test Group")
+ membership = self._enroll_in_program(group)
+
+ self.dedup_default._record_duplicate(self.dedup_default, [membership.id], "Same reason")
+ self.dedup_default._record_duplicate(self.dedup_default, [membership.id], "Same reason")
+
+ dups = self.env["spp.program.membership.duplicate"].search(
+ [
+ ("beneficiary_ids", "in", membership.id),
+ ("reason", "=", "Same reason"),
+ ]
+ )
+ self.assertEqual(len(dups), 1)
+
+
+class TestDeduplicationManagerSelection(TestDeduplicationCommon):
+ """Tests for deduplication manager selection/registration."""
+
+ def test_selection_includes_default(self):
+ selection = self.env["spp.deduplication.manager"]._selection_manager_ref_id()
+ names = [s[0] for s in selection]
+ self.assertIn("spp.deduplication.manager.default", names)
+
+ def test_selection_includes_phone(self):
+ selection = self.env["spp.deduplication.manager"]._selection_manager_ref_id()
+ names = [s[0] for s in selection]
+ self.assertIn("spp.deduplication.manager.phone_number", names)
+
+ def test_selection_includes_id_dedup(self):
+ selection = self.env["spp.deduplication.manager"]._selection_manager_ref_id()
+ names = [s[0] for s in selection]
+ self.assertIn("spp.deduplication.manager.id_dedup", names)
+
+ def test_eligibility_selection_includes_id_and_phone(self):
+ selection = self.env["spp.eligibility.manager"]._selection_manager_ref_id()
+ names = [s[0] for s in selection]
+ self.assertIn("spp.program.membership.manager.id_dedup", names)
+ self.assertIn("spp.program.membership.manager.phone_number", names)
diff --git a/spp_programs/tests/test_program_enrollment.py b/spp_programs/tests/test_program_enrollment.py
new file mode 100644
index 00000000..b45a4fc5
--- /dev/null
+++ b/spp_programs/tests/test_program_enrollment.py
@@ -0,0 +1,366 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+import uuid
+
+from odoo.exceptions import UserError
+from odoo.tests import TransactionCase
+
+
+class TestProgramEnrollment(TransactionCase):
+ """Tests for program-level enrollment and deduplication workflows."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.program = cls.env["spp.program"].create(
+ {"name": f"Enrollment Test {uuid.uuid4().hex[:8]}", "target_type": "group"}
+ )
+
+ # Create program manager
+ cls.pm_default = cls.env["spp.program.manager.default"].create(
+ {"name": "Default PM", "program_id": cls.program.id}
+ )
+ pm = cls.env["spp.program.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{cls.pm_default._name},{cls.pm_default.id}",
+ }
+ )
+ cls.program.write({"program_manager_ids": [(4, pm.id)]})
+
+ # Create eligibility manager
+ cls.em_default = cls.env["spp.program.membership.manager.default"].create(
+ {"name": "Default EM", "program_id": cls.program.id}
+ )
+ em = cls.env["spp.eligibility.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{cls.em_default._name},{cls.em_default.id}",
+ }
+ )
+ cls.program.write({"eligibility_manager_ids": [(4, em.id)]})
+
+ # Create deduplication manager
+ cls.dedup_default = cls.env["spp.deduplication.manager.default"].create(
+ {"name": "Default Dedup", "program_id": cls.program.id}
+ )
+ dm = cls.env["spp.deduplication.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{cls.dedup_default._name},{cls.dedup_default.id}",
+ }
+ )
+ cls.program.write({"deduplication_manager_ids": [(4, dm.id)]})
+
+ def _create_group(self, name):
+ return self.env["res.partner"].create({"name": name, "is_registrant": True, "is_group": True})
+
+ def _enroll(self, partner, state="draft"):
+ return self.env["spp.program.membership"].create(
+ {"partner_id": partner.id, "program_id": self.program.id, "state": state}
+ )
+
+ def test_enroll_eligible_registrants_no_beneficiaries_raises(self):
+ """Enrollment raises error when no beneficiaries exist."""
+ program = self.env["spp.program"].create({"name": f"Empty {uuid.uuid4().hex[:8]}"})
+ with self.assertRaises(UserError):
+ program.enroll_eligible_registrants()
+
+ def test_enroll_eligible_registrants_no_manager_raises(self):
+ """Enrollment raises error when no program manager exists."""
+ program = self.env["spp.program"].create({"name": f"No PM {uuid.uuid4().hex[:8]}"})
+ group = self._create_group("Group")
+ self.env["spp.program.membership"].create({"partner_id": group.id, "program_id": program.id})
+ with self.assertRaises(UserError):
+ program.enroll_eligible_registrants()
+
+ def test_enrollment_skips_duplicated(self):
+ """Enrollment does not change duplicated state to enrolled."""
+ group = self._create_group("Duplicated Group")
+ membership = self._enroll(group, "duplicated")
+
+ # Run enrollment
+ self.pm_default._enroll_eligible_registrants(["duplicated"])
+
+ membership.invalidate_recordset()
+ self.assertEqual(membership.state, "duplicated")
+
+ def test_enrollment_skips_exited(self):
+ """Enrollment does not change exited state to enrolled."""
+ group = self._create_group("Exited Group")
+ membership = self._enroll(group, "exited")
+
+ self.pm_default._enroll_eligible_registrants(["exited"])
+
+ membership.invalidate_recordset()
+ self.assertEqual(membership.state, "exited")
+
+ def test_enrollment_enrolls_draft(self):
+ """Enrollment changes draft state to enrolled."""
+ group = self._create_group("Draft Group")
+ membership = self._enroll(group, "draft")
+
+ self.pm_default._enroll_eligible_registrants(["draft"], do_count=True)
+
+ membership.invalidate_recordset()
+ self.assertEqual(membership.state, "enrolled")
+
+ def test_disenrollment_skips_duplicated(self):
+ """Disenrollment does not change duplicated to not_eligible."""
+ group = self._create_group("Dup Group")
+ membership = self._enroll(group, "duplicated")
+
+ self.pm_default._enroll_eligible_registrants(["duplicated"])
+
+ membership.invalidate_recordset()
+ # Should NOT be changed to not_eligible
+ self.assertNotEqual(membership.state, "not_eligible")
+
+ def test_program_deduplicate_no_duplicates(self):
+ """Program-level deduplication shows no duplicates message."""
+ group = self._create_group("Solo Group")
+ self._enroll(group, "enrolled")
+
+ result = self.program.deduplicate_beneficiaries()
+ self.assertEqual(result["params"]["type"], "success")
+ self.assertIn("No duplicates", result["params"]["message"])
+
+ def test_program_deduplicate_with_duplicates(self):
+ """Program-level deduplication shows warning with counts."""
+ ind = self.env["res.partner"].create({"name": "Shared", "is_registrant": True, "is_group": False})
+ group1 = self._create_group("Group 1")
+ group2 = self._create_group("Group 2")
+ self.env["spp.group.membership"].create({"group": group1.id, "individual": ind.id})
+ self.env["spp.group.membership"].create({"group": group2.id, "individual": ind.id})
+ self._enroll(group1, "enrolled")
+ self._enroll(group2, "enrolled")
+
+ result = self.program.deduplicate_beneficiaries()
+ self.assertEqual(result["params"]["type"], "warning")
+
+ def test_program_deduplicate_shows_new_vs_existing(self):
+ """Deduplication message distinguishes new from existing duplicates."""
+ ind = self.env["res.partner"].create({"name": "Shared", "is_registrant": True, "is_group": False})
+ group1 = self._create_group("Group A")
+ group2 = self._create_group("Group B")
+ self.env["spp.group.membership"].create({"group": group1.id, "individual": ind.id})
+ self.env["spp.group.membership"].create({"group": group2.id, "individual": ind.id})
+ m1 = self._enroll(group1, "enrolled")
+ m2 = self._enroll(group2, "enrolled")
+
+ # First run
+ self.program.deduplicate_beneficiaries()
+
+ # Reset states to run again
+ m1.write({"state": "enrolled"})
+ m2.write({"state": "enrolled"})
+
+ # Second run - should show already flagged
+ result = self.program.deduplicate_beneficiaries()
+ self.assertEqual(result["params"]["type"], "warning")
+
+ def test_program_deduplicate_no_manager_raises(self):
+ """Deduplication raises error when no manager defined."""
+ program = self.env["spp.program"].create({"name": f"No Dedup {uuid.uuid4().hex[:8]}"})
+ with self.assertRaises(UserError):
+ program.deduplicate_beneficiaries()
+
+ def test_verify_eligibility_no_manager_raises(self):
+ """verify_eligibility raises error when no program manager."""
+ program = self.env["spp.program"].create({"name": f"No PM V {uuid.uuid4().hex[:8]}"})
+ with self.assertRaises(UserError):
+ program.verify_eligibility()
+
+
+class TestProgramManagerDefault(TransactionCase):
+ """Tests for spp.program.manager.default."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.program = cls.env["spp.program"].create({"name": f"PM Test {uuid.uuid4().hex[:8]}", "target_type": "group"})
+ cls.pm = cls.env["spp.program.manager.default"].create({"name": "Default PM", "program_id": cls.program.id})
+ pm_manager = cls.env["spp.program.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{cls.pm._name},{cls.pm.id}",
+ }
+ )
+ cls.program.write({"program_manager_ids": [(4, pm_manager.id)]})
+
+ # Eligibility manager
+ em_default = cls.env["spp.program.membership.manager.default"].create(
+ {"name": "Default EM", "program_id": cls.program.id}
+ )
+ em = cls.env["spp.eligibility.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{em_default._name},{em_default.id}",
+ }
+ )
+ cls.program.write({"eligibility_manager_ids": [(4, em.id)]})
+
+ def test_enroll_no_eligibility_manager_raises(self):
+ """Enrollment raises error when no eligibility manager."""
+ program = self.env["spp.program"].create({"name": f"No EM {uuid.uuid4().hex[:8]}"})
+ pm = self.env["spp.program.manager.default"].create({"name": "PM", "program_id": program.id})
+ with self.assertRaises(UserError):
+ pm.enroll_eligible_registrants()
+
+ def test_selection_includes_default(self):
+ """Program manager selection includes default."""
+ selection = self.env["spp.program.manager"]._selection_manager_ref_id()
+ names = [s[0] for s in selection]
+ self.assertIn("spp.program.manager.default", names)
+
+
+class TestProgramModel(TransactionCase):
+ """Tests for spp.program model methods."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.program = cls.env["spp.program"].create(
+ {"name": f"Program Model {uuid.uuid4().hex[:8]}", "target_type": "group"}
+ )
+
+ def test_unique_program_name(self):
+ """Program names must be unique."""
+ with self.assertRaises(UserError):
+ self.env["spp.program"].create({"name": self.program.name})
+
+ def test_compute_has_members(self):
+ """has_members is True when memberships exist."""
+ self.assertFalse(self.program.has_members)
+ group = self.env["res.partner"].create({"name": "Group", "is_registrant": True, "is_group": True})
+ self.env["spp.program.membership"].create({"partner_id": group.id, "program_id": self.program.id})
+ self.program.invalidate_recordset()
+ self.assertTrue(self.program.has_members)
+
+ def test_count_beneficiaries(self):
+ """count_beneficiaries returns correct count."""
+ result = self.program.count_beneficiaries()
+ self.assertEqual(result["value"], 0)
+
+ def test_count_beneficiaries_by_state(self):
+ """count_beneficiaries filters by state."""
+ group = self.env["res.partner"].create({"name": "G", "is_registrant": True, "is_group": True})
+ self.env["spp.program.membership"].create(
+ {
+ "partner_id": group.id,
+ "program_id": self.program.id,
+ "state": "enrolled",
+ }
+ )
+ result = self.program.count_beneficiaries(["enrolled"])
+ self.assertEqual(result["value"], 1)
+ result_draft = self.program.count_beneficiaries(["draft"])
+ self.assertEqual(result_draft["value"], 0)
+
+ def test_get_beneficiaries(self):
+ """get_beneficiaries returns correct recordset."""
+ group = self.env["res.partner"].create({"name": "G", "is_registrant": True, "is_group": True})
+ self.env["spp.program.membership"].create(
+ {
+ "partner_id": group.id,
+ "program_id": self.program.id,
+ "state": "enrolled",
+ }
+ )
+ result = self.program.get_beneficiaries("enrolled")
+ self.assertEqual(len(result), 1)
+
+ def test_get_beneficiaries_count(self):
+ """get_beneficiaries with count=True returns integer."""
+ result = self.program.get_beneficiaries(count=True)
+ self.assertIsInstance(result, int)
+
+ def test_get_beneficiaries_cursor_pagination(self):
+ """get_beneficiaries supports cursor-based pagination."""
+ result = self.program.get_beneficiaries(last_id=0, limit=10)
+ self.assertEqual(len(result), 0)
+
+ def test_open_eligible_beneficiaries_form(self):
+ """open_eligible_beneficiaries_form returns action."""
+ result = self.program.open_eligible_beneficiaries_form()
+ self.assertEqual(result["type"], "ir.actions.act_window")
+
+ def test_open_duplicate_membership_form(self):
+ """open_duplicate_membership_form returns action."""
+ result = self.program.open_duplicate_membership_form()
+ self.assertIn("duplicated", str(result["domain"]))
+
+ def test_open_cycles_form(self):
+ """open_cycles_form returns action."""
+ result = self.program.open_cycles_form()
+ self.assertEqual(result["type"], "ir.actions.act_window")
+
+ def test_open_program_form(self):
+ """open_program_form returns action."""
+ result = self.program.open_program_form()
+ self.assertEqual(result["res_id"], self.program.id)
+
+ def test_refresh_page(self):
+ """refresh_page returns reload action."""
+ result = self.program.refresh_page()
+ self.assertEqual(result["tag"], "reload")
+
+ def test_end_program(self):
+ """end_program changes state to ended."""
+ self.program.with_context(active_ids=[self.program.id]).end_program()
+ self.assertEqual(self.program.state, "ended")
+
+ def test_end_program_already_ended(self):
+ """end_program shows error for already ended program."""
+ self.program.write({"state": "ended"})
+ result = self.program.with_context(active_ids=[self.program.id]).end_program()
+ self.assertEqual(result["params"]["type"], "danger")
+
+ def test_reactivate_program(self):
+ """reactivate_program changes state back to active."""
+ self.program.write({"state": "ended"})
+ self.program.with_context(active_ids=[self.program.id]).reactivate_program()
+ self.assertEqual(self.program.state, "active")
+
+ def test_reactivate_active_program(self):
+ """reactivate_program shows error for active program."""
+ result = self.program.with_context(active_ids=[self.program.id]).reactivate_program()
+ self.assertEqual(result["params"]["type"], "danger")
+
+ def test_check_managers_limit(self):
+ """Cannot have more than one program manager."""
+ pm1 = self.env["spp.program.manager.default"].create({"name": "PM1", "program_id": self.program.id})
+ pm2 = self.env["spp.program.manager.default"].create({"name": "PM2", "program_id": self.program.id})
+ mgr1 = self.env["spp.program.manager"].create(
+ {
+ "program_id": self.program.id,
+ "manager_ref_id": f"{pm1._name},{pm1.id}",
+ }
+ )
+ mgr2 = self.env["spp.program.manager"].create(
+ {
+ "program_id": self.program.id,
+ "manager_ref_id": f"{pm2._name},{pm2.id}",
+ }
+ )
+ with self.assertRaises(UserError):
+ self.program.write({"program_manager_ids": [(6, 0, [mgr1.id, mgr2.id])]})
+
+ def test_unlink_program(self):
+ """Program can be deleted with its managers."""
+ program = self.env["spp.program"].create({"name": f"Delete Me {uuid.uuid4().hex[:8]}"})
+ program.unlink()
+ self.assertFalse(program.exists())
+
+ def test_compute_has_compliance_criteria(self):
+ """has_compliance_criteria reflects manager presence."""
+ self.assertFalse(self.program.has_compliance_criteria)
+
+ def test_get_manager_unsupported_raises(self):
+ """get_manager raises for unsupported manager type."""
+ with self.assertRaises(NotImplementedError):
+ self.program.get_manager("unsupported_type")
+
+ def test_get_managers_unsupported_raises(self):
+ """get_managers raises for unsupported manager type."""
+ with self.assertRaises(NotImplementedError):
+ self.program.get_managers("unsupported_type")
diff --git a/spp_programs/tests/test_program_membership.py b/spp_programs/tests/test_program_membership.py
new file mode 100644
index 00000000..5415cae4
--- /dev/null
+++ b/spp_programs/tests/test_program_membership.py
@@ -0,0 +1,231 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+import uuid
+
+from odoo import fields
+from odoo.exceptions import UserError, ValidationError
+from odoo.tests import TransactionCase
+
+
+class TestProgramMembership(TransactionCase):
+ """Tests for spp.program.membership model."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.program = cls.env["spp.program"].create(
+ {"name": f"Membership Test {uuid.uuid4().hex[:8]}", "target_type": "group"}
+ )
+
+ # Create eligibility manager so enrollment works
+ manager_default = cls.env["spp.program.membership.manager.default"].create(
+ {"name": "Default", "program_id": cls.program.id}
+ )
+ eligibility_manager = cls.env["spp.eligibility.manager"].create(
+ {
+ "program_id": cls.program.id,
+ "manager_ref_id": f"{manager_default._name},{manager_default.id}",
+ }
+ )
+ cls.program.write({"eligibility_manager_ids": [(4, eligibility_manager.id)]})
+
+ cls.group = cls.env["res.partner"].create({"name": "Test Group", "is_registrant": True, "is_group": True})
+
+ def _create_membership(self, partner=None, state="draft"):
+ partner = partner or self.group
+ return self.env["spp.program.membership"].create(
+ {
+ "partner_id": partner.id,
+ "program_id": self.program.id,
+ "state": state,
+ }
+ )
+
+ def test_unique_partner_per_program(self):
+ """Cannot have duplicate memberships for same partner+program."""
+ self._create_membership()
+ with self.assertRaises(ValidationError):
+ self._create_membership()
+
+ def test_enrollment_date_set_on_enrolled(self):
+ """Enrollment date is set when state becomes enrolled."""
+ membership = self._create_membership(state="enrolled")
+ self.assertTrue(membership.enrollment_date)
+
+ def test_duplicate_reason_computed(self):
+ """Duplicate reason shows the reason from the duplicate record."""
+ membership = self._create_membership(state="duplicated")
+ self.env["spp.program.membership.duplicate"].create(
+ {
+ "beneficiary_ids": [(6, 0, [membership.id])],
+ "state": "duplicate",
+ "reason": "Test duplicate reason",
+ "deduplication_manager_id": 1,
+ }
+ )
+ membership.invalidate_recordset()
+ self.assertEqual(membership.duplicate_reason, "Test duplicate reason")
+
+ def test_duplicate_reason_empty_when_not_duplicated(self):
+ """Duplicate reason is False when state is not duplicated."""
+ membership = self._create_membership(state="enrolled")
+ self.assertFalse(membership.duplicate_reason)
+
+ def test_enroll_prevents_duplicated_state(self):
+ """Cannot enroll a beneficiary that is in duplicated state."""
+ membership = self._create_membership(state="duplicated")
+ result = membership.enroll_eligible_registrants()
+ self.assertEqual(result["params"]["type"], "warning")
+ self.assertIn("duplicated", result["params"]["message"].lower())
+ membership.invalidate_recordset()
+ self.assertEqual(membership.state, "duplicated")
+
+ def test_enroll_prevents_exited_state(self):
+ """Cannot enroll a beneficiary that is in exited state."""
+ membership = self._create_membership(state="exited")
+ result = membership.enroll_eligible_registrants()
+ self.assertEqual(result["params"]["type"], "warning")
+ membership.invalidate_recordset()
+ self.assertEqual(membership.state, "exited")
+
+ def test_enroll_already_enrolled_shows_info(self):
+ """Enrolling an already enrolled beneficiary shows info message."""
+ membership = self._create_membership(state="enrolled")
+ result = membership.enroll_eligible_registrants()
+ self.assertEqual(result["params"]["type"], "info")
+
+ def test_verify_eligibility_marks_not_eligible(self):
+ """verify_eligibility marks as not_eligible when no match."""
+ # Create individual (won't match group target_type eligibility)
+ individual = self.env["res.partner"].create(
+ {
+ "name": "Individual Wrong Type",
+ "is_registrant": True,
+ "is_group": False,
+ }
+ )
+ ind_membership = self.env["spp.program.membership"].create(
+ {
+ "partner_id": individual.id,
+ "program_id": self.program.id,
+ }
+ )
+ ind_membership.verify_eligibility()
+ self.assertEqual(ind_membership.state, "not_eligible")
+
+ def test_back_to_draft(self):
+ """Back to draft resets state."""
+ membership = self._create_membership(state="not_eligible")
+ membership.back_to_draft()
+ self.assertEqual(membership.state, "draft")
+
+ def test_action_pause(self):
+ """Pausing an enrolled membership."""
+ membership = self._create_membership(state="enrolled")
+ membership.action_pause()
+ self.assertEqual(membership.state, "paused")
+
+ def test_action_pause_non_enrolled_raises(self):
+ """Cannot pause a non-enrolled membership."""
+ membership = self._create_membership(state="draft")
+ with self.assertRaises(UserError):
+ membership.action_pause()
+
+ def test_action_resume(self):
+ """Resuming a paused membership."""
+ membership = self._create_membership(state="paused")
+ membership.action_resume()
+ self.assertEqual(membership.state, "enrolled")
+
+ def test_action_resume_non_paused_raises(self):
+ """Cannot resume a non-paused membership."""
+ membership = self._create_membership(state="enrolled")
+ with self.assertRaises(UserError):
+ membership.action_resume()
+
+ def test_action_exit(self):
+ """Exiting an enrolled membership."""
+ membership = self._create_membership(state="enrolled")
+ membership.action_exit()
+ self.assertEqual(membership.state, "exited")
+ self.assertEqual(membership.exit_date, fields.Date.today())
+
+ def test_action_exit_non_enrolled_raises(self):
+ """Cannot exit a draft membership."""
+ membership = self._create_membership(state="draft")
+ with self.assertRaises(UserError):
+ membership.action_exit()
+
+ def test_open_beneficiaries_form(self):
+ """open_beneficiaries_form returns an action."""
+ membership = self._create_membership()
+ result = membership.open_beneficiaries_form()
+ self.assertEqual(result["res_model"], "spp.program.membership")
+ self.assertEqual(result["res_id"], membership.id)
+
+ def test_open_registrant_form_group(self):
+ """open_registrant_form returns an action for groups."""
+ membership = self._create_membership()
+ result = membership.open_registrant_form()
+ self.assertEqual(result["res_model"], "res.partner")
+ self.assertEqual(result["res_id"], self.group.id)
+
+ def test_open_registrant_form_individual(self):
+ """open_registrant_form returns an action for individuals."""
+ individual = self.env["res.partner"].create({"name": "Individual", "is_registrant": True, "is_group": False})
+ ind_program = self.env["spp.program"].create(
+ {
+ "name": f"Ind Program {uuid.uuid4().hex[:8]}",
+ "target_type": "individual",
+ }
+ )
+ membership = self.env["spp.program.membership"].create(
+ {"partner_id": individual.id, "program_id": ind_program.id}
+ )
+ result = membership.open_registrant_form()
+ self.assertEqual(result["res_model"], "res.partner")
+ self.assertEqual(result["res_id"], individual.id)
+
+ def test_display_name(self):
+ """display_name is set on membership."""
+ membership = self._create_membership()
+ self.assertTrue(membership.display_name)
+
+ def test_deduplicate_from_membership(self):
+ """Deduplication can be triggered from membership."""
+ membership = self._create_membership()
+
+ # Set up deduplication manager
+ dedup_default = self.env["spp.deduplication.manager.default"].create(
+ {"name": "Default Dedup", "program_id": self.program.id}
+ )
+ dedup_manager = self.env["spp.deduplication.manager"].create(
+ {
+ "program_id": self.program.id,
+ "manager_ref_id": f"{dedup_default._name},{dedup_default.id}",
+ }
+ )
+ self.program.write({"deduplication_manager_ids": [(4, dedup_manager.id)]})
+
+ result = membership.deduplicate_beneficiaries()
+ self.assertEqual(result["tag"], "display_notification")
+
+ def test_deduplicate_no_manager_raises(self):
+ """Deduplication raises error when no manager is set."""
+ program_no_dedup = self.env["spp.program"].create({"name": f"No Dedup {uuid.uuid4().hex[:8]}"})
+ membership = self.env["spp.program.membership"].create(
+ {"partner_id": self.group.id, "program_id": program_no_dedup.id}
+ )
+ with self.assertRaises(UserError):
+ membership.deduplicate_beneficiaries()
+
+ def test_get_view_form_group_context(self):
+ """_get_view applies group domain in context."""
+ Membership = self.env["spp.program.membership"].with_context(target_type="group")
+ arch, view = Membership._get_view(view_type="form")
+ self.assertIn("is_group", arch)
+
+ def test_get_view_form_individual_context(self):
+ """_get_view applies individual domain in context."""
+ Membership = self.env["spp.program.membership"].with_context(target_type="individual")
+ arch, view = Membership._get_view(view_type="form")
+ self.assertIn("is_group", arch)
diff --git a/spp_programs/views/managers/deduplication_manager_view.xml b/spp_programs/views/managers/deduplication_manager_view.xml
index 0e92cf6b..d1220a77 100644
--- a/spp_programs/views/managers/deduplication_manager_view.xml
+++ b/spp_programs/views/managers/deduplication_manager_view.xml
@@ -172,7 +172,8 @@ Part of OpenSPP. See LICENSE file for full copyright and licensing details.