47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
from cvttpy_tools.secrets import Secrets
|
|
from cvttpy_tools.app import App
|
|
|
|
from cvttpy_tools.logger import Log
|
|
from cvttpy_trading.exchanges.coinbase.auth import create_rest_jwt
|
|
|
|
|
|
async def get_token(infscl_path: str) -> str:
|
|
secrets = Secrets.instance()
|
|
await secrets.load()
|
|
secret = secrets.get(key=infscl_path)
|
|
|
|
token = create_rest_jwt(
|
|
secret=secret,
|
|
method="GET",
|
|
url="https://api.coinbase.com/api/v3/brokerage/products"
|
|
)
|
|
return token
|
|
|
|
|
|
async def main():
|
|
token = await get_token(infscl_path="COINBASE_ADV_RO")
|
|
cmd = (
|
|
"curl"
|
|
" --request GET"
|
|
" --url"
|
|
" 'https://api.coinbase.com/api/v3/brokerage/products?"
|
|
"product_type=FUTURE"
|
|
"&expiring_contract_status=STATUS_UNEXPIRED"
|
|
"&contract_expiry_type=EXPIRING"
|
|
"'"
|
|
f" --header \"Authorization: Bearer {token}\""
|
|
)
|
|
|
|
import subprocess,json,pprint
|
|
resp = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True).stdout
|
|
# pprint.pprint(json.loads(resp))
|
|
print(resp)
|
|
|
|
if __name__ == "__main__":
|
|
Log(log_level="WARNING")
|
|
App()
|
|
App.instance().add_call(stage=App.Stage.Run, func=main())
|
|
App.instance().run()
|