airbyte_cdk.sources.declarative.auth.jwt
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import base64 6import json 7from dataclasses import InitVar, dataclass 8from datetime import datetime 9from typing import Any, Mapping, MutableMapping, Optional, Union, cast 10 11import jwt 12from cryptography.hazmat.primitives import serialization 13from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey 14from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey 15from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 16from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey 17 18from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator 19from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean 20from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping 21from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 22from airbyte_cdk.sources.declarative.requesters.request_option import ( 23 RequestOption, 24 RequestOptionType, 25) 26 27# Type alias for keys that JWT library accepts 28JwtKeyTypes = Union[ 29 RSAPrivateKey, EllipticCurvePrivateKey, Ed25519PrivateKey, Ed448PrivateKey, str, bytes 30] 31 32 33class JwtAlgorithm(str): 34 """ 35 Enum for supported JWT algorithms 36 """ 37 38 HS256 = "HS256" 39 HS384 = "HS384" 40 HS512 = "HS512" 41 ES256 = "ES256" 42 ES256K = "ES256K" 43 ES384 = "ES384" 44 ES512 = "ES512" 45 RS256 = "RS256" 46 RS384 = "RS384" 47 RS512 = "RS512" 48 PS256 = "PS256" 49 PS384 = "PS384" 50 PS512 = "PS512" 51 EdDSA = "EdDSA" 52 53 54@dataclass 55class JwtAuthenticator(DeclarativeAuthenticator): 56 """ 57 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 58 59 Attributes: 60 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 61 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 62 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 63 token_duration (Optional[int]): The duration in seconds for which the token is valid 64 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 65 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 66 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 67 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 68 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 69 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 70 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 71 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 72 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 73 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 74 """ 75 76 config: Mapping[str, Any] 77 parameters: InitVar[Mapping[str, Any]] 78 secret_key: Union[InterpolatedString, str] 79 algorithm: Union[str, JwtAlgorithm] 80 token_duration: Optional[int] 81 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 82 header_prefix: Optional[Union[InterpolatedString, str]] = None 83 kid: Optional[Union[InterpolatedString, str]] = None 84 typ: Optional[Union[InterpolatedString, str]] = None 85 cty: Optional[Union[InterpolatedString, str]] = None 86 iss: Optional[Union[InterpolatedString, str]] = None 87 sub: Optional[Union[InterpolatedString, str]] = None 88 aud: Optional[Union[InterpolatedString, str]] = None 89 additional_jwt_headers: Optional[Mapping[str, Any]] = None 90 additional_jwt_payload: Optional[Mapping[str, Any]] = None 91 passphrase: Optional[Union[InterpolatedString, str]] = None 92 request_option: Optional[RequestOption] = None 93 94 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 95 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 96 self._algorithm = ( 97 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 98 ) 99 self._base64_encode_secret_key = ( 100 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 101 if isinstance(self.base64_encode_secret_key, str) 102 else self.base64_encode_secret_key 103 ) 104 self._token_duration = self.token_duration 105 self._header_prefix = ( 106 InterpolatedString.create(self.header_prefix, parameters=parameters) 107 if self.header_prefix 108 else None 109 ) 110 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 111 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 112 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 113 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 114 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 115 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 116 self._additional_jwt_headers = InterpolatedMapping( 117 self.additional_jwt_headers or {}, parameters=parameters 118 ) 119 self._additional_jwt_payload = InterpolatedMapping( 120 self.additional_jwt_payload or {}, parameters=parameters 121 ) 122 self._passphrase = ( 123 InterpolatedString.create(self.passphrase, parameters=parameters) 124 if self.passphrase 125 else None 126 ) 127 128 # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed 129 # to be loaded into the request headers under the `Authorization` key. This is not always the case, but 130 # this default option allows for backwards compatibility to be retained for existing connectors 131 self._request_option = self.request_option or RequestOption( 132 inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters 133 ) 134 135 def _get_jwt_headers(self) -> dict[str, Any]: 136 """ 137 Builds and returns the headers used when signing the JWT. 138 """ 139 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 140 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 141 raise ValueError( 142 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 143 ) 144 145 if self._kid: 146 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 147 if self._typ: 148 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 149 if self._cty: 150 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 151 headers["alg"] = self._algorithm 152 return headers 153 154 def _get_jwt_payload(self) -> dict[str, Any]: 155 """ 156 Builds and returns the payload used when signing the JWT. 157 """ 158 now = int(datetime.now().timestamp()) 159 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 160 nbf = now 161 162 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 163 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 164 raise ValueError( 165 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 166 ) 167 168 if self._iss: 169 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 170 if self._sub: 171 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 172 if self._aud: 173 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 174 175 payload["iat"] = now 176 payload["exp"] = exp 177 payload["nbf"] = nbf 178 return payload 179 180 def _get_secret_key(self) -> JwtKeyTypes: 181 """ 182 Returns the secret key used to sign the JWT. 183 """ 184 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 185 186 if self._passphrase: 187 passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads) 188 if passphrase_value: 189 private_key = serialization.load_pem_private_key( 190 secret_key.encode(), 191 password=passphrase_value.encode(), 192 ) 193 return cast(JwtKeyTypes, private_key) 194 195 return ( 196 base64.b64encode(secret_key.encode()).decode() 197 if self._base64_encode_secret_key 198 else secret_key 199 ) 200 201 def _get_signed_token(self) -> Union[str, Any]: 202 """ 203 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 204 """ 205 try: 206 return jwt.encode( 207 payload=self._get_jwt_payload(), 208 key=self._get_secret_key(), 209 algorithm=self._algorithm, 210 headers=self._get_jwt_headers(), 211 ) 212 except Exception as e: 213 raise ValueError(f"Failed to sign token: {e}") 214 215 def _get_header_prefix(self) -> Union[str, None]: 216 """ 217 Returns the header prefix to be used when attaching the token to the request. 218 """ 219 return ( 220 self._header_prefix.eval(self.config, json_loads=json.loads) 221 if self._header_prefix 222 else None 223 ) 224 225 @property 226 def auth_header(self) -> str: 227 options = self._get_request_options(RequestOptionType.header) 228 return next(iter(options.keys()), "") 229 230 @property 231 def token(self) -> str: 232 return ( 233 f"{self._get_header_prefix()} {self._get_signed_token()}" 234 if self._get_header_prefix() 235 else self._get_signed_token() 236 ) 237 238 def get_request_params(self) -> Mapping[str, Any]: 239 return self._get_request_options(RequestOptionType.request_parameter) 240 241 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 242 return self._get_request_options(RequestOptionType.body_data) 243 244 def get_request_body_json(self) -> Mapping[str, Any]: 245 return self._get_request_options(RequestOptionType.body_json) 246 247 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 248 options: MutableMapping[str, Any] = {} 249 if self._request_option.inject_into == option_type: 250 self._request_option.inject_into_request(options, self.token, self.config) 251 return options
JwtKeyTypes =
typing.Union[cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey, str, bytes]
class
JwtAlgorithm(builtins.str):
34class JwtAlgorithm(str): 35 """ 36 Enum for supported JWT algorithms 37 """ 38 39 HS256 = "HS256" 40 HS384 = "HS384" 41 HS512 = "HS512" 42 ES256 = "ES256" 43 ES256K = "ES256K" 44 ES384 = "ES384" 45 ES512 = "ES512" 46 RS256 = "RS256" 47 RS384 = "RS384" 48 RS512 = "RS512" 49 PS256 = "PS256" 50 PS384 = "PS384" 51 PS512 = "PS512" 52 EdDSA = "EdDSA"
Enum for supported JWT algorithms
@dataclass
class
JwtAuthenticator55@dataclass 56class JwtAuthenticator(DeclarativeAuthenticator): 57 """ 58 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 59 60 Attributes: 61 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 62 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 63 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 64 token_duration (Optional[int]): The duration in seconds for which the token is valid 65 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 66 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 67 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 68 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 69 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 70 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 71 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 72 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 73 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 74 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 75 """ 76 77 config: Mapping[str, Any] 78 parameters: InitVar[Mapping[str, Any]] 79 secret_key: Union[InterpolatedString, str] 80 algorithm: Union[str, JwtAlgorithm] 81 token_duration: Optional[int] 82 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 83 header_prefix: Optional[Union[InterpolatedString, str]] = None 84 kid: Optional[Union[InterpolatedString, str]] = None 85 typ: Optional[Union[InterpolatedString, str]] = None 86 cty: Optional[Union[InterpolatedString, str]] = None 87 iss: Optional[Union[InterpolatedString, str]] = None 88 sub: Optional[Union[InterpolatedString, str]] = None 89 aud: Optional[Union[InterpolatedString, str]] = None 90 additional_jwt_headers: Optional[Mapping[str, Any]] = None 91 additional_jwt_payload: Optional[Mapping[str, Any]] = None 92 passphrase: Optional[Union[InterpolatedString, str]] = None 93 request_option: Optional[RequestOption] = None 94 95 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 96 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 97 self._algorithm = ( 98 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 99 ) 100 self._base64_encode_secret_key = ( 101 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 102 if isinstance(self.base64_encode_secret_key, str) 103 else self.base64_encode_secret_key 104 ) 105 self._token_duration = self.token_duration 106 self._header_prefix = ( 107 InterpolatedString.create(self.header_prefix, parameters=parameters) 108 if self.header_prefix 109 else None 110 ) 111 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 112 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 113 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 114 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 115 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 116 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 117 self._additional_jwt_headers = InterpolatedMapping( 118 self.additional_jwt_headers or {}, parameters=parameters 119 ) 120 self._additional_jwt_payload = InterpolatedMapping( 121 self.additional_jwt_payload or {}, parameters=parameters 122 ) 123 self._passphrase = ( 124 InterpolatedString.create(self.passphrase, parameters=parameters) 125 if self.passphrase 126 else None 127 ) 128 129 # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed 130 # to be loaded into the request headers under the `Authorization` key. This is not always the case, but 131 # this default option allows for backwards compatibility to be retained for existing connectors 132 self._request_option = self.request_option or RequestOption( 133 inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters 134 ) 135 136 def _get_jwt_headers(self) -> dict[str, Any]: 137 """ 138 Builds and returns the headers used when signing the JWT. 139 """ 140 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 141 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 142 raise ValueError( 143 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 144 ) 145 146 if self._kid: 147 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 148 if self._typ: 149 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 150 if self._cty: 151 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 152 headers["alg"] = self._algorithm 153 return headers 154 155 def _get_jwt_payload(self) -> dict[str, Any]: 156 """ 157 Builds and returns the payload used when signing the JWT. 158 """ 159 now = int(datetime.now().timestamp()) 160 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 161 nbf = now 162 163 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 164 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 165 raise ValueError( 166 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 167 ) 168 169 if self._iss: 170 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 171 if self._sub: 172 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 173 if self._aud: 174 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 175 176 payload["iat"] = now 177 payload["exp"] = exp 178 payload["nbf"] = nbf 179 return payload 180 181 def _get_secret_key(self) -> JwtKeyTypes: 182 """ 183 Returns the secret key used to sign the JWT. 184 """ 185 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 186 187 if self._passphrase: 188 passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads) 189 if passphrase_value: 190 private_key = serialization.load_pem_private_key( 191 secret_key.encode(), 192 password=passphrase_value.encode(), 193 ) 194 return cast(JwtKeyTypes, private_key) 195 196 return ( 197 base64.b64encode(secret_key.encode()).decode() 198 if self._base64_encode_secret_key 199 else secret_key 200 ) 201 202 def _get_signed_token(self) -> Union[str, Any]: 203 """ 204 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 205 """ 206 try: 207 return jwt.encode( 208 payload=self._get_jwt_payload(), 209 key=self._get_secret_key(), 210 algorithm=self._algorithm, 211 headers=self._get_jwt_headers(), 212 ) 213 except Exception as e: 214 raise ValueError(f"Failed to sign token: {e}") 215 216 def _get_header_prefix(self) -> Union[str, None]: 217 """ 218 Returns the header prefix to be used when attaching the token to the request. 219 """ 220 return ( 221 self._header_prefix.eval(self.config, json_loads=json.loads) 222 if self._header_prefix 223 else None 224 ) 225 226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "") 230 231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 ) 238 239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter) 241 242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data) 244 245 def get_request_body_json(self) -> Mapping[str, Any]: 246 return self._get_request_options(RequestOptionType.body_json) 247 248 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 249 options: MutableMapping[str, Any] = {} 250 if self._request_option.inject_into == option_type: 251 self._request_option.inject_into_request(options, self.token, self.config) 252 return options
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None, passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, request_option: Optional[airbyte_cdk.RequestOption] = None)
secret_key: Union[airbyte_cdk.InterpolatedString, str]
algorithm: Union[str, JwtAlgorithm]
auth_header: str
226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "")
HTTP header to set on the requests
token: str
231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 )
The header value to set on outgoing HTTP requests
def
get_request_params(self) -> Mapping[str, Any]:
239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter)
HTTP request parameter to add to the requests
def
get_request_body_data(self) -> Union[Mapping[str, Any], str]:
242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data)
Form-encoded body data to set on the requests