API Reference
BaseSolanaSystem (ABC, BaseMixin)
Source code in solsim/system.py
class BaseSolanaSystem(ABC, BaseMixin):
SOLANA_CLUSTER_URI = "http://127.0.0.1:8899"
def __init__(
self, workspace_dir: str, client: Optional[Client] = None, localnet_process: Optional[Process] = None
) -> None:
self._workspace_dir = workspace_dir
self._localnet_process = localnet_process
self._localnet_initialized = False
self.setup()
self.client = client or Client(self.SOLANA_CLUSTER_URI)
self.workspace = create_workspace(self._workspace_dir)
def setup(self) -> None:
if not self._localnet_initialized:
if not self._localnet_process:
self._logfile = tempfile.NamedTemporaryFile()
self._localnet = self._start_localnet()
print("👆 Starting Solana localnet cluster (~5s) ...")
while not self._localnet_ready:
time.sleep(1)
else:
self._localnet = self._localnet_process
self._localnet_initialized = True
def teardown(self) -> None:
if self._localnet_initialized:
print("👇 Terminating Solana localnet cluster ...")
self._terminate_localnet()
self._localnet_initialized = False
async def cleanup(self) -> None:
await close_workspace(self.workspace)
@abstractmethod
async def initial_step(self) -> Awaitable[StateType]:
"""Return initial system state.
This method is `async` because it (presumably) will make RPC calls
to the Solana blockchain, which are `async`.
Returns:
The initial state of the system.
"""
raise NotImplementedError
@abstractmethod
async def step(self, state: StateType, history: List[StateType]) -> Awaitable[StateType]:
"""Return an arbitrary state of the system.
This method is `async` because it (presumably) will make RPC calls
to the Solana blockchain, which are `async`.
Args:
state: The previous system state.
history: The full history of system states.
Returns:
The initial state of the system.
"""
raise NotImplementedError
def _start_localnet(self) -> subprocess.Popen[Any]:
for proc in psutil.process_iter():
if proc.name() == "solana-test-validator":
self._terminate_processes([proc])
return subprocess.Popen(["anchor", "localnet"], cwd=self._workspace_dir, stdout=self._logfile, stderr=DEVNULL)
@property
def _localnet_ready(self) -> bool:
lastline = subprocess.check_output(["tail", "-n", "1", self._logfile.name]).decode("utf-8").strip()
return "| Processed Slot: " in lastline
def get_token_account_balance(
self, pubkey: PublicKey, commitment: Optional[commitment.Commitment] = commitment.Confirmed
) -> float:
"""Get account token balance.
Args:
pubkey: The public key of the account in question.
Returns:
The token balance of the account.
"""
return float(self.client.get_token_account_balance(pubkey, commitment)["result"]["value"]["uiAmount"])
def _terminate_processes(self, kill_list: list[Process], timeout: int = 10) -> None:
# Attempt graceful termination first.
for p in reversed(kill_list):
self._signal_process(p, signal.SIGTERM)
_, alive = psutil.wait_procs(kill_list, timeout=timeout)
# Forcefully terminate procs still running.
for p in alive:
self._signal_process(p, signal.SIGKILL)
_, alive = psutil.wait_procs(kill_list, timeout=timeout)
if alive:
raise Exception(f"could not terminate process {alive}")
def _terminate_localnet(self) -> None:
"""
Borrowed from https://github.com/pytest-dev/pytest-xprocess/blob/6dac644e7b6b17d9b970f6e9e2bf2ade539841dc/xprocess/xprocess.py#L35. # noqa E501
"""
parent = psutil.Process(self._localnet.pid)
try:
kill_list = [parent]
kill_list += parent.children(recursive=True)
self._terminate_processes(kill_list)
except (psutil.Error, ValueError) as err:
raise Exception(f"Error while terminating process {err}")
else:
return
def _signal_process(self, p: Process, sig: signal.Signals) -> None:
"""
Borrowed from: https://github.com/pytest-dev/pytest-xprocess/blob/6dac644e7b6b17d9b970f6e9e2bf2ade539841dc/xprocess/xprocess.py#L29. # noqa E501
"""
try:
p.send_signal(sig)
except psutil.NoSuchProcess:
pass
get_token_account_balance(self, pubkey, commitment='confirmed')
Get account token balance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pubkey |
PublicKey |
The public key of the account in question. |
required |
Returns:
Type | Description |
---|---|
float |
The token balance of the account. |
Source code in solsim/system.py
def get_token_account_balance(
self, pubkey: PublicKey, commitment: Optional[commitment.Commitment] = commitment.Confirmed
) -> float:
"""Get account token balance.
Args:
pubkey: The public key of the account in question.
Returns:
The token balance of the account.
"""
return float(self.client.get_token_account_balance(pubkey, commitment)["result"]["value"]["uiAmount"])
initial_step(self)
async
Return initial system state.
This method is async
because it (presumably) will make RPC calls
to the Solana blockchain, which are async
.
Returns:
Type | Description |
---|---|
Awaitable[Dict[Any, Any]] |
The initial state of the system. |
Source code in solsim/system.py
@abstractmethod
async def initial_step(self) -> Awaitable[StateType]:
"""Return initial system state.
This method is `async` because it (presumably) will make RPC calls
to the Solana blockchain, which are `async`.
Returns:
The initial state of the system.
"""
raise NotImplementedError
step(self, state, history)
async
Return an arbitrary state of the system.
This method is async
because it (presumably) will make RPC calls
to the Solana blockchain, which are async
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
Dict[Any, Any] |
The previous system state. |
required |
history |
List[Dict[Any, Any]] |
The full history of system states. |
required |
Returns:
Type | Description |
---|---|
Awaitable[Dict[Any, Any]] |
The initial state of the system. |
Source code in solsim/system.py
@abstractmethod
async def step(self, state: StateType, history: List[StateType]) -> Awaitable[StateType]:
"""Return an arbitrary state of the system.
This method is `async` because it (presumably) will make RPC calls
to the Solana blockchain, which are `async`.
Args:
state: The previous system state.
history: The full history of system states.
Returns:
The initial state of the system.
"""
raise NotImplementedError
BaseSystem (ABC, BaseMixin)
Source code in solsim/system.py
class BaseSystem(ABC, BaseMixin):
@abstractmethod
def initial_step(self) -> StateType:
"""Return initial system state.
Returns:
The initial state of the system.
"""
raise NotImplementedError
@abstractmethod
def step(self, state: StateType, history: List[StateType]) -> StateType:
"""Return an arbitrary state of the system.
This method is `async` because it (presumably) will make RPC calls
to the Solana blockchain, which are `async`.
Args:
state: The previous system state.
history: The full history of system states.
Returns:
The initial state of the system.
"""
raise NotImplementedError
initial_step(self)
Return initial system state.
Returns:
Type | Description |
---|---|
Dict[Any, Any] |
The initial state of the system. |
Source code in solsim/system.py
@abstractmethod
def initial_step(self) -> StateType:
"""Return initial system state.
Returns:
The initial state of the system.
"""
raise NotImplementedError
step(self, state, history)
Return an arbitrary state of the system.
This method is async
because it (presumably) will make RPC calls
to the Solana blockchain, which are async
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
Dict[Any, Any] |
The previous system state. |
required |
history |
List[Dict[Any, Any]] |
The full history of system states. |
required |
Returns:
Type | Description |
---|---|
Dict[Any, Any] |
The initial state of the system. |
Source code in solsim/system.py
@abstractmethod
def step(self, state: StateType, history: List[StateType]) -> StateType:
"""Return an arbitrary state of the system.
This method is `async` because it (presumably) will make RPC calls
to the Solana blockchain, which are `async`.
Args:
state: The previous system state.
history: The full history of system states.
Returns:
The initial state of the system.
"""
raise NotImplementedError
Simulation
Source code in solsim/simulation.py
class Simulation:
INDEX_COLS = ["step", "run"]
def __init__(self, system: Union[BaseSystem, BaseSolanaSystem], watchlist: Iterable[str]) -> None:
self._system = system
self._watchlist = set(watchlist)
@property
def cli(self) -> typer.Typer:
app = typer.Typer()
@app.command() # type: ignore
def run(runs: int = 1, steps_per_run: int = 1, viz_results: bool = False) -> pd.DataFrame:
return self.run(runs, steps_per_run, viz_results)
@app.callback() # type: ignore
def callback() -> None:
pass
return app
def run(self, runs: int = 1, steps_per_run: int = 3, visualize_results: bool = False) -> pd.DataFrame:
"""Run your simulation.
Args:
runs: The number of times to run your simulation.
visualize_results: Optionally build and start a Streamlit app to explore simulation results.
Returns:
results: A pandas DataFrame containing your simulation results.
"""
results = asyncio.run(self._run(runs, steps_per_run))
if visualize_results:
try:
with tempfile.TemporaryDirectory() as tmpdir:
app = self._start_results_app(results, tmpdir)
app.wait()
except KeyboardInterrupt:
pass
return results
@staticmethod
def _start_results_app(results: pd.DataFrame, tmpdir: str) -> subprocess.Popen[Any]:
results_path = os.path.join(tmpdir, "results.feather")
feather.write_dataframe(results, results_path)
env = {**os.environ, "SOLSIM_RESULTS_PATH": results_path}
return subprocess.Popen(["streamlit", "run", "visualize.py"], cwd=os.path.dirname(__file__), env=env)
async def _run(self, runs: int, steps_per_run: int) -> pd.DataFrame:
results: list[StateType] = []
try:
for run in range(runs):
try:
state: StateType = {}
history: list[StateType] = []
self._system.setup()
for step in tqdm(range(steps_per_run), desc=f"🟢 run: {run} | step"):
if self._system.uses_solana:
updates = await self._system.initial_step() if step == 0 else await self._system.step(state, history) # type: ignore # noqa: E501
else:
updates = self._system.initial_step() if step == 0 else self._system.step(state, history)
state = {**state, **updates, "run": run, "step": step}
history.append(state)
results.append(self._filter_state(state))
finally:
self._system.teardown()
finally:
await self._system.cleanup() if self._system.uses_solana else self._system.cleanup()
results = pd.DataFrame(results)
return self._reorder_results_columns(results)
def _filter_state(self, state: StateType) -> StateType:
for qty in self._watchlist:
if qty not in state:
raise Exception(f"{qty} not found in state: {state}")
return {qty: state[qty] for qty in self._watchlist | set(self.INDEX_COLS)}
def _reorder_results_columns(self, results: pd.DataFrame) -> pd.DataFrame:
cols = self.INDEX_COLS + sorted([col for col in results.columns if col not in self.INDEX_COLS])
return results[cols]
run(self, runs=1, steps_per_run=3, visualize_results=False)
Run your simulation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runs |
int |
The number of times to run your simulation. |
1 |
visualize_results |
bool |
Optionally build and start a Streamlit app to explore simulation results. |
False |
Returns:
Type | Description |
---|---|
results |
A pandas DataFrame containing your simulation results. |
Source code in solsim/simulation.py
def run(self, runs: int = 1, steps_per_run: int = 3, visualize_results: bool = False) -> pd.DataFrame:
"""Run your simulation.
Args:
runs: The number of times to run your simulation.
visualize_results: Optionally build and start a Streamlit app to explore simulation results.
Returns:
results: A pandas DataFrame containing your simulation results.
"""
results = asyncio.run(self._run(runs, steps_per_run))
if visualize_results:
try:
with tempfile.TemporaryDirectory() as tmpdir:
app = self._start_results_app(results, tmpdir)
app.wait()
except KeyboardInterrupt:
pass
return results