API Reference
        
BaseSolanaSystem            (ABC, BaseMixin)
        
    Source code in solsim/system.py
          class BaseSolanaSystem(ABC, BaseMixin):
    SOLANA_CLUSTER_URI = "http://127.0.0.1:8899"
    def __init__(
        self, workspace_dir: str, client: Optional[Client] = None, localnet_process: Optional[Process] = None
    ) -> None:
        self._workspace_dir = workspace_dir
        self._localnet_process = localnet_process
        self._localnet_initialized = False
        self.setup()
        self.client = client or Client(self.SOLANA_CLUSTER_URI)
        self.workspace = create_workspace(self._workspace_dir)
    def setup(self) -> None:
        if not self._localnet_initialized:
            if not self._localnet_process:
                self._logfile = tempfile.NamedTemporaryFile()
                self._localnet = self._start_localnet()
                print("👆 Starting Solana localnet cluster (~5s) ...")
                while not self._localnet_ready:
                    time.sleep(1)
            else:
                self._localnet = self._localnet_process
            self._localnet_initialized = True
    def teardown(self) -> None:
        if self._localnet_initialized:
            print("👇 Terminating Solana localnet cluster ...")
            self._terminate_localnet()
        self._localnet_initialized = False
    async def cleanup(self) -> None:
        await close_workspace(self.workspace)
    @abstractmethod
    async def initial_step(self) -> Awaitable[StateType]:
        """Return initial system state.
        This method is `async` because it (presumably) will make RPC calls
        to the Solana blockchain, which are `async`.
        Returns:
            The initial state of the system.
        """
        raise NotImplementedError
    @abstractmethod
    async def step(self, state: StateType, history: List[StateType]) -> Awaitable[StateType]:
        """Return an arbitrary state of the system.
        This method is `async` because it (presumably) will make RPC calls
        to the Solana blockchain, which are `async`.
        Args:
            state: The previous system state.
            history: The full history of system states.
        Returns:
            The initial state of the system.
        """
        raise NotImplementedError
    def _start_localnet(self) -> subprocess.Popen[Any]:
        for proc in psutil.process_iter():
            if proc.name() == "solana-test-validator":
                self._terminate_processes([proc])
        return subprocess.Popen(["anchor", "localnet"], cwd=self._workspace_dir, stdout=self._logfile, stderr=DEVNULL)
    @property
    def _localnet_ready(self) -> bool:
        lastline = subprocess.check_output(["tail", "-n", "1", self._logfile.name]).decode("utf-8").strip()
        return "| Processed Slot: " in lastline
    def get_token_account_balance(
        self, pubkey: PublicKey, commitment: Optional[commitment.Commitment] = commitment.Confirmed
    ) -> float:
        """Get account token balance.
        Args:
            pubkey: The public key of the account in question.
        Returns:
            The token balance of the account.
        """
        return float(self.client.get_token_account_balance(pubkey, commitment)["result"]["value"]["uiAmount"])
    def _terminate_processes(self, kill_list: list[Process], timeout: int = 10) -> None:
        # Attempt graceful termination first.
        for p in reversed(kill_list):
            self._signal_process(p, signal.SIGTERM)
        _, alive = psutil.wait_procs(kill_list, timeout=timeout)
        # Forcefully terminate procs still running.
        for p in alive:
            self._signal_process(p, signal.SIGKILL)
        _, alive = psutil.wait_procs(kill_list, timeout=timeout)
        if alive:
            raise Exception(f"could not terminate process {alive}")
    def _terminate_localnet(self) -> None:
        """
        Borrowed from https://github.com/pytest-dev/pytest-xprocess/blob/6dac644e7b6b17d9b970f6e9e2bf2ade539841dc/xprocess/xprocess.py#L35.  # noqa E501
        """
        parent = psutil.Process(self._localnet.pid)
        try:
            kill_list = [parent]
            kill_list += parent.children(recursive=True)
            self._terminate_processes(kill_list)
        except (psutil.Error, ValueError) as err:
            raise Exception(f"Error while terminating process {err}")
        else:
            return
    def _signal_process(self, p: Process, sig: signal.Signals) -> None:
        """
        Borrowed from: https://github.com/pytest-dev/pytest-xprocess/blob/6dac644e7b6b17d9b970f6e9e2bf2ade539841dc/xprocess/xprocess.py#L29.  # noqa E501
        """
        try:
            p.send_signal(sig)
        except psutil.NoSuchProcess:
            pass
get_token_account_balance(self, pubkey, commitment='confirmed')
    Get account token balance.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
pubkey | 
        PublicKey | 
        The public key of the account in question.  | 
        required | 
Returns:
| Type | Description | 
|---|---|
float | 
      The token balance of the account.  | 
    
Source code in solsim/system.py
          def get_token_account_balance(
    self, pubkey: PublicKey, commitment: Optional[commitment.Commitment] = commitment.Confirmed
) -> float:
    """Get account token balance.
    Args:
        pubkey: The public key of the account in question.
    Returns:
        The token balance of the account.
    """
    return float(self.client.get_token_account_balance(pubkey, commitment)["result"]["value"]["uiAmount"])
initial_step(self)
  
      async
  
    Return initial system state.
This method is async because it (presumably) will make RPC calls
to the Solana blockchain, which are async.
Returns:
| Type | Description | 
|---|---|
Awaitable[Dict[Any, Any]] | 
      The initial state of the system.  | 
    
Source code in solsim/system.py
          @abstractmethod
async def initial_step(self) -> Awaitable[StateType]:
    """Return initial system state.
    This method is `async` because it (presumably) will make RPC calls
    to the Solana blockchain, which are `async`.
    Returns:
        The initial state of the system.
    """
    raise NotImplementedError
step(self, state, history)
  
      async
  
    Return an arbitrary state of the system.
This method is async because it (presumably) will make RPC calls
to the Solana blockchain, which are async.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
state | 
        Dict[Any, Any] | 
        The previous system state.  | 
        required | 
history | 
        List[Dict[Any, Any]] | 
        The full history of system states.  | 
        required | 
Returns:
| Type | Description | 
|---|---|
Awaitable[Dict[Any, Any]] | 
      The initial state of the system.  | 
    
Source code in solsim/system.py
          @abstractmethod
async def step(self, state: StateType, history: List[StateType]) -> Awaitable[StateType]:
    """Return an arbitrary state of the system.
    This method is `async` because it (presumably) will make RPC calls
    to the Solana blockchain, which are `async`.
    Args:
        state: The previous system state.
        history: The full history of system states.
    Returns:
        The initial state of the system.
    """
    raise NotImplementedError
        
BaseSystem            (ABC, BaseMixin)
        
    Source code in solsim/system.py
          class BaseSystem(ABC, BaseMixin):
    @abstractmethod
    def initial_step(self) -> StateType:
        """Return initial system state.
        Returns:
            The initial state of the system.
        """
        raise NotImplementedError
    @abstractmethod
    def step(self, state: StateType, history: List[StateType]) -> StateType:
        """Return an arbitrary state of the system.
        This method is `async` because it (presumably) will make RPC calls
        to the Solana blockchain, which are `async`.
        Args:
            state: The previous system state.
            history: The full history of system states.
        Returns:
            The initial state of the system.
        """
        raise NotImplementedError
initial_step(self)
    Return initial system state.
Returns:
| Type | Description | 
|---|---|
Dict[Any, Any] | 
      The initial state of the system.  | 
    
Source code in solsim/system.py
          @abstractmethod
def initial_step(self) -> StateType:
    """Return initial system state.
    Returns:
        The initial state of the system.
    """
    raise NotImplementedError
step(self, state, history)
    Return an arbitrary state of the system.
This method is async because it (presumably) will make RPC calls
to the Solana blockchain, which are async.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
state | 
        Dict[Any, Any] | 
        The previous system state.  | 
        required | 
history | 
        List[Dict[Any, Any]] | 
        The full history of system states.  | 
        required | 
Returns:
| Type | Description | 
|---|---|
Dict[Any, Any] | 
      The initial state of the system.  | 
    
Source code in solsim/system.py
          @abstractmethod
def step(self, state: StateType, history: List[StateType]) -> StateType:
    """Return an arbitrary state of the system.
    This method is `async` because it (presumably) will make RPC calls
    to the Solana blockchain, which are `async`.
    Args:
        state: The previous system state.
        history: The full history of system states.
    Returns:
        The initial state of the system.
    """
    raise NotImplementedError
        
Simulation        
    Source code in solsim/simulation.py
          class Simulation:
    INDEX_COLS = ["step", "run"]
    def __init__(self, system: Union[BaseSystem, BaseSolanaSystem], watchlist: Iterable[str]) -> None:
        self._system = system
        self._watchlist = set(watchlist)
    @property
    def cli(self) -> typer.Typer:
        app = typer.Typer()
        @app.command()  # type: ignore
        def run(runs: int = 1, steps_per_run: int = 1, viz_results: bool = False) -> pd.DataFrame:
            return self.run(runs, steps_per_run, viz_results)
        @app.callback()  # type: ignore
        def callback() -> None:
            pass
        return app
    def run(self, runs: int = 1, steps_per_run: int = 3, visualize_results: bool = False) -> pd.DataFrame:
        """Run your simulation.
        Args:
            runs: The number of times to run your simulation.
            visualize_results: Optionally build and start a Streamlit app to explore simulation results.
        Returns:
            results: A pandas DataFrame containing your simulation results.
        """
        results = asyncio.run(self._run(runs, steps_per_run))
        if visualize_results:
            try:
                with tempfile.TemporaryDirectory() as tmpdir:
                    app = self._start_results_app(results, tmpdir)
                    app.wait()
            except KeyboardInterrupt:
                pass
        return results
    @staticmethod
    def _start_results_app(results: pd.DataFrame, tmpdir: str) -> subprocess.Popen[Any]:
        results_path = os.path.join(tmpdir, "results.feather")
        feather.write_dataframe(results, results_path)
        env = {**os.environ, "SOLSIM_RESULTS_PATH": results_path}
        return subprocess.Popen(["streamlit", "run", "visualize.py"], cwd=os.path.dirname(__file__), env=env)
    async def _run(self, runs: int, steps_per_run: int) -> pd.DataFrame:
        results: list[StateType] = []
        try:
            for run in range(runs):
                try:
                    state: StateType = {}
                    history: list[StateType] = []
                    self._system.setup()
                    for step in tqdm(range(steps_per_run), desc=f"🟢 run: {run} | step"):
                        if self._system.uses_solana:
                            updates = await self._system.initial_step() if step == 0 else await self._system.step(state, history)  # type: ignore  # noqa: E501
                        else:
                            updates = self._system.initial_step() if step == 0 else self._system.step(state, history)
                        state = {**state, **updates, "run": run, "step": step}
                        history.append(state)
                        results.append(self._filter_state(state))
                finally:
                    self._system.teardown()
        finally:
            await self._system.cleanup() if self._system.uses_solana else self._system.cleanup()
        results = pd.DataFrame(results)
        return self._reorder_results_columns(results)
    def _filter_state(self, state: StateType) -> StateType:
        for qty in self._watchlist:
            if qty not in state:
                raise Exception(f"{qty} not found in state: {state}")
        return {qty: state[qty] for qty in self._watchlist | set(self.INDEX_COLS)}
    def _reorder_results_columns(self, results: pd.DataFrame) -> pd.DataFrame:
        cols = self.INDEX_COLS + sorted([col for col in results.columns if col not in self.INDEX_COLS])
        return results[cols]
run(self, runs=1, steps_per_run=3, visualize_results=False)
    Run your simulation.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
runs | 
        int | 
        The number of times to run your simulation.  | 
        1 | 
      
visualize_results | 
        bool | 
        Optionally build and start a Streamlit app to explore simulation results.  | 
        False | 
      
Returns:
| Type | Description | 
|---|---|
results | 
      A pandas DataFrame containing your simulation results.  | 
    
Source code in solsim/simulation.py
          def run(self, runs: int = 1, steps_per_run: int = 3, visualize_results: bool = False) -> pd.DataFrame:
    """Run your simulation.
    Args:
        runs: The number of times to run your simulation.
        visualize_results: Optionally build and start a Streamlit app to explore simulation results.
    Returns:
        results: A pandas DataFrame containing your simulation results.
    """
    results = asyncio.run(self._run(runs, steps_per_run))
    if visualize_results:
        try:
            with tempfile.TemporaryDirectory() as tmpdir:
                app = self._start_results_app(results, tmpdir)
                app.wait()
        except KeyboardInterrupt:
            pass
    return results