Skip to content

Async CRM

AsyncCRMNamespace for the crm.lead model (leads and opportunities), accessed as client.crm on AsyncOdooClient.

crm

Async CRM lead/opportunity operations for Vodoo.

AsyncCRMNamespace

AsyncCRMNamespace(client: AsyncOdooClient)

Bases: _CRMAttrs, AsyncDomainNamespace

Async CRM leads/opportunities namespace.

Source code in src/vodoo/aio/_domain.py
def __init__(self, client: AsyncOdooClient) -> None:
    self._client = client

create async

create(name: str, *, partner_id: int | None = None, expected_revenue: float | None = None, stage_id: int | None = None, user_id: int | None = None, team_id: int | None = None, tag_ids: list[int] | None = None, lead_type: str = 'opportunity', **extra_fields: Any) -> int

Create a CRM lead or opportunity.

Source code in src/vodoo/aio/crm.py
async def create(
    self,
    name: str,
    *,
    partner_id: int | None = None,
    expected_revenue: float | None = None,
    stage_id: int | None = None,
    user_id: int | None = None,
    team_id: int | None = None,
    tag_ids: list[int] | None = None,
    lead_type: str = "opportunity",
    **extra_fields: Any,
) -> int:
    """Create a CRM lead or opportunity."""
    values: dict[str, Any] = {"name": name, "type": lead_type, **extra_fields}
    if partner_id is not None:
        values["partner_id"] = partner_id
    if expected_revenue is not None:
        values["expected_revenue"] = expected_revenue
    if stage_id is not None:
        values["stage_id"] = stage_id
    if user_id is not None:
        values["user_id"] = user_id
    if team_id is not None:
        values["team_id"] = team_id
    if tag_ids is not None:
        values["tag_ids"] = [(6, 0, tag_ids)]
    return await self._client.create(self._model, values)

stages async

stages(*, team_id: int | None = None) -> list[dict[str, Any]]

List CRM pipeline stages.

Source code in src/vodoo/aio/crm.py
async def stages(self, *, team_id: int | None = None) -> list[dict[str, Any]]:
    """List CRM pipeline stages."""
    domain: list[Any] = []
    if team_id is not None:
        domain.append(("team_id", "=", team_id))
    return await self._client.search_read(
        "crm.stage",
        domain=domain,
        fields=STAGE_FIELDS,
        order="sequence",
    )

pipeline async

pipeline(*, team: str | None = None, user: str | None = None) -> dict[str, Any]

Fetch pipeline data and return aggregated summary.

Source code in src/vodoo/aio/crm.py
async def pipeline(
    self,
    *,
    team: str | None = None,
    user: str | None = None,
) -> dict[str, Any]:
    """Fetch pipeline data and return aggregated summary."""
    domain: list[Any] = [("type", "=", "opportunity")]
    if team:
        domain.append(("team_id.name", "ilike", team))
    if user:
        domain.append(("user_id.name", "ilike", user))

    deals = await self._client.search_read(
        self._model,
        domain=domain,
        fields=_PIPELINE_FIELDS,
        limit=0,
    )

    stages = await self._client.search_read(
        "crm.stage",
        domain=[],
        fields=STAGE_FIELDS,
        order="sequence",
    )

    return build_pipeline_summary(deals, stages, team=team)