API Reference
Program
Program provides the IDL deserialized client representation of an Anchor program.
This API is the one stop shop for all things related to communicating with on-chain programs. Among other things, one can send transactions, fetch deserialized accounts, decode instruction data, subscribe to account changes, and listen to events.
In addition to field accessors and methods, the object provides a set of dynamically generated properties, also known as namespaces, that map one-to-one to program methods and accounts.
Source code in anchorpy/program/core.py
class Program(object):
"""Program provides the IDL deserialized client representation of an Anchor program.
This API is the one stop shop for all things related to communicating with
on-chain programs. Among other things, one can send transactions, fetch
deserialized accounts, decode instruction data, subscribe to account
changes, and listen to events.
In addition to field accessors and methods, the object provides a set of
dynamically generated properties, also known as namespaces, that
map one-to-one to program methods and accounts.
"""
def __init__(
self, idl: Idl, program_id: Pubkey, provider: Optional[Provider] = None
):
"""Initialize the Program object.
Args:
idl: The parsed IDL object.
program_id: The program ID.
provider: The Provider object for the Program. Defaults to Provider.local().
"""
self.idl = idl
self.program_id = program_id
self.provider = provider if provider is not None else Provider.local()
self.coder = Coder(idl)
(
rpc,
instruction,
transaction,
account,
simulate,
types,
methods,
) = _build_namespace(
idl,
self.coder,
program_id,
self.provider,
)
self.rpc = rpc
self.instruction = instruction
self.transaction = transaction
self.account = account
self.simulate = simulate
self.type = types
self.methods = methods
async def __aenter__(self) -> Program:
"""Use as a context manager."""
await self.provider.__aenter__()
return self
async def __aexit__(self, _exc_type, _exc, _tb):
"""Exit the context manager."""
await self.close()
async def close(self) -> None:
"""Use this when you are done with the client."""
await self.provider.close()
@staticmethod
async def fetch_raw_idl(
address: AddressType,
provider: Provider,
) -> str:
"""Fetch an idl from the blockchain as a raw JSON dictionary.
Args:
address: The program ID.
provider: The network and wallet context.
Raises:
IdlNotFoundError: If the requested IDL account does not exist.
Returns:
str: The raw IDL.
"""
program_id = translate_address(address)
actual_provider = provider if provider is not None else Provider.local()
idl_addr = _idl_address(program_id)
account_info = await actual_provider.connection.get_account_info(idl_addr)
account_info_val = account_info.value
if account_info_val is None:
raise IdlNotFoundError(f"IDL not found for program: {address}")
idl_account = _decode_idl_account(
account_info_val.data[ACCOUNT_DISCRIMINATOR_SIZE:]
)
return _pako_inflate(bytes(idl_account["data"])).decode()
@classmethod
async def fetch_idl(
cls,
address: AddressType,
provider: Provider,
) -> Idl:
"""Fetch and parse an idl from the blockchain.
Args:
address: The program ID.
provider: The network and wallet context.
Returns:
Idl: The fetched IDL.
"""
raw = await cls.fetch_raw_idl(address, provider)
return Idl.from_json(raw)
@classmethod
async def at(
cls,
address: AddressType,
provider: Optional[Provider] = None,
) -> Program:
"""Generate a Program client by fetching the IDL from the network.
In order to use this method, an IDL must have been previously initialized
via the anchor CLI's `anchor idl init` command.
Args:
address: The program ID.
provider: The network and wallet context.
Returns:
The Program instantiated using the fetched IDL.
"""
provider_to_use = Provider.local() if provider is None else provider
program_id = translate_address(address)
idl = await cls.fetch_idl(program_id, provider_to_use)
return cls(idl, program_id, provider)
__init__(self, idl, program_id, provider=None)
special
Initialize the Program object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idl |
Idl |
The parsed IDL object. |
required |
program_id |
Pubkey |
The program ID. |
required |
provider |
Optional[Provider] |
The Provider object for the Program. Defaults to Provider.local(). |
None |
Source code in anchorpy/program/core.py
def __init__(
self, idl: Idl, program_id: Pubkey, provider: Optional[Provider] = None
):
"""Initialize the Program object.
Args:
idl: The parsed IDL object.
program_id: The program ID.
provider: The Provider object for the Program. Defaults to Provider.local().
"""
self.idl = idl
self.program_id = program_id
self.provider = provider if provider is not None else Provider.local()
self.coder = Coder(idl)
(
rpc,
instruction,
transaction,
account,
simulate,
types,
methods,
) = _build_namespace(
idl,
self.coder,
program_id,
self.provider,
)
self.rpc = rpc
self.instruction = instruction
self.transaction = transaction
self.account = account
self.simulate = simulate
self.type = types
self.methods = methods
at(address, provider=None)
async
classmethod
Generate a Program client by fetching the IDL from the network.
In order to use this method, an IDL must have been previously initialized
via the anchor CLI's anchor idl init
command.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
AddressType |
The program ID. |
required |
provider |
Optional[Provider] |
The network and wallet context. |
None |
Returns:
Type | Description |
---|---|
Program |
The Program instantiated using the fetched IDL. |
Source code in anchorpy/program/core.py
@classmethod
async def at(
cls,
address: AddressType,
provider: Optional[Provider] = None,
) -> Program:
"""Generate a Program client by fetching the IDL from the network.
In order to use this method, an IDL must have been previously initialized
via the anchor CLI's `anchor idl init` command.
Args:
address: The program ID.
provider: The network and wallet context.
Returns:
The Program instantiated using the fetched IDL.
"""
provider_to_use = Provider.local() if provider is None else provider
program_id = translate_address(address)
idl = await cls.fetch_idl(program_id, provider_to_use)
return cls(idl, program_id, provider)
close(self)
async
Use this when you are done with the client.
Source code in anchorpy/program/core.py
async def close(self) -> None:
"""Use this when you are done with the client."""
await self.provider.close()
fetch_idl(address, provider)
async
classmethod
Fetch and parse an idl from the blockchain.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
AddressType |
The program ID. |
required |
provider |
Provider |
The network and wallet context. |
required |
Returns:
Type | Description |
---|---|
Idl |
The fetched IDL. |
Source code in anchorpy/program/core.py
@classmethod
async def fetch_idl(
cls,
address: AddressType,
provider: Provider,
) -> Idl:
"""Fetch and parse an idl from the blockchain.
Args:
address: The program ID.
provider: The network and wallet context.
Returns:
Idl: The fetched IDL.
"""
raw = await cls.fetch_raw_idl(address, provider)
return Idl.from_json(raw)
fetch_raw_idl(address, provider)
async
staticmethod
Fetch an idl from the blockchain as a raw JSON dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
AddressType |
The program ID. |
required |
provider |
Provider |
The network and wallet context. |
required |
Exceptions:
Type | Description |
---|---|
IdlNotFoundError |
If the requested IDL account does not exist. |
Returns:
Type | Description |
---|---|
str |
The raw IDL. |
Source code in anchorpy/program/core.py
@staticmethod
async def fetch_raw_idl(
address: AddressType,
provider: Provider,
) -> str:
"""Fetch an idl from the blockchain as a raw JSON dictionary.
Args:
address: The program ID.
provider: The network and wallet context.
Raises:
IdlNotFoundError: If the requested IDL account does not exist.
Returns:
str: The raw IDL.
"""
program_id = translate_address(address)
actual_provider = provider if provider is not None else Provider.local()
idl_addr = _idl_address(program_id)
account_info = await actual_provider.connection.get_account_info(idl_addr)
account_info_val = account_info.value
if account_info_val is None:
raise IdlNotFoundError(f"IDL not found for program: {address}")
idl_account = _decode_idl_account(
account_info_val.data[ACCOUNT_DISCRIMINATOR_SIZE:]
)
return _pako_inflate(bytes(idl_account["data"])).decode()
Provider
The network and wallet context used to send transactions paid for and signed by the provider.
Source code in anchorpy/provider.py
class Provider:
"""The network and wallet context used to send transactions paid for and signed by the provider.""" # noqa: E501
def __init__(
self,
connection: AsyncClient,
wallet: Wallet,
opts: types.TxOpts = DEFAULT_OPTIONS,
) -> None:
"""Initialize the Provider.
Args:
connection: The cluster connection where the program is deployed.
wallet: The wallet used to pay for and sign all transactions.
opts: Transaction confirmation options to use by default.
"""
self.connection = connection
self.wallet = wallet
self.opts = opts
@classmethod
def local(
cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
) -> Provider:
"""Create a `Provider` with a wallet read from the local filesystem.
Args:
url: The network cluster url.
opts: The default transaction confirmation options.
"""
connection = AsyncClient(url, opts.preflight_commitment)
wallet = Wallet.local()
return cls(connection, wallet, opts)
@classmethod
def readonly(
cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
) -> Provider:
"""Create a `Provider` that can only fetch data, not send transactions.
Args:
url: The network cluster url.
opts: The default transaction confirmation options.
"""
connection = AsyncClient(url, opts.preflight_commitment)
wallet = Wallet.dummy()
return cls(connection, wallet, opts)
@classmethod
def env(cls) -> Provider:
"""Create a `Provider` using the `ANCHOR_PROVIDER_URL` environment variable."""
url = environ["ANCHOR_PROVIDER_URL"]
options = DEFAULT_OPTIONS
connection = AsyncClient(url, options.preflight_commitment)
wallet = Wallet.local()
return cls(connection, wallet, options)
async def simulate(
self,
tx: Union[Transaction, VersionedTransaction],
opts: Optional[types.TxOpts] = None,
) -> SimulateTransactionResp:
"""Simulate the given transaction, returning emitted logs from execution.
Args:
tx: The transaction to send.
signers: The set of signers in addition to the provider wallet that will
sign the transaction.
opts: Transaction confirmation options.
Returns:
The transaction simulation result.
"""
if opts is None:
opts = self.opts
return await self.connection.simulate_transaction(
tx, sig_verify=True, commitment=opts.preflight_commitment
)
async def send(
self,
tx: Union[Transaction, VersionedTransaction],
opts: Optional[types.TxOpts] = None,
) -> Signature:
"""Send the given transaction, paid for and signed by the provider's wallet.
Args:
tx: The transaction to send.
signers: The set of signers in addition to the provider wallet that will
sign the transaction.
opts: Transaction confirmation options.
Returns:
The transaction signature from the RPC server.
"""
if opts is None:
opts = self.opts
raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
resp = await self.connection.send_raw_transaction(raw, opts=opts)
return resp.value
async def send_all(
self,
txs: Sequence[Union[Transaction, VersionedTransaction]],
opts: Optional[types.TxOpts] = None,
) -> list[Signature]:
"""Similar to `send`, but for an array of transactions and signers.
Args:
txs: a list of transaction objects.
opts: Transaction confirmation options.
Returns:
The transaction signatures from the RPC server.
"""
if opts is None:
opts = self.opts
sigs = []
for tx in txs:
raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
resp = await self.connection.send_raw_transaction(raw, opts=opts)
sigs.append(resp.value)
return sigs
async def __aenter__(self) -> Provider:
"""Use as a context manager."""
await self.connection.__aenter__()
return self
async def __aexit__(self, _exc_type, _exc, _tb):
"""Exit the context manager."""
await self.close()
async def close(self) -> None:
"""Use this when you are done with the connection."""
await self.connection.close()
__init__(self, connection, wallet, opts=TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None))
special
Initialize the Provider.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection |
AsyncClient |
The cluster connection where the program is deployed. |
required |
wallet |
Wallet |
The wallet used to pay for and sign all transactions. |
required |
opts |
types.TxOpts |
Transaction confirmation options to use by default. |
TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None) |
Source code in anchorpy/provider.py
def __init__(
self,
connection: AsyncClient,
wallet: Wallet,
opts: types.TxOpts = DEFAULT_OPTIONS,
) -> None:
"""Initialize the Provider.
Args:
connection: The cluster connection where the program is deployed.
wallet: The wallet used to pay for and sign all transactions.
opts: Transaction confirmation options to use by default.
"""
self.connection = connection
self.wallet = wallet
self.opts = opts
close(self)
async
Use this when you are done with the connection.
Source code in anchorpy/provider.py
async def close(self) -> None:
"""Use this when you are done with the connection."""
await self.connection.close()
env()
classmethod
Create a Provider
using the ANCHOR_PROVIDER_URL
environment variable.
Source code in anchorpy/provider.py
@classmethod
def env(cls) -> Provider:
"""Create a `Provider` using the `ANCHOR_PROVIDER_URL` environment variable."""
url = environ["ANCHOR_PROVIDER_URL"]
options = DEFAULT_OPTIONS
connection = AsyncClient(url, options.preflight_commitment)
wallet = Wallet.local()
return cls(connection, wallet, options)
local(url=None, opts=TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None))
classmethod
Create a Provider
with a wallet read from the local filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
Optional[str] |
The network cluster url. |
None |
opts |
types.TxOpts |
The default transaction confirmation options. |
TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None) |
Source code in anchorpy/provider.py
@classmethod
def local(
cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
) -> Provider:
"""Create a `Provider` with a wallet read from the local filesystem.
Args:
url: The network cluster url.
opts: The default transaction confirmation options.
"""
connection = AsyncClient(url, opts.preflight_commitment)
wallet = Wallet.local()
return cls(connection, wallet, opts)
readonly(url=None, opts=TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None))
classmethod
Create a Provider
that can only fetch data, not send transactions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
Optional[str] |
The network cluster url. |
None |
opts |
types.TxOpts |
The default transaction confirmation options. |
TxOpts(skip_confirmation=False, skip_preflight=False, preflight_commitment='processed', max_retries=None, last_valid_block_height=None) |
Source code in anchorpy/provider.py
@classmethod
def readonly(
cls, url: Optional[str] = None, opts: types.TxOpts = DEFAULT_OPTIONS
) -> Provider:
"""Create a `Provider` that can only fetch data, not send transactions.
Args:
url: The network cluster url.
opts: The default transaction confirmation options.
"""
connection = AsyncClient(url, opts.preflight_commitment)
wallet = Wallet.dummy()
return cls(connection, wallet, opts)
send(self, tx, opts=None)
async
Send the given transaction, paid for and signed by the provider's wallet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tx |
Union[Transaction, VersionedTransaction] |
The transaction to send. |
required |
signers |
The set of signers in addition to the provider wallet that will sign the transaction. |
required | |
opts |
Optional[types.TxOpts] |
Transaction confirmation options. |
None |
Returns:
Type | Description |
---|---|
Signature |
The transaction signature from the RPC server. |
Source code in anchorpy/provider.py
async def send(
self,
tx: Union[Transaction, VersionedTransaction],
opts: Optional[types.TxOpts] = None,
) -> Signature:
"""Send the given transaction, paid for and signed by the provider's wallet.
Args:
tx: The transaction to send.
signers: The set of signers in addition to the provider wallet that will
sign the transaction.
opts: Transaction confirmation options.
Returns:
The transaction signature from the RPC server.
"""
if opts is None:
opts = self.opts
raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
resp = await self.connection.send_raw_transaction(raw, opts=opts)
return resp.value
send_all(self, txs, opts=None)
async
Similar to send
, but for an array of transactions and signers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
txs |
Sequence[Union[Transaction, VersionedTransaction]] |
a list of transaction objects. |
required |
opts |
Optional[types.TxOpts] |
Transaction confirmation options. |
None |
Returns:
Type | Description |
---|---|
list[Signature] |
The transaction signatures from the RPC server. |
Source code in anchorpy/provider.py
async def send_all(
self,
txs: Sequence[Union[Transaction, VersionedTransaction]],
opts: Optional[types.TxOpts] = None,
) -> list[Signature]:
"""Similar to `send`, but for an array of transactions and signers.
Args:
txs: a list of transaction objects.
opts: Transaction confirmation options.
Returns:
The transaction signatures from the RPC server.
"""
if opts is None:
opts = self.opts
sigs = []
for tx in txs:
raw = tx.serialize() if isinstance(tx, Transaction) else bytes(tx)
resp = await self.connection.send_raw_transaction(raw, opts=opts)
sigs.append(resp.value)
return sigs
simulate(self, tx, opts=None)
async
Simulate the given transaction, returning emitted logs from execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tx |
Union[Transaction, VersionedTransaction] |
The transaction to send. |
required |
signers |
The set of signers in addition to the provider wallet that will sign the transaction. |
required | |
opts |
Optional[types.TxOpts] |
Transaction confirmation options. |
None |
Returns:
Type | Description |
---|---|
SimulateTransactionResp |
The transaction simulation result. |
Source code in anchorpy/provider.py
async def simulate(
self,
tx: Union[Transaction, VersionedTransaction],
opts: Optional[types.TxOpts] = None,
) -> SimulateTransactionResp:
"""Simulate the given transaction, returning emitted logs from execution.
Args:
tx: The transaction to send.
signers: The set of signers in addition to the provider wallet that will
sign the transaction.
opts: Transaction confirmation options.
Returns:
The transaction simulation result.
"""
if opts is None:
opts = self.opts
return await self.connection.simulate_transaction(
tx, sig_verify=True, commitment=opts.preflight_commitment
)
Context
dataclass
Context provides all non-argument inputs for generating Anchor transactions.
Attributes:
Name | Type | Description |
---|---|---|
accounts |
Dict[str, Any] |
The accounts used in the instruction context. |
remaining_accounts |
List[solders.instruction.AccountMeta] |
All accounts to pass into an instruction after the main |
signers |
List[solders.keypair.Keypair] |
Accounts that must sign a given transaction. |
pre_instructions |
List[solders.instruction.Instruction] |
Instructions to run before a given method. Often this is used, for example to create accounts prior to executing a method. |
post_instructions |
List[solders.instruction.Instruction] |
Instructions to run after a given method. Often this is used, for example to close accounts prior to executing a method. |
options |
Optional[solana.rpc.types.TxOpts] |
Commitment parameters to use for a transaction. |
Source code in anchorpy/program/context.py
@dataclass
class Context:
"""Context provides all non-argument inputs for generating Anchor transactions.
Attributes:
accounts: The accounts used in the instruction context.
remaining_accounts: All accounts to pass into an instruction *after* the main
`accounts`. This can be used for optional or otherwise unknown accounts.
signers: Accounts that must sign a given transaction.
pre_instructions: Instructions to run *before* a given method. Often this is
used, for example to create accounts prior to executing a method.
post_instructions: Instructions to run *after* a given method. Often this is
used, for example to close accounts prior to executing a method.
options: Commitment parameters to use for a transaction.
"""
# For some reason mkdocstrings doesn't understand the full type hint
# here if we use list[Instruction] instead of typing.List.
# Weirdly there are other places where it understands list[whatever].
accounts: Accounts = field(default_factory=dict)
remaining_accounts: List[AccountMeta] = field(default_factory=list)
signers: List[Keypair] = field(default_factory=list)
pre_instructions: List[Instruction] = field(default_factory=list)
post_instructions: List[Instruction] = field(default_factory=list)
options: Optional[TxOpts] = None
create_workspace(path=None, url=None)
Get a workspace from the provided path to the project root.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[pathlib.Path, str] |
The path to the project root. Defaults to the current working directory if omitted. |
None |
url |
Optional[str] |
The URL of the JSON RPC. Defaults to http://localhost:8899. |
None |
Returns:
Type | Description |
---|---|
Dict[str, anchorpy.program.core.Program] |
Mapping of program name to Program object. |
Source code in anchorpy/workspace.py
def create_workspace(
path: Optional[Union[Path, str]] = None, url: Optional[str] = None
) -> WorkspaceType:
"""Get a workspace from the provided path to the project root.
Args:
path: The path to the project root. Defaults to the current working
directory if omitted.
url: The URL of the JSON RPC. Defaults to http://localhost:8899.
Returns:
Mapping of program name to Program object.
"""
result = {}
project_root = Path.cwd() if path is None else Path(path)
idl_folder = project_root / "target/idl"
localnet_programs: dict[str, str] = toml.load(project_root / "Anchor.toml")[
"programs"
]["localnet"]
for file in idl_folder.iterdir():
raw = file.read_text()
idl = Idl.from_json(raw)
name = idl.name
program_id = Pubkey.from_string(localnet_programs[name])
program = Program(idl, program_id, Provider.local(url))
result[idl.name] = program
return result
close_workspace(workspace)
async
Close the HTTP clients of all the programs in the workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace |
Dict[str, anchorpy.program.core.Program] |
The workspace to close. |
required |
Source code in anchorpy/workspace.py
async def close_workspace(workspace: WorkspaceType) -> None:
"""Close the HTTP clients of all the programs in the workspace.
Args:
workspace: The workspace to close.
"""
for program in workspace.values():
# could do this in a faster way but there's probably no point.
await program.close()
bankrun_fixture(path, scope='module', build_cmd=None, accounts=None, compute_max_units=None, transaction_account_lock_limit=None)
Create a fixture that builds the project and starts a bankrun with all the programs in the workspace deployed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[pathlib.Path, str] |
Path to root of the Anchor project. |
required |
scope |
Literal['session', 'package', 'module', 'class', 'function'] |
Pytest fixture scope. |
'module' |
build_cmd |
Optional[str] |
Command to build the project. Defaults to |
None |
accounts |
Optional[Sequence[Tuple[solders.pubkey.Pubkey, solders.account.Account]]] |
A sequence of (address, account_object) tuples, indicating what data to write to the given addresses. |
None |
compute_max_units |
Optional[int] |
Override the default compute unit limit for a transaction. |
None |
transaction_account_lock_limit |
Optional[int] |
Override the default transaction account lock limit. |
None |
Returns:
Type | Description |
---|---|
bankrun.ProgramTestContext |
A bankrun fixture for use with pytest. |
Source code in anchorpy/pytest_plugin.py
def bankrun_fixture(
path: Union[Path, str],
scope: _Scope = "module",
build_cmd: Optional[str] = None,
accounts: Optional[Sequence[Tuple[Pubkey, Account]]] = None,
compute_max_units: Optional[int] = None,
transaction_account_lock_limit: Optional[int] = None,
) -> "bankrun.ProgramTestContext":
"""Create a fixture that builds the project and starts a bankrun with all the programs in the workspace deployed.
Args:
path: Path to root of the Anchor project.
scope: Pytest fixture scope.
build_cmd: Command to build the project. Defaults to `anchor build`.
accounts: A sequence of (address, account_object) tuples, indicating
what data to write to the given addresses.
compute_max_units: Override the default compute unit limit for a transaction.
transaction_account_lock_limit: Override the default transaction account lock limit.
Returns:
A bankrun fixture for use with pytest.
""" # noqa: E501,D202
@async_fixture(scope=scope)
async def _bankrun_fixture() -> bankrun.ProgramTestContext:
return await _bankrun_helper(
path,
build_cmd,
accounts,
compute_max_units,
transaction_account_lock_limit,
)
return _bankrun_fixture
workspace_fixture(path, scope='module', timeout_seconds=60, build_cmd=None)
Create a fixture that sets up and tears down a localnet instance and returns a workspace dict.
Equivalent to combining localnet_fixture
, create_workspace
and close_workspace
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[pathlib.Path, str] |
Path to root of the Anchor project. |
required |
scope |
Literal['session', 'package', 'module', 'class', 'function'] |
Pytest fixture scope. |
'module' |
timeout_seconds |
int |
Time to wait for Anchor localnet to start. |
60 |
build_cmd |
Optional[str] |
Command to run before |
None |
Returns:
Type | Description |
---|---|
Callable |
A workspace fixture for use with pytest. |
Source code in anchorpy/pytest_plugin.py
def workspace_fixture(
path: Union[Path, str],
scope: _Scope = "module",
timeout_seconds: int = 60,
build_cmd: Optional[str] = None,
) -> Callable:
"""Create a fixture that sets up and tears down a localnet instance and returns a workspace dict.
Equivalent to combining `localnet_fixture`, `create_workspace` and `close_workspace`.
Args:
path: Path to root of the Anchor project.
scope: Pytest fixture scope.
timeout_seconds: Time to wait for Anchor localnet to start.
build_cmd: Command to run before `anchor localnet`. Defaults to `anchor build`.
Returns:
A workspace fixture for use with pytest.
""" # noqa: E501,D202
@async_fixture(scope=scope)
async def _workspace_fixture(
_fixed_xprocess,
) -> AsyncGenerator[dict[str, Program], None]:
class Starter(ProcessStarter):
# startup pattern
pattern = "JSON RPC URL"
terminate_on_interrupt = True
# command to start process
args = ["anchor", "localnet", "--skip-build"]
timeout = timeout_seconds
popen_kwargs = {
"cwd": path,
"start_new_session": True,
}
max_read_lines = 1_000
# command to start process
actual_build_cmd = "anchor build" if build_cmd is None else build_cmd
subprocess.run(actual_build_cmd, cwd=path, check=True, shell=True)
# ensure process is running
_ = _fixed_xprocess.ensure("localnet", Starter)
ws = create_workspace(path)
yield ws
await close_workspace(ws)
# clean up whole process tree afterwards
_fixed_xprocess.getinfo("localnet").terminate()
return _workspace_fixture
localnet_fixture(path, scope='module', timeout_seconds=60, build_cmd=None)
Create a fixture that sets up and tears down a localnet instance with workspace programs deployed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path |
Path to root of the Anchor project. |
required |
scope |
Literal['session', 'package', 'module', 'class', 'function'] |
Pytest fixture scope. |
'module' |
timeout_seconds |
int |
Time to wait for Anchor localnet to start. |
60 |
build_cmd |
Optional[str] |
Command to run before |
None |
Returns:
Type | Description |
---|---|
Callable |
A localnet fixture for use with pytest. |
Source code in anchorpy/pytest_plugin.py
def localnet_fixture(
path: Path,
scope: _Scope = "module",
timeout_seconds: int = 60,
build_cmd: Optional[str] = None,
) -> Callable:
"""Create a fixture that sets up and tears down a localnet instance with workspace programs deployed.
Args:
path: Path to root of the Anchor project.
scope: Pytest fixture scope.
timeout_seconds: Time to wait for Anchor localnet to start.
build_cmd: Command to run before `anchor localnet`. Defaults to `anchor build`.
Returns:
A localnet fixture for use with pytest.
""" # noqa: E501,D202
@fixture(scope=scope)
def _localnet_fixture(_fixed_xprocess):
class Starter(ProcessStarter):
# startup pattern
pattern = "JSON RPC URL"
terminate_on_interrupt = True
# command to start process
args = ["anchor", "localnet", "--skip-build"]
timeout = timeout_seconds
popen_kwargs = {
"cwd": path,
"start_new_session": True,
}
max_read_lines = 1_000
# command to start process
actual_build_cmd = "anchor build" if build_cmd is None else build_cmd
subprocess.run(actual_build_cmd, cwd=path, check=True, shell=True)
# ensure process is running and return its logfile
logfile = _fixed_xprocess.ensure("localnet", Starter)
yield logfile
# clean up whole process tree afterwards
_fixed_xprocess.getinfo("localnet").terminate()
return _localnet_fixture
Wallet
Python wallet object.
Source code in anchorpy/provider.py
class Wallet:
"""Python wallet object."""
def __init__(self, payer: Keypair):
"""Initialize the wallet.
Args:
payer: the Keypair used to sign transactions.
"""
self.payer = payer
@property
def public_key(self) -> Pubkey:
"""Get the public key of the wallet."""
return self.payer.pubkey()
def sign_transaction(self, tx: Transaction) -> Transaction:
"""Sign a transaction using the wallet's keypair.
Args:
tx: The transaction to sign.
Returns:
The signed transaction.
"""
tx.sign(self.payer)
return tx
def sign_all_transactions(self, txs: list[Transaction]) -> list[Transaction]:
"""Sign a list of transactions using the wallet's keypair.
Args:
txs: The transactions to sign.
Returns:
The signed transactions.
"""
for tx in txs:
tx.sign_partial(self.payer)
return txs
@classmethod
def local(cls) -> Wallet:
"""Create a wallet instance from the filesystem.
Uses the path at the ANCHOR_WALLET env var if set,
otherwise uses ~/.config/solana/id.json.
"""
path = Path(getenv("ANCHOR_WALLET", Path.home() / ".config/solana/id.json"))
with path.open() as f:
keypair: List[int] = json.load(f)
return cls(Keypair.from_bytes(keypair))
@classmethod
def dummy(cls) -> Wallet:
"""Create a dummy wallet instance that won't be used to sign transactions."""
keypair = Keypair()
return cls(keypair)
public_key: Pubkey
property
readonly
Get the public key of the wallet.
__init__(self, payer)
special
Initialize the wallet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payer |
Keypair |
the Keypair used to sign transactions. |
required |
Source code in anchorpy/provider.py
def __init__(self, payer: Keypair):
"""Initialize the wallet.
Args:
payer: the Keypair used to sign transactions.
"""
self.payer = payer
dummy()
classmethod
Create a dummy wallet instance that won't be used to sign transactions.
Source code in anchorpy/provider.py
@classmethod
def dummy(cls) -> Wallet:
"""Create a dummy wallet instance that won't be used to sign transactions."""
keypair = Keypair()
return cls(keypair)
local()
classmethod
Create a wallet instance from the filesystem.
Uses the path at the ANCHOR_WALLET env var if set, otherwise uses ~/.config/solana/id.json.
Source code in anchorpy/provider.py
@classmethod
def local(cls) -> Wallet:
"""Create a wallet instance from the filesystem.
Uses the path at the ANCHOR_WALLET env var if set,
otherwise uses ~/.config/solana/id.json.
"""
path = Path(getenv("ANCHOR_WALLET", Path.home() / ".config/solana/id.json"))
with path.open() as f:
keypair: List[int] = json.load(f)
return cls(Keypair.from_bytes(keypair))
sign_all_transactions(self, txs)
Sign a list of transactions using the wallet's keypair.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
txs |
list[Transaction] |
The transactions to sign. |
required |
Returns:
Type | Description |
---|---|
list[Transaction] |
The signed transactions. |
Source code in anchorpy/provider.py
def sign_all_transactions(self, txs: list[Transaction]) -> list[Transaction]:
"""Sign a list of transactions using the wallet's keypair.
Args:
txs: The transactions to sign.
Returns:
The signed transactions.
"""
for tx in txs:
tx.sign_partial(self.payer)
return txs
sign_transaction(self, tx)
Sign a transaction using the wallet's keypair.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tx |
Transaction |
The transaction to sign. |
required |
Returns:
Type | Description |
---|---|
Transaction |
The signed transaction. |
Source code in anchorpy/provider.py
def sign_transaction(self, tx: Transaction) -> Transaction:
"""Sign a transaction using the wallet's keypair.
Args:
tx: The transaction to sign.
Returns:
The signed transaction.
"""
tx.sign(self.payer)
return tx
Coder
Coder provides a facade for encoding and decoding all IDL related objects.
Source code in anchorpy/coder/coder.py
class Coder:
"""Coder provides a facade for encoding and decoding all IDL related objects."""
def __init__(self, idl: Idl):
"""Initialize the coder.
Args:
idl: a parsed Idl instance.
"""
self.instruction: InstructionCoder = InstructionCoder(idl)
self.accounts: AccountsCoder = AccountsCoder(idl)
self.events: EventCoder = EventCoder(idl)
__init__(self, idl)
special
Initialize the coder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idl |
Idl |
a parsed Idl instance. |
required |
Source code in anchorpy/coder/coder.py
def __init__(self, idl: Idl):
"""Initialize the coder.
Args:
idl: a parsed Idl instance.
"""
self.instruction: InstructionCoder = InstructionCoder(idl)
self.accounts: AccountsCoder = AccountsCoder(idl)
self.events: EventCoder = EventCoder(idl)
InstructionCoder (Adapter)
Encodes and decodes program instructions.
Source code in anchorpy/coder/instruction.py
class InstructionCoder(Adapter):
"""Encodes and decodes program instructions."""
def __init__(self, idl: Idl) -> None:
"""Init.
Args:
idl: The parsed IDL object.
"""
self.ix_layout = _parse_ix_layout(idl)
sighasher = _Sighash()
sighash_layouts: Dict[bytes, Construct] = {}
sighashes: Dict[str, bytes] = {}
sighash_to_name: Dict[bytes, str] = {}
for ix in idl.instructions:
ix_name = snake(ix.name)
sh = sighasher.build(ix_name)
sighashes[ix_name] = sh
sighash_layouts[sh] = self.ix_layout[ix_name]
sighash_to_name[sh] = ix_name
self.sighash_layouts = sighash_layouts
self.sighashes = sighashes
self.sighash_to_name = sighash_to_name
subcon = Sequence(
"sighash" / Bytes(8),
Switch(lambda this: this.sighash, sighash_layouts),
)
super().__init__(subcon) # type: ignore
def encode(self, ix_name: str, ix: Dict[str, Any]) -> bytes:
"""Encode a program instruction.
Args:
ix_name: The name of the instruction
ix: The data to encode.
Returns:
The encoded instruction.
"""
return self.build(NamedInstruction(name=ix_name, data=ix))
def _decode(self, obj: Tuple[bytes, Any], context, path) -> NamedInstruction:
return NamedInstruction(data=obj[1], name=self.sighash_to_name[obj[0]])
def _encode(
self, obj: NamedInstruction, context: Container, path
) -> Tuple[bytes, Any]:
return (self.sighashes[obj.name], obj.data)
__init__(self, idl)
special
Init.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idl |
Idl |
The parsed IDL object. |
required |
Source code in anchorpy/coder/instruction.py
def __init__(self, idl: Idl) -> None:
"""Init.
Args:
idl: The parsed IDL object.
"""
self.ix_layout = _parse_ix_layout(idl)
sighasher = _Sighash()
sighash_layouts: Dict[bytes, Construct] = {}
sighashes: Dict[str, bytes] = {}
sighash_to_name: Dict[bytes, str] = {}
for ix in idl.instructions:
ix_name = snake(ix.name)
sh = sighasher.build(ix_name)
sighashes[ix_name] = sh
sighash_layouts[sh] = self.ix_layout[ix_name]
sighash_to_name[sh] = ix_name
self.sighash_layouts = sighash_layouts
self.sighashes = sighashes
self.sighash_to_name = sighash_to_name
subcon = Sequence(
"sighash" / Bytes(8),
Switch(lambda this: this.sighash, sighash_layouts),
)
super().__init__(subcon) # type: ignore
encode(self, ix_name, ix)
Encode a program instruction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ix_name |
str |
The name of the instruction |
required |
ix |
Dict[str, Any] |
The data to encode. |
required |
Returns:
Type | Description |
---|---|
bytes |
The encoded instruction. |
Source code in anchorpy/coder/instruction.py
def encode(self, ix_name: str, ix: Dict[str, Any]) -> bytes:
"""Encode a program instruction.
Args:
ix_name: The name of the instruction
ix: The data to encode.
Returns:
The encoded instruction.
"""
return self.build(NamedInstruction(name=ix_name, data=ix))
EventCoder (Adapter)
Encodes and decodes Anchor events.
Source code in anchorpy/coder/event.py
class EventCoder(Adapter):
"""Encodes and decodes Anchor events."""
def __init__(self, idl: Idl):
"""Initialize the EventCoder.
Args:
idl: The parsed Idl object.
"""
self.idl = idl
idl_events = idl.events
layouts: Dict[str, Construct]
if idl_events:
layouts = {event.name: _event_layout(event, idl) for event in idl_events}
else:
layouts = {}
self.layouts = layouts
self.discriminators: Dict[bytes, str] = (
{}
if idl_events is None
else {_event_discriminator(event.name): event.name for event in idl_events}
)
self.discriminator_to_layout = {
disc: self.layouts[event_name]
for disc, event_name in self.discriminators.items()
}
subcon = Sequence(
"discriminator" / Bytes(8), # not base64-encoded here
Switch(lambda this: this.discriminator, self.discriminator_to_layout),
)
super().__init__(subcon) # type: ignore
def _decode(self, obj: Tuple[bytes, Any], context, path) -> Optional[Event]:
disc = obj[0]
try:
event_name = self.discriminators[disc]
except KeyError:
return None
return Event(data=obj[1], name=event_name)
__init__(self, idl)
special
Initialize the EventCoder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idl |
Idl |
The parsed Idl object. |
required |
Source code in anchorpy/coder/event.py
def __init__(self, idl: Idl):
"""Initialize the EventCoder.
Args:
idl: The parsed Idl object.
"""
self.idl = idl
idl_events = idl.events
layouts: Dict[str, Construct]
if idl_events:
layouts = {event.name: _event_layout(event, idl) for event in idl_events}
else:
layouts = {}
self.layouts = layouts
self.discriminators: Dict[bytes, str] = (
{}
if idl_events is None
else {_event_discriminator(event.name): event.name for event in idl_events}
)
self.discriminator_to_layout = {
disc: self.layouts[event_name]
for disc, event_name in self.discriminators.items()
}
subcon = Sequence(
"discriminator" / Bytes(8), # not base64-encoded here
Switch(lambda this: this.discriminator, self.discriminator_to_layout),
)
super().__init__(subcon) # type: ignore
AccountsCoder (Adapter)
Encodes and decodes account data.
Source code in anchorpy/coder/accounts.py
class AccountsCoder(Adapter):
"""Encodes and decodes account data."""
def __init__(self, idl: Idl) -> None:
"""Init.
Args:
idl: The parsed IDL object.
"""
self._accounts_layout = {
acc.name: _typedef_layout(acc, idl.types, acc.name) for acc in idl.accounts
}
self.acc_name_to_discriminator = {
acc.name: _account_discriminator(acc.name) for acc in idl.accounts
}
self.discriminator_to_acc_name = {
disc: acc_name for acc_name, disc in self.acc_name_to_discriminator.items()
}
discriminator_to_typedef_layout = {
disc: self._accounts_layout[acc_name]
for acc_name, disc in self.acc_name_to_discriminator.items()
}
subcon = Sequence(
"discriminator" / Bytes(ACCOUNT_DISCRIMINATOR_SIZE),
Switch(lambda this: this.discriminator, discriminator_to_typedef_layout),
)
super().__init__(subcon) # type: ignore
def decode(self, obj: bytes) -> Container[Any]:
"""Decode account data.
Args:
obj: Data to decode.
Returns:
Decoded data.
"""
return self.parse(obj).data
def _decode(self, obj: Tuple[bytes, Any], context, path) -> AccountToSerialize:
return AccountToSerialize(
data=obj[1],
name=self.discriminator_to_acc_name[obj[0]],
)
def _encode(self, obj: AccountToSerialize, context, path) -> Tuple[bytes, Any]:
discriminator = self.acc_name_to_discriminator[obj.name]
return discriminator, obj.data
__init__(self, idl)
special
Init.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idl |
Idl |
The parsed IDL object. |
required |
Source code in anchorpy/coder/accounts.py
def __init__(self, idl: Idl) -> None:
"""Init.
Args:
idl: The parsed IDL object.
"""
self._accounts_layout = {
acc.name: _typedef_layout(acc, idl.types, acc.name) for acc in idl.accounts
}
self.acc_name_to_discriminator = {
acc.name: _account_discriminator(acc.name) for acc in idl.accounts
}
self.discriminator_to_acc_name = {
disc: acc_name for acc_name, disc in self.acc_name_to_discriminator.items()
}
discriminator_to_typedef_layout = {
disc: self._accounts_layout[acc_name]
for acc_name, disc in self.acc_name_to_discriminator.items()
}
subcon = Sequence(
"discriminator" / Bytes(ACCOUNT_DISCRIMINATOR_SIZE),
Switch(lambda this: this.discriminator, discriminator_to_typedef_layout),
)
super().__init__(subcon) # type: ignore
decode(self, obj)
Decode account data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
bytes |
Data to decode. |
required |
Returns:
Type | Description |
---|---|
Container |
Decoded data. |
Source code in anchorpy/coder/accounts.py
def decode(self, obj: bytes) -> Container[Any]:
"""Decode account data.
Args:
obj: Data to decode.
Returns:
Decoded data.
"""
return self.parse(obj).data
NamedInstruction
dataclass
Container for a named instruction.
Attributes:
Name | Type | Description |
---|---|---|
data |
Union[Dict[str, Any], construct.lib.containers.Container[Any]] |
The actual instruction data. |
name |
str |
The name of the instruction. |
Source code in anchorpy/program/common.py
@dataclass
class NamedInstruction:
"""Container for a named instruction.
Attributes:
data: The actual instruction data.
name: The name of the instruction.
"""
data: Union[Dict[str, Any], Container[Any]]
name: str
IdlProgramAccount (dict)
The on-chain account of the IDL.
Source code in anchorpy/idl.py
class IdlProgramAccount(TypedDict):
"""The on-chain account of the IDL."""
authority: solders.pubkey.Pubkey
data: bytes
Event (tuple)
A parsed event object.
Source code in anchorpy/program/common.py
class Event(NamedTuple):
"""A parsed event object."""
name: str
data: Any
translate_address(address)
Convert str | Pubkey
into Pubkey
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
Union[solders.pubkey.Pubkey, str] |
Public key as string or |
required |
Returns:
Type | Description |
---|---|
Pubkey |
Public key as |
Source code in anchorpy/program/common.py
def translate_address(address: AddressType) -> Pubkey:
"""Convert `str | Pubkey` into `Pubkey`.
Args:
address: Public key as string or `Pubkey`.
Returns:
Public key as `Pubkey`.
"""
if isinstance(address, str):
return Pubkey.from_string(address)
return address
validate_accounts(ix_accounts, accounts)
Check that accounts passed in ctx
match the IDL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ix_accounts |
list |
Accounts from the IDL. |
required |
accounts |
Dict[str, Any] |
Accounts from the |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If |
Source code in anchorpy/program/common.py
def validate_accounts(ix_accounts: list[IdlAccountItem], accounts: Accounts):
"""Check that accounts passed in `ctx` match the IDL.
Args:
ix_accounts: Accounts from the IDL.
accounts: Accounts from the `ctx` arg.
Raises:
ValueError: If `ctx` accounts don't match the IDL.
"""
for acc in ix_accounts:
acc_name = snake(acc.name)
if isinstance(acc, IdlAccounts):
nested = cast(Accounts, accounts[acc_name])
validate_accounts(acc.accounts, nested)
elif acc_name not in accounts:
raise ValueError(f"Invalid arguments: {acc_name} not provided")
AccountClient
Provides methods for fetching and creating accounts.
Source code in anchorpy/program/namespace/account.py
class AccountClient(object):
"""Provides methods for fetching and creating accounts."""
def __init__(
self,
idl: Idl,
idl_account: IdlTypeDefinition,
coder: Coder,
program_id: Pubkey,
provider: Provider,
):
"""Init.
Args:
idl: the parsed IDL object.
idl_account: the account definition from the IDL.
coder: The program's Coder object.
program_id: the program ID.
provider: The Provider object for the Program.
"""
self._idl_account = idl_account
self._program_id = program_id
self._provider = provider
self._coder = coder
self._size = ACCOUNT_DISCRIMINATOR_SIZE + _account_size(idl, idl_account)
async def fetch(
self, address: Pubkey, commitment: Optional[Commitment] = None
) -> Container[Any]:
"""Return a deserialized account.
Args:
address: The address of the account to fetch.
commitment: Bank state to query.
Raises:
AccountDoesNotExistError: If the account doesn't exist.
AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
"""
account_info = await self._provider.connection.get_account_info(
address,
encoding="base64",
commitment=commitment,
)
if not account_info.value:
raise AccountDoesNotExistError(f"Account {address} does not exist")
data = account_info.value.data
discriminator = _account_discriminator(self._idl_account.name)
if discriminator != data[:ACCOUNT_DISCRIMINATOR_SIZE]:
msg = f"Account {address} has an invalid discriminator"
raise AccountInvalidDiscriminator(msg)
return self._coder.accounts.decode(data)
async def fetch_multiple(
self,
addresses: List[Pubkey],
batch_size: int = 300,
commitment: Optional[Commitment] = None,
) -> list[Optional[Container[Any]]]:
"""Return multiple deserialized accounts.
Accounts not found or with wrong discriminator are returned as None.
Args:
addresses: The addresses of the accounts to fetch.
batch_size: The number of `getMultipleAccounts` objects to send
in each HTTP request.
commitment: Bank state to query.
"""
accounts = await get_multiple_accounts(
self._provider.connection,
addresses,
batch_size=batch_size,
commitment=commitment,
)
discriminator = _account_discriminator(self._idl_account.name)
result: list[Optional[Container[Any]]] = []
for account in accounts:
if account is None:
result.append(None)
elif discriminator == account.account.data[:8]:
result.append(self._coder.accounts.decode(account.account.data))
else:
result.append(None)
return result
async def create_instruction(
self,
signer: Keypair,
size_override: int = 0,
) -> Instruction:
"""Return an instruction for creating this account.
Args:
signer: [description]
size_override: Optional override for the account size. Defaults to 0.
Returns:
The instruction to create the account.
"""
space = size_override if size_override else self._size
mbre_resp = (
await self._provider.connection.get_minimum_balance_for_rent_exemption(
space
)
)
return create_account(
CreateAccountParams(
from_pubkey=self._provider.wallet.public_key,
to_pubkey=signer.pubkey(),
space=space,
lamports=mbre_resp.value,
owner=self._program_id,
)
)
async def all( # noqa: A003
self,
buffer: Optional[bytes] = None,
filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None,
) -> list[ProgramAccount]:
"""Return all instances of this account type for the program.
Args:
buffer: bytes filter to append to the discriminator.
filters: (optional) Options to compare a provided series of bytes with
program account data at a particular offset.
Note: an int entry is converted to a `dataSize` filter.
"""
all_accounts = []
discriminator = _account_discriminator(self._idl_account.name)
to_encode = discriminator if buffer is None else discriminator + buffer
bytes_arg = b58encode(to_encode).decode("ascii")
base_memcmp_opt = MemcmpOpts(
offset=0,
bytes=bytes_arg,
)
filters_to_use = [base_memcmp_opt] + ([] if filters is None else filters)
resp = await self._provider.connection.get_program_accounts(
self._program_id,
encoding="base64",
commitment=self.provider.connection._commitment,
filters=filters_to_use,
)
for r in resp.value:
account_data = r.account.data
all_accounts.append(
ProgramAccount(
public_key=r.pubkey,
account=self._coder.accounts.decode(account_data),
),
)
return all_accounts
@property
def size(self) -> int:
"""Return the number of bytes in this account."""
return self._size
@property
def program_id(self) -> Pubkey:
"""Return the program ID owning all accounts."""
return self._program_id
@property
def provider(self) -> Provider:
"""Return the client's wallet and network provider."""
return self._provider
@property
def coder(self) -> Coder:
"""Return the coder."""
return self._coder
coder: Coder
property
readonly
Return the coder.
program_id: Pubkey
property
readonly
Return the program ID owning all accounts.
provider: Provider
property
readonly
Return the client's wallet and network provider.
size: int
property
readonly
Return the number of bytes in this account.
__init__(self, idl, idl_account, coder, program_id, provider)
special
Init.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idl |
Idl |
the parsed IDL object. |
required |
idl_account |
IdlTypeDefinition |
the account definition from the IDL. |
required |
coder |
Coder |
The program's Coder object. |
required |
program_id |
Pubkey |
the program ID. |
required |
provider |
Provider |
The Provider object for the Program. |
required |
Source code in anchorpy/program/namespace/account.py
def __init__(
self,
idl: Idl,
idl_account: IdlTypeDefinition,
coder: Coder,
program_id: Pubkey,
provider: Provider,
):
"""Init.
Args:
idl: the parsed IDL object.
idl_account: the account definition from the IDL.
coder: The program's Coder object.
program_id: the program ID.
provider: The Provider object for the Program.
"""
self._idl_account = idl_account
self._program_id = program_id
self._provider = provider
self._coder = coder
self._size = ACCOUNT_DISCRIMINATOR_SIZE + _account_size(idl, idl_account)
all(self, buffer=None, filters=None)
async
Return all instances of this account type for the program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
buffer |
Optional[bytes] |
bytes filter to append to the discriminator. |
None |
filters |
Optional[Sequence[Union[int, solana.rpc.types.MemcmpOpts]]] |
(optional) Options to compare a provided series of bytes with
program account data at a particular offset.
Note: an int entry is converted to a |
None |
Source code in anchorpy/program/namespace/account.py
async def all( # noqa: A003
self,
buffer: Optional[bytes] = None,
filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None,
) -> list[ProgramAccount]:
"""Return all instances of this account type for the program.
Args:
buffer: bytes filter to append to the discriminator.
filters: (optional) Options to compare a provided series of bytes with
program account data at a particular offset.
Note: an int entry is converted to a `dataSize` filter.
"""
all_accounts = []
discriminator = _account_discriminator(self._idl_account.name)
to_encode = discriminator if buffer is None else discriminator + buffer
bytes_arg = b58encode(to_encode).decode("ascii")
base_memcmp_opt = MemcmpOpts(
offset=0,
bytes=bytes_arg,
)
filters_to_use = [base_memcmp_opt] + ([] if filters is None else filters)
resp = await self._provider.connection.get_program_accounts(
self._program_id,
encoding="base64",
commitment=self.provider.connection._commitment,
filters=filters_to_use,
)
for r in resp.value:
account_data = r.account.data
all_accounts.append(
ProgramAccount(
public_key=r.pubkey,
account=self._coder.accounts.decode(account_data),
),
)
return all_accounts
create_instruction(self, signer, size_override=0)
async
Return an instruction for creating this account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signer |
Keypair |
[description] |
required |
size_override |
int |
Optional override for the account size. Defaults to 0. |
0 |
Returns:
Type | Description |
---|---|
Instruction |
The instruction to create the account. |
Source code in anchorpy/program/namespace/account.py
async def create_instruction(
self,
signer: Keypair,
size_override: int = 0,
) -> Instruction:
"""Return an instruction for creating this account.
Args:
signer: [description]
size_override: Optional override for the account size. Defaults to 0.
Returns:
The instruction to create the account.
"""
space = size_override if size_override else self._size
mbre_resp = (
await self._provider.connection.get_minimum_balance_for_rent_exemption(
space
)
)
return create_account(
CreateAccountParams(
from_pubkey=self._provider.wallet.public_key,
to_pubkey=signer.pubkey(),
space=space,
lamports=mbre_resp.value,
owner=self._program_id,
)
)
fetch(self, address, commitment=None)
async
Return a deserialized account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
Pubkey |
The address of the account to fetch. |
required |
commitment |
Optional[Commitment] |
Bank state to query. |
None |
Exceptions:
Type | Description |
---|---|
AccountDoesNotExistError |
If the account doesn't exist. |
AccountInvalidDiscriminator |
If the discriminator doesn't match the IDL. |
Source code in anchorpy/program/namespace/account.py
async def fetch(
self, address: Pubkey, commitment: Optional[Commitment] = None
) -> Container[Any]:
"""Return a deserialized account.
Args:
address: The address of the account to fetch.
commitment: Bank state to query.
Raises:
AccountDoesNotExistError: If the account doesn't exist.
AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
"""
account_info = await self._provider.connection.get_account_info(
address,
encoding="base64",
commitment=commitment,
)
if not account_info.value:
raise AccountDoesNotExistError(f"Account {address} does not exist")
data = account_info.value.data
discriminator = _account_discriminator(self._idl_account.name)
if discriminator != data[:ACCOUNT_DISCRIMINATOR_SIZE]:
msg = f"Account {address} has an invalid discriminator"
raise AccountInvalidDiscriminator(msg)
return self._coder.accounts.decode(data)
fetch_multiple(self, addresses, batch_size=300, commitment=None)
async
Return multiple deserialized accounts.
Accounts not found or with wrong discriminator are returned as None.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
addresses |
List[solders.pubkey.Pubkey] |
The addresses of the accounts to fetch. |
required |
batch_size |
int |
The number of |
300 |
commitment |
Optional[Commitment] |
Bank state to query. |
None |
Source code in anchorpy/program/namespace/account.py
async def fetch_multiple(
self,
addresses: List[Pubkey],
batch_size: int = 300,
commitment: Optional[Commitment] = None,
) -> list[Optional[Container[Any]]]:
"""Return multiple deserialized accounts.
Accounts not found or with wrong discriminator are returned as None.
Args:
addresses: The addresses of the accounts to fetch.
batch_size: The number of `getMultipleAccounts` objects to send
in each HTTP request.
commitment: Bank state to query.
"""
accounts = await get_multiple_accounts(
self._provider.connection,
addresses,
batch_size=batch_size,
commitment=commitment,
)
discriminator = _account_discriminator(self._idl_account.name)
result: list[Optional[Container[Any]]] = []
for account in accounts:
if account is None:
result.append(None)
elif discriminator == account.account.data[:8]:
result.append(self._coder.accounts.decode(account.account.data))
else:
result.append(None)
return result
ProgramAccount
dataclass
Deserialized account owned by a program.
Source code in anchorpy/program/namespace/account.py
@dataclass
class ProgramAccount:
"""Deserialized account owned by a program."""
public_key: Pubkey
account: Container
EventParser
dataclass
Parser to handle on_logs callbacks.
Source code in anchorpy/program/event.py
@dataclass
class EventParser:
"""Parser to handle on_logs callbacks."""
program_id: Pubkey
coder: Coder
def parse_logs(self, logs: List[str], callback: Callable[[Event], None]) -> None:
"""Parse a list of logs using a provided callback.
Args:
logs: The logs to parse.
callback: The function to handle the parsed log.
"""
log_scanner = _LogScanner(logs)
execution = _ExecutionContext(cast(str, log_scanner.to_next()))
log = log_scanner.to_next()
while log is not None:
event, new_program, did_pop = self.handle_log(execution, log)
if event is not None:
callback(event)
if new_program is not None:
execution.push(new_program)
if did_pop:
execution.pop()
log = log_scanner.to_next()
def handle_log(
self,
execution: _ExecutionContext,
log: str,
) -> tuple[Optional[Event], Optional[str], bool]:
"""Main log handler.
Args:
execution: The execution stack.
log: log string from the RPC node.
Returns:
A three element array of the event, the next program
that was invoked for CPI, and a boolean indicating if
a program has completed execution (and thus should be popped off the
execution stack).
"""
# Executing program is this program.
if execution.stack and execution.program() == str(self.program_id):
return self.handle_program_log(log)
# Executing program is not this program.
return (None, *self.handle_system_log(log))
def handle_program_log(
self, log: str
) -> tuple[Optional[Event], Optional[str], bool]:
"""Handle logs from *this* program.
Args:
log: log string from the RPC node.
"""
# This is a `msg!` log or a `sol_log_data!` log.
if log.startswith(PROGRAM_LOG) or log.startswith(PROGRAM_DATA):
log_str = (
log[PROGRAM_LOG_START_INDEX:]
if log.startswith(PROGRAM_LOG)
else log[PROGRAM_DATA_START_INDEX:]
)
try:
decoded = b64decode(log_str)
except binascii.Error:
return None, None, False
event = self.coder.events.parse(decoded)
return event, None, False
return (None, *self.handle_system_log(log))
def handle_system_log(self, log: str) -> tuple[Optional[str], bool]:
"""Handle logs when the current program being executing is *not* this.
Args:
log: log string from the RPC node.
"""
log_start = log.split(":")[0]
splitted = log_start.split(" ")
invoke_msg = f"Program {str(self.program_id)} invoke"
if len(splitted) == 3 and splitted[0] == "Program" and splitted[2] == "success":
return None, True
if log_start.startswith(invoke_msg):
return str(self.program_id), False
if "invoke" in log_start:
return "cpi", False
return None, False
handle_log(self, execution, log)
Main log handler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution |
_ExecutionContext |
The execution stack. |
required |
log |
str |
log string from the RPC node. |
required |
Returns:
Type | Description |
---|---|
tuple |
A three element array of the event, the next program that was invoked for CPI, and a boolean indicating if a program has completed execution (and thus should be popped off the execution stack). |
Source code in anchorpy/program/event.py
def handle_log(
self,
execution: _ExecutionContext,
log: str,
) -> tuple[Optional[Event], Optional[str], bool]:
"""Main log handler.
Args:
execution: The execution stack.
log: log string from the RPC node.
Returns:
A three element array of the event, the next program
that was invoked for CPI, and a boolean indicating if
a program has completed execution (and thus should be popped off the
execution stack).
"""
# Executing program is this program.
if execution.stack and execution.program() == str(self.program_id):
return self.handle_program_log(log)
# Executing program is not this program.
return (None, *self.handle_system_log(log))
handle_program_log(self, log)
Handle logs from this program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log |
str |
log string from the RPC node. |
required |
Source code in anchorpy/program/event.py
def handle_program_log(
self, log: str
) -> tuple[Optional[Event], Optional[str], bool]:
"""Handle logs from *this* program.
Args:
log: log string from the RPC node.
"""
# This is a `msg!` log or a `sol_log_data!` log.
if log.startswith(PROGRAM_LOG) or log.startswith(PROGRAM_DATA):
log_str = (
log[PROGRAM_LOG_START_INDEX:]
if log.startswith(PROGRAM_LOG)
else log[PROGRAM_DATA_START_INDEX:]
)
try:
decoded = b64decode(log_str)
except binascii.Error:
return None, None, False
event = self.coder.events.parse(decoded)
return event, None, False
return (None, *self.handle_system_log(log))
handle_system_log(self, log)
Handle logs when the current program being executing is not this.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log |
str |
log string from the RPC node. |
required |
Source code in anchorpy/program/event.py
def handle_system_log(self, log: str) -> tuple[Optional[str], bool]:
"""Handle logs when the current program being executing is *not* this.
Args:
log: log string from the RPC node.
"""
log_start = log.split(":")[0]
splitted = log_start.split(" ")
invoke_msg = f"Program {str(self.program_id)} invoke"
if len(splitted) == 3 and splitted[0] == "Program" and splitted[2] == "success":
return None, True
if log_start.startswith(invoke_msg):
return str(self.program_id), False
if "invoke" in log_start:
return "cpi", False
return None, False
parse_logs(self, logs, callback)
Parse a list of logs using a provided callback.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logs |
List[str] |
The logs to parse. |
required |
callback |
Callable[[anchorpy.program.common.Event], NoneType] |
The function to handle the parsed log. |
required |
Source code in anchorpy/program/event.py
def parse_logs(self, logs: List[str], callback: Callable[[Event], None]) -> None:
"""Parse a list of logs using a provided callback.
Args:
logs: The logs to parse.
callback: The function to handle the parsed log.
"""
log_scanner = _LogScanner(logs)
execution = _ExecutionContext(cast(str, log_scanner.to_next()))
log = log_scanner.to_next()
while log is not None:
event, new_program, did_pop = self.handle_log(execution, log)
if event is not None:
callback(event)
if new_program is not None:
execution.push(new_program)
if did_pop:
execution.pop()
log = log_scanner.to_next()
SimulateResponse (tuple)
The result of a simulate function call.
Source code in anchorpy/program/namespace/simulate.py
class SimulateResponse(NamedTuple):
"""The result of a simulate function call."""
events: list[Event]
raw: list[str]
error
This module handles AnchorPy errors.
AccountDoesNotExistError (Exception)
Raise if account doesn't exist.
Source code in anchorpy/error.py
class AccountDoesNotExistError(Exception):
"""Raise if account doesn't exist."""
AccountInvalidDiscriminator (Exception)
Raise if account discriminator doesn't match the IDL.
Source code in anchorpy/error.py
class AccountInvalidDiscriminator(Exception):
"""Raise if account discriminator doesn't match the IDL."""
ArgsError (Exception)
Raise when the incorrect number of args is passed to the RPC function.
Source code in anchorpy/error.py
class ArgsError(Exception):
"""Raise when the incorrect number of args is passed to the RPC function."""
IdlNotFoundError (Exception)
Raise when requested IDL account does not exist.
Source code in anchorpy/error.py
class IdlNotFoundError(Exception):
"""Raise when requested IDL account does not exist."""
ProgramError (Exception)
An error from a user defined program.
Source code in anchorpy/error.py
class ProgramError(Exception):
"""An error from a user defined program."""
def __init__(
self, code: int, msg: Optional[str], logs: Optional[list[str]] = None
) -> None:
"""Init.
Args:
code: The error code.
msg: The error message.
logs: The transaction simulation logs.
"""
self.code = code
self.msg = msg
self.logs = logs
super().__init__(f"{code}: {msg}")
@classmethod
def parse(
cls,
err_info: RPCError,
idl_errors: dict[int, str],
program_id: Pubkey,
) -> Optional[ProgramError]:
"""Convert an RPC error into a ProgramError, if possible.
Args:
err_info: The RPC error.
idl_errors: Errors from the IDL file.
program_id: The ID of the program we expect the error to come from.
Returns:
A ProgramError or None.
"""
extracted = extract_code_and_logs(err_info, program_id)
if extracted is None:
return None
code, logs = extracted
msg = idl_errors.get(code)
if msg is not None:
return cls(code, msg, logs)
# parse framework internal error
msg = LangErrorMessage.get(code)
if msg is not None:
return cls(code, msg, logs)
# Unable to parse the error.
return None
@classmethod
def parse_tx_error(
cls,
err_info: TransactionErrorType,
idl_errors: dict[int, str],
program_id: Pubkey,
logs: List[str],
) -> Optional[ProgramError]:
"""Convert an RPC error into a ProgramError, if possible.
Args:
err_info: The RPC error.
idl_errors: Errors from the IDL file.
program_id: The ID of the program we expect the error to come from.
logs: The transaction logs.
Returns:
A ProgramError or None.
"""
code = extract_code_tx_error(err_info, program_id, logs)
if code is None:
return None
msg = idl_errors.get(code)
if msg is not None:
return cls(code, msg, logs)
# parse framework internal error
msg = LangErrorMessage.get(code)
if msg is not None:
return cls(code, msg, logs)
# Unable to parse the error.
return None
__init__(self, code, msg, logs=None)
special
Init.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
int |
The error code. |
required |
msg |
Optional[str] |
The error message. |
required |
logs |
Optional[list[str]] |
The transaction simulation logs. |
None |
Source code in anchorpy/error.py
def __init__(
self, code: int, msg: Optional[str], logs: Optional[list[str]] = None
) -> None:
"""Init.
Args:
code: The error code.
msg: The error message.
logs: The transaction simulation logs.
"""
self.code = code
self.msg = msg
self.logs = logs
super().__init__(f"{code}: {msg}")
parse(err_info, idl_errors, program_id)
classmethod
Convert an RPC error into a ProgramError, if possible.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err_info |
RPCError |
The RPC error. |
required |
idl_errors |
dict[int, str] |
Errors from the IDL file. |
required |
program_id |
Pubkey |
The ID of the program we expect the error to come from. |
required |
Returns:
Type | Description |
---|---|
Optional[ProgramError] |
A ProgramError or None. |
Source code in anchorpy/error.py
@classmethod
def parse(
cls,
err_info: RPCError,
idl_errors: dict[int, str],
program_id: Pubkey,
) -> Optional[ProgramError]:
"""Convert an RPC error into a ProgramError, if possible.
Args:
err_info: The RPC error.
idl_errors: Errors from the IDL file.
program_id: The ID of the program we expect the error to come from.
Returns:
A ProgramError or None.
"""
extracted = extract_code_and_logs(err_info, program_id)
if extracted is None:
return None
code, logs = extracted
msg = idl_errors.get(code)
if msg is not None:
return cls(code, msg, logs)
# parse framework internal error
msg = LangErrorMessage.get(code)
if msg is not None:
return cls(code, msg, logs)
# Unable to parse the error.
return None
parse_tx_error(err_info, idl_errors, program_id, logs)
classmethod
Convert an RPC error into a ProgramError, if possible.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err_info |
TransactionErrorType |
The RPC error. |
required |
idl_errors |
dict[int, str] |
Errors from the IDL file. |
required |
program_id |
Pubkey |
The ID of the program we expect the error to come from. |
required |
logs |
List[str] |
The transaction logs. |
required |
Returns:
Type | Description |
---|---|
Optional[ProgramError] |
A ProgramError or None. |
Source code in anchorpy/error.py
@classmethod
def parse_tx_error(
cls,
err_info: TransactionErrorType,
idl_errors: dict[int, str],
program_id: Pubkey,
logs: List[str],
) -> Optional[ProgramError]:
"""Convert an RPC error into a ProgramError, if possible.
Args:
err_info: The RPC error.
idl_errors: Errors from the IDL file.
program_id: The ID of the program we expect the error to come from.
logs: The transaction logs.
Returns:
A ProgramError or None.
"""
code = extract_code_tx_error(err_info, program_id, logs)
if code is None:
return None
msg = idl_errors.get(code)
if msg is not None:
return cls(code, msg, logs)
# parse framework internal error
msg = LangErrorMessage.get(code)
if msg is not None:
return cls(code, msg, logs)
# Unable to parse the error.
return None
extract_code_and_logs(err_info, program_id)
Extract the custom instruction error code from an RPC response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err_info |
RPCError |
The RPC error. |
required |
program_id |
Pubkey |
The ID of the program we expect the error to come from. |
required |
Source code in anchorpy/error.py
def extract_code_and_logs(
err_info: RPCError, program_id: Pubkey
) -> Optional[Tuple[int, List[str]]]:
"""Extract the custom instruction error code from an RPC response.
Args:
err_info: The RPC error.
program_id: The ID of the program we expect the error to come from.
"""
if isinstance(err_info, SendTransactionPreflightFailureMessage):
err_data = err_info.data
err_data_err = err_data.err
logs = err_data.logs
if logs is None:
return None
if err_data_err is None:
return None
maybe_code = _handle_ix_err(err_data_err, logs, program_id)
return None if maybe_code is None else (maybe_code, logs)
return None
extract_code_tx_error(err_info, program_id, logs)
Extract the custom instruction error code from a transaction error.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err_info |
TransactionErrorType |
The tx error. |
required |
program_id |
Pubkey |
The ID of the program we expect the error to come from. |
required |
logs |
List[str] |
The tx logs. |
required |
Source code in anchorpy/error.py
def extract_code_tx_error(
err_info: TransactionErrorType, program_id: Pubkey, logs: List[str]
) -> Optional[int]:
"""Extract the custom instruction error code from a transaction error.
Args:
err_info: The tx error.
program_id: The ID of the program we expect the error to come from.
logs: The tx logs.
"""
return _handle_ix_err(err_info, logs, program_id)
utils
special
Various utility functions.
rpc
This module contains the invoke function.
AccountInfo (tuple)
Information describing an account.
Attributes:
Name | Type | Description |
---|---|---|
executable |
bool |
|
owner |
Pubkey |
Identifier of the program that owns the account. |
lamports |
int |
Number of lamports assigned to the account. |
data |
bytes |
Optional data assigned to the account. |
rent_epoch |
Optional[int] |
Optional rent epoch info for for account. |
Source code in anchorpy/utils/rpc.py
class AccountInfo(NamedTuple):
"""Information describing an account.
Attributes:
executable: `True` if this account's data contains a loaded program.
owner: Identifier of the program that owns the account.
lamports: Number of lamports assigned to the account.
data: Optional data assigned to the account.
rent_epoch: Optional rent epoch info for for account.
"""
executable: bool
owner: Pubkey
lamports: int
data: bytes
rent_epoch: Optional[int]
get_multiple_accounts(connection, pubkeys, batch_size=3, commitment=None)
async
Fetch multiple account infos through batched getMultipleAccount
RPC requests.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection |
AsyncClient |
The |
required |
pubkeys |
list |
Pubkeys to fetch. |
required |
batch_size |
int |
The number of |
3 |
commitment |
Optional[Commitment] |
Bank state to query. |
None |
Returns:
Type | Description |
---|---|
list |
Account infos and pubkeys. |
Source code in anchorpy/utils/rpc.py
async def get_multiple_accounts(
connection: AsyncClient,
pubkeys: list[Pubkey],
batch_size: int = 3,
commitment: Optional[Commitment] = None,
) -> list[Optional[_MultipleAccountsItem]]:
"""Fetch multiple account infos through batched `getMultipleAccount` RPC requests.
Args:
connection: The `solana-py` client object.
pubkeys: Pubkeys to fetch.
batch_size: The number of `getMultipleAccount` objects to include in each
HTTP request.
commitment: Bank state to query.
Returns:
Account infos and pubkeys.
"""
pubkeys_per_network_request = _GET_MULTIPLE_ACCOUNTS_LIMIT * batch_size
chunks = partition_all(pubkeys_per_network_request, pubkeys)
awaitables = [
_get_multiple_accounts_core(connection, pubkeys_chunk, commitment)
for pubkeys_chunk in chunks
]
results = await gather(*awaitables, return_exceptions=False)
return list(concat(results))
invoke(program_id, provider, accounts=None, data=None)
async
Send a transaction to a program with the given accounts and instruction data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program_id |
Union[solders.pubkey.Pubkey, str] |
The program ID |
required |
provider |
Provider |
the |
required |
accounts |
Optional[list[solders.instruction.AccountMeta]] |
|
None |
data |
Optional[bytes] |
The transaction data. |
None |
Returns:
Type | Description |
---|---|
Signature |
The transaction signature. |
Source code in anchorpy/utils/rpc.py
async def invoke(
program_id: AddressType,
provider: Provider,
accounts: Optional[list[AccountMeta]] = None,
data: Optional[bytes] = None,
) -> Signature:
"""Send a transaction to a program with the given accounts and instruction data.
Args:
program_id: The program ID
provider: the `Provider` instance.
accounts: `AccountMeta` objects.
data: The transaction data.
Returns:
The transaction signature.
"""
translated_program_id = translate_address(program_id)
tx = Transaction()
tx.add(
Instruction(
program_id=translated_program_id,
accounts=[] if accounts is None else accounts,
data=bytes(0) if data is None else data,
),
)
return await provider.send(tx)
token
This module contains utilities for the SPL Token Program.
create_mint_and_vault(provider, amount, owner=None, decimals=None)
async
Create a mint and a vault, then mint tokens to the vault.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provider |
Provider |
An anchorpy Provider instance. |
required |
amount |
int |
The amount of tokens to mint to the vault. |
required |
owner |
Optional[solders.pubkey.Pubkey] |
User account that will own the new account. |
None |
decimals |
Optional[int] |
The number of decimal places for the token to support. |
None |
Returns:
Type | Description |
---|---|
tuple |
The mint and vault pubkeys. |
Source code in anchorpy/utils/token.py
async def create_mint_and_vault(
provider: Provider,
amount: int,
owner: Optional[Pubkey] = None,
decimals: Optional[int] = None,
) -> tuple[Pubkey, Pubkey]:
"""Create a mint and a vault, then mint tokens to the vault.
Args:
provider: An anchorpy Provider instance.
amount: The amount of tokens to mint to the vault.
owner: User account that will own the new account.
decimals: The number of decimal places for the token to support.
Returns:
The mint and vault pubkeys.
"""
actual_owner = provider.wallet.public_key if owner is None else owner
mint = Keypair()
vault = Keypair()
mint_space = 82
create_mint_mbre_resp = (
await provider.connection.get_minimum_balance_for_rent_exemption(mint_space)
)
create_mint_mbre = create_mint_mbre_resp.value
create_mint_account_params = CreateAccountParams(
from_pubkey=provider.wallet.public_key,
to_pubkey=mint.pubkey(),
space=mint_space,
lamports=create_mint_mbre,
owner=TOKEN_PROGRAM_ID,
)
create_mint_account_instruction = create_account(
create_mint_account_params,
)
init_mint_instruction = initialize_mint(
InitializeMintParams(
mint=mint.pubkey(),
decimals=0 if decimals is None else decimals,
mint_authority=provider.wallet.public_key,
program_id=TOKEN_PROGRAM_ID,
),
)
vault_space = 165
create_vault_mbre_resp = (
await provider.connection.get_minimum_balance_for_rent_exemption(vault_space)
)
create_vault_mbre = create_vault_mbre_resp.value
create_vault_account_instruction = create_account(
CreateAccountParams(
from_pubkey=provider.wallet.public_key,
to_pubkey=vault.pubkey(),
space=vault_space,
lamports=create_vault_mbre,
owner=TOKEN_PROGRAM_ID,
),
)
init_vault_instruction = initialize_account(
InitializeAccountParams(
program_id=TOKEN_PROGRAM_ID,
account=vault.pubkey(),
mint=mint.pubkey(),
owner=actual_owner,
),
)
mint_to_instruction = mint_to(
MintToParams(
program_id=TOKEN_PROGRAM_ID,
mint=mint.pubkey(),
dest=vault.pubkey(),
amount=amount,
mint_authority=provider.wallet.public_key,
),
)
blockhash = (
await provider.connection.get_latest_blockhash(Confirmed)
).value.blockhash
msg = Message.new_with_blockhash(
[
create_mint_account_instruction,
init_mint_instruction,
create_vault_account_instruction,
init_vault_instruction,
mint_to_instruction,
],
provider.wallet.public_key,
blockhash,
)
tx = VersionedTransaction(msg, [provider.wallet.payer, mint, vault])
await provider.send(tx)
return mint.pubkey(), vault.pubkey()
create_token_account(prov, mint, owner)
async
Create a token account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prov |
Provider |
An anchorpy Provider instance. |
required |
mint |
Pubkey |
The pubkey of the token's mint. |
required |
owner |
Pubkey |
User account that will own the new account. |
required |
Returns:
Type | Description |
---|---|
Pubkey |
The pubkey of the new account. |
Source code in anchorpy/utils/token.py
async def create_token_account(
prov: Provider,
mint: Pubkey,
owner: Pubkey,
) -> Pubkey:
"""Create a token account.
Args:
prov: An anchorpy Provider instance.
mint: The pubkey of the token's mint.
owner: User account that will own the new account.
Returns:
The pubkey of the new account.
"""
token = AsyncToken(prov.connection, mint, TOKEN_PROGRAM_ID, prov.wallet.payer)
return await token.create_account(owner)
create_token_account_instrs(provider, new_account_pubkey, mint, owner)
async
Generate instructions for creating a token account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provider |
Provider |
An anchorpy Provider instance. |
required |
new_account_pubkey |
Pubkey |
The pubkey of the new account. |
required |
mint |
Pubkey |
The pubkey of the token's mint. |
required |
owner |
Pubkey |
User account that will own the new account. |
required |
Returns:
Type | Description |
---|---|
tuple |
Transaction instructions to create the new account. |
Source code in anchorpy/utils/token.py
async def create_token_account_instrs(
provider: Provider,
new_account_pubkey: Pubkey,
mint: Pubkey,
owner: Pubkey,
) -> tuple[Instruction, Instruction]:
"""Generate instructions for creating a token account.
Args:
provider: An anchorpy Provider instance.
new_account_pubkey: The pubkey of the new account.
mint: The pubkey of the token's mint.
owner: User account that will own the new account.
Returns:
Transaction instructions to create the new account.
"""
mbre_resp = await provider.connection.get_minimum_balance_for_rent_exemption(165)
lamports = mbre_resp.value
return (
create_account(
CreateAccountParams(
from_pubkey=provider.wallet.public_key,
to_pubkey=new_account_pubkey,
space=165,
lamports=lamports,
owner=TOKEN_PROGRAM_ID,
)
),
initialize_account(
InitializeAccountParams(
account=new_account_pubkey,
mint=mint,
owner=owner,
program_id=TOKEN_PROGRAM_ID,
)
),
)
get_mint_info(provider, addr)
async
Retrieve mint information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provider |
Provider |
The anchorpy Provider instance. |
required |
addr |
Pubkey |
The pubkey of the mint. |
required |
Returns:
Type | Description |
---|---|
MintInfo |
The parsed |
Source code in anchorpy/utils/token.py
async def get_mint_info(
provider: Provider,
addr: Pubkey,
) -> MintInfo:
"""Retrieve mint information.
Args:
provider: The anchorpy Provider instance.
addr: The pubkey of the mint.
Returns:
The parsed `MintInfo`.
"""
depositor_acc_info_raw = await provider.connection.get_account_info(addr)
return parse_mint_account(depositor_acc_info_raw)
get_token_account(provider, addr)
async
Retrieve token account information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provider |
Provider |
The anchorpy Provider instance. |
required |
addr |
Pubkey |
The pubkey of the token account. |
required |
Returns:
Type | Description |
---|---|
AccountInfo |
The parsed |
Source code in anchorpy/utils/token.py
async def get_token_account(provider: Provider, addr: Pubkey) -> AccountInfo:
"""Retrieve token account information.
Args:
provider: The anchorpy Provider instance.
addr: The pubkey of the token account.
Returns:
The parsed `AccountInfo` of the token account.
"""
depositor_acc_info_raw = await provider.connection.get_account_info(addr)
return parse_token_account(depositor_acc_info_raw)
parse_mint_account(info)
Parse raw RPC response into MintInfo
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
GetAccountInfoResp |
The RPC response from calling |
required |
Exceptions:
Type | Description |
---|---|
AttributeError |
If the account is not owned by the Token Program. |
ValueError |
If the fetched data is the wrong size. |
Returns:
Type | Description |
---|---|
MintInfo |
The parsed |
Source code in anchorpy/utils/token.py
def parse_mint_account(info: GetAccountInfoResp) -> MintInfo:
"""Parse raw RPC response into `MintInfo`.
Args:
info: The RPC response from calling `.get_account_info` for the mint pubkey.
Raises:
AttributeError: If the account is not owned by the Token Program.
ValueError: If the fetched data is the wrong size.
Returns:
The parsed `MintInfo`.
"""
val = info.value
if val is None:
raise ValueError("Account does not exist.")
owner = val.owner
if owner != TOKEN_PROGRAM_ID:
raise AttributeError(f"Invalid mint owner: {owner}")
bytes_data = val.data
if len(bytes_data) != MINT_LAYOUT.sizeof():
raise ValueError("Invalid mint size")
decoded_data = MINT_LAYOUT.parse(bytes_data)
decimals = decoded_data.decimals
mint_authority = (
None
if decoded_data.mint_authority_option == 0
else Pubkey(decoded_data.mint_authority)
)
supply = decoded_data.supply
is_initialized = decoded_data.is_initialized != 0
if decoded_data.freeze_authority_option == 0:
freeze_authority = None
else:
freeze_authority = Pubkey(decoded_data.freeze_authority)
return MintInfo(mint_authority, supply, decimals, is_initialized, freeze_authority)
parse_token_account(info)
Parse AccountInfo
from RPC response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
GetAccountInfoResp |
the |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the fetched data is the wrong size. |
AttributeError |
If the account is not owned by the token program. |
Returns:
Type | Description |
---|---|
AccountInfo |
The parsed |
Source code in anchorpy/utils/token.py
def parse_token_account(info: GetAccountInfoResp) -> AccountInfo:
"""Parse `AccountInfo` from RPC response.
Args:
info: the `get_account_info` RPC response.
Raises:
ValueError: If the fetched data is the wrong size.
AttributeError: If the account is not owned by the token program.
Returns:
The parsed `AccountInfo`.
"""
val = info.value
if not val:
raise ValueError("Invalid account owner")
if val.owner != TOKEN_PROGRAM_ID:
raise AttributeError("Invalid account owner")
bytes_data = val.data
if len(bytes_data) != ACCOUNT_LAYOUT.sizeof():
raise ValueError("Invalid account size")
decoded_data = ACCOUNT_LAYOUT.parse(bytes_data)
mint = Pubkey(decoded_data.mint)
owner = Pubkey(decoded_data.owner)
amount = decoded_data.amount
if decoded_data.delegate_option == 0:
delegate = None
delegated_amount = 0
else:
delegate = Pubkey(decoded_data.delegate)
delegated_amount = decoded_data.delegated_amount
is_initialized = decoded_data.state != 0
is_frozen = decoded_data.state == 2
if decoded_data.is_native_option == 1:
rent_exempt_reserve = decoded_data.is_native
is_native = True
else:
rent_exempt_reserve = None
is_native = False
if decoded_data.close_authority_option == 0:
close_authority = None
else:
close_authority = Pubkey(decoded_data.owner)
return AccountInfo(
mint,
owner,
amount,
delegate,
delegated_amount,
is_initialized,
is_frozen,
is_native,
rent_exempt_reserve,
close_authority,
)