Skip to content

API Reference

Program

Program provides the IDL deserialized client representation of an Anchor program.

This API is the one stop shop for all things related to communicating with on-chain programs. Among other things, one can send transactions, fetch deserialized accounts, decode instruction data, subscribe to account changes, and listen to events.

In addition to field accessors and methods, the object provides a set of dynamically generated properties, also known as namespaces, that map one-to-one to program methods and accounts.

Source code in anchorpy/program/core.py
class Program(object):
    """Program provides the IDL deserialized client representation of an Anchor program.

    This API is the one stop shop for all things related to communicating with
    on-chain programs. Among other things, one can send transactions, fetch
    deserialized accounts, decode instruction data, subscribe to account
    changes, and listen to events.

    In addition to field accessors and methods, the object provides a set of
    dynamically generated properties, also known as namespaces, that
    map one-to-one to program methods and accounts.

    """

    def __init__(
        self, idl: Idl, program_id: Pubkey, provider: Optional[Provider] = None
    ):
        """Initialize the Program object.

        Args:
            idl: The parsed IDL object.
            program_id: The program ID.
            provider: The Provider object for the Program. Defaults to Provider.local().
        """
        self.idl = idl
        self.program_id = program_id
        self.provider = provider if provider is not None else Provider.local()
        self.coder = Coder(idl)

        (
            rpc,
            instruction,
            transaction,
            account,
            simulate,
            types,
            methods,
        ) = _build_namespace(
            idl,
            self.coder,
            program_id,
            self.provider,
        )

        self.rpc = rpc
        self.instruction = instruction
        self.transaction = transaction
        self.account = account
        self.simulate = simulate
        self.type = types
        self.methods = methods

    async def __aenter__(self) -> Program:
        """Use as a context manager."""
        await self.provider.__aenter__()
        return self

    async def __aexit__(self, _exc_type, _exc, _tb):
        """Exit the context manager."""
        await self.close()

    async def close(self) -> None:
        """Use this when you are done with the client."""
        await self.provider.close()

    @staticmethod
    async def fetch_raw_idl(
        address: AddressType,
        provider: Provider,
    ) -> str:
        """Fetch an idl from the blockchain as a raw JSON dictionary.

        Args:
            address: The program ID.
            provider: The network and wallet context.

        Raises:
            IdlNotFoundError: If the requested IDL account does not exist.

        Returns:
            str: The raw IDL.
        """
        program_id = translate_address(address)
        actual_provider = provider if provider is not None else Provider.local()
        idl_addr = _idl_address(program_id)
        account_info = await actual_provider.connection.get_account_info(idl_addr)
        account_info_val = account_info.value
        if account_info_val is None:
            raise IdlNotFoundError(f"IDL not found for program: {address}")
        idl_account = _decode_idl_account(
            account_info_val.data[ACCOUNT_DISCRIMINATOR_SIZE:]
        )
        return _pako_inflate(bytes(idl_account["data"])).decode()

    @classmethod
    async def fetch_idl(
        cls,
        address: AddressType,
        provider: Provider,
    ) -> Idl:
        """Fetch and parse an idl from the blockchain.

        Args:
            address: The program ID.
            provider: The network and wallet context.

        Returns:
            Idl: The fetched IDL.
        """
        raw = await cls.fetch_raw_idl(address, provider)
        return Idl.from_json(raw)

    @classmethod
    async def at(
        cls,
        address: AddressType,
        provider: Optional[Provider] = None,
    ) -> Program:
        """Generate a Program client by fetching the IDL from the network.

        In order to use this method, an IDL must have been previously initialized
        via the anchor CLI's `anchor idl init` command.

        Args:
            address: The program ID.
            provider: The network and wallet context.

        Returns:
            The Program instantiated using the fetched IDL.
        """
        provider_to_use = Provider.local() if provider is None else provider
        program_id = translate_address(address)
        idl = await cls.fetch_idl(program_id, provider_to_use)
        return cls(idl, program_id, provider)

__init__(self, idl, program_id, provider=None) special

Initialize the Program object.

Parameters:

Name Type Description Default
idl Idl

The parsed IDL object.

required
program_id Pubkey

The program ID.

required
provider Optional[Provider]

The Provider object for the Program. Defaults to Provider.local().

None
Source code in anchorpy/program/core.py
def __init__(
    self, idl: Idl, program_id: Pubkey, provider: Optional[Provider] = None
):
    """Initialize the Program object.

    Args:
        idl: The parsed IDL object.
        program_id: The program ID.
        provider: The Provider object for the Program. Defaults to Provider.local().
    """
    self.idl = idl
    self.program_id = program_id
    self.provider = provider if provider is not None else Provider.local()
    self.coder = Coder(idl)

    (
        rpc,
        instruction,
        transaction,
        account,
        simulate,
        types,
        methods,
    ) = _build_namespace(
        idl,
        self.coder,
        program_id,
        self.provider,
    )

    self.rpc = rpc
    self.instruction = instruction
    self.transaction = transaction
    self.account = account
    self.simulate = simulate
    self.type = types
    self.methods = methods

at(address, provider=None) async classmethod

Generate a Program client by fetching the IDL from the network.

In order to use this method, an IDL must have been previously initialized via the anchor CLI's anchor idl init command.

Parameters:

Name Type Description Default
address AddressType

The program ID.

required
provider Optional[Provider]

The network and wallet context.

None

Returns:

Type Description
Program

The Program instantiated using the fetched IDL.

Source code in anchorpy/program/core.py
@classmethod
async def at(
    cls,
    address: AddressType,
    provider: Optional[Provider] = None,
) -> Program:
    """Generate a Program client by fetching the IDL from the network.

    In order to use this method, an IDL must have been previously initialized
    via the anchor CLI's `anchor idl init` command.

    Args:
        address: The program ID.
        provider: The network and wallet context.

    Returns:
        The Program instantiated using the fetched IDL.
    """
    provider_to_use = Provider.local() if provider is None else provider
    program_id = translate_address(address)
    idl = await cls.fetch_idl(program_id, provider_to_use)
    return cls(idl, program_id, provider)

close(self) async

Use this when you are done with the client.

Source code in anchorpy/program/core.py
async def close(self) -> None:
    """Use this when you are done with the client."""
    await self.provider.close()

fetch_idl(address, provider) async classmethod

Fetch and parse an idl from the blockchain.

Parameters:

Name Type Description Default
address AddressType

The program ID.

required
provider Provider

The network and wallet context.

required

Returns:

Type Description
Idl

The fetched IDL.

Source code in anchorpy/program/core.py
@classmethod
async def fetch_idl(
    cls,
    address: AddressType,
    provider: Provider,
) -> Idl:
    """Fetch and parse an idl from the blockchain.

    Args:
        address: The program ID.
        provider: The network and wallet context.

    Returns:
        Idl: The fetched IDL.
    """
    raw = await cls.fetch_raw_idl(address, provider)
    return Idl.from_json(raw)

fetch_raw_idl(address, provider) async staticmethod

Fetch an idl from the blockchain as a raw JSON dictionary.

Parameters:

Name Type Description Default
address AddressType

The program ID.

required
provider Provider

The network and wallet context.

required

Exceptions:

Type Description
IdlNotFoundError

If the requested IDL account does not exist.

Returns:

Type Description
str

The raw IDL.

Source code in anchorpy/program/core.py
@staticmethod
async def fetch_raw_idl(
    address: AddressType,
    provider: Provider,
) -> str:
    """Fetch an idl from the blockchain as a raw JSON dictionary.

    Args:
        address: The program ID.
        provider: The network and wallet context.

    Raises:
        IdlNotFoundError: If the requested IDL account does not exist.

    Returns:
        str: The raw IDL.
    """
    program_id = translate_address(address)
    actual_provider = provider if provider is not None else Provider.local()
    idl_addr = _idl_address(program_id)
    account_info = await actual_provider.connection.get_account_info(idl_addr)
    account_info_val = account_info.value
    if account_info_val is None:
        raise IdlNotFoundError(f"IDL not found for program: {address}")
    idl_account = _decode_idl_account(
        account_info_val.data[ACCOUNT_DISCRIMINATOR_SIZE:]
    )
    return _pako_inflate(bytes(idl_account["data"])).decode()

Provider

The network and wallet context used to send transactions paid for and signed by the provider.

Source code in anchorpy/provider.py
class Provider:
    """The network and wallet context used to send transactions paid for and signed by the provider."""  # noqa: E501

    def __init__(
        self,
        connection: AsyncClient,
        wallet: Wallet,
        opts: types.TxOpts = DEFAULT_OPTIONS,
    ) -> None:
        """Initialize the Provider.

        Args:
            connection: The cluster connection where the program is deployed.
            wallet: The wallet used to pay for and sign all transactions.
            opts: Transaction confirmation options to use by default.
        """
        self.connection = connection
        self.wallet = wallet
        self.opts = opts

    @classmethod
    def local(
        cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
    ) -> Provider:
        """Create a `Provider` with a wallet read from the local filesystem.

        Args:
            url: The network cluster url.
            opts: The default transaction confirmation options.
        """
        connection = AsyncClient(url, opts.preflight_commitment)
        wallet = Wallet.local()
        return cls(connection, wallet, opts)

    @classmethod
    def readonly(
        cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
    ) -> Provider:
        """Create a `Provider` that can only fetch data, not send transactions.

        Args:
            url: The network cluster url.
            opts: The default transaction confirmation options.
        """
        connection = AsyncClient(url, opts.preflight_commitment)
        wallet = Wallet.dummy()
        return cls(connection, wallet, opts)

    @classmethod
    def env(cls) -> Provider:
        """Create a `Provider` using the `ANCHOR_PROVIDER_URL` environment variable."""
        url = environ["ANCHOR_PROVIDER_URL"]
        options = DEFAULT_OPTIONS
        connection = AsyncClient(url, options.preflight_commitment)
        wallet = Wallet.local()
        return cls(connection, wallet, options)

    async def simulate(
        self,
        tx: Union[Transaction, VersionedTransaction],
        opts: Optional[types.TxOpts] = None,
    ) -> SimulateTransactionResp:
        """Simulate the given transaction, returning emitted logs from execution.

        Args:
            tx: The transaction to send.
            signers: The set of signers in addition to the provider wallet that will
                sign the transaction.
            opts: Transaction confirmation options.

        Returns:
            The transaction simulation result.
        """
        if opts is None:
            opts = self.opts
        return await self.connection.simulate_transaction(
            tx, sig_verify=True, commitment=opts.preflight_commitment
        )

    async def send(
        self,
        tx: Union[Transaction, VersionedTransaction],
        opts: Optional[types.TxOpts] = None,
    ) -> Signature:
        """Send the given transaction, paid for and signed by the provider's wallet.

        Args:
            tx: The transaction to send.
            signers: The set of signers in addition to the provider wallet that will
                sign the transaction.
            opts: Transaction confirmation options.

        Returns:
            The transaction signature from the RPC server.
        """
        if opts is None:
            opts = self.opts
        raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
        resp = await self.connection.send_raw_transaction(raw, opts=opts)
        return resp.value

    async def send_all(
        self,
        txs: Sequence[Union[Transaction, VersionedTransaction]],
        opts: Optional[types.TxOpts] = None,
    ) -> list[Signature]:
        """Similar to `send`, but for an array of transactions and signers.

        Args:
            txs: a list of transaction objects.
            opts: Transaction confirmation options.

        Returns:
            The transaction signatures from the RPC server.
        """
        if opts is None:
            opts = self.opts
        sigs = []
        for tx in txs:
            raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
            resp = await self.connection.send_raw_transaction(raw, opts=opts)
            sigs.append(resp.value)
        return sigs

    async def __aenter__(self) -> Provider:
        """Use as a context manager."""
        await self.connection.__aenter__()
        return self

    async def __aexit__(self, _exc_type, _exc, _tb):
        """Exit the context manager."""
        await self.close()

    async def close(self) -> None:
        """Use this when you are done with the connection."""
        await self.connection.close()

__init__(self, connection, wallet, opts=TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None)) special

Initialize the Provider.

Parameters:

Name Type Description Default
connection AsyncClient

The cluster connection where the program is deployed.

required
wallet Wallet

The wallet used to pay for and sign all transactions.

required
opts types.TxOpts

Transaction confirmation options to use by default.

TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None)
Source code in anchorpy/provider.py
def __init__(
    self,
    connection: AsyncClient,
    wallet: Wallet,
    opts: types.TxOpts = DEFAULT_OPTIONS,
) -> None:
    """Initialize the Provider.

    Args:
        connection: The cluster connection where the program is deployed.
        wallet: The wallet used to pay for and sign all transactions.
        opts: Transaction confirmation options to use by default.
    """
    self.connection = connection
    self.wallet = wallet
    self.opts = opts

close(self) async

Use this when you are done with the connection.

Source code in anchorpy/provider.py
async def close(self) -> None:
    """Use this when you are done with the connection."""
    await self.connection.close()

env() classmethod

Create a Provider using the ANCHOR_PROVIDER_URL environment variable.

Source code in anchorpy/provider.py
@classmethod
def env(cls) -> Provider:
    """Create a `Provider` using the `ANCHOR_PROVIDER_URL` environment variable."""
    url = environ["ANCHOR_PROVIDER_URL"]
    options = DEFAULT_OPTIONS
    connection = AsyncClient(url, options.preflight_commitment)
    wallet = Wallet.local()
    return cls(connection, wallet, options)

local(url=None, opts=TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None)) classmethod

Create a Provider with a wallet read from the local filesystem.

Parameters:

Name Type Description Default
url Optional[str]

The network cluster url.

None
opts types.TxOpts

The default transaction confirmation options.

TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None)
Source code in anchorpy/provider.py
@classmethod
def local(
    cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
) -> Provider:
    """Create a `Provider` with a wallet read from the local filesystem.

    Args:
        url: The network cluster url.
        opts: The default transaction confirmation options.
    """
    connection = AsyncClient(url, opts.preflight_commitment)
    wallet = Wallet.local()
    return cls(connection, wallet, opts)

readonly(url=None, opts=TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None)) classmethod

Create a Provider that can only fetch data, not send transactions.

Parameters:

Name Type Description Default
url Optional[str]

The network cluster url.

None
opts types.TxOpts

The default transaction confirmation options.

TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None)
Source code in anchorpy/provider.py
@classmethod
def readonly(
    cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
) -> Provider:
    """Create a `Provider` that can only fetch data, not send transactions.

    Args:
        url: The network cluster url.
        opts: The default transaction confirmation options.
    """
    connection = AsyncClient(url, opts.preflight_commitment)
    wallet = Wallet.dummy()
    return cls(connection, wallet, opts)

send(self, tx, opts=None) async

Send the given transaction, paid for and signed by the provider's wallet.

Parameters:

Name Type Description Default
tx Union[Transaction, VersionedTransaction]

The transaction to send.

required
signers

The set of signers in addition to the provider wallet that will sign the transaction.

required
opts Optional[types.TxOpts]

Transaction confirmation options.

None

Returns:

Type Description
Signature

The transaction signature from the RPC server.

Source code in anchorpy/provider.py
async def send(
    self,
    tx: Union[Transaction, VersionedTransaction],
    opts: Optional[types.TxOpts] = None,
) -> Signature:
    """Send the given transaction, paid for and signed by the provider's wallet.

    Args:
        tx: The transaction to send.
        signers: The set of signers in addition to the provider wallet that will
            sign the transaction.
        opts: Transaction confirmation options.

    Returns:
        The transaction signature from the RPC server.
    """
    if opts is None:
        opts = self.opts
    raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
    resp = await self.connection.send_raw_transaction(raw, opts=opts)
    return resp.value

send_all(self, txs, opts=None) async

Similar to send, but for an array of transactions and signers.

Parameters:

Name Type Description Default
txs Sequence[Union[Transaction, VersionedTransaction]]

a list of transaction objects.

required
opts Optional[types.TxOpts]

Transaction confirmation options.

None

Returns:

Type Description
list[Signature]

The transaction signatures from the RPC server.

Source code in anchorpy/provider.py
async def send_all(
    self,
    txs: Sequence[Union[Transaction, VersionedTransaction]],
    opts: Optional[types.TxOpts] = None,
) -> list[Signature]:
    """Similar to `send`, but for an array of transactions and signers.

    Args:
        txs: a list of transaction objects.
        opts: Transaction confirmation options.

    Returns:
        The transaction signatures from the RPC server.
    """
    if opts is None:
        opts = self.opts
    sigs = []
    for tx in txs:
        raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
        resp = await self.connection.send_raw_transaction(raw, opts=opts)
        sigs.append(resp.value)
    return sigs

simulate(self, tx, opts=None) async

Simulate the given transaction, returning emitted logs from execution.

Parameters:

Name Type Description Default
tx Union[Transaction, VersionedTransaction]

The transaction to send.

required
signers

The set of signers in addition to the provider wallet that will sign the transaction.

required
opts Optional[types.TxOpts]

Transaction confirmation options.

None

Returns:

Type Description
SimulateTransactionResp

The transaction simulation result.

Source code in anchorpy/provider.py
async def simulate(
    self,
    tx: Union[Transaction, VersionedTransaction],
    opts: Optional[types.TxOpts] = None,
) -> SimulateTransactionResp:
    """Simulate the given transaction, returning emitted logs from execution.

    Args:
        tx: The transaction to send.
        signers: The set of signers in addition to the provider wallet that will
            sign the transaction.
        opts: Transaction confirmation options.

    Returns:
        The transaction simulation result.
    """
    if opts is None:
        opts = self.opts
    return await self.connection.simulate_transaction(
        tx, sig_verify=True, commitment=opts.preflight_commitment
    )

Context dataclass

Context provides all non-argument inputs for generating Anchor transactions.

Attributes:

Name Type Description
accounts Dict[str, Any]

The accounts used in the instruction context.

remaining_accounts List[solders.instruction.AccountMeta]

All accounts to pass into an instruction after the main

signers List[solders.keypair.Keypair]

Accounts that must sign a given transaction.

pre_instructions List[solders.instruction.Instruction]

Instructions to run before a given method. Often this is used, for example to create accounts prior to executing a method.

post_instructions List[solders.instruction.Instruction]

Instructions to run after a given method. Often this is used, for example to close accounts prior to executing a method.

options Optional[solana.rpc.types.TxOpts]

Commitment parameters to use for a transaction.

Source code in anchorpy/program/context.py
@dataclass
class Context:
    """Context provides all non-argument inputs for generating Anchor transactions.

    Attributes:
        accounts: The accounts used in the instruction context.
        remaining_accounts: All accounts to pass into an instruction *after* the main
        `accounts`. This can be used for optional or otherwise unknown accounts.
        signers: Accounts that must sign a given transaction.
        pre_instructions: Instructions to run *before* a given method. Often this is
            used, for example to create accounts prior to executing a method.
        post_instructions: Instructions to run *after* a given method. Often this is
            used, for example to close accounts prior to executing a method.
        options: Commitment parameters to use for a transaction.

    """

    # For some reason mkdocstrings doesn't understand the full type hint
    # here if we use list[Instruction] instead of typing.List.
    # Weirdly there are other places where it understands list[whatever].

    accounts: Accounts = field(default_factory=dict)
    remaining_accounts: List[AccountMeta] = field(default_factory=list)
    signers: List[Keypair] = field(default_factory=list)
    pre_instructions: List[Instruction] = field(default_factory=list)
    post_instructions: List[Instruction] = field(default_factory=list)
    options: Optional[TxOpts] = None

create_workspace(path=None, url=None)

Get a workspace from the provided path to the project root.

Parameters:

Name Type Description Default
path Union[pathlib.Path, str]

The path to the project root. Defaults to the current working directory if omitted.

None
url Optional[str]

The URL of the JSON RPC. Defaults to http://localhost:8899.

None

Returns:

Type Description
Dict[str, anchorpy.program.core.Program]

Mapping of program name to Program object.

Source code in anchorpy/workspace.py
def create_workspace(
    path: Optional[Union[Path, str]] = None, url: Optional[str] = None
) -> WorkspaceType:
    """Get a workspace from the provided path to the project root.

    Args:
        path: The path to the project root. Defaults to the current working
            directory if omitted.
        url: The URL of the JSON RPC. Defaults to http://localhost:8899.

    Returns:
        Mapping of program name to Program object.
    """
    result = {}
    project_root = Path.cwd() if path is None else Path(path)
    idl_folder = project_root / "target/idl"
    localnet_programs: dict[str, str] = toml.load(project_root / "Anchor.toml")[
        "programs"
    ]["localnet"]
    for file in idl_folder.iterdir():
        raw = file.read_text()
        idl = Idl.from_json(raw)
        name = idl.name
        program_id = Pubkey.from_string(localnet_programs[name])
        program = Program(idl, program_id, Provider.local(url))
        result[idl.name] = program
    return result

close_workspace(workspace) async

Close the HTTP clients of all the programs in the workspace.

Parameters:

Name Type Description Default
workspace Dict[str, anchorpy.program.core.Program]

The workspace to close.

required
Source code in anchorpy/workspace.py
async def close_workspace(workspace: WorkspaceType) -> None:
    """Close the HTTP clients of all the programs in the workspace.

    Args:
        workspace: The workspace to close.
    """
    for program in workspace.values():
        # could do this in a faster way but there's probably no point.
        await program.close()

bankrun_fixture(path, scope='module', build_cmd=None, accounts=None, compute_max_units=None, transaction_account_lock_limit=None)

Create a fixture that builds the project and starts a bankrun with all the programs in the workspace deployed.

Parameters:

Name Type Description Default
path Union[pathlib.Path, str]

Path to root of the Anchor project.

required
scope Literal['session', 'package', 'module', 'class', 'function']

Pytest fixture scope.

'module'
build_cmd Optional[str]

Command to build the project. Defaults to anchor build.

None
accounts Optional[Sequence[Tuple[solders.pubkey.Pubkey, solders.account.Account]]]

A sequence of (address, account_object) tuples, indicating what data to write to the given addresses.

None
compute_max_units Optional[int]

Override the default compute unit limit for a transaction.

None
transaction_account_lock_limit Optional[int]

Override the default transaction account lock limit.

None

Returns:

Type Description
bankrun.ProgramTestContext

A bankrun fixture for use with pytest.

Source code in anchorpy/pytest_plugin.py
def bankrun_fixture(
    path: Union[Path, str],
    scope: _Scope = "module",
    build_cmd: Optional[str] = None,
    accounts: Optional[Sequence[Tuple[Pubkey, Account]]] = None,
    compute_max_units: Optional[int] = None,
    transaction_account_lock_limit: Optional[int] = None,
) -> "bankrun.ProgramTestContext":
    """Create a fixture that builds the project and starts a bankrun with all the programs in the workspace deployed.

    Args:
        path: Path to root of the Anchor project.
        scope: Pytest fixture scope.
        build_cmd: Command to build the project. Defaults to `anchor build`.
        accounts: A sequence of (address, account_object) tuples, indicating
            what data to write to the given addresses.
        compute_max_units: Override the default compute unit limit for a transaction.
        transaction_account_lock_limit: Override the default transaction account lock limit.

    Returns:
        A bankrun fixture for use with pytest.
    """  # noqa: E501,D202

    @async_fixture(scope=scope)
    async def _bankrun_fixture() -> bankrun.ProgramTestContext:
        return await _bankrun_helper(
            path,
            build_cmd,
            accounts,
            compute_max_units,
            transaction_account_lock_limit,
        )

    return _bankrun_fixture

workspace_fixture(path, scope='module', timeout_seconds=60, build_cmd=None)

Create a fixture that sets up and tears down a localnet instance and returns a workspace dict.

Equivalent to combining localnet_fixture, create_workspace and close_workspace.

Parameters:

Name Type Description Default
path Union[pathlib.Path, str]

Path to root of the Anchor project.

required
scope Literal['session', 'package', 'module', 'class', 'function']

Pytest fixture scope.

'module'
timeout_seconds int

Time to wait for Anchor localnet to start.

60
build_cmd Optional[str]

Command to run before anchor localnet. Defaults to anchor build.

None

Returns:

Type Description
Callable

A workspace fixture for use with pytest.

Source code in anchorpy/pytest_plugin.py
def workspace_fixture(
    path: Union[Path, str],
    scope: _Scope = "module",
    timeout_seconds: int = 60,
    build_cmd: Optional[str] = None,
) -> Callable:
    """Create a fixture that sets up and tears down a localnet instance and returns a workspace dict.

    Equivalent to combining `localnet_fixture`, `create_workspace` and `close_workspace`.

    Args:
        path: Path to root of the Anchor project.
        scope: Pytest fixture scope.
        timeout_seconds: Time to wait for Anchor localnet to start.
        build_cmd: Command to run before `anchor localnet`. Defaults to `anchor build`.

    Returns:
        A workspace fixture for use with pytest.
    """  # noqa: E501,D202

    @async_fixture(scope=scope)
    async def _workspace_fixture(
        _fixed_xprocess,
    ) -> AsyncGenerator[dict[str, Program], None]:
        class Starter(ProcessStarter):
            # startup pattern
            pattern = "JSON RPC URL"
            terminate_on_interrupt = True
            # command to start process
            args = ["anchor", "localnet", "--skip-build"]
            timeout = timeout_seconds
            popen_kwargs = {
                "cwd": path,
                "start_new_session": True,
            }
            max_read_lines = 1_000
            # command to start process

        actual_build_cmd = "anchor build" if build_cmd is None else build_cmd
        subprocess.run(actual_build_cmd, cwd=path, check=True, shell=True)
        # ensure process is running
        _ = _fixed_xprocess.ensure("localnet", Starter)
        ws = create_workspace(path)
        yield ws
        await close_workspace(ws)

        # clean up whole process tree afterwards
        _fixed_xprocess.getinfo("localnet").terminate()

    return _workspace_fixture

localnet_fixture(path, scope='module', timeout_seconds=60, build_cmd=None)

Create a fixture that sets up and tears down a localnet instance with workspace programs deployed.

Parameters:

Name Type Description Default
path Path

Path to root of the Anchor project.

required
scope Literal['session', 'package', 'module', 'class', 'function']

Pytest fixture scope.

'module'
timeout_seconds int

Time to wait for Anchor localnet to start.

60
build_cmd Optional[str]

Command to run before anchor localnet. Defaults to anchor build.

None

Returns:

Type Description
Callable

A localnet fixture for use with pytest.

Source code in anchorpy/pytest_plugin.py
def localnet_fixture(
    path: Path,
    scope: _Scope = "module",
    timeout_seconds: int = 60,
    build_cmd: Optional[str] = None,
) -> Callable:
    """Create a fixture that sets up and tears down a localnet instance with workspace programs deployed.

    Args:
        path: Path to root of the Anchor project.
        scope: Pytest fixture scope.
        timeout_seconds: Time to wait for Anchor localnet to start.
        build_cmd: Command to run before `anchor localnet`. Defaults to `anchor build`.

    Returns:
        A localnet fixture for use with pytest.
    """  # noqa: E501,D202

    @fixture(scope=scope)
    def _localnet_fixture(_fixed_xprocess):
        class Starter(ProcessStarter):
            # startup pattern
            pattern = "JSON RPC URL"
            terminate_on_interrupt = True
            # command to start process
            args = ["anchor", "localnet", "--skip-build"]
            timeout = timeout_seconds
            popen_kwargs = {
                "cwd": path,
                "start_new_session": True,
            }
            max_read_lines = 1_000
            # command to start process

        actual_build_cmd = "anchor build" if build_cmd is None else build_cmd
        subprocess.run(actual_build_cmd, cwd=path, check=True, shell=True)
        # ensure process is running and return its logfile
        logfile = _fixed_xprocess.ensure("localnet", Starter)

        yield logfile

        # clean up whole process tree afterwards
        _fixed_xprocess.getinfo("localnet").terminate()

    return _localnet_fixture

Wallet

Python wallet object.

Source code in anchorpy/provider.py
class Wallet:
    """Python wallet object."""

    def __init__(self, payer: Keypair):
        """Initialize the wallet.

        Args:
            payer: the Keypair used to sign transactions.
        """
        self.payer = payer

    @property
    def public_key(self) -> Pubkey:
        """Get the public key of the wallet."""
        return self.payer.pubkey()

    def sign_transaction(self, tx: Transaction) -> Transaction:
        """Sign a transaction using the wallet's keypair.

        Args:
            tx: The transaction to sign.

        Returns:
            The signed transaction.
        """
        tx.sign(self.payer)
        return tx

    def sign_all_transactions(self, txs: list[Transaction]) -> list[Transaction]:
        """Sign a list of transactions using the wallet's keypair.

        Args:
            txs: The transactions to sign.

        Returns:
            The signed transactions.
        """
        for tx in txs:
            tx.sign_partial(self.payer)
        return txs

    @classmethod
    def local(cls) -> Wallet:
        """Create a wallet instance from the filesystem.

        Uses the path at the ANCHOR_WALLET env var if set,
        otherwise uses ~/.config/solana/id.json.
        """
        path = Path(getenv("ANCHOR_WALLET", Path.home() / ".config/solana/id.json"))
        with path.open() as f:
            keypair: List[int] = json.load(f)
        return cls(Keypair.from_bytes(keypair))

    @classmethod
    def dummy(cls) -> Wallet:
        """Create a dummy wallet instance that won't be used to sign transactions."""
        keypair = Keypair()
        return cls(keypair)

public_key: Pubkey property readonly

Get the public key of the wallet.

__init__(self, payer) special

Initialize the wallet.

Parameters:

Name Type Description Default
payer Keypair

the Keypair used to sign transactions.

required
Source code in anchorpy/provider.py
def __init__(self, payer: Keypair):
    """Initialize the wallet.

    Args:
        payer: the Keypair used to sign transactions.
    """
    self.payer = payer

dummy() classmethod

Create a dummy wallet instance that won't be used to sign transactions.

Source code in anchorpy/provider.py
@classmethod
def dummy(cls) -> Wallet:
    """Create a dummy wallet instance that won't be used to sign transactions."""
    keypair = Keypair()
    return cls(keypair)

local() classmethod

Create a wallet instance from the filesystem.

Uses the path at the ANCHOR_WALLET env var if set, otherwise uses ~/.config/solana/id.json.

Source code in anchorpy/provider.py
@classmethod
def local(cls) -> Wallet:
    """Create a wallet instance from the filesystem.

    Uses the path at the ANCHOR_WALLET env var if set,
    otherwise uses ~/.config/solana/id.json.
    """
    path = Path(getenv("ANCHOR_WALLET", Path.home() / ".config/solana/id.json"))
    with path.open() as f:
        keypair: List[int] = json.load(f)
    return cls(Keypair.from_bytes(keypair))

sign_all_transactions(self, txs)

Sign a list of transactions using the wallet's keypair.

Parameters:

Name Type Description Default
txs list[Transaction]

The transactions to sign.

required

Returns:

Type Description
list[Transaction]

The signed transactions.

Source code in anchorpy/provider.py
def sign_all_transactions(self, txs: list[Transaction]) -> list[Transaction]:
    """Sign a list of transactions using the wallet's keypair.

    Args:
        txs: The transactions to sign.

    Returns:
        The signed transactions.
    """
    for tx in txs:
        tx.sign_partial(self.payer)
    return txs

sign_transaction(self, tx)

Sign a transaction using the wallet's keypair.

Parameters:

Name Type Description Default
tx Transaction

The transaction to sign.

required

Returns:

Type Description
Transaction

The signed transaction.

Source code in anchorpy/provider.py
def sign_transaction(self, tx: Transaction) -> Transaction:
    """Sign a transaction using the wallet's keypair.

    Args:
        tx: The transaction to sign.

    Returns:
        The signed transaction.
    """
    tx.sign(self.payer)
    return tx

Coder

Coder provides a facade for encoding and decoding all IDL related objects.

Source code in anchorpy/coder/coder.py
class Coder:
    """Coder provides a facade for encoding and decoding all IDL related objects."""

    def __init__(self, idl: Idl):
        """Initialize the coder.

        Args:
            idl: a parsed Idl instance.
        """
        self.instruction: InstructionCoder = InstructionCoder(idl)
        self.accounts: AccountsCoder = AccountsCoder(idl)
        self.events: EventCoder = EventCoder(idl)

__init__(self, idl) special

Initialize the coder.

Parameters:

Name Type Description Default
idl Idl

a parsed Idl instance.

required
Source code in anchorpy/coder/coder.py
def __init__(self, idl: Idl):
    """Initialize the coder.

    Args:
        idl: a parsed Idl instance.
    """
    self.instruction: InstructionCoder = InstructionCoder(idl)
    self.accounts: AccountsCoder = AccountsCoder(idl)
    self.events: EventCoder = EventCoder(idl)

InstructionCoder (Adapter)

Encodes and decodes program instructions.

Source code in anchorpy/coder/instruction.py
class InstructionCoder(Adapter):
    """Encodes and decodes program instructions."""

    def __init__(self, idl: Idl) -> None:
        """Init.

        Args:
            idl: The parsed IDL object.
        """
        self.ix_layout = _parse_ix_layout(idl)
        sighasher = _Sighash()
        sighash_layouts: Dict[bytes, Construct] = {}
        sighashes: Dict[str, bytes] = {}
        sighash_to_name: Dict[bytes, str] = {}
        for ix in idl.instructions:
            ix_name = snake(ix.name)
            sh = sighasher.build(ix_name)
            sighashes[ix_name] = sh
            sighash_layouts[sh] = self.ix_layout[ix_name]
            sighash_to_name[sh] = ix_name
        self.sighash_layouts = sighash_layouts
        self.sighashes = sighashes
        self.sighash_to_name = sighash_to_name
        subcon = Sequence(
            "sighash" / Bytes(8),
            Switch(lambda this: this.sighash, sighash_layouts),
        )
        super().__init__(subcon)  # type: ignore

    def encode(self, ix_name: str, ix: Dict[str, Any]) -> bytes:
        """Encode a program instruction.

        Args:
            ix_name: The name of the instruction
            ix: The data to encode.

        Returns:
            The encoded instruction.
        """
        return self.build(NamedInstruction(name=ix_name, data=ix))

    def _decode(self, obj: Tuple[bytes, Any], context, path) -> NamedInstruction:
        return NamedInstruction(data=obj[1], name=self.sighash_to_name[obj[0]])

    def _encode(
        self, obj: NamedInstruction, context: Container, path
    ) -> Tuple[bytes, Any]:
        return (self.sighashes[obj.name], obj.data)

__init__(self, idl) special

Init.

Parameters:

Name Type Description Default
idl Idl

The parsed IDL object.

required
Source code in anchorpy/coder/instruction.py
def __init__(self, idl: Idl) -> None:
    """Init.

    Args:
        idl: The parsed IDL object.
    """
    self.ix_layout = _parse_ix_layout(idl)
    sighasher = _Sighash()
    sighash_layouts: Dict[bytes, Construct] = {}
    sighashes: Dict[str, bytes] = {}
    sighash_to_name: Dict[bytes, str] = {}
    for ix in idl.instructions:
        ix_name = snake(ix.name)
        sh = sighasher.build(ix_name)
        sighashes[ix_name] = sh
        sighash_layouts[sh] = self.ix_layout[ix_name]
        sighash_to_name[sh] = ix_name
    self.sighash_layouts = sighash_layouts
    self.sighashes = sighashes
    self.sighash_to_name = sighash_to_name
    subcon = Sequence(
        "sighash" / Bytes(8),
        Switch(lambda this: this.sighash, sighash_layouts),
    )
    super().__init__(subcon)  # type: ignore

encode(self, ix_name, ix)

Encode a program instruction.

Parameters:

Name Type Description Default
ix_name str

The name of the instruction

required
ix Dict[str, Any]

The data to encode.

required

Returns:

Type Description
bytes

The encoded instruction.

Source code in anchorpy/coder/instruction.py
def encode(self, ix_name: str, ix: Dict[str, Any]) -> bytes:
    """Encode a program instruction.

    Args:
        ix_name: The name of the instruction
        ix: The data to encode.

    Returns:
        The encoded instruction.
    """
    return self.build(NamedInstruction(name=ix_name, data=ix))

EventCoder (Adapter)

Encodes and decodes Anchor events.

Source code in anchorpy/coder/event.py
class EventCoder(Adapter):
    """Encodes and decodes Anchor events."""

    def __init__(self, idl: Idl):
        """Initialize the EventCoder.

        Args:
            idl: The parsed Idl object.
        """
        self.idl = idl
        idl_events = idl.events
        layouts: Dict[str, Construct]
        if idl_events:
            layouts = {event.name: _event_layout(event, idl) for event in idl_events}
        else:
            layouts = {}
        self.layouts = layouts
        self.discriminators: Dict[bytes, str] = (
            {}
            if idl_events is None
            else {_event_discriminator(event.name): event.name for event in idl_events}
        )
        self.discriminator_to_layout = {
            disc: self.layouts[event_name]
            for disc, event_name in self.discriminators.items()
        }
        subcon = Sequence(
            "discriminator" / Bytes(8),  # not base64-encoded here
            Switch(lambda this: this.discriminator, self.discriminator_to_layout),
        )
        super().__init__(subcon)  # type: ignore

    def _decode(self, obj: Tuple[bytes, Any], context, path) -> Optional[Event]:
        disc = obj[0]
        try:
            event_name = self.discriminators[disc]
        except KeyError:
            return None
        return Event(data=obj[1], name=event_name)

__init__(self, idl) special

Initialize the EventCoder.

Parameters:

Name Type Description Default
idl Idl

The parsed Idl object.

required
Source code in anchorpy/coder/event.py
def __init__(self, idl: Idl):
    """Initialize the EventCoder.

    Args:
        idl: The parsed Idl object.
    """
    self.idl = idl
    idl_events = idl.events
    layouts: Dict[str, Construct]
    if idl_events:
        layouts = {event.name: _event_layout(event, idl) for event in idl_events}
    else:
        layouts = {}
    self.layouts = layouts
    self.discriminators: Dict[bytes, str] = (
        {}
        if idl_events is None
        else {_event_discriminator(event.name): event.name for event in idl_events}
    )
    self.discriminator_to_layout = {
        disc: self.layouts[event_name]
        for disc, event_name in self.discriminators.items()
    }
    subcon = Sequence(
        "discriminator" / Bytes(8),  # not base64-encoded here
        Switch(lambda this: this.discriminator, self.discriminator_to_layout),
    )
    super().__init__(subcon)  # type: ignore

AccountsCoder (Adapter)

Encodes and decodes account data.

Source code in anchorpy/coder/accounts.py
class AccountsCoder(Adapter):
    """Encodes and decodes account data."""

    def __init__(self, idl: Idl) -> None:
        """Init.

        Args:
            idl: The parsed IDL object.
        """
        self._accounts_layout = {
            acc.name: _typedef_layout(acc, idl.types, acc.name) for acc in idl.accounts
        }
        self.acc_name_to_discriminator = {
            acc.name: _account_discriminator(acc.name) for acc in idl.accounts
        }
        self.discriminator_to_acc_name = {
            disc: acc_name for acc_name, disc in self.acc_name_to_discriminator.items()
        }
        discriminator_to_typedef_layout = {
            disc: self._accounts_layout[acc_name]
            for acc_name, disc in self.acc_name_to_discriminator.items()
        }
        subcon = Sequence(
            "discriminator" / Bytes(ACCOUNT_DISCRIMINATOR_SIZE),
            Switch(lambda this: this.discriminator, discriminator_to_typedef_layout),
        )
        super().__init__(subcon)  # type: ignore

    def decode(self, obj: bytes) -> Container[Any]:
        """Decode account data.

        Args:
            obj: Data to decode.

        Returns:
            Decoded data.
        """
        return self.parse(obj).data

    def _decode(self, obj: Tuple[bytes, Any], context, path) -> AccountToSerialize:
        return AccountToSerialize(
            data=obj[1],
            name=self.discriminator_to_acc_name[obj[0]],
        )

    def _encode(self, obj: AccountToSerialize, context, path) -> Tuple[bytes, Any]:
        discriminator = self.acc_name_to_discriminator[obj.name]
        return discriminator, obj.data

__init__(self, idl) special

Init.

Parameters:

Name Type Description Default
idl Idl

The parsed IDL object.

required
Source code in anchorpy/coder/accounts.py
def __init__(self, idl: Idl) -> None:
    """Init.

    Args:
        idl: The parsed IDL object.
    """
    self._accounts_layout = {
        acc.name: _typedef_layout(acc, idl.types, acc.name) for acc in idl.accounts
    }
    self.acc_name_to_discriminator = {
        acc.name: _account_discriminator(acc.name) for acc in idl.accounts
    }
    self.discriminator_to_acc_name = {
        disc: acc_name for acc_name, disc in self.acc_name_to_discriminator.items()
    }
    discriminator_to_typedef_layout = {
        disc: self._accounts_layout[acc_name]
        for acc_name, disc in self.acc_name_to_discriminator.items()
    }
    subcon = Sequence(
        "discriminator" / Bytes(ACCOUNT_DISCRIMINATOR_SIZE),
        Switch(lambda this: this.discriminator, discriminator_to_typedef_layout),
    )
    super().__init__(subcon)  # type: ignore

decode(self, obj)

Decode account data.

Parameters:

Name Type Description Default
obj bytes

Data to decode.

required

Returns:

Type Description
Container

Decoded data.

Source code in anchorpy/coder/accounts.py
def decode(self, obj: bytes) -> Container[Any]:
    """Decode account data.

    Args:
        obj: Data to decode.

    Returns:
        Decoded data.
    """
    return self.parse(obj).data

NamedInstruction dataclass

Container for a named instruction.

Attributes:

Name Type Description
data Union[Dict[str, Any], construct.lib.containers.Container[Any]]

The actual instruction data.

name str

The name of the instruction.

Source code in anchorpy/program/common.py
@dataclass
class NamedInstruction:
    """Container for a named instruction.

    Attributes:
        data: The actual instruction data.
        name: The name of the instruction.
    """

    data: Union[Dict[str, Any], Container[Any]]
    name: str

IdlProgramAccount (dict)

The on-chain account of the IDL.

Source code in anchorpy/idl.py
class IdlProgramAccount(TypedDict):
    """The on-chain account of the IDL."""

    authority: solders.pubkey.Pubkey
    data: bytes

Event (tuple)

A parsed event object.

Source code in anchorpy/program/common.py
class Event(NamedTuple):
    """A parsed event object."""

    name: str
    data: Any

translate_address(address)

Convert str | Pubkey into Pubkey.

Parameters:

Name Type Description Default
address Union[solders.pubkey.Pubkey, str]

Public key as string or Pubkey.

required

Returns:

Type Description
Pubkey

Public key as Pubkey.

Source code in anchorpy/program/common.py
def translate_address(address: AddressType) -> Pubkey:
    """Convert `str | Pubkey` into `Pubkey`.

    Args:
        address: Public key as string or `Pubkey`.

    Returns:
        Public key as `Pubkey`.
    """
    if isinstance(address, str):
        return Pubkey.from_string(address)
    return address

validate_accounts(ix_accounts, accounts)

Check that accounts passed in ctx match the IDL.

Parameters:

Name Type Description Default
ix_accounts list

Accounts from the IDL.

required
accounts Dict[str, Any]

Accounts from the ctx arg.

required

Exceptions:

Type Description
ValueError

If ctx accounts don't match the IDL.

Source code in anchorpy/program/common.py
def validate_accounts(ix_accounts: list[IdlAccountItem], accounts: Accounts):
    """Check that accounts passed in `ctx` match the IDL.

    Args:
        ix_accounts: Accounts from the IDL.
        accounts: Accounts from the `ctx` arg.

    Raises:
        ValueError: If `ctx` accounts don't match the IDL.
    """
    for acc in ix_accounts:
        acc_name = snake(acc.name)
        if isinstance(acc, IdlAccounts):
            nested = cast(Accounts, accounts[acc_name])
            validate_accounts(acc.accounts, nested)
        elif acc_name not in accounts:
            raise ValueError(f"Invalid arguments: {acc_name} not provided")

AccountClient

Provides methods for fetching and creating accounts.

Source code in anchorpy/program/namespace/account.py
class AccountClient(object):
    """Provides methods for fetching and creating accounts."""

    def __init__(
        self,
        idl: Idl,
        idl_account: IdlTypeDefinition,
        coder: Coder,
        program_id: Pubkey,
        provider: Provider,
    ):
        """Init.

        Args:
            idl: the parsed IDL object.
            idl_account: the account definition from the IDL.
            coder: The program's Coder object.
            program_id: the program ID.
            provider: The Provider object for the Program.
        """
        self._idl_account = idl_account
        self._program_id = program_id
        self._provider = provider
        self._coder = coder
        self._size = ACCOUNT_DISCRIMINATOR_SIZE + _account_size(idl, idl_account)

    async def fetch(
        self, address: Pubkey, commitment: Optional[Commitment] = None
    ) -> Container[Any]:
        """Return a deserialized account.

        Args:
            address: The address of the account to fetch.
            commitment: Bank state to query.


        Raises:
            AccountDoesNotExistError: If the account doesn't exist.
            AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
        """
        account_info = await self._provider.connection.get_account_info(
            address,
            encoding="base64",
            commitment=commitment,
        )
        if not account_info.value:
            raise AccountDoesNotExistError(f"Account {address} does not exist")
        data = account_info.value.data
        discriminator = _account_discriminator(self._idl_account.name)
        if discriminator != data[:ACCOUNT_DISCRIMINATOR_SIZE]:
            msg = f"Account {address} has an invalid discriminator"
            raise AccountInvalidDiscriminator(msg)
        return self._coder.accounts.decode(data)

    async def fetch_multiple(
        self,
        addresses: List[Pubkey],
        batch_size: int = 300,
        commitment: Optional[Commitment] = None,
    ) -> list[Optional[Container[Any]]]:
        """Return multiple deserialized accounts.

        Accounts not found or with wrong discriminator are returned as None.

        Args:
            addresses: The addresses of the accounts to fetch.
            batch_size: The number of `getMultipleAccounts` objects to send
                in each HTTP request.
            commitment: Bank state to query.
        """
        accounts = await get_multiple_accounts(
            self._provider.connection,
            addresses,
            batch_size=batch_size,
            commitment=commitment,
        )
        discriminator = _account_discriminator(self._idl_account.name)
        result: list[Optional[Container[Any]]] = []
        for account in accounts:
            if account is None:
                result.append(None)
            elif discriminator == account.account.data[:8]:
                result.append(self._coder.accounts.decode(account.account.data))
            else:
                result.append(None)
        return result

    async def create_instruction(
        self,
        signer: Keypair,
        size_override: int = 0,
    ) -> Instruction:
        """Return an instruction for creating this account.

        Args:
            signer: [description]
            size_override: Optional override for the account size. Defaults to 0.

        Returns:
            The instruction to create the account.
        """
        space = size_override if size_override else self._size
        mbre_resp = (
            await self._provider.connection.get_minimum_balance_for_rent_exemption(
                space
            )
        )
        return create_account(
            CreateAccountParams(
                from_pubkey=self._provider.wallet.public_key,
                to_pubkey=signer.pubkey(),
                space=space,
                lamports=mbre_resp.value,
                owner=self._program_id,
            )
        )

    async def all(  # noqa: A003
        self,
        buffer: Optional[bytes] = None,
        filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None,
    ) -> list[ProgramAccount]:
        """Return all instances of this account type for the program.

        Args:
            buffer: bytes filter to append to the discriminator.
            filters: (optional) Options to compare a provided series of bytes with
                program account data at a particular offset.
                Note: an int entry is converted to a `dataSize` filter.
        """
        all_accounts = []
        discriminator = _account_discriminator(self._idl_account.name)
        to_encode = discriminator if buffer is None else discriminator + buffer
        bytes_arg = b58encode(to_encode).decode("ascii")
        base_memcmp_opt = MemcmpOpts(
            offset=0,
            bytes=bytes_arg,
        )
        filters_to_use = [base_memcmp_opt] + [] if filters is None else filters
        resp = await self._provider.connection.get_program_accounts(
            self._program_id,
            encoding="base64",
            commitment=self.provider.connection._commitment,
            filters=filters_to_use,
        )
        for r in resp.value:
            account_data = r.account.data
            all_accounts.append(
                ProgramAccount(
                    public_key=r.pubkey,
                    account=self._coder.accounts.decode(account_data),
                ),
            )
        return all_accounts

    @property
    def size(self) -> int:
        """Return the number of bytes in this account."""
        return self._size

    @property
    def program_id(self) -> Pubkey:
        """Return the program ID owning all accounts."""
        return self._program_id

    @property
    def provider(self) -> Provider:
        """Return the client's wallet and network provider."""
        return self._provider

    @property
    def coder(self) -> Coder:
        """Return the coder."""
        return self._coder

coder: Coder property readonly

Return the coder.

program_id: Pubkey property readonly

Return the program ID owning all accounts.

provider: Provider property readonly

Return the client's wallet and network provider.

size: int property readonly

Return the number of bytes in this account.

__init__(self, idl, idl_account, coder, program_id, provider) special

Init.

Parameters:

Name Type Description Default
idl Idl

the parsed IDL object.

required
idl_account IdlTypeDefinition

the account definition from the IDL.

required
coder Coder

The program's Coder object.

required
program_id Pubkey

the program ID.

required
provider Provider

The Provider object for the Program.

required
Source code in anchorpy/program/namespace/account.py
def __init__(
    self,
    idl: Idl,
    idl_account: IdlTypeDefinition,
    coder: Coder,
    program_id: Pubkey,
    provider: Provider,
):
    """Init.

    Args:
        idl: the parsed IDL object.
        idl_account: the account definition from the IDL.
        coder: The program's Coder object.
        program_id: the program ID.
        provider: The Provider object for the Program.
    """
    self._idl_account = idl_account
    self._program_id = program_id
    self._provider = provider
    self._coder = coder
    self._size = ACCOUNT_DISCRIMINATOR_SIZE + _account_size(idl, idl_account)

all(self, buffer=None, filters=None) async

Return all instances of this account type for the program.

Parameters:

Name Type Description Default
buffer Optional[bytes]

bytes filter to append to the discriminator.

None
filters Optional[Sequence[Union[int, solana.rpc.types.MemcmpOpts]]]

(optional) Options to compare a provided series of bytes with program account data at a particular offset. Note: an int entry is converted to a dataSize filter.

None
Source code in anchorpy/program/namespace/account.py
async def all(  # noqa: A003
    self,
    buffer: Optional[bytes] = None,
    filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None,
) -> list[ProgramAccount]:
    """Return all instances of this account type for the program.

    Args:
        buffer: bytes filter to append to the discriminator.
        filters: (optional) Options to compare a provided series of bytes with
            program account data at a particular offset.
            Note: an int entry is converted to a `dataSize` filter.
    """
    all_accounts = []
    discriminator = _account_discriminator(self._idl_account.name)
    to_encode = discriminator if buffer is None else discriminator + buffer
    bytes_arg = b58encode(to_encode).decode("ascii")
    base_memcmp_opt = MemcmpOpts(
        offset=0,
        bytes=bytes_arg,
    )
    filters_to_use = [base_memcmp_opt] + [] if filters is None else filters
    resp = await self._provider.connection.get_program_accounts(
        self._program_id,
        encoding="base64",
        commitment=self.provider.connection._commitment,
        filters=filters_to_use,
    )
    for r in resp.value:
        account_data = r.account.data
        all_accounts.append(
            ProgramAccount(
                public_key=r.pubkey,
                account=self._coder.accounts.decode(account_data),
            ),
        )
    return all_accounts

create_instruction(self, signer, size_override=0) async

Return an instruction for creating this account.

Parameters:

Name Type Description Default
signer Keypair

[description]

required
size_override int

Optional override for the account size. Defaults to 0.

0

Returns:

Type Description
Instruction

The instruction to create the account.

Source code in anchorpy/program/namespace/account.py
async def create_instruction(
    self,
    signer: Keypair,
    size_override: int = 0,
) -> Instruction:
    """Return an instruction for creating this account.

    Args:
        signer: [description]
        size_override: Optional override for the account size. Defaults to 0.

    Returns:
        The instruction to create the account.
    """
    space = size_override if size_override else self._size
    mbre_resp = (
        await self._provider.connection.get_minimum_balance_for_rent_exemption(
            space
        )
    )
    return create_account(
        CreateAccountParams(
            from_pubkey=self._provider.wallet.public_key,
            to_pubkey=signer.pubkey(),
            space=space,
            lamports=mbre_resp.value,
            owner=self._program_id,
        )
    )

fetch(self, address, commitment=None) async

Return a deserialized account.

Parameters:

Name Type Description Default
address Pubkey

The address of the account to fetch.

required
commitment Optional[Commitment]

Bank state to query.

None

Exceptions:

Type Description
AccountDoesNotExistError

If the account doesn't exist.

AccountInvalidDiscriminator

If the discriminator doesn't match the IDL.

Source code in anchorpy/program/namespace/account.py
async def fetch(
    self, address: Pubkey, commitment: Optional[Commitment] = None
) -> Container[Any]:
    """Return a deserialized account.

    Args:
        address: The address of the account to fetch.
        commitment: Bank state to query.


    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    account_info = await self._provider.connection.get_account_info(
        address,
        encoding="base64",
        commitment=commitment,
    )
    if not account_info.value:
        raise AccountDoesNotExistError(f"Account {address} does not exist")
    data = account_info.value.data
    discriminator = _account_discriminator(self._idl_account.name)
    if discriminator != data[:ACCOUNT_DISCRIMINATOR_SIZE]:
        msg = f"Account {address} has an invalid discriminator"
        raise AccountInvalidDiscriminator(msg)
    return self._coder.accounts.decode(data)

fetch_multiple(self, addresses, batch_size=300, commitment=None) async

Return multiple deserialized accounts.

Accounts not found or with wrong discriminator are returned as None.

Parameters:

Name Type Description Default
addresses List[solders.pubkey.Pubkey]

The addresses of the accounts to fetch.

required
batch_size int

The number of getMultipleAccounts objects to send in each HTTP request.

300
commitment Optional[Commitment]

Bank state to query.

None
Source code in anchorpy/program/namespace/account.py
async def fetch_multiple(
    self,
    addresses: List[Pubkey],
    batch_size: int = 300,
    commitment: Optional[Commitment] = None,
) -> list[Optional[Container[Any]]]:
    """Return multiple deserialized accounts.

    Accounts not found or with wrong discriminator are returned as None.

    Args:
        addresses: The addresses of the accounts to fetch.
        batch_size: The number of `getMultipleAccounts` objects to send
            in each HTTP request.
        commitment: Bank state to query.
    """
    accounts = await get_multiple_accounts(
        self._provider.connection,
        addresses,
        batch_size=batch_size,
        commitment=commitment,
    )
    discriminator = _account_discriminator(self._idl_account.name)
    result: list[Optional[Container[Any]]] = []
    for account in accounts:
        if account is None:
            result.append(None)
        elif discriminator == account.account.data[:8]:
            result.append(self._coder.accounts.decode(account.account.data))
        else:
            result.append(None)
    return result

ProgramAccount dataclass

Deserialized account owned by a program.

Source code in anchorpy/program/namespace/account.py
@dataclass
class ProgramAccount:
    """Deserialized account owned by a program."""

    public_key: Pubkey
    account: Container

EventParser dataclass

Parser to handle on_logs callbacks.

Source code in anchorpy/program/event.py
@dataclass
class EventParser:
    """Parser to handle on_logs callbacks."""

    program_id: Pubkey
    coder: Coder

    def parse_logs(self, logs: List[str], callback: Callable[[Event], None]) -> None:
        """Parse a list of logs using a provided callback.

        Args:
            logs: The logs to parse.
            callback: The function to handle the parsed log.
        """
        log_scanner = _LogScanner(logs)
        execution = _ExecutionContext(cast(str, log_scanner.to_next()))
        log = log_scanner.to_next()
        while log is not None:
            event, new_program, did_pop = self.handle_log(execution, log)
            if event is not None:
                callback(event)
            if new_program is not None:
                execution.push(new_program)
            if did_pop:
                execution.pop()
            log = log_scanner.to_next()

    def handle_log(
        self,
        execution: _ExecutionContext,
        log: str,
    ) -> tuple[Optional[Event], Optional[str], bool]:
        """Main log handler.

        Args:
            execution: The execution stack.
            log: log string from the RPC node.

        Returns:
            A three element array of the event, the next program
            that was invoked for CPI, and a boolean indicating if
            a program has completed execution (and thus should be popped off the
            execution stack).
        """
        # Executing program is this program.
        if execution.stack and execution.program() == str(self.program_id):
            return self.handle_program_log(log)
        # Executing program is not this program.
        return (None, *self.handle_system_log(log))

    def handle_program_log(
        self, log: str
    ) -> tuple[Optional[Event], Optional[str], bool]:
        """Handle logs from *this* program.

        Args:
            log: log string from the RPC node.

        """
        # This is a `msg!` log or a `sol_log_data!` log.
        if log.startswith(PROGRAM_LOG) or log.startswith(PROGRAM_DATA):
            log_str = (
                log[PROGRAM_LOG_START_INDEX:]
                if log.startswith(PROGRAM_LOG)
                else log[PROGRAM_DATA_START_INDEX:]
            )
            try:
                decoded = b64decode(log_str)
            except binascii.Error:
                return None, None, False
            event = self.coder.events.parse(decoded)
            return event, None, False
        return (None, *self.handle_system_log(log))

    def handle_system_log(self, log: str) -> tuple[Optional[str], bool]:
        """Handle logs when the current program being executing is *not* this.

        Args:
            log: log string from the RPC node.

        """
        log_start = log.split(":")[0]
        splitted = log_start.split(" ")
        invoke_msg = f"Program {str(self.program_id)} invoke"
        if len(splitted) == 3 and splitted[0] == "Program" and splitted[2] == "success":
            return None, True
        if log_start.startswith(invoke_msg):
            return str(self.program_id), False
        if "invoke" in log_start:
            return "cpi", False
        return None, False

handle_log(self, execution, log)

Main log handler.

Parameters:

Name Type Description Default
execution _ExecutionContext

The execution stack.

required
log str

log string from the RPC node.

required

Returns:

Type Description
tuple

A three element array of the event, the next program that was invoked for CPI, and a boolean indicating if a program has completed execution (and thus should be popped off the execution stack).

Source code in anchorpy/program/event.py
def handle_log(
    self,
    execution: _ExecutionContext,
    log: str,
) -> tuple[Optional[Event], Optional[str], bool]:
    """Main log handler.

    Args:
        execution: The execution stack.
        log: log string from the RPC node.

    Returns:
        A three element array of the event, the next program
        that was invoked for CPI, and a boolean indicating if
        a program has completed execution (and thus should be popped off the
        execution stack).
    """
    # Executing program is this program.
    if execution.stack and execution.program() == str(self.program_id):
        return self.handle_program_log(log)
    # Executing program is not this program.
    return (None, *self.handle_system_log(log))

handle_program_log(self, log)

Handle logs from this program.

Parameters:

Name Type Description Default
log str

log string from the RPC node.

required
Source code in anchorpy/program/event.py
def handle_program_log(
    self, log: str
) -> tuple[Optional[Event], Optional[str], bool]:
    """Handle logs from *this* program.

    Args:
        log: log string from the RPC node.

    """
    # This is a `msg!` log or a `sol_log_data!` log.
    if log.startswith(PROGRAM_LOG) or log.startswith(PROGRAM_DATA):
        log_str = (
            log[PROGRAM_LOG_START_INDEX:]
            if log.startswith(PROGRAM_LOG)
            else log[PROGRAM_DATA_START_INDEX:]
        )
        try:
            decoded = b64decode(log_str)
        except binascii.Error:
            return None, None, False
        event = self.coder.events.parse(decoded)
        return event, None, False
    return (None, *self.handle_system_log(log))

handle_system_log(self, log)

Handle logs when the current program being executing is not this.

Parameters:

Name Type Description Default
log str

log string from the RPC node.

required
Source code in anchorpy/program/event.py
def handle_system_log(self, log: str) -> tuple[Optional[str], bool]:
    """Handle logs when the current program being executing is *not* this.

    Args:
        log: log string from the RPC node.

    """
    log_start = log.split(":")[0]
    splitted = log_start.split(" ")
    invoke_msg = f"Program {str(self.program_id)} invoke"
    if len(splitted) == 3 and splitted[0] == "Program" and splitted[2] == "success":
        return None, True
    if log_start.startswith(invoke_msg):
        return str(self.program_id), False
    if "invoke" in log_start:
        return "cpi", False
    return None, False

parse_logs(self, logs, callback)

Parse a list of logs using a provided callback.

Parameters:

Name Type Description Default
logs List[str]

The logs to parse.

required
callback Callable[[anchorpy.program.common.Event], NoneType]

The function to handle the parsed log.

required
Source code in anchorpy/program/event.py
def parse_logs(self, logs: List[str], callback: Callable[[Event], None]) -> None:
    """Parse a list of logs using a provided callback.

    Args:
        logs: The logs to parse.
        callback: The function to handle the parsed log.
    """
    log_scanner = _LogScanner(logs)
    execution = _ExecutionContext(cast(str, log_scanner.to_next()))
    log = log_scanner.to_next()
    while log is not None:
        event, new_program, did_pop = self.handle_log(execution, log)
        if event is not None:
            callback(event)
        if new_program is not None:
            execution.push(new_program)
        if did_pop:
            execution.pop()
        log = log_scanner.to_next()

SimulateResponse (tuple)

The result of a simulate function call.

Source code in anchorpy/program/namespace/simulate.py
class SimulateResponse(NamedTuple):
    """The result of a simulate function call."""

    events: list[Event]
    raw: list[str]

error

This module handles AnchorPy errors.

AccountDoesNotExistError (Exception)

Raise if account doesn't exist.

Source code in anchorpy/error.py
class AccountDoesNotExistError(Exception):
    """Raise if account doesn't exist."""

AccountInvalidDiscriminator (Exception)

Raise if account discriminator doesn't match the IDL.

Source code in anchorpy/error.py
class AccountInvalidDiscriminator(Exception):
    """Raise if account discriminator doesn't match the IDL."""

ArgsError (Exception)

Raise when the incorrect number of args is passed to the RPC function.

Source code in anchorpy/error.py
class ArgsError(Exception):
    """Raise when the incorrect number of args is passed to the RPC function."""

IdlNotFoundError (Exception)

Raise when requested IDL account does not exist.

Source code in anchorpy/error.py
class IdlNotFoundError(Exception):
    """Raise when requested IDL account does not exist."""

ProgramError (Exception)

An error from a user defined program.

Source code in anchorpy/error.py
class ProgramError(Exception):
    """An error from a user defined program."""

    def __init__(
        self, code: int, msg: Optional[str], logs: Optional[list[str]] = None
    ) -> None:
        """Init.

        Args:
            code: The error code.
            msg: The error message.
            logs: The transaction simulation logs.
        """
        self.code = code
        self.msg = msg
        self.logs = logs
        super().__init__(f"{code}: {msg}")

    @classmethod
    def parse(
        cls,
        err_info: RPCError,
        idl_errors: dict[int, str],
        program_id: Pubkey,
    ) -> Optional[ProgramError]:
        """Convert an RPC error into a ProgramError, if possible.

        Args:
            err_info: The RPC error.
            idl_errors: Errors from the IDL file.
            program_id: The ID of the program we expect the error to come from.

        Returns:
            A ProgramError or None.
        """
        extracted = extract_code_and_logs(err_info, program_id)
        if extracted is None:
            return None
        code, logs = extracted
        msg = idl_errors.get(code)
        if msg is not None:
            return cls(code, msg, logs)
        # parse framework internal error
        msg = LangErrorMessage.get(code)
        if msg is not None:
            return cls(code, msg, logs)
        # Unable to parse the error.
        return None

    @classmethod
    def parse_tx_error(
        cls,
        err_info: TransactionErrorType,
        idl_errors: dict[int, str],
        program_id: Pubkey,
        logs: List[str],
    ) -> Optional[ProgramError]:
        """Convert an RPC error into a ProgramError, if possible.

        Args:
            err_info: The RPC error.
            idl_errors: Errors from the IDL file.
            program_id: The ID of the program we expect the error to come from.
            logs: The transaction logs.

        Returns:
            A ProgramError or None.
        """
        code = extract_code_tx_error(err_info, program_id, logs)
        if code is None:
            return None
        msg = idl_errors.get(code)
        if msg is not None:
            return cls(code, msg, logs)
        # parse framework internal error
        msg = LangErrorMessage.get(code)
        if msg is not None:
            return cls(code, msg, logs)
        # Unable to parse the error.
        return None

__init__(self, code, msg, logs=None) special

Init.

Parameters:

Name Type Description Default
code int

The error code.

required
msg Optional[str]

The error message.

required
logs Optional[list[str]]

The transaction simulation logs.

None
Source code in anchorpy/error.py
def __init__(
    self, code: int, msg: Optional[str], logs: Optional[list[str]] = None
) -> None:
    """Init.

    Args:
        code: The error code.
        msg: The error message.
        logs: The transaction simulation logs.
    """
    self.code = code
    self.msg = msg
    self.logs = logs
    super().__init__(f"{code}: {msg}")

parse(err_info, idl_errors, program_id) classmethod

Convert an RPC error into a ProgramError, if possible.

Parameters:

Name Type Description Default
err_info RPCError

The RPC error.

required
idl_errors dict[int, str]

Errors from the IDL file.

required
program_id Pubkey

The ID of the program we expect the error to come from.

required

Returns:

Type Description
Optional[ProgramError]

A ProgramError or None.

Source code in anchorpy/error.py
@classmethod
def parse(
    cls,
    err_info: RPCError,
    idl_errors: dict[int, str],
    program_id: Pubkey,
) -> Optional[ProgramError]:
    """Convert an RPC error into a ProgramError, if possible.

    Args:
        err_info: The RPC error.
        idl_errors: Errors from the IDL file.
        program_id: The ID of the program we expect the error to come from.

    Returns:
        A ProgramError or None.
    """
    extracted = extract_code_and_logs(err_info, program_id)
    if extracted is None:
        return None
    code, logs = extracted
    msg = idl_errors.get(code)
    if msg is not None:
        return cls(code, msg, logs)
    # parse framework internal error
    msg = LangErrorMessage.get(code)
    if msg is not None:
        return cls(code, msg, logs)
    # Unable to parse the error.
    return None

parse_tx_error(err_info, idl_errors, program_id, logs) classmethod

Convert an RPC error into a ProgramError, if possible.

Parameters:

Name Type Description Default
err_info TransactionErrorType

The RPC error.

required
idl_errors dict[int, str]

Errors from the IDL file.

required
program_id Pubkey

The ID of the program we expect the error to come from.

required
logs List[str]

The transaction logs.

required

Returns:

Type Description
Optional[ProgramError]

A ProgramError or None.

Source code in anchorpy/error.py
@classmethod
def parse_tx_error(
    cls,
    err_info: TransactionErrorType,
    idl_errors: dict[int, str],
    program_id: Pubkey,
    logs: List[str],
) -> Optional[ProgramError]:
    """Convert an RPC error into a ProgramError, if possible.

    Args:
        err_info: The RPC error.
        idl_errors: Errors from the IDL file.
        program_id: The ID of the program we expect the error to come from.
        logs: The transaction logs.

    Returns:
        A ProgramError or None.
    """
    code = extract_code_tx_error(err_info, program_id, logs)
    if code is None:
        return None
    msg = idl_errors.get(code)
    if msg is not None:
        return cls(code, msg, logs)
    # parse framework internal error
    msg = LangErrorMessage.get(code)
    if msg is not None:
        return cls(code, msg, logs)
    # Unable to parse the error.
    return None

extract_code_and_logs(err_info, program_id)

Extract the custom instruction error code from an RPC response.

Parameters:

Name Type Description Default
err_info RPCError

The RPC error.

required
program_id Pubkey

The ID of the program we expect the error to come from.

required
Source code in anchorpy/error.py
def extract_code_and_logs(
    err_info: RPCError, program_id: Pubkey
) -> Optional[Tuple[int, List[str]]]:
    """Extract the custom instruction error code from an RPC response.

    Args:
        err_info: The RPC error.
        program_id: The ID of the program we expect the error to come from.
    """
    if isinstance(err_info, SendTransactionPreflightFailureMessage):
        err_data = err_info.data
        err_data_err = err_data.err
        logs = err_data.logs
        if logs is None:
            return None
        if err_data_err is None:
            return None
        maybe_code = _handle_ix_err(err_data_err, logs, program_id)
        return None if maybe_code is None else (maybe_code, logs)
    return None

extract_code_tx_error(err_info, program_id, logs)

Extract the custom instruction error code from a transaction error.

Parameters:

Name Type Description Default
err_info TransactionErrorType

The tx error.

required
program_id Pubkey

The ID of the program we expect the error to come from.

required
logs List[str]

The tx logs.

required
Source code in anchorpy/error.py
def extract_code_tx_error(
    err_info: TransactionErrorType, program_id: Pubkey, logs: List[str]
) -> Optional[int]:
    """Extract the custom instruction error code from a transaction error.

    Args:
        err_info: The tx error.
        program_id: The ID of the program we expect the error to come from.
        logs: The tx logs.
    """
    return _handle_ix_err(err_info, logs, program_id)

utils special

Various utility functions.

rpc

This module contains the invoke function.

AccountInfo (tuple)

Information describing an account.

Attributes:

Name Type Description
executable bool

True if this account's data contains a loaded program.

owner Pubkey

Identifier of the program that owns the account.

lamports int

Number of lamports assigned to the account.

data bytes

Optional data assigned to the account.

rent_epoch Optional[int]

Optional rent epoch info for for account.

Source code in anchorpy/utils/rpc.py
class AccountInfo(NamedTuple):
    """Information describing an account.

    Attributes:
        executable: `True` if this account's data contains a loaded program.
        owner: Identifier of the program that owns the account.
        lamports: Number of lamports assigned to the account.
        data: Optional data assigned to the account.
        rent_epoch: Optional rent epoch info for for account.
    """

    executable: bool
    owner: Pubkey
    lamports: int
    data: bytes
    rent_epoch: Optional[int]

get_multiple_accounts(connection, pubkeys, batch_size=3, commitment=None) async

Fetch multiple account infos through batched getMultipleAccount RPC requests.

Parameters:

Name Type Description Default
connection AsyncClient

The solana-py client object.

required
pubkeys list

Pubkeys to fetch.

required
batch_size int

The number of getMultipleAccount objects to include in each HTTP request.

3
commitment Optional[Commitment]

Bank state to query.

None

Returns:

Type Description
list

Account infos and pubkeys.

Source code in anchorpy/utils/rpc.py
async def get_multiple_accounts(
    connection: AsyncClient,
    pubkeys: list[Pubkey],
    batch_size: int = 3,
    commitment: Optional[Commitment] = None,
) -> list[Optional[_MultipleAccountsItem]]:
    """Fetch multiple account infos through batched `getMultipleAccount` RPC requests.

    Args:
        connection: The `solana-py` client object.
        pubkeys: Pubkeys to fetch.
        batch_size: The number of `getMultipleAccount` objects to include in each
            HTTP request.
        commitment: Bank state to query.

    Returns:
        Account infos and pubkeys.
    """
    pubkeys_per_network_request = _GET_MULTIPLE_ACCOUNTS_LIMIT * batch_size
    chunks = partition_all(pubkeys_per_network_request, pubkeys)
    awaitables = [
        _get_multiple_accounts_core(connection, pubkeys_chunk, commitment)
        for pubkeys_chunk in chunks
    ]
    results = await gather(*awaitables, return_exceptions=False)
    return list(concat(results))

invoke(program_id, provider, accounts=None, data=None) async

Send a transaction to a program with the given accounts and instruction data.

Parameters:

Name Type Description Default
program_id Union[solders.pubkey.Pubkey, str]

The program ID

required
provider Provider

the Provider instance.

required
accounts Optional[list[solders.instruction.AccountMeta]]

AccountMeta objects.

None
data Optional[bytes]

The transaction data.

None

Returns:

Type Description
Signature

The transaction signature.

Source code in anchorpy/utils/rpc.py
async def invoke(
    program_id: AddressType,
    provider: Provider,
    accounts: Optional[list[AccountMeta]] = None,
    data: Optional[bytes] = None,
) -> Signature:
    """Send a transaction to a program with the given accounts and instruction data.

    Args:
        program_id: The program ID
        provider: the `Provider` instance.
        accounts: `AccountMeta` objects.
        data: The transaction data.

    Returns:
        The transaction signature.
    """
    translated_program_id = translate_address(program_id)
    tx = Transaction()
    tx.add(
        Instruction(
            program_id=translated_program_id,
            accounts=[] if accounts is None else accounts,
            data=bytes(0) if data is None else data,
        ),
    )
    return await provider.send(tx)

token

This module contains utilities for the SPL Token Program.

create_mint_and_vault(provider, amount, owner=None, decimals=None) async

Create a mint and a vault, then mint tokens to the vault.

Parameters:

Name Type Description Default
provider Provider

An anchorpy Provider instance.

required
amount int

The amount of tokens to mint to the vault.

required
owner Optional[solders.pubkey.Pubkey]

User account that will own the new account.

None
decimals Optional[int]

The number of decimal places for the token to support.

None

Returns:

Type Description
tuple

The mint and vault pubkeys.

Source code in anchorpy/utils/token.py
async def create_mint_and_vault(
    provider: Provider,
    amount: int,
    owner: Optional[Pubkey] = None,
    decimals: Optional[int] = None,
) -> tuple[Pubkey, Pubkey]:
    """Create a mint and a vault, then mint tokens to the vault.

    Args:
        provider: An anchorpy Provider instance.
        amount: The amount of tokens to mint to the vault.
        owner: User account that will own the new account.
        decimals: The number of decimal places for the token to support.

    Returns:
        The mint and vault pubkeys.
    """
    actual_owner = provider.wallet.public_key if owner is None else owner
    mint = Keypair()
    vault = Keypair()
    mint_space = 82
    create_mint_mbre_resp = (
        await provider.connection.get_minimum_balance_for_rent_exemption(mint_space)
    )
    create_mint_mbre = create_mint_mbre_resp.value
    create_mint_account_params = CreateAccountParams(
        from_pubkey=provider.wallet.public_key,
        to_pubkey=mint.pubkey(),
        space=mint_space,
        lamports=create_mint_mbre,
        owner=TOKEN_PROGRAM_ID,
    )
    create_mint_account_instruction = create_account(
        create_mint_account_params,
    )
    init_mint_instruction = initialize_mint(
        InitializeMintParams(
            mint=mint.pubkey(),
            decimals=0 if decimals is None else decimals,
            mint_authority=provider.wallet.public_key,
            program_id=TOKEN_PROGRAM_ID,
        ),
    )
    vault_space = 165
    create_vault_mbre_resp = (
        await provider.connection.get_minimum_balance_for_rent_exemption(vault_space)
    )
    create_vault_mbre = create_vault_mbre_resp.value
    create_vault_account_instruction = create_account(
        CreateAccountParams(
            from_pubkey=provider.wallet.public_key,
            to_pubkey=vault.pubkey(),
            space=vault_space,
            lamports=create_vault_mbre,
            owner=TOKEN_PROGRAM_ID,
        ),
    )
    init_vault_instruction = initialize_account(
        InitializeAccountParams(
            program_id=TOKEN_PROGRAM_ID,
            account=vault.pubkey(),
            mint=mint.pubkey(),
            owner=actual_owner,
        ),
    )
    mint_to_instruction = mint_to(
        MintToParams(
            program_id=TOKEN_PROGRAM_ID,
            mint=mint.pubkey(),
            dest=vault.pubkey(),
            amount=amount,
            mint_authority=provider.wallet.public_key,
        ),
    )
    blockhash = (
        await provider.connection.get_latest_blockhash(Confirmed)
    ).value.blockhash
    msg = Message.new_with_blockhash(
        [
            create_mint_account_instruction,
            init_mint_instruction,
            create_vault_account_instruction,
            init_vault_instruction,
            mint_to_instruction,
        ],
        provider.wallet.public_key,
        blockhash,
    )
    tx = VersionedTransaction(msg, [provider.wallet.payer, mint, vault])
    await provider.send(tx)
    return mint.pubkey(), vault.pubkey()

create_token_account(prov, mint, owner) async

Create a token account.

Parameters:

Name Type Description Default
prov Provider

An anchorpy Provider instance.

required
mint Pubkey

The pubkey of the token's mint.

required
owner Pubkey

User account that will own the new account.

required

Returns:

Type Description
Pubkey

The pubkey of the new account.

Source code in anchorpy/utils/token.py
async def create_token_account(
    prov: Provider,
    mint: Pubkey,
    owner: Pubkey,
) -> Pubkey:
    """Create a token account.

    Args:
        prov: An anchorpy Provider instance.
        mint: The pubkey of the token's mint.
        owner: User account that will own the new account.

    Returns:
        The pubkey of the new account.
    """
    token = AsyncToken(prov.connection, mint, TOKEN_PROGRAM_ID, prov.wallet.payer)
    return await token.create_account(owner)

create_token_account_instrs(provider, new_account_pubkey, mint, owner) async

Generate instructions for creating a token account.

Parameters:

Name Type Description Default
provider Provider

An anchorpy Provider instance.

required
new_account_pubkey Pubkey

The pubkey of the new account.

required
mint Pubkey

The pubkey of the token's mint.

required
owner Pubkey

User account that will own the new account.

required

Returns:

Type Description
tuple

Transaction instructions to create the new account.

Source code in anchorpy/utils/token.py
async def create_token_account_instrs(
    provider: Provider,
    new_account_pubkey: Pubkey,
    mint: Pubkey,
    owner: Pubkey,
) -> tuple[Instruction, Instruction]:
    """Generate instructions for creating a token account.

    Args:
        provider: An anchorpy Provider instance.
        new_account_pubkey: The pubkey of the new account.
        mint: The pubkey of the token's mint.
        owner: User account that will own the new account.

    Returns:
        Transaction instructions to create the new account.
    """
    mbre_resp = await provider.connection.get_minimum_balance_for_rent_exemption(165)
    lamports = mbre_resp.value
    return (
        create_account(
            CreateAccountParams(
                from_pubkey=provider.wallet.public_key,
                to_pubkey=new_account_pubkey,
                space=165,
                lamports=lamports,
                owner=TOKEN_PROGRAM_ID,
            )
        ),
        initialize_account(
            InitializeAccountParams(
                account=new_account_pubkey,
                mint=mint,
                owner=owner,
                program_id=TOKEN_PROGRAM_ID,
            )
        ),
    )

get_mint_info(provider, addr) async

Retrieve mint information.

Parameters:

Name Type Description Default
provider Provider

The anchorpy Provider instance.

required
addr Pubkey

The pubkey of the mint.

required

Returns:

Type Description
MintInfo

The parsed MintInfo.

Source code in anchorpy/utils/token.py
async def get_mint_info(
    provider: Provider,
    addr: Pubkey,
) -> MintInfo:
    """Retrieve mint information.

    Args:
        provider: The anchorpy Provider instance.
        addr: The pubkey of the mint.

    Returns:
        The parsed `MintInfo`.
    """
    depositor_acc_info_raw = await provider.connection.get_account_info(addr)
    return parse_mint_account(depositor_acc_info_raw)

get_token_account(provider, addr) async

Retrieve token account information.

Parameters:

Name Type Description Default
provider Provider

The anchorpy Provider instance.

required
addr Pubkey

The pubkey of the token account.

required

Returns:

Type Description
AccountInfo

The parsed AccountInfo of the token account.

Source code in anchorpy/utils/token.py
async def get_token_account(provider: Provider, addr: Pubkey) -> AccountInfo:
    """Retrieve token account information.

    Args:
        provider: The anchorpy Provider instance.
        addr: The pubkey of the token account.

    Returns:
        The parsed `AccountInfo` of the token account.
    """
    depositor_acc_info_raw = await provider.connection.get_account_info(addr)
    return parse_token_account(depositor_acc_info_raw)

parse_mint_account(info)

Parse raw RPC response into MintInfo.

Parameters:

Name Type Description Default
info GetAccountInfoResp

The RPC response from calling .get_account_info for the mint pubkey.

required

Exceptions:

Type Description
AttributeError

If the account is not owned by the Token Program.

ValueError

If the fetched data is the wrong size.

Returns:

Type Description
MintInfo

The parsed MintInfo.

Source code in anchorpy/utils/token.py
def parse_mint_account(info: GetAccountInfoResp) -> MintInfo:
    """Parse raw RPC response into `MintInfo`.

    Args:
        info: The RPC response from calling `.get_account_info` for the mint pubkey.

    Raises:
        AttributeError: If the account is not owned by the Token Program.
        ValueError: If the fetched data is the wrong size.

    Returns:
        The parsed `MintInfo`.
    """
    val = info.value
    if val is None:
        raise ValueError("Account does not exist.")
    owner = val.owner
    if owner != TOKEN_PROGRAM_ID:
        raise AttributeError(f"Invalid mint owner: {owner}")

    bytes_data = val.data
    if len(bytes_data) != MINT_LAYOUT.sizeof():
        raise ValueError("Invalid mint size")

    decoded_data = MINT_LAYOUT.parse(bytes_data)
    decimals = decoded_data.decimals

    mint_authority = (
        None
        if decoded_data.mint_authority_option == 0
        else Pubkey(decoded_data.mint_authority)
    )

    supply = decoded_data.supply
    is_initialized = decoded_data.is_initialized != 0

    if decoded_data.freeze_authority_option == 0:
        freeze_authority = None
    else:
        freeze_authority = Pubkey(decoded_data.freeze_authority)

    return MintInfo(mint_authority, supply, decimals, is_initialized, freeze_authority)

parse_token_account(info)

Parse AccountInfo from RPC response.

Parameters:

Name Type Description Default
info GetAccountInfoResp

the get_account_info RPC response.

required

Exceptions:

Type Description
ValueError

If the fetched data is the wrong size.

AttributeError

If the account is not owned by the token program.

Returns:

Type Description
AccountInfo

The parsed AccountInfo.

Source code in anchorpy/utils/token.py
def parse_token_account(info: GetAccountInfoResp) -> AccountInfo:
    """Parse `AccountInfo` from RPC response.

    Args:
        info: the `get_account_info` RPC response.

    Raises:
        ValueError: If the fetched data is the wrong size.
        AttributeError: If the account is not owned by the token program.

    Returns:
        The parsed `AccountInfo`.
    """
    val = info.value
    if not val:
        raise ValueError("Invalid account owner")

    if val.owner != TOKEN_PROGRAM_ID:
        raise AttributeError("Invalid account owner")

    bytes_data = val.data
    if len(bytes_data) != ACCOUNT_LAYOUT.sizeof():
        raise ValueError("Invalid account size")

    decoded_data = ACCOUNT_LAYOUT.parse(bytes_data)

    mint = Pubkey(decoded_data.mint)
    owner = Pubkey(decoded_data.owner)
    amount = decoded_data.amount

    if decoded_data.delegate_option == 0:
        delegate = None
        delegated_amount = 0
    else:
        delegate = Pubkey(decoded_data.delegate)
        delegated_amount = decoded_data.delegated_amount

    is_initialized = decoded_data.state != 0
    is_frozen = decoded_data.state == 2

    if decoded_data.is_native_option == 1:
        rent_exempt_reserve = decoded_data.is_native
        is_native = True
    else:
        rent_exempt_reserve = None
        is_native = False

    if decoded_data.close_authority_option == 0:
        close_authority = None
    else:
        close_authority = Pubkey(decoded_data.owner)

    return AccountInfo(
        mint,
        owner,
        amount,
        delegate,
        delegated_amount,
        is_initialized,
        is_frozen,
        is_native,
        rent_exempt_reserve,
        close_authority,
    )