Skip to content

Async Base Operations

Async shared CRUD, messaging, and attachment helpers used by async domain namespaces.

base

Async base operations for Odoo models.

Provides async versions of I/O functions from :mod:vodoo.base. Pure display/formatting functions are re-exported unchanged.

configure_output

configure_output(*, console: Console | None = None, simple: bool = False, json_mode: bool = False, toon_mode: bool = False) -> None

Configure the output console and mode.

Called by the CLI layer. Library users may call this to customise display behaviour, or simply ignore it (sensible defaults apply).

Requires the cli extra (rich) when console is provided or simple is False and display functions are subsequently called.

PARAMETER DESCRIPTION
console

Rich Console instance to use for output.

TYPE: Console | None DEFAULT: None

simple

If True, display functions emit plain TSV instead of rich tables.

TYPE: bool DEFAULT: False

json_mode

If True, display functions emit JSON output.

TYPE: bool DEFAULT: False

toon_mode

If True, display functions emit TOON output.

TYPE: bool DEFAULT: False

Source code in src/vodoo/base.py
def configure_output(
    *,
    console: Console | None = None,
    simple: bool = False,
    json_mode: bool = False,
    toon_mode: bool = False,
) -> None:
    """Configure the output console and mode.

    Called by the CLI layer.  Library users may call this to customise
    display behaviour, or simply ignore it (sensible defaults apply).

    Requires the ``cli`` extra (``rich``) when *console* is provided or
    *simple* is ``False`` and display functions are subsequently called.

    Args:
        console: Rich Console instance to use for output.
        simple: If ``True``, display functions emit plain TSV instead of
            rich tables.
        json_mode: If ``True``, display functions emit JSON output.
        toon_mode: If ``True``, display functions emit TOON output.

    """
    global _output_console, _output_simple, _output_json, _output_toon  # noqa: PLW0603
    if console is not None:
        _output_console = console
    _output_simple = simple
    _output_json = json_mode
    _output_toon = toon_mode

display_attachments

display_attachments(attachments: list[dict[str, Any]]) -> None

Display attachments in a table, TSV, or JSON format.

PARAMETER DESCRIPTION
attachments

List of attachment dictionaries

TYPE: list[dict[str, Any]]

Source code in src/vodoo/base.py
def display_attachments(attachments: list[dict[str, Any]]) -> None:
    """Display attachments in a table, TSV, or JSON format.

    Args:
        attachments: List of attachment dictionaries

    """
    if is_structured_output():
        structured_print(attachments)
        return

    if _is_simple_output():
        print("id\tname\tsize_kb\tmimetype\tcreate_date")
        for att in attachments:
            size = att.get("file_size", 0)
            size_kb = f"{size / 1024:.1f}" if size else ""
            name = att.get("name", "")
            mime = att.get("mimetype", "")
            created = att.get("create_date", "")
            print(f"{att['id']}\t{name}\t{size_kb}\t{mime}\t{created}")
    else:
        from rich.table import Table

        console = _get_console()
        table = Table(title="Attachments")
        table.add_column("ID", style="cyan")
        table.add_column("Name", style="green")
        table.add_column("Size", style="yellow")
        table.add_column("Type", style="blue")
        table.add_column("Created", style="magenta")

        for att in attachments:
            size = att.get("file_size", 0)
            size_str = f"{size / 1024:.1f} KB" if size else "N/A"

            table.add_row(
                str(att["id"]),
                att.get("name", "N/A"),
                size_str,
                att.get("mimetype", "N/A"),
                str(att.get("create_date", "N/A")),
            )

        console.print(table)

display_messages

display_messages(messages: list[dict[str, Any]], show_html: bool = False) -> None

Display messages in a formatted list or simple format.

PARAMETER DESCRIPTION
messages

List of message dictionaries

TYPE: list[dict[str, Any]]

show_html

Whether to show raw HTML body

TYPE: bool DEFAULT: False

Source code in src/vodoo/base.py
def display_messages(messages: list[dict[str, Any]], show_html: bool = False) -> None:  # noqa: PLR0912
    """Display messages in a formatted list or simple format.

    Args:
        messages: List of message dictionaries
        show_html: Whether to show raw HTML body

    """
    from html import unescape
    from html.parser import HTMLParser

    class HTMLToText(HTMLParser):
        """Simple HTML to text converter."""

        def __init__(self) -> None:
            super().__init__()
            self.text: list[str] = []

        def handle_data(self, data: str) -> None:
            self.text.append(data)

        def get_text(self) -> str:
            return "".join(self.text).strip()

    def get_body_text(body: str) -> str:
        if show_html:
            return body
        parser = HTMLToText()
        parser.feed(unescape(body))
        return parser.get_text()

    if is_structured_output():
        structured_print(messages)
        return

    if not messages:
        print("No messages found") if _is_simple_output() else _get_console().print(
            "[yellow]No messages found[/yellow]"
        )
        return

    if _is_simple_output():
        # Simple format: date, author, type, body (one line per message)
        print("date\tauthor\ttype\tbody")
        for msg in messages:
            date = msg.get("date", "")
            author = msg.get("author_id")
            author_name = (
                author[1] if author and isinstance(author, list) else msg.get("email_from", "")
            )
            subtype = msg.get("subtype_id")
            if subtype and isinstance(subtype, list):
                subtype_name = subtype[1]
            else:
                subtype_name = msg.get("message_type", "")
            body = get_body_text(msg.get("body", "")).replace("\t", " ").replace("\n", " ")
            print(f"{date}\t{author_name}\t{subtype_name}\t{body}")
    else:
        console = _get_console()
        console.print(f"\n[bold cyan]Message History ({len(messages)} messages)[/bold cyan]\n")

        for i, msg in enumerate(messages, 1):
            date = msg.get("date", "N/A")
            author = msg.get("author_id")
            if author and isinstance(author, list):
                author_name = author[1]
            else:
                author_name = msg.get("email_from", "Unknown")

            message_type = msg.get("message_type", "comment")
            subtype = msg.get("subtype_id")
            subtype_name = subtype[1] if subtype and isinstance(subtype, list) else message_type

            console.print(f"[bold]Message #{i}[/bold] [dim]({date})[/dim]")
            console.print(f"[cyan]From:[/cyan] {author_name}")
            console.print(f"[cyan]Type:[/cyan] {subtype_name}")

            if msg.get("subject"):
                console.print(f"[cyan]Subject:[/cyan] {msg['subject']}")

            body = msg.get("body", "")
            if body:
                text = get_body_text(body)
                if text:
                    console.print(f"\n{text}\n")

            if i < len(messages):
                console.print("[dim]" + "─" * 80 + "[/dim]\n")

display_record_detail

display_record_detail(record: dict[str, Any], *, show_html: bool = False, record_type: str = 'Record') -> None

Display detailed record information.

PARAMETER DESCRIPTION
record

Record dictionary

TYPE: dict[str, Any]

show_html

If True, show raw HTML description, else convert to markdown

TYPE: bool DEFAULT: False

record_type

Human-readable record type (e.g., "Ticket", "Task")

TYPE: str DEFAULT: 'Record'

Source code in src/vodoo/base.py
def display_record_detail(  # noqa: PLR0912
    record: dict[str, Any],
    *,
    show_html: bool = False,
    record_type: str = "Record",
) -> None:
    """Display detailed record information.

    Args:
        record: Record dictionary
        show_html: If True, show raw HTML description, else convert to markdown
        record_type: Human-readable record type (e.g., "Ticket", "Task")

    """
    if is_structured_output():
        structured_print(record)
        return

    if _is_simple_output():
        # Simple key: value format
        print(f"id: {record['id']}")
        print(f"name: {record['name']}")
        if record.get("partner_id"):
            print(f"partner: {record['partner_id'][1]}")
        if record.get("stage_id"):
            print(f"stage: {record['stage_id'][1]}")
        if record.get("user_id"):
            print(f"assigned_to: {record['user_id'][1]}")
        if record.get("project_id"):
            print(f"project: {record['project_id'][1]}")
        if "priority" in record:
            print(f"priority: {record.get('priority', '0')}")
        if record.get("description"):
            desc = record["description"]
            if not show_html:
                desc = _html_to_markdown(desc)
            print(f"description: {desc}")
        if record.get("tag_ids"):
            print(f"tags: {','.join(map(str, record['tag_ids']))}")
    else:
        console = _get_console()
        console.print(f"\n[bold cyan]{record_type} #{record['id']}[/bold cyan]")
        console.print(f"[bold]Name:[/bold] {record['name']}")

        if record.get("partner_id"):
            console.print(f"[bold]Partner:[/bold] {record['partner_id'][1]}")

        if record.get("stage_id"):
            console.print(f"[bold]Stage:[/bold] {record['stage_id'][1]}")

        if record.get("user_id"):
            console.print(f"[bold]Assigned To:[/bold] {record['user_id'][1]}")

        if record.get("project_id"):
            console.print(f"[bold]Project:[/bold] {record['project_id'][1]}")

        if "priority" in record:
            console.print(f"[bold]Priority:[/bold] {record.get('priority', '0')}")

        if record.get("description"):
            description = record["description"]
            if show_html:
                console.print(f"\n[bold]Description:[/bold]\n{description}")
            else:
                markdown_text = _html_to_markdown(description)
                console.print(f"\n[bold]Description:[/bold]\n{markdown_text}")

        if record.get("tag_ids"):
            console.print(f"\n[bold]Tags:[/bold] {', '.join(map(str, record['tag_ids']))}")

display_records

display_records(records: list[dict[str, Any]], title: str = 'Records') -> None

Display records in a table, TSV, or JSON format.

PARAMETER DESCRIPTION
records

List of record dictionaries

TYPE: list[dict[str, Any]]

title

Table title

TYPE: str DEFAULT: 'Records'

Source code in src/vodoo/base.py
def display_records(records: list[dict[str, Any]], title: str = "Records") -> None:
    """Display records in a table, TSV, or JSON format.

    Args:
        records: List of record dictionaries
        title: Table title

    """
    if is_structured_output():
        structured_print(records)
        return

    if not records:
        if _is_simple_output():
            print("No records found")
        else:
            _get_console().print("[yellow]No records found[/yellow]")
        return

    field_names = list(records[0].keys())

    if _is_simple_output():
        # Simple TSV output for LLMs
        print("\t".join(field_names))
        for record in records:
            row = [_format_field_value(record.get(f)) for f in field_names]
            print("\t".join(row))
    else:
        # Rich table output
        from rich.table import Table

        console = _get_console()
        table = Table(title=title)

        field_styles = {
            "id": "cyan",
            "name": "green",
            "partner_id": "yellow",
            "stage_id": "blue",
            "user_id": "magenta",
            "priority": "red",
            "project_id": "blue",
        }

        for field_name in field_names:
            style = field_styles.get(field_name, "white")
            table.add_column(field_name, style=style)

        for record in records:
            row_values = [_format_field_value(record.get(f)) or "N/A" for f in field_names]
            table.add_row(*row_values)

        console.print(table)

display_tags

display_tags(tags: list[dict[str, Any]], title: str = 'Tags') -> None

Display tags in a table, TSV, or JSON format.

PARAMETER DESCRIPTION
tags

List of tag dictionaries

TYPE: list[dict[str, Any]]

title

Table title

TYPE: str DEFAULT: 'Tags'

Source code in src/vodoo/base.py
def display_tags(tags: list[dict[str, Any]], title: str = "Tags") -> None:
    """Display tags in a table, TSV, or JSON format.

    Args:
        tags: List of tag dictionaries
        title: Table title

    """
    if is_structured_output():
        structured_print(tags)
        return

    if _is_simple_output():
        print("id\tname\tcolor")
        for tag in tags:
            print(f"{tag['id']}\t{tag['name']}\t{tag.get('color', '')}")
    else:
        from rich.table import Table

        console = _get_console()
        table = Table(title=title)
        table.add_column("ID", style="cyan")
        table.add_column("Name", style="green")
        table.add_column("Color", style="yellow")

        for tag in tags:
            table.add_row(
                str(tag["id"]),
                tag["name"],
                str(tag.get("color", "N/A")),
            )

        console.print(table)

get_record_url

get_record_url(client: OdooClient | Any, model: str, record_id: int) -> str

Get the web URL for a record.

Works with both sync OdooClient and async AsyncOdooClient — only client.config.url is accessed.

PARAMETER DESCRIPTION
client

Odoo client (sync or async)

TYPE: OdooClient | Any

model

Model name

TYPE: str

record_id

Record ID

TYPE: int

RETURNS DESCRIPTION
str

URL to view the record in Odoo web interface

Examples:

>>> get_record_url(client, "helpdesk.ticket", 42)
'https://odoo.example.com/web#id=42&model=helpdesk.ticket&view_type=form'
Source code in src/vodoo/base.py
def get_record_url(client: OdooClient | Any, model: str, record_id: int) -> str:
    """Get the web URL for a record.

    Works with both sync ``OdooClient`` and async ``AsyncOdooClient`` —
    only ``client.config.url`` is accessed.

    Args:
        client: Odoo client (sync or async)
        model: Model name
        record_id: Record ID

    Returns:
        URL to view the record in Odoo web interface

    Examples:
        >>> get_record_url(client, "helpdesk.ticket", 42)
        'https://odoo.example.com/web#id=42&model=helpdesk.ticket&view_type=form'

    """
    base_url = client.config.url.rstrip("/")
    return f"{base_url}/web#id={record_id}&model={model}&view_type=form"

list_records async

list_records(client: AsyncOdooClient, model: str, domain: list[Any] | None = None, limit: int | None = 50, fields: list[str] | None = None, order: str = 'create_date desc') -> list[dict[str, Any]]

List records from a model.

PARAMETER DESCRIPTION
client

Async Odoo client

TYPE: AsyncOdooClient

model

Model name

TYPE: str

domain

Search domain filters

TYPE: list[Any] | None DEFAULT: None

limit

Maximum number of records

TYPE: int | None DEFAULT: 50

fields

List of fields to fetch

TYPE: list[str] | None DEFAULT: None

order

Sort order

TYPE: str DEFAULT: 'create_date desc'

RETURNS DESCRIPTION
list[dict[str, Any]]

List of record dictionaries

Source code in src/vodoo/aio/base.py
async def list_records(
    client: AsyncOdooClient,
    model: str,
    domain: list[Any] | None = None,
    limit: int | None = 50,
    fields: list[str] | None = None,
    order: str = "create_date desc",
) -> list[dict[str, Any]]:
    """List records from a model.

    Args:
        client: Async Odoo client
        model: Model name
        domain: Search domain filters
        limit: Maximum number of records
        fields: List of fields to fetch
        order: Sort order

    Returns:
        List of record dictionaries
    """
    return await client.search_read(
        model,
        domain=domain,
        fields=fields,
        limit=limit,
        order=order,
    )

get_record async

get_record(client: AsyncOdooClient, model: str, record_id: int, fields: list[str] | None = None) -> dict[str, Any]

Get detailed record information.

PARAMETER DESCRIPTION
client

Async Odoo client

TYPE: AsyncOdooClient

model

Model name

TYPE: str

record_id

Record ID

TYPE: int

fields

List of field names to read

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

Record dictionary

RAISES DESCRIPTION
RecordNotFoundError

If record not found

Source code in src/vodoo/aio/base.py
async def get_record(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    fields: list[str] | None = None,
) -> dict[str, Any]:
    """Get detailed record information.

    Args:
        client: Async Odoo client
        model: Model name
        record_id: Record ID
        fields: List of field names to read

    Returns:
        Record dictionary

    Raises:
        RecordNotFoundError: If record not found
    """
    records = await client.read(model, [record_id], fields=fields)
    if not records:
        raise RecordNotFoundError(model, record_id)
    return records[0]

list_fields async

list_fields(client: AsyncOdooClient, model: str) -> dict[str, Any]

Get all available fields for a model.

Source code in src/vodoo/aio/base.py
async def list_fields(client: AsyncOdooClient, model: str) -> dict[str, Any]:
    """Get all available fields for a model."""
    return await client.fields_get(model)

set_record_fields async

set_record_fields(client: AsyncOdooClient, model: str, record_id: int, values: dict[str, Any]) -> bool

Update fields on a record.

Source code in src/vodoo/aio/base.py
async def set_record_fields(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    values: dict[str, Any],
) -> bool:
    """Update fields on a record."""
    return await client.write(model, [record_id], values)

add_comment async

add_comment(client: AsyncOdooClient, model: str, record_id: int, message: str, user_id: int | None = None, markdown: bool = True) -> bool

Add a comment to a record (visible to customers).

Source code in src/vodoo/aio/base.py
async def add_comment(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    message: str,
    user_id: int | None = None,
    markdown: bool = True,
) -> bool:
    """Add a comment to a record (visible to customers)."""
    body = _convert_to_html(message, markdown)
    return await message_post_sudo(
        client,
        model,
        record_id,
        body,
        user_id=user_id,
        is_note=False,
    )

add_note async

add_note(client: AsyncOdooClient, model: str, record_id: int, message: str, user_id: int | None = None, markdown: bool = True) -> bool

Add an internal note to a record (not visible to customers).

Source code in src/vodoo/aio/base.py
async def add_note(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    message: str,
    user_id: int | None = None,
    markdown: bool = True,
) -> bool:
    """Add an internal note to a record (not visible to customers)."""
    body = _convert_to_html(message, markdown)
    return await message_post_sudo(
        client,
        model,
        record_id,
        body,
        user_id=user_id,
        is_note=True,
    )

list_tags async

list_tags(client: AsyncOdooClient, model: str) -> list[dict[str, Any]]

List available tags for a model.

Source code in src/vodoo/aio/base.py
async def list_tags(client: AsyncOdooClient, model: str) -> list[dict[str, Any]]:
    """List available tags for a model."""
    fields = _TAG_FIELDS
    return await client.search_read(model, fields=fields, order="name")

add_tag_to_record async

add_tag_to_record(client: AsyncOdooClient, model: str, record_id: int, tag_id: int) -> bool

Add a tag to a record.

Source code in src/vodoo/aio/base.py
async def add_tag_to_record(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    tag_id: int,
) -> bool:
    """Add a tag to a record."""
    record = await get_record(client, model, record_id, fields=["tag_ids"])
    current_tags = record.get("tag_ids", [])
    if tag_id not in current_tags:
        current_tags.append(tag_id)
        return await client.write(
            model,
            [record_id],
            {"tag_ids": [(6, 0, current_tags)]},
        )
    return True

list_messages async

list_messages(client: AsyncOdooClient, model: str, record_id: int, limit: int | None = None) -> list[dict[str, Any]]

List messages/chatter for a record.

Source code in src/vodoo/aio/base.py
async def list_messages(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    limit: int | None = None,
) -> list[dict[str, Any]]:
    """List messages/chatter for a record."""
    domain: list[Any] = [
        ("model", "=", model),
        ("res_id", "=", record_id),
    ]
    fields = _MESSAGE_FIELDS
    return await client.search_read(
        "mail.message",
        domain=domain,
        fields=fields,
        order="date desc",
        limit=limit,
    )

list_attachments async

list_attachments(client: AsyncOdooClient, model: str, record_id: int) -> list[dict[str, Any]]

List attachments for a record.

Source code in src/vodoo/aio/base.py
async def list_attachments(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
) -> list[dict[str, Any]]:
    """List attachments for a record."""
    domain: list[Any] = [
        ("res_model", "=", model),
        ("res_id", "=", record_id),
    ]
    fields = _ATTACHMENT_LIST_FIELDS
    return await client.search_read("ir.attachment", domain=domain, fields=fields)

download_attachment async

download_attachment(client: AsyncOdooClient, attachment_id: int, output_path: Path | None = None) -> Path

Download an attachment.

PARAMETER DESCRIPTION
client

Async Odoo client

TYPE: AsyncOdooClient

attachment_id

Attachment ID

TYPE: int

output_path

Output file path

TYPE: Path | None DEFAULT: None

RETURNS DESCRIPTION
Path

Path to downloaded file

RAISES DESCRIPTION
RecordNotFoundError

If attachment not found

Source code in src/vodoo/aio/base.py
async def download_attachment(
    client: AsyncOdooClient,
    attachment_id: int,
    output_path: Path | None = None,
) -> Path:
    """Download an attachment.

    Args:
        client: Async Odoo client
        attachment_id: Attachment ID
        output_path: Output file path

    Returns:
        Path to downloaded file

    Raises:
        RecordNotFoundError: If attachment not found
    """
    attachments = await client.read("ir.attachment", [attachment_id], _ATTACHMENT_READ_FIELDS)
    if not attachments:
        raise RecordNotFoundError("ir.attachment", attachment_id)

    attachment = attachments[0]
    filename = attachment.get("name", f"attachment_{attachment_id}")

    if output_path is None:
        output_path = Path.cwd() / filename
    elif output_path.is_dir():
        output_path = output_path / filename

    if attachment.get("datas"):
        data = base64.b64decode(attachment["datas"])
        output_path.write_bytes(data)
    else:
        raise RecordNotFoundError("ir.attachment", attachment_id)

    return output_path

download_record_attachments async

download_record_attachments(client: AsyncOdooClient, model: str, record_id: int, output_dir: Path | None = None, extension: str | None = None) -> list[Path]

Download all attachments for a record.

Source code in src/vodoo/aio/base.py
async def download_record_attachments(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    output_dir: Path | None = None,
    extension: str | None = None,
) -> list[Path]:
    """Download all attachments for a record."""
    if output_dir is None:
        output_dir = Path.cwd()
    elif not output_dir.exists():
        output_dir.mkdir(parents=True, exist_ok=True)

    attachments = await list_attachments(client, model, record_id)

    if extension:
        ext = extension.lower().lstrip(".")
        attachments = [
            att for att in attachments if att.get("name", "").lower().endswith(f".{ext}")
        ]

    downloaded_files: list[Path] = []

    for attachment in attachments:
        filename = attachment.get("name", f"attachment_{attachment['id']}")
        try:
            att_data = await client.read(
                "ir.attachment", [attachment["id"]], _ATTACHMENT_READ_FIELDS
            )
            if not att_data:
                continue

            att = att_data[0]
            filename = att.get("name", f"attachment_{attachment['id']}")
            output_path = output_dir / filename

            if att.get("datas"):
                data = base64.b64decode(att["datas"])
                output_path.write_bytes(data)
                downloaded_files.append(output_path)
        except Exception as e:
            import logging

            logging.getLogger("vodoo").warning("Failed to download %s: %s", filename, e)
            continue

    return downloaded_files

get_attachment_data async

get_attachment_data(client: AsyncOdooClient, attachment_id: int) -> bytes

Read an attachment and return its raw binary content in-memory.

Source code in src/vodoo/aio/base.py
async def get_attachment_data(
    client: AsyncOdooClient,
    attachment_id: int,
) -> bytes:
    """Read an attachment and return its raw binary content in-memory."""
    attachments = await client.read("ir.attachment", [attachment_id], _ATTACHMENT_READ_FIELDS)

    if not attachments:
        raise RecordNotFoundError("ir.attachment", attachment_id)

    return _decode_attachment_data(attachments[0], attachment_id)

get_record_attachment_data async

get_record_attachment_data(client: AsyncOdooClient, model: str, record_id: int) -> list[tuple[int, str, bytes]]

Read all attachments for a record and return their binary content in-memory.

Source code in src/vodoo/aio/base.py
async def get_record_attachment_data(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
) -> list[tuple[int, str, bytes]]:
    """Read all attachments for a record and return their binary content in-memory."""
    attachments = await list_attachments(client, model, record_id)

    result: list[tuple[int, str, bytes]] = []
    for att_meta in attachments:
        att_id = att_meta["id"]
        try:
            att_data = await client.read(
                "ir.attachment", [att_id], ["id", *_ATTACHMENT_READ_FIELDS]
            )
            if not att_data:
                continue
            decoded = _decode_attachment_record(att_data[0], att_id)
            if decoded is not None:
                result.append(decoded)
        except Exception as e:
            import logging

            logging.getLogger("vodoo").warning("Failed to read attachment %s: %s", att_id, e)
            continue

    return result

create_attachment async

create_attachment(client: AsyncOdooClient, model: str, record_id: int, file_path: Path | str | None = None, *, data: bytes | None = None, name: str | None = None) -> int

Create an attachment for a record.

PARAMETER DESCRIPTION
client

Async Odoo client

TYPE: AsyncOdooClient

model

Model name

TYPE: str

record_id

Record ID

TYPE: int

file_path

Path to file to attach (mutually exclusive with data)

TYPE: Path | str | None DEFAULT: None

data

Raw bytes to attach (mutually exclusive with file_path)

TYPE: bytes | None DEFAULT: None

name

Attachment name (defaults to filename; required when using data)

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
int

ID of created attachment

RAISES DESCRIPTION
FileNotFoundError

If file path is invalid

ValueError

If arguments are invalid

Source code in src/vodoo/aio/base.py
async def create_attachment(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    file_path: Path | str | None = None,
    *,
    data: bytes | None = None,
    name: str | None = None,
) -> int:
    """Create an attachment for a record.

    Args:
        client: Async Odoo client
        model: Model name
        record_id: Record ID
        file_path: Path to file to attach (mutually exclusive with data)
        data: Raw bytes to attach (mutually exclusive with file_path)
        name: Attachment name (defaults to filename; required when using data)

    Returns:
        ID of created attachment

    Raises:
        FileNotFoundError: If file path is invalid
        ValueError: If arguments are invalid
    """
    values = _prepare_attachment_upload(file_path, data, name, model, record_id)
    return await client.create("ir.attachment", values)

parse_field_assignment async

parse_field_assignment(client: AsyncOdooClient, model: str, record_id: int, field_assignment: str, no_markdown: bool = False) -> tuple[str, Any]

Parse a field assignment and return field name and computed value. HTML fields automatically get markdown conversion unless no_markdown=True.

Source code in src/vodoo/aio/base.py
async def parse_field_assignment(
    client: AsyncOdooClient,
    model: str,
    record_id: int,
    field_assignment: str,
    no_markdown: bool = False,
) -> tuple[str, Any]:
    """Parse a field assignment and return field name and computed value.
    HTML fields automatically get markdown conversion unless no_markdown=True.
    """
    field, operator, value = _match_field_assignment(field_assignment)
    parsed_value = _parse_raw_value(field, value)

    # Auto-convert markdown to HTML for HTML fields
    if isinstance(parsed_value, str) and not no_markdown:
        fields_info = await list_fields(client, model)
        if field in fields_info and fields_info[field].get("type") == "html":
            parsed_value = _convert_to_html(parsed_value, use_markdown=True)
    # Handle operators that require current value
    if operator in ("+=", "-=", "*=", "/="):
        record = await get_record(client, model, record_id, fields=[field])
        current_value = record.get(field)
        parsed_value = _apply_operator(field, operator, parsed_value, current_value)

    return field, parsed_value