Skip to content

API Reference

BaseSolanaSystem (ABC, BaseMixin)

Source code in solsim/system.py
class BaseSolanaSystem(ABC, BaseMixin):

    SOLANA_CLUSTER_URI = "http://127.0.0.1:8899"

    def __init__(
        self, workspace_dir: str, client: Optional[Client] = None, localnet_process: Optional[Process] = None
    ) -> None:
        self._workspace_dir = workspace_dir
        self._localnet_process = localnet_process
        self._localnet_initialized = False
        self.setup()
        self.client = client or Client(self.SOLANA_CLUSTER_URI)
        self.workspace = create_workspace(self._workspace_dir)

    def setup(self) -> None:
        if not self._localnet_initialized:
            if not self._localnet_process:
                self._logfile = tempfile.NamedTemporaryFile()
                self._localnet = self._start_localnet()
                print("👆 Starting Solana localnet cluster (~5s) ...")
                while not self._localnet_ready:
                    time.sleep(1)
            else:
                self._localnet = self._localnet_process
            self._localnet_initialized = True

    def teardown(self) -> None:
        if self._localnet_initialized:
            print("👇 Terminating Solana localnet cluster ...")
            self._terminate_localnet()
        self._localnet_initialized = False

    async def cleanup(self) -> None:
        await close_workspace(self.workspace)

    @abstractmethod
    async def initial_step(self) -> Awaitable[StateType]:
        """Return initial system state.

        This method is `async` because it (presumably) will make RPC calls
        to the Solana blockchain, which are `async`.

        Returns:
            The initial state of the system.
        """
        raise NotImplementedError

    @abstractmethod
    async def step(self, state: StateType, history: List[StateType]) -> Awaitable[StateType]:
        """Return an arbitrary state of the system.

        This method is `async` because it (presumably) will make RPC calls
        to the Solana blockchain, which are `async`.

        Args:
            state: The previous system state.
            history: The full history of system states.

        Returns:
            The initial state of the system.
        """
        raise NotImplementedError

    def _start_localnet(self) -> subprocess.Popen[Any]:
        for proc in psutil.process_iter():
            if proc.name() == "solana-test-validator":
                self._terminate_processes([proc])
        return subprocess.Popen(["anchor", "localnet"], cwd=self._workspace_dir, stdout=self._logfile, stderr=DEVNULL)

    @property
    def _localnet_ready(self) -> bool:
        lastline = subprocess.check_output(["tail", "-n", "1", self._logfile.name]).decode("utf-8").strip()
        return "| Processed Slot: " in lastline

    def get_token_account_balance(
        self, pubkey: PublicKey, commitment: Optional[commitment.Commitment] = commitment.Confirmed
    ) -> float:
        """Get account token balance.

        Args:
            pubkey: The public key of the account in question.

        Returns:
            The token balance of the account.
        """
        return float(self.client.get_token_account_balance(pubkey, commitment)["result"]["value"]["uiAmount"])

    def _terminate_processes(self, kill_list: list[Process], timeout: int = 10) -> None:
        # Attempt graceful termination first.
        for p in reversed(kill_list):
            self._signal_process(p, signal.SIGTERM)
        _, alive = psutil.wait_procs(kill_list, timeout=timeout)

        # Forcefully terminate procs still running.
        for p in alive:
            self._signal_process(p, signal.SIGKILL)
        _, alive = psutil.wait_procs(kill_list, timeout=timeout)

        if alive:
            raise Exception(f"could not terminate process {alive}")

    def _terminate_localnet(self) -> None:
        """
        Borrowed from https://github.com/pytest-dev/pytest-xprocess/blob/6dac644e7b6b17d9b970f6e9e2bf2ade539841dc/xprocess/xprocess.py#L35.  # noqa E501
        """
        parent = psutil.Process(self._localnet.pid)
        try:
            kill_list = [parent]
            kill_list += parent.children(recursive=True)
            self._terminate_processes(kill_list)
        except (psutil.Error, ValueError) as err:
            raise Exception(f"Error while terminating process {err}")
        else:
            return

    def _signal_process(self, p: Process, sig: signal.Signals) -> None:
        """
        Borrowed from: https://github.com/pytest-dev/pytest-xprocess/blob/6dac644e7b6b17d9b970f6e9e2bf2ade539841dc/xprocess/xprocess.py#L29.  # noqa E501
        """
        try:
            p.send_signal(sig)
        except psutil.NoSuchProcess:
            pass

get_token_account_balance(self, pubkey, commitment='confirmed')

Get account token balance.

Parameters:

Name Type Description Default
pubkey PublicKey

The public key of the account in question.

required

Returns:

Type Description
float

The token balance of the account.

Source code in solsim/system.py
def get_token_account_balance(
    self, pubkey: PublicKey, commitment: Optional[commitment.Commitment] = commitment.Confirmed
) -> float:
    """Get account token balance.

    Args:
        pubkey: The public key of the account in question.

    Returns:
        The token balance of the account.
    """
    return float(self.client.get_token_account_balance(pubkey, commitment)["result"]["value"]["uiAmount"])

initial_step(self) async

Return initial system state.

This method is async because it (presumably) will make RPC calls to the Solana blockchain, which are async.

Returns:

Type Description
Awaitable[Dict[Any, Any]]

The initial state of the system.

Source code in solsim/system.py
@abstractmethod
async def initial_step(self) -> Awaitable[StateType]:
    """Return initial system state.

    This method is `async` because it (presumably) will make RPC calls
    to the Solana blockchain, which are `async`.

    Returns:
        The initial state of the system.
    """
    raise NotImplementedError

step(self, state, history) async

Return an arbitrary state of the system.

This method is async because it (presumably) will make RPC calls to the Solana blockchain, which are async.

Parameters:

Name Type Description Default
state Dict[Any, Any]

The previous system state.

required
history List[Dict[Any, Any]]

The full history of system states.

required

Returns:

Type Description
Awaitable[Dict[Any, Any]]

The initial state of the system.

Source code in solsim/system.py
@abstractmethod
async def step(self, state: StateType, history: List[StateType]) -> Awaitable[StateType]:
    """Return an arbitrary state of the system.

    This method is `async` because it (presumably) will make RPC calls
    to the Solana blockchain, which are `async`.

    Args:
        state: The previous system state.
        history: The full history of system states.

    Returns:
        The initial state of the system.
    """
    raise NotImplementedError

BaseSystem (ABC, BaseMixin)

Source code in solsim/system.py
class BaseSystem(ABC, BaseMixin):
    @abstractmethod
    def initial_step(self) -> StateType:
        """Return initial system state.

        Returns:
            The initial state of the system.
        """
        raise NotImplementedError

    @abstractmethod
    def step(self, state: StateType, history: List[StateType]) -> StateType:
        """Return an arbitrary state of the system.

        This method is `async` because it (presumably) will make RPC calls
        to the Solana blockchain, which are `async`.

        Args:
            state: The previous system state.
            history: The full history of system states.

        Returns:
            The initial state of the system.
        """
        raise NotImplementedError

initial_step(self)

Return initial system state.

Returns:

Type Description
Dict[Any, Any]

The initial state of the system.

Source code in solsim/system.py
@abstractmethod
def initial_step(self) -> StateType:
    """Return initial system state.

    Returns:
        The initial state of the system.
    """
    raise NotImplementedError

step(self, state, history)

Return an arbitrary state of the system.

This method is async because it (presumably) will make RPC calls to the Solana blockchain, which are async.

Parameters:

Name Type Description Default
state Dict[Any, Any]

The previous system state.

required
history List[Dict[Any, Any]]

The full history of system states.

required

Returns:

Type Description
Dict[Any, Any]

The initial state of the system.

Source code in solsim/system.py
@abstractmethod
def step(self, state: StateType, history: List[StateType]) -> StateType:
    """Return an arbitrary state of the system.

    This method is `async` because it (presumably) will make RPC calls
    to the Solana blockchain, which are `async`.

    Args:
        state: The previous system state.
        history: The full history of system states.

    Returns:
        The initial state of the system.
    """
    raise NotImplementedError

Simulation

Source code in solsim/simulation.py
class Simulation:

    INDEX_COLS = ["step", "run"]

    def __init__(self, system: Union[BaseSystem, BaseSolanaSystem], watchlist: Iterable[str]) -> None:
        self._system = system
        self._watchlist = set(watchlist)

    @property
    def cli(self) -> typer.Typer:
        app = typer.Typer()

        @app.command()  # type: ignore
        def run(runs: int = 1, steps_per_run: int = 1, viz_results: bool = False) -> pd.DataFrame:
            return self.run(runs, steps_per_run, viz_results)

        @app.callback()  # type: ignore
        def callback() -> None:
            pass

        return app

    def run(self, runs: int = 1, steps_per_run: int = 3, visualize_results: bool = False) -> pd.DataFrame:
        """Run your simulation.

        Args:
            runs: The number of times to run your simulation.
            visualize_results: Optionally build and start a Streamlit app to explore simulation results.

        Returns:
            results: A pandas DataFrame containing your simulation results.
        """
        results = asyncio.run(self._run(runs, steps_per_run))
        if visualize_results:
            try:
                with tempfile.TemporaryDirectory() as tmpdir:
                    app = self._start_results_app(results, tmpdir)
                    app.wait()
            except KeyboardInterrupt:
                pass
        return results

    @staticmethod
    def _start_results_app(results: pd.DataFrame, tmpdir: str) -> subprocess.Popen[Any]:
        results_path = os.path.join(tmpdir, "results.feather")
        feather.write_dataframe(results, results_path)
        env = {**os.environ, "SOLSIM_RESULTS_PATH": results_path}
        return subprocess.Popen(["streamlit", "run", "visualize.py"], cwd=os.path.dirname(__file__), env=env)

    async def _run(self, runs: int, steps_per_run: int) -> pd.DataFrame:
        results: list[StateType] = []
        try:
            for run in range(runs):
                try:
                    state: StateType = {}
                    history: list[StateType] = []
                    self._system.setup()
                    for step in tqdm(range(steps_per_run), desc=f"🟢 run: {run} | step"):
                        if self._system.uses_solana:
                            updates = await self._system.initial_step() if step == 0 else await self._system.step(state, history)  # type: ignore  # noqa: E501
                        else:
                            updates = self._system.initial_step() if step == 0 else self._system.step(state, history)
                        state = {**state, **updates, "run": run, "step": step}
                        history.append(state)
                        results.append(self._filter_state(state))
                finally:
                    self._system.teardown()
        finally:
            await self._system.cleanup() if self._system.uses_solana else self._system.cleanup()

        results = pd.DataFrame(results)
        return self._reorder_results_columns(results)

    def _filter_state(self, state: StateType) -> StateType:
        for qty in self._watchlist:
            if qty not in state:
                raise Exception(f"{qty} not found in state: {state}")
        return {qty: state[qty] for qty in self._watchlist | set(self.INDEX_COLS)}

    def _reorder_results_columns(self, results: pd.DataFrame) -> pd.DataFrame:
        cols = self.INDEX_COLS + sorted([col for col in results.columns if col not in self.INDEX_COLS])
        return results[cols]

run(self, runs=1, steps_per_run=3, visualize_results=False)

Run your simulation.

Parameters:

Name Type Description Default
runs int

The number of times to run your simulation.

1
visualize_results bool

Optionally build and start a Streamlit app to explore simulation results.

False

Returns:

Type Description
results

A pandas DataFrame containing your simulation results.

Source code in solsim/simulation.py
def run(self, runs: int = 1, steps_per_run: int = 3, visualize_results: bool = False) -> pd.DataFrame:
    """Run your simulation.

    Args:
        runs: The number of times to run your simulation.
        visualize_results: Optionally build and start a Streamlit app to explore simulation results.

    Returns:
        results: A pandas DataFrame containing your simulation results.
    """
    results = asyncio.run(self._run(runs, steps_per_run))
    if visualize_results:
        try:
            with tempfile.TemporaryDirectory() as tmpdir:
                app = self._start_results_app(results, tmpdir)
                app.wait()
        except KeyboardInterrupt:
            pass
    return results