airbyte.sources.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from dataclasses import dataclass 12from enum import Enum 13from pathlib import Path 14 15import requests 16 17from airbyte import exceptions as exc 18from airbyte._util.meta import is_docker_installed 19from airbyte.constants import AIRBYTE_OFFLINE_MODE 20from airbyte.logs import warn_once 21from airbyte.version import get_version 22 23 24logger = logging.getLogger("airbyte") 25 26 27__cache: dict[str, ConnectorMetadata] | None = None 28 29 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 32 33_LOWCODE_CDK_TAG = "cdk:low-code" 34 35_PYTHON_LANGUAGE = "python" 36_MANIFEST_ONLY_LANGUAGE = "manifest-only" 37 38_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 39_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 40 41_LOWCODE_CONNECTORS_NEEDING_PYTHON: list[str] = [ 42 "source-adjust", 43 "source-alpha-vantage", 44 "source-amplitude", 45 "source-apify-dataset", 46 "source-asana", 47 "source-avni", 48 "source-aws-cloudtrail", 49 "source-bamboo-hr", 50 "source-braintree", 51 "source-braze", 52 "source-chargebee", 53 "source-close-com", 54 "source-commercetools", 55 "source-facebook-pages", 56 "source-fastbill", 57 "source-freshdesk", 58 "source-gitlab", 59 "source-gnews", 60 "source-greenhouse", 61 "source-instagram", 62 "source-instatus", 63 "source-intercom", 64 "source-iterable", 65 "source-jina-ai-reader", 66 "source-jira", 67 "source-klaviyo", 68 "source-looker", 69 "source-mailchimp", 70 "source-mixpanel", 71 "source-monday", 72 "source-my-hours", 73 "source-notion", 74 "source-okta", 75 "source-orb", 76 "source-outreach", 77 "source-partnerstack", 78 "source-paypal-transaction", 79 "source-pinterest", 80 "source-pipedrive", 81 "source-pocket", 82 "source-posthog", 83 "source-prestashop", 84 "source-public-apis", 85 "source-qualaroo", 86 "source-quickbooks", 87 "source-railz", 88 "source-recharge", 89 "source-recurly", 90 "source-retently", 91 "source-rss", 92 "source-salesloft", 93 "source-slack", 94 "source-surveymonkey", 95 "source-tiktok-marketing", 96 "source-the-guardian-api", 97 "source-trello", 98 "source-typeform", 99 "source-us-census", 100 "source-xero", 101 "source-younium", 102 "source-zendesk-chat", 103 "source-zendesk-sunshine", 104 "source-zendesk-support", 105 "source-zendesk-talk", 106 "source-zenloop", 107 "source-zoom", 108] 109_LOWCODE_CONNECTORS_FAILING_VALIDATION = [ 110 "source-amazon-ads", 111] 112# Connectors that return 404 or some other misc exception. 113_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [] 114# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package. 115_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = [ 116 "source-apple-search-ads", 117 "source-marketo", 118 "source-n8n", 119 "source-onesignal", 120 "source-postmarkapp", 121 "source-sentry", 122 "source-unleash", 123] 124_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [ 125 *_LOWCODE_CONNECTORS_FAILING_VALIDATION, 126 *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS, 127 *_LOWCODE_CONNECTORS_NEEDING_PYTHON, 128 *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS, 129] 130 131 132class InstallType(str, Enum): 133 """The type of installation for a connector.""" 134 135 YAML = "yaml" 136 PYTHON = "python" 137 DOCKER = "docker" 138 JAVA = "java" 139 140 141class Language(str, Enum): 142 """The language of a connector.""" 143 144 PYTHON = InstallType.PYTHON.value 145 JAVA = InstallType.JAVA.value 146 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 147 148 149@dataclass 150class ConnectorMetadata: 151 """Metadata for a connector.""" 152 153 name: str 154 """Connector name. For example, "source-google-sheets".""" 155 156 latest_available_version: str | None 157 """The latest available version of the connector.""" 158 159 pypi_package_name: str | None 160 """The name of the PyPI package for the connector, if it exists.""" 161 162 language: Language | None 163 """The language of the connector.""" 164 165 install_types: set[InstallType] 166 """The supported install types for the connector.""" 167 168 @property 169 def default_install_type(self) -> InstallType: 170 """Return the default install type for the connector.""" 171 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 172 return InstallType.YAML 173 174 if InstallType.PYTHON in self.install_types: 175 return InstallType.PYTHON 176 177 # Else: Java or Docker 178 return InstallType.DOCKER 179 180 181def _get_registry_url() -> str: 182 if _REGISTRY_ENV_VAR in os.environ: 183 return str(os.environ.get(_REGISTRY_ENV_VAR)) 184 185 return _REGISTRY_URL 186 187 188def _is_registry_disabled(url: str) -> bool: 189 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 190 191 192def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 193 name = entry["dockerRepository"].replace("airbyte/", "") 194 latest_version: str | None = entry.get("dockerImageTag") 195 tags = entry.get("tags", []) 196 language: Language | None = None 197 198 if "language" in entry and entry["language"] is not None: 199 try: 200 language = Language(entry["language"]) 201 except Exception: 202 warnings.warn( 203 message=f"Invalid language for connector {name}: {entry['language']}", 204 stacklevel=2, 205 ) 206 if not language and _PYTHON_LANGUAGE_TAG in tags: 207 language = Language.PYTHON 208 if not language and _MANIFEST_ONLY_TAG in tags: 209 language = Language.MANIFEST_ONLY 210 211 remote_registries: dict = entry.get("remoteRegistries", {}) 212 pypi_registry: dict = remote_registries.get("pypi", {}) 213 pypi_package_name: str = pypi_registry.get("packageName", None) 214 pypi_enabled: bool = pypi_registry.get("enabled", False) 215 install_types: set[InstallType] = { 216 x 217 for x in [ 218 InstallType.DOCKER, # Always True 219 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 220 InstallType.JAVA if language == Language.JAVA else None, 221 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 222 # TODO: Remove 'cdk:low-code' check once all connectors are migrated to manifest-only. 223 # https://github.com/airbytehq/PyAirbyte/issues/348 224 InstallType.YAML if _LOWCODE_CDK_TAG in tags else None, 225 ] 226 if x 227 } 228 229 return ConnectorMetadata( 230 name=name, 231 latest_available_version=latest_version, 232 pypi_package_name=pypi_package_name if pypi_enabled else None, 233 language=language, 234 install_types=install_types, 235 ) 236 237 238def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 239 """Return the registry cache.""" 240 global __cache 241 if __cache and not force_refresh: 242 return __cache 243 244 registry_url = _get_registry_url() 245 246 if _is_registry_disabled(registry_url): 247 return {} 248 249 if registry_url.startswith("http"): 250 response = requests.get( 251 registry_url, 252 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 253 ) 254 response.raise_for_status() 255 data = response.json() 256 else: 257 # Assume local file 258 with Path(registry_url).open(encoding="utf-8") as f: 259 data = json.load(f) 260 261 new_cache: dict[str, ConnectorMetadata] = {} 262 263 for connector in data["sources"]: 264 connector_metadata = _registry_entry_to_connector_metadata(connector) 265 new_cache[connector_metadata.name] = connector_metadata 266 267 for connector in data["destinations"]: 268 connector_metadata = _registry_entry_to_connector_metadata(connector) 269 new_cache[connector_metadata.name] = connector_metadata 270 271 if len(new_cache) == 0: 272 # This isn't necessarily fatal, since users can bring their own 273 # connector definitions. 274 warn_once( 275 message=f"Connector registry is empty: {registry_url}", 276 with_stack=False, 277 ) 278 279 __cache = new_cache 280 return __cache 281 282 283def get_connector_metadata(name: str) -> ConnectorMetadata | None: 284 """Check the cache for the connector. 285 286 If the cache is empty, populate by calling update_cache. 287 """ 288 registry_url = _get_registry_url() 289 290 if _is_registry_disabled(registry_url): 291 return None 292 293 cache = copy(_get_registry_cache()) 294 295 if not cache: 296 raise exc.PyAirbyteInternalError( 297 message="Connector registry could not be loaded.", 298 context={ 299 "registry_url": _get_registry_url(), 300 }, 301 ) 302 if name not in cache: 303 raise exc.AirbyteConnectorNotRegisteredError( 304 connector_name=name, 305 context={ 306 "registry_url": _get_registry_url(), 307 "available_connectors": get_available_connectors(), 308 }, 309 ) 310 return cache[name] 311 312 313def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 314 """Return a list of all available connectors. 315 316 Connectors will be returned in alphabetical order, with the standard prefix "source-". 317 """ 318 if install_type is None: 319 # No install type specified. Filter for whatever is runnable. 320 if is_docker_installed(): 321 logger.info("Docker is detected. Returning all connectors.") 322 # If Docker is available, return all connectors. 323 return sorted(conn.name for conn in _get_registry_cache().values()) 324 325 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 326 327 # If Docker is not available, return only Python and Manifest-based connectors. 328 return sorted( 329 conn.name 330 for conn in _get_registry_cache().values() 331 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 332 ) 333 334 if not isinstance(install_type, InstallType): 335 install_type = InstallType(install_type) 336 337 if install_type == InstallType.PYTHON: 338 return sorted( 339 conn.name 340 for conn in _get_registry_cache().values() 341 if conn.pypi_package_name is not None 342 ) 343 344 if install_type == InstallType.JAVA: 345 warnings.warn( 346 message="Java connectors are not yet supported.", 347 stacklevel=2, 348 ) 349 return sorted( 350 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 351 ) 352 353 if install_type == InstallType.DOCKER: 354 return sorted(conn.name for conn in _get_registry_cache().values()) 355 356 if install_type == InstallType.YAML: 357 return sorted( 358 conn.name 359 for conn in _get_registry_cache().values() 360 if InstallType.YAML in conn.install_types 361 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 362 ) 363 364 # pragma: no cover # Should never be reached. 365 raise exc.PyAirbyteInputError( 366 message="Invalid install type.", 367 context={ 368 "install_type": install_type, 369 }, 370 )
logger =
<Logger airbyte (INFO)>
class
InstallType(builtins.str, enum.Enum):
133class InstallType(str, Enum): 134 """The type of installation for a connector.""" 135 136 YAML = "yaml" 137 PYTHON = "python" 138 DOCKER = "docker" 139 JAVA = "java"
The type of installation for a connector.
YAML =
<InstallType.YAML: 'yaml'>
PYTHON =
<InstallType.PYTHON: 'python'>
DOCKER =
<InstallType.DOCKER: 'docker'>
JAVA =
<InstallType.JAVA: 'java'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Language(builtins.str, enum.Enum):
142class Language(str, Enum): 143 """The language of a connector.""" 144 145 PYTHON = InstallType.PYTHON.value 146 JAVA = InstallType.JAVA.value 147 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
PYTHON =
<Language.PYTHON: 'python'>
JAVA =
<Language.JAVA: 'java'>
MANIFEST_ONLY =
<Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
@dataclass
class
ConnectorMetadata:
150@dataclass 151class ConnectorMetadata: 152 """Metadata for a connector.""" 153 154 name: str 155 """Connector name. For example, "source-google-sheets".""" 156 157 latest_available_version: str | None 158 """The latest available version of the connector.""" 159 160 pypi_package_name: str | None 161 """The name of the PyPI package for the connector, if it exists.""" 162 163 language: Language | None 164 """The language of the connector.""" 165 166 install_types: set[InstallType] 167 """The supported install types for the connector.""" 168 169 @property 170 def default_install_type(self) -> InstallType: 171 """Return the default install type for the connector.""" 172 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 173 return InstallType.YAML 174 175 if InstallType.PYTHON in self.install_types: 176 return InstallType.PYTHON 177 178 # Else: Java or Docker 179 return InstallType.DOCKER
Metadata for a connector.
ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: Language | None, install_types: set[InstallType])
default_install_type: InstallType
169 @property 170 def default_install_type(self) -> InstallType: 171 """Return the default install type for the connector.""" 172 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 173 return InstallType.YAML 174 175 if InstallType.PYTHON in self.install_types: 176 return InstallType.PYTHON 177 178 # Else: Java or Docker 179 return InstallType.DOCKER
Return the default install type for the connector.
284def get_connector_metadata(name: str) -> ConnectorMetadata | None: 285 """Check the cache for the connector. 286 287 If the cache is empty, populate by calling update_cache. 288 """ 289 registry_url = _get_registry_url() 290 291 if _is_registry_disabled(registry_url): 292 return None 293 294 cache = copy(_get_registry_cache()) 295 296 if not cache: 297 raise exc.PyAirbyteInternalError( 298 message="Connector registry could not be loaded.", 299 context={ 300 "registry_url": _get_registry_url(), 301 }, 302 ) 303 if name not in cache: 304 raise exc.AirbyteConnectorNotRegisteredError( 305 connector_name=name, 306 context={ 307 "registry_url": _get_registry_url(), 308 "available_connectors": get_available_connectors(), 309 }, 310 ) 311 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
314def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 315 """Return a list of all available connectors. 316 317 Connectors will be returned in alphabetical order, with the standard prefix "source-". 318 """ 319 if install_type is None: 320 # No install type specified. Filter for whatever is runnable. 321 if is_docker_installed(): 322 logger.info("Docker is detected. Returning all connectors.") 323 # If Docker is available, return all connectors. 324 return sorted(conn.name for conn in _get_registry_cache().values()) 325 326 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 327 328 # If Docker is not available, return only Python and Manifest-based connectors. 329 return sorted( 330 conn.name 331 for conn in _get_registry_cache().values() 332 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 333 ) 334 335 if not isinstance(install_type, InstallType): 336 install_type = InstallType(install_type) 337 338 if install_type == InstallType.PYTHON: 339 return sorted( 340 conn.name 341 for conn in _get_registry_cache().values() 342 if conn.pypi_package_name is not None 343 ) 344 345 if install_type == InstallType.JAVA: 346 warnings.warn( 347 message="Java connectors are not yet supported.", 348 stacklevel=2, 349 ) 350 return sorted( 351 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 352 ) 353 354 if install_type == InstallType.DOCKER: 355 return sorted(conn.name for conn in _get_registry_cache().values()) 356 357 if install_type == InstallType.YAML: 358 return sorted( 359 conn.name 360 for conn in _get_registry_cache().values() 361 if InstallType.YAML in conn.install_types 362 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 363 ) 364 365 # pragma: no cover # Should never be reached. 366 raise exc.PyAirbyteInputError( 367 message="Invalid install type.", 368 context={ 369 "install_type": install_type, 370 }, 371 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".