Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

Python library for authorizing access and fetching personal data from portability APIs and services

## Using `pardner`

### With classes

In `services/` you'll find classes defined for different services that are supported by the library (e.g., Tumblr). You can make an instance of that class and use that same object for getting initial authorization from a user and for making data transfer requests.

### Stateless mode

In `stateless/` there are modules that expose functions grouped by service that allow you to complete the same tasks as in the classes described above. Unlike using pardner with the classes we provide, however, you supply the necessary data each time you make a request for each request.

## Developer set-up

> **tl;dr**:
Expand Down
1 change: 1 addition & 0 deletions src/pardner/stateless/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pardner.stateless.base import Scope as Scope
71 changes: 71 additions & 0 deletions src/pardner/stateless/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Any, Optional, TypeAlias

from requests_oauthlib import OAuth2Session

Scope: TypeAlias = str | set[object] | tuple[object] | list[object]


def generic_construct_authorization_url(
authorization_url_endpoint: str, client_id: str, redirect_uri: str, scope: Scope
) -> tuple[str, str]:
"""
Builds the authorization URL and state. Once the end-user (i.e., resource owner)
navigates to the authorization URL they can begin the authorization flow.

:param authorization_url_endpoint: The service's endpoint that must be hit to begin
the OAuth 2 flow.
:param client_id: Client identifier given by the OAuth provider upon registration.
:param redirect_uri: The registered callback URI.
:param scope: The scope of the access request. These may be any string but are
commonly URIs or various categories such as ``videos`` or ``documents``.

:returns: the authorization URL and state, respectively.
"""
oAuth2Session = OAuth2Session(
client_id=client_id, redirect_uri=redirect_uri, scope=scope
)
return oAuth2Session.authorization_url(authorization_url_endpoint)


def generic_fetch_token(
client_id: str,
redirect_uri: str,
scope: Scope,
token_url: str,
authorization_response: Optional[str] = None,
client_secret: Optional[str] = None,
code: Optional[str] = None,
include_client_id: Optional[bool] = None,
) -> dict[str, Any]:
"""
Once the end-user authorizes the application to access their data, the
resource server sends a request to `redirect_uri` with the authorization code as
a parameter. Using this authorization code, this method makes a request to the
resource server to obtain the access token.

One of either `code` or `authorization_response` must not be None.

:param client_id: Client identifier given by the OAuth provider upon registration.
:param redirect_uri: The registered callback URI.
:param scope: The scope of the access request. These may be any string but are
commonly URIs or various categories such as ``videos`` or ``documents``.
:param token_url: Token endpoint HTTPS URL.
:param authorization_response: the URL (with parameters) the end-user's browser
redirected to after authorization.
:param client_secret: The `client_secret` paired to the `client_id`.
:param code: Authorization code (used by WebApplicationClients).
:param include_client_id: Should the request body include the
`client_id` parameter.

:returns: the authorization URL and state, respectively.
"""
oAuth2Session = OAuth2Session(
client_id=client_id, redirect_uri=redirect_uri, scope=scope
)
return oAuth2Session.fetch_token(
token_url=token_url,
code=code,
authorization_response=authorization_response,
include_client_id=include_client_id,
client_secret=client_secret,
)
72 changes: 72 additions & 0 deletions src/pardner/stateless/tumblr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from enum import StrEnum
from typing import Any, Iterable, Optional

from pardner.stateless import Scope
from pardner.stateless.base import (
generic_construct_authorization_url,
generic_fetch_token,
)
from pardner.verticals import Vertical


class URLs(StrEnum):
AuthorizationURL = 'https://www.tumblr.com/oauth2/authorize'
TokenURL = 'https://api.tumblr.com/v2/oauth2/token'


def scope_for_verticals(verticals: Iterable[Vertical]) -> set[str]:
# Tumblr only needs 'base' for read access requests
return {'base'}


def construct_authorization_url(
client_id: str, redirect_uri: str, scope: Scope = {'base'}
) -> tuple[str, str]:
"""
Builds the authorization URL and state for Tumblr.

:param client_id: Client identifier given by the OAuth provider upon registration.
:param redirect_uri: The registered callback URI.
:param scope: The scope of the access request. These may be any string but are
commonly URIs or various categories such as ``videos`` or ``documents``.

:returns: the authorization URL and state, respectively.
"""
return generic_construct_authorization_url(
URLs.AuthorizationURL, client_id, redirect_uri, scope
)


def fetch_token(
client_id: str,
redirect_uri: str,
authorization_response: Optional[str] = None,
client_secret: Optional[str] = None,
code: Optional[str] = None,
) -> dict[str, Any]:
"""
Makes a request to Tumblr's resource server to obtain the access token.

One of either `code` or `authorization_response` must not be None.

:param client_id: Client identifier given by the OAuth provider upon registration.
:param redirect_uri: The registered callback URI.
:param scope: The scope of the access request. These may be any string but are
commonly URIs or various categories such as ``videos`` or ``documents``.
:param authorization_response: the URL (with parameters) the end-user's browser
redirected to after authorization.
:param client_secret: The `client_secret` paired to the `client_id`.
:param code: Authorization code (used by WebApplicationClients).

:returns: the authorization URL and state, respectively.
"""
return generic_fetch_token(
client_id,
redirect_uri,
{'base'},
URLs.TokenURL,
authorization_response,
client_secret,
code,
include_client_id=True,
)
35 changes: 35 additions & 0 deletions src/pardner/stateless/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Any, Optional

from oauthlib.oauth2.rfc6749.utils import scope_to_list

from pardner.stateless import Scope


def scope_to_set(scope: Any) -> set[str]:
"""
Splits `scope` into each individual scope and puts the values in a set of strings.
Leverages OAuthlib library helpers.

:param scope: the string or sequence/iterable of objects that will be converted to
a set of strings.

:returns: a set of strings where each string is an individual scope.
"""
return set(scope_to_list(scope)) if scope else set()


def has_sufficient_scope(
old_scope: Optional[Scope], new_scope: Optional[Scope]
) -> bool:
"""
Given an old scope and a new scope, determines if the new scope has scopes that are
not in the original `old_scope`.

:param old_scope: zero or more scopes.
:param new_scope: zero or more scopes.

:returns: True if `new_scope` has no new scopes and False otherwise.
"""
old_scope_set = scope_to_set(old_scope)
new_scope_set = scope_to_set(new_scope)
return new_scope_set.issubset(old_scope_set)
Empty file added tests/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any
from urllib import parse

import pytest


@pytest.fixture
def mock_outbound_requests(mocker):
mock_oauth2session_request = mocker.patch('requests_oauthlib.OAuth2Session.request')
mock_client_parse_request_body_response = mocker.patch(
'oauthlib.oauth2.rfc6749.clients.WebApplicationClient.parse_request_body_response'
)
return (mock_oauth2session_request, mock_client_parse_request_body_response)


def get_url_params(url: str) -> dict[str, Any]:
url_query = parse.urlsplit(url).query
return dict(parse.parse_qsl(url_query))
Empty file.
74 changes: 74 additions & 0 deletions tests/test_stateless/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest

from pardner.stateless.base import (
generic_construct_authorization_url,
generic_fetch_token,
)
from tests.conftest import get_url_params


def test_generic_construct_authorization_url():
auth_url, state = generic_construct_authorization_url(
'https://authorize.com',
'fake_client_id',
'https://redirect_uri',
{'fake', 'scope'},
)
assert auth_url.startswith('https://authorize.com')

auth_url_params = get_url_params(auth_url)

assert 'client_id' in auth_url_params
assert auth_url_params['client_id'] == 'fake_client_id'
assert 'redirect_uri' in auth_url_params
assert auth_url_params['redirect_uri'] == 'https://redirect_uri'
assert 'state' in auth_url_params
assert auth_url_params['state'] == state
assert 'scope' in auth_url_params
assert 'fake' in auth_url_params['scope']
assert 'scope' in auth_url_params['scope']


def test_generic_fetch_token_raises_error():
with pytest.raises(ValueError):
generic_fetch_token(
'fake_client_id',
'https://redirect_uri',
{'fake', 'scope'},
'https://token_url.com',
authorization_response=None,
client_secret='fake client secret',
code=None,
)


def test_generic_fetch_token_with_code(mock_outbound_requests):
mock_oauth2session_request, mock_client_parse_request_body_response = (
mock_outbound_requests
)
generic_fetch_token(
'fake_client_id',
'https://redirect_uri',
{'fake', 'scope'},
'https://token_url.com',
client_secret='fake client secret',
code='the_best_code',
)
mock_oauth2session_request.assert_called_once()
mock_client_parse_request_body_response.assert_called_once()


def test_generic_fetch_token_with_authorization_response(mock_outbound_requests):
mock_oauth2session_request, mock_client_parse_request_body_response = (
mock_outbound_requests
)
generic_fetch_token(
'fake_client_id',
'https://redirect_uri',
{'fake', 'scope'},
'https://token_url.com',
authorization_response='https://redirect/?code=the_best_code',
client_secret='fake client secret',
)
mock_oauth2session_request.assert_called_once()
mock_client_parse_request_body_response.assert_called_once()
44 changes: 44 additions & 0 deletions tests/test_stateless/test_tumblr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pardner.stateless.tumblr import (
URLs,
construct_authorization_url,
fetch_token,
scope_for_verticals,
)
from pardner.verticals import Vertical
from tests.conftest import get_url_params


def test_scope_for_verticals():
assert scope_for_verticals({Vertical.FeedPost}) == {'base'}


def test_generic_construct_authorization_url():
auth_url, state = construct_authorization_url(
'fake_client_id', 'https://redirect_uri'
)
assert auth_url.startswith(URLs.AuthorizationURL)

auth_url_params = get_url_params(auth_url)

assert 'client_id' in auth_url_params
assert auth_url_params['client_id'] == 'fake_client_id'
assert 'redirect_uri' in auth_url_params
assert auth_url_params['redirect_uri'] == 'https://redirect_uri'
assert 'state' in auth_url_params
assert auth_url_params['state'] == state
assert 'scope' in auth_url_params
assert 'base' in auth_url_params['scope']


def test_generic_fetch_token_with_code(mock_outbound_requests):
mock_oauth2session_request, mock_client_parse_request_body_response = (
mock_outbound_requests
)
fetch_token(
'fake_client_id',
'https://redirect_uri',
client_secret='fake client secret',
code='the_best_code',
)
mock_oauth2session_request.assert_called_once()
mock_client_parse_request_body_response.assert_called_once()
33 changes: 33 additions & 0 deletions tests/test_stateless/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest

from pardner.stateless.utils import has_sufficient_scope, scope_to_set

expected_set = {'scope1', 'scope2', 'scope3'}


@pytest.mark.parametrize(
['scopes', 'expected'],
[
('scope1 scope2 scope3', expected_set),
(['scope1', 'scope2', 'scope3'], expected_set),
(['scope2', 'scope1', 'scope3'], expected_set),
(('scope1', 'scope2', 'scope3'), expected_set),
(['scope1', 'scope2', 'scope3'], expected_set),
({'scope1', 'scope2', 'scope3'}, expected_set),
],
)
def test_scope_to_set(scopes, expected):
assert scope_to_set(scopes) == expected


@pytest.mark.parametrize(
['old_scope', 'new_scope', 'expected'],
[
('scope1 scope2 scope3', 'scope1 scope2 scope3', True),
('scope1 scope2 scope3', ['scope1', 'scope2'], True),
({'scope1', 'scope2'}, 'scope1 scope2 scope3', False),
('', 'scope1 scope2 scope3', False),
],
)
def test_has_sufficient_scope(old_scope, new_scope, expected):
assert has_sufficient_scope(old_scope, new_scope) == expected
Empty file.
Loading