airbyte.sources.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from dataclasses import dataclass 12from enum import Enum 13from pathlib import Path 14 15import requests 16 17from airbyte import exceptions as exc 18from airbyte._util.meta import is_docker_installed 19from airbyte.constants import AIRBYTE_OFFLINE_MODE 20from airbyte.logs import warn_once 21from airbyte.version import get_version 22 23 24logger = logging.getLogger("airbyte") 25 26 27__cache: dict[str, ConnectorMetadata] | None = None 28 29 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 32 33_PYTHON_LANGUAGE = "python" 34_MANIFEST_ONLY_LANGUAGE = "manifest-only" 35 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 38 39_LOWCODE_CONNECTORS_NEEDING_PYTHON: list[str] = [ 40 "source-alpha-vantage", 41 "source-amplitude", 42 "source-apify-dataset", 43 "source-asana", 44 "source-avni", 45 "source-aws-cloudtrail", 46 "source-bamboo-hr", 47 "source-braintree", 48 "source-braze", 49 "source-chargebee", 50 "source-commercetools", 51 "source-eventbrite", 52 "source-facebook-pages", 53 "source-fastbill", 54 "source-freshdesk", 55 "source-gitlab", 56 "source-gnews", 57 "source-gong", 58 "source-greenhouse", 59 "source-instagram", 60 "source-instatus", 61 "source-intercom", 62 "source-iterable", 63 "source-jina-ai-reader", 64 "source-jira", 65 "source-klaviyo", 66 "source-looker", 67 "source-mailchimp", 68 "source-mixpanel", 69 "source-monday", 70 "source-my-hours", 71 "source-notion", 72 "source-okta", 73 "source-orb", 74 "source-outreach", 75 "source-paypal-transaction", 76 "source-pinterest", 77 "source-pipedrive", 78 "source-pocket", 79 "source-posthog", 80 "source-prestashop", 81 "source-public-apis", 82 "source-qualaroo", 83 "source-railz", 84 "source-recharge", 85 "source-recurly", 86 "source-retently", 87 "source-rss", 88 "source-salesloft", 89 "source-service-now", 90 "source-slack", 91 "source-surveymonkey", 92 "source-the-guardian-api", 93 "source-tiktok-marketing", 94 "source-trello", 95 "source-typeform", 96 "source-us-census", 97 "source-xero", 98 "source-younium", 99 "source-zendesk-chat", 100 "source-zendesk-sunshine", 101 "source-zendesk-support", 102 "source-zendesk-talk", 103 "source-zenloop", 104 "source-zoom", 105] 106_LOWCODE_CONNECTORS_FAILING_VALIDATION: list[str] = [] 107# Connectors that return 404 or some other misc exception. 108_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [ 109 "source-adjust", 110 "source-amazon-ads", 111 "source-marketo", 112] 113# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package. 114_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = [] 115_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [ 116 *_LOWCODE_CONNECTORS_FAILING_VALIDATION, 117 *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS, 118 *_LOWCODE_CONNECTORS_NEEDING_PYTHON, 119 *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS, 120] 121 122 123class InstallType(str, Enum): 124 """The type of installation for a connector.""" 125 126 YAML = "yaml" 127 PYTHON = "python" 128 DOCKER = "docker" 129 JAVA = "java" 130 131 132class Language(str, Enum): 133 """The language of a connector.""" 134 135 PYTHON = InstallType.PYTHON.value 136 JAVA = InstallType.JAVA.value 137 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 138 139 140@dataclass 141class ConnectorMetadata: 142 """Metadata for a connector.""" 143 144 name: str 145 """Connector name. For example, "source-google-sheets".""" 146 147 latest_available_version: str | None 148 """The latest available version of the connector.""" 149 150 pypi_package_name: str | None 151 """The name of the PyPI package for the connector, if it exists.""" 152 153 language: Language | None 154 """The language of the connector.""" 155 156 install_types: set[InstallType] 157 """The supported install types for the connector.""" 158 159 @property 160 def default_install_type(self) -> InstallType: 161 """Return the default install type for the connector.""" 162 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 163 return InstallType.YAML 164 165 if InstallType.PYTHON in self.install_types: 166 return InstallType.PYTHON 167 168 # Else: Java or Docker 169 return InstallType.DOCKER 170 171 172def _get_registry_url() -> str: 173 if _REGISTRY_ENV_VAR in os.environ: 174 return str(os.environ.get(_REGISTRY_ENV_VAR)) 175 176 return _REGISTRY_URL 177 178 179def _is_registry_disabled(url: str) -> bool: 180 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 181 182 183def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 184 name = entry["dockerRepository"].replace("airbyte/", "") 185 latest_version: str | None = entry.get("dockerImageTag") 186 tags = entry.get("tags", []) 187 language: Language | None = None 188 189 if "language" in entry and entry["language"] is not None: 190 try: 191 language = Language(entry["language"]) 192 except Exception: 193 warnings.warn( 194 message=f"Invalid language for connector {name}: {entry['language']}", 195 stacklevel=2, 196 ) 197 if not language and _PYTHON_LANGUAGE_TAG in tags: 198 language = Language.PYTHON 199 if not language and _MANIFEST_ONLY_TAG in tags: 200 language = Language.MANIFEST_ONLY 201 202 remote_registries: dict = entry.get("remoteRegistries", {}) 203 pypi_registry: dict = remote_registries.get("pypi", {}) 204 pypi_package_name: str = pypi_registry.get("packageName", None) 205 pypi_enabled: bool = pypi_registry.get("enabled", False) 206 install_types: set[InstallType] = { 207 x 208 for x in [ 209 InstallType.DOCKER, # Always True 210 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 211 InstallType.JAVA if language == Language.JAVA else None, 212 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 213 ] 214 if x 215 } 216 217 return ConnectorMetadata( 218 name=name, 219 latest_available_version=latest_version, 220 pypi_package_name=pypi_package_name if pypi_enabled else None, 221 language=language, 222 install_types=install_types, 223 ) 224 225 226def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 227 """Return the registry cache.""" 228 global __cache 229 if __cache and not force_refresh: 230 return __cache 231 232 registry_url = _get_registry_url() 233 234 if _is_registry_disabled(registry_url): 235 return {} 236 237 if registry_url.startswith("http"): 238 response = requests.get( 239 registry_url, 240 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 241 ) 242 response.raise_for_status() 243 data = response.json() 244 else: 245 # Assume local file 246 with Path(registry_url).open(encoding="utf-8") as f: 247 data = json.load(f) 248 249 new_cache: dict[str, ConnectorMetadata] = {} 250 251 for connector in data["sources"]: 252 connector_metadata = _registry_entry_to_connector_metadata(connector) 253 new_cache[connector_metadata.name] = connector_metadata 254 255 for connector in data["destinations"]: 256 connector_metadata = _registry_entry_to_connector_metadata(connector) 257 new_cache[connector_metadata.name] = connector_metadata 258 259 if len(new_cache) == 0: 260 # This isn't necessarily fatal, since users can bring their own 261 # connector definitions. 262 warn_once( 263 message=f"Connector registry is empty: {registry_url}", 264 with_stack=False, 265 ) 266 267 __cache = new_cache 268 return __cache 269 270 271def get_connector_metadata(name: str) -> ConnectorMetadata | None: 272 """Check the cache for the connector. 273 274 If the cache is empty, populate by calling update_cache. 275 """ 276 registry_url = _get_registry_url() 277 278 if _is_registry_disabled(registry_url): 279 return None 280 281 cache = copy(_get_registry_cache()) 282 283 if not cache: 284 raise exc.PyAirbyteInternalError( 285 message="Connector registry could not be loaded.", 286 context={ 287 "registry_url": _get_registry_url(), 288 }, 289 ) 290 if name not in cache: 291 raise exc.AirbyteConnectorNotRegisteredError( 292 connector_name=name, 293 context={ 294 "registry_url": _get_registry_url(), 295 "available_connectors": get_available_connectors(), 296 }, 297 ) 298 return cache[name] 299 300 301def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 302 """Return a list of all available connectors. 303 304 Connectors will be returned in alphabetical order, with the standard prefix "source-". 305 """ 306 if install_type is None: 307 # No install type specified. Filter for whatever is runnable. 308 if is_docker_installed(): 309 logger.info("Docker is detected. Returning all connectors.") 310 # If Docker is available, return all connectors. 311 return sorted(conn.name for conn in _get_registry_cache().values()) 312 313 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 314 315 # If Docker is not available, return only Python and Manifest-based connectors. 316 return sorted( 317 conn.name 318 for conn in _get_registry_cache().values() 319 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 320 ) 321 322 if not isinstance(install_type, InstallType): 323 install_type = InstallType(install_type) 324 325 if install_type == InstallType.PYTHON: 326 return sorted( 327 conn.name 328 for conn in _get_registry_cache().values() 329 if conn.pypi_package_name is not None 330 ) 331 332 if install_type == InstallType.JAVA: 333 warnings.warn( 334 message="Java connectors are not yet supported.", 335 stacklevel=2, 336 ) 337 return sorted( 338 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 339 ) 340 341 if install_type == InstallType.DOCKER: 342 return sorted(conn.name for conn in _get_registry_cache().values()) 343 344 if install_type == InstallType.YAML: 345 return sorted( 346 conn.name 347 for conn in _get_registry_cache().values() 348 if InstallType.YAML in conn.install_types 349 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 350 ) 351 352 # pragma: no cover # Should never be reached. 353 raise exc.PyAirbyteInputError( 354 message="Invalid install type.", 355 context={ 356 "install_type": install_type, 357 }, 358 )
logger =
<Logger airbyte (INFO)>
class
InstallType(builtins.str, enum.Enum):
124class InstallType(str, Enum): 125 """The type of installation for a connector.""" 126 127 YAML = "yaml" 128 PYTHON = "python" 129 DOCKER = "docker" 130 JAVA = "java"
The type of installation for a connector.
YAML =
<InstallType.YAML: 'yaml'>
PYTHON =
<InstallType.PYTHON: 'python'>
DOCKER =
<InstallType.DOCKER: 'docker'>
JAVA =
<InstallType.JAVA: 'java'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Language(builtins.str, enum.Enum):
133class Language(str, Enum): 134 """The language of a connector.""" 135 136 PYTHON = InstallType.PYTHON.value 137 JAVA = InstallType.JAVA.value 138 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
PYTHON =
<Language.PYTHON: 'python'>
JAVA =
<Language.JAVA: 'java'>
MANIFEST_ONLY =
<Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
@dataclass
class
ConnectorMetadata:
141@dataclass 142class ConnectorMetadata: 143 """Metadata for a connector.""" 144 145 name: str 146 """Connector name. For example, "source-google-sheets".""" 147 148 latest_available_version: str | None 149 """The latest available version of the connector.""" 150 151 pypi_package_name: str | None 152 """The name of the PyPI package for the connector, if it exists.""" 153 154 language: Language | None 155 """The language of the connector.""" 156 157 install_types: set[InstallType] 158 """The supported install types for the connector.""" 159 160 @property 161 def default_install_type(self) -> InstallType: 162 """Return the default install type for the connector.""" 163 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 164 return InstallType.YAML 165 166 if InstallType.PYTHON in self.install_types: 167 return InstallType.PYTHON 168 169 # Else: Java or Docker 170 return InstallType.DOCKER
Metadata for a connector.
ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: Language | None, install_types: set[InstallType])
default_install_type: InstallType
160 @property 161 def default_install_type(self) -> InstallType: 162 """Return the default install type for the connector.""" 163 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 164 return InstallType.YAML 165 166 if InstallType.PYTHON in self.install_types: 167 return InstallType.PYTHON 168 169 # Else: Java or Docker 170 return InstallType.DOCKER
Return the default install type for the connector.
272def get_connector_metadata(name: str) -> ConnectorMetadata | None: 273 """Check the cache for the connector. 274 275 If the cache is empty, populate by calling update_cache. 276 """ 277 registry_url = _get_registry_url() 278 279 if _is_registry_disabled(registry_url): 280 return None 281 282 cache = copy(_get_registry_cache()) 283 284 if not cache: 285 raise exc.PyAirbyteInternalError( 286 message="Connector registry could not be loaded.", 287 context={ 288 "registry_url": _get_registry_url(), 289 }, 290 ) 291 if name not in cache: 292 raise exc.AirbyteConnectorNotRegisteredError( 293 connector_name=name, 294 context={ 295 "registry_url": _get_registry_url(), 296 "available_connectors": get_available_connectors(), 297 }, 298 ) 299 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
302def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 303 """Return a list of all available connectors. 304 305 Connectors will be returned in alphabetical order, with the standard prefix "source-". 306 """ 307 if install_type is None: 308 # No install type specified. Filter for whatever is runnable. 309 if is_docker_installed(): 310 logger.info("Docker is detected. Returning all connectors.") 311 # If Docker is available, return all connectors. 312 return sorted(conn.name for conn in _get_registry_cache().values()) 313 314 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 315 316 # If Docker is not available, return only Python and Manifest-based connectors. 317 return sorted( 318 conn.name 319 for conn in _get_registry_cache().values() 320 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 321 ) 322 323 if not isinstance(install_type, InstallType): 324 install_type = InstallType(install_type) 325 326 if install_type == InstallType.PYTHON: 327 return sorted( 328 conn.name 329 for conn in _get_registry_cache().values() 330 if conn.pypi_package_name is not None 331 ) 332 333 if install_type == InstallType.JAVA: 334 warnings.warn( 335 message="Java connectors are not yet supported.", 336 stacklevel=2, 337 ) 338 return sorted( 339 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 340 ) 341 342 if install_type == InstallType.DOCKER: 343 return sorted(conn.name for conn in _get_registry_cache().values()) 344 345 if install_type == InstallType.YAML: 346 return sorted( 347 conn.name 348 for conn in _get_registry_cache().values() 349 if InstallType.YAML in conn.install_types 350 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 351 ) 352 353 # pragma: no cover # Should never be reached. 354 raise exc.PyAirbyteInputError( 355 message="Invalid install type.", 356 context={ 357 "install_type": install_type, 358 }, 359 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".