Skip to content
Snippets Groups Projects
Commit 7cbd0b8f authored by Martin Blanchard's avatar Martin Blanchard
Browse files

tests/auth: Add client and sever interceptor tests

parent c4021884
No related branches found
No related tags found
1 merge request!128Add JWT authentication support
Showing
with 275 additions and 0 deletions
not-your-256-bit-secret
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJCdWlsZEdyaWQgVGVzdCIsImlhdCI6MTM1NTMxNDMzMiwiZXhwIjoxMzU1MzE0MzMyfQ.J-YxkvVbeZFwZ2_mK1d91Wb5vm481LkuehfkUoNpLb8
your-256-bit-secret
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJCdWlsZEdyaWQgVGVzdCIsImlhdCI6MTM1NTMxNDMzMn0.tEBYb8GM_4lmJCxrh6iMhdoxupgGTCaJR3atLxodZV8
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJCdWlsZEdyaWQgVGVzdCIsImlhdCI6MTM1NTMxNDMzMiwiZXhwIjoyMzAxOTk5MTMyfQ.U__BJgksk65S0YuDZqU85tRWF3F03ITF9HXW1cd_Ci8
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNGWI+5SgYIiGcS5UNQzZOs5As
QJRsKR7bBiMjJWnqrNExn0pYrecQsBy6zgwmN6MqEPFV5A1GOEeP3FAqlwD5y+rL
iO2tqq0naiFiJb27qsHzgjakw7pVqgJuGuLVIWWE1RlDnhfN+auNGWyl2YjF6N2+
Lsl0bBX8Q8zaOxbU3wIDAQAB
-----END PUBLIC KEY-----
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJCdWlsZEdyaWQgVGVzdCIsImlhdCI6MTM1NTMxNDMzMiwiZXhwIjoxMzU1MzE0MzMyfQ.L9Wg6YDEV57_a_j_J2s6ug5eEGS80SvlwztdVFzU1ajqlfYmbF5oYq6IEMaiZq95aKkJ71xnwpGfpcuufH-xONiNoZhTg7r-lb99yvPZ8VEHwOIyt1EUEziffim9XRiuwM570fg7_HUC-ZhNJG536k1IM-6rPRnv-1Tu-MxLgvQ
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQDdlatRjRjogo3WojgGHFHYLugdUWAY9iR3fy4arWNA1KoS8kVw
33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQsHUfQrSDv+MuSUMAe8jzKE4qW
+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5Do2kQ+X5xK9cipRgEKwIDAQAB
AoGAD+onAtVye4ic7VR7V50DF9bOnwRwNXrARcDhq9LWNRrRGElESYYTQ6EbatXS
3MCyjjX2eMhu/aF5YhXBwkppwxg+EOmXeh+MzL7Zh284OuPbkglAaGhV9bb6/5Cp
uGb1esyPbYW+Ty2PC0GSZfIXkXs76jXAu9TOBvD0ybc2YlkCQQDywg2R/7t3Q2OE
2+yo382CLJdrlSLVROWKwb4tb2PjhY4XAwV8d1vy0RenxTB+K5Mu57uVSTHtrMK0
GAtFr833AkEA6avx20OHo61Yela/4k5kQDtjEf1N0LfI+BcWZtxsS3jDM3i1Hp0K
Su5rsCPb8acJo5RO26gGVrfAsDcIXKC+bQJAZZ2XIpsitLyPpuiMOvBbzPavd4gY
6Z8KWrfYzJoI/Q9FuBo6rKwl4BFoToD7WIUS+hpkagwWiz+6zLoX1dbOZwJACmH5
fSSjAkLRi54PKJ8TFUeOP15h9sQzydI8zJU+upvDEKZsZc/UhT/SySDOxQ4G/523
Y0sz/OZtSWcol/UMgQJALesy++GdvoIDLfJX5GBQpuFgFenRiRDabxrE9MNUZ2aP
FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
-----END RSA PRIVATE KEY-----
\ No newline at end of file
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDdlatRjRjogo3WojgGHFHYLugd
UWAY9iR3fy4arWNA1KoS8kVw33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQs
HUfQrSDv+MuSUMAe8jzKE4qW+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5D
o2kQ+X5xK9cipRgEKwIDAQAB
-----END PUBLIC KEY-----
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJCdWlsZEdyaWQgVGVzdCIsImlhdCI6MTM1NTMxNDMzMn0.1_yoF5Vg1fXs2SmWhYm7LbLMGbZVgBjxZkzRlw87blSG6lRgEi_R-WXKcS4n_pGynkcahJ_AqLseyHduXZveI1nVQFATXVNQQPcvmkM6pYSHPm155iqZFdYAWWVKB9ND1F3oDrXLBzqF2a4HtLNXYTu5gDStdUPkrH_FiatX79g
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJCdWlsZEdyaWQgVGVzdCIsImlhdCI6MTM1NTMxNDMzMiwiZXhwIjoyMzAxOTk5MTMyfQ.RCbQNqPaF0mfFsUGwdb47Ga3DITAL7OjYlcWkJ2xWL61Fo9zURx_mSIVDYTgEY3nFW1cmf2r6Y2Z0rEUOY-qBl8B9Ww9jijGz7LMR4w_j8f967MjTxkyOCWZSUWazObg5jxNjLtfxPD28UbbrS_2R8BLgMQFf3ymSDXTX5lAdBY
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=redefined-outer-name
import os
import grpc
import pytest
from buildgrid.client.authentication import setup_channel
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
from ..utils.dummy import serve_dummy
from ..utils.utils import run_in_subprocess
try:
import jwt # pylint: disable=unused-import
except ImportError:
HAVE_JWT = False
else:
HAVE_JWT = True
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'data')
METHOD = AuthMetadataMethod.JWT
TOKEN = os.path.join(DATA_DIR, 'jwt-hs256-valid.token')
SECRET = 'your-256-bit-secret'
ALGORITHM = AuthMetadataAlgorithm.JWT_HS256
@pytest.mark.skipif(not HAVE_JWT, reason="No pyjwt")
def test_channel_token_authorization():
# Actual test function, to be run in a subprocess:
def __test_channel_token_authorization(queue, remote, token):
channel, _ = setup_channel(remote, auth_token=token)
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
request = bytestream_pb2.QueryWriteStatusRequest()
status_code = grpc.StatusCode.OK
try:
next(stub.Read(request))
except grpc.RpcError as e:
status_code = e.code()
queue.put(status_code)
with serve_dummy(auth_method=METHOD, auth_secret=SECRET,
auth_algorithm=ALGORITHM) as server:
status = run_in_subprocess(__test_channel_token_authorization,
server.remote, TOKEN)
assert status != grpc.StatusCode.UNAUTHENTICATED
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=redefined-outer-name
from collections import namedtuple
from unittest import mock
import os
import grpc
from grpc._server import _Context
import pytest
from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
from buildgrid.server._authentication import AuthMetadataServerInterceptor
from ..utils.utils import read_file
try:
import jwt # pylint: disable=unused-import
except ImportError:
HAVE_JWT = False
else:
HAVE_JWT = True
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'data')
TOKENS = [None, 'not-a-token']
SECRETS = [None, None]
ALGORITHMS = [
AuthMetadataAlgorithm.UNSPECIFIED,
AuthMetadataAlgorithm.UNSPECIFIED]
VALIDITIES = [False, False]
# Generic test data: token, secret, algorithm, validity:
DATA = zip(TOKENS, SECRETS, ALGORITHMS, VALIDITIES)
JWT_TOKENS = [
'jwt-hs256-expired.token',
'jwt-hs256-unbounded.token',
'jwt-hs256-valid.token',
'jwt-hs256-valid.token',
'jwt-rs256-expired.token',
'jwt-rs256-unbounded.token',
'jwt-rs256-valid.token',
'jwt-rs256-valid.token']
JWT_SECRETS = [
'jwt-hs256-matching.secret',
'jwt-hs256-matching.secret',
'jwt-hs256-matching.secret',
'jwt-hs256-conflicting.secret',
'jwt-rs256-matching.pub.key',
'jwt-rs256-matching.pub.key',
'jwt-rs256-matching.pub.key',
'jwt-rs256-conflicting.pub.key']
JWT_ALGORITHMS = [
AuthMetadataAlgorithm.JWT_HS256,
AuthMetadataAlgorithm.JWT_HS256,
AuthMetadataAlgorithm.UNSPECIFIED,
AuthMetadataAlgorithm.JWT_HS256,
AuthMetadataAlgorithm.JWT_RS256,
AuthMetadataAlgorithm.JWT_RS256,
AuthMetadataAlgorithm.UNSPECIFIED,
AuthMetadataAlgorithm.JWT_RS256]
JWT_VALIDITIES = [
False, False, True, False,
False, False, True, False]
# JWT test data: token, secret, algorithm, validity:
JWT_DATA = zip(JWT_TOKENS, JWT_SECRETS, JWT_ALGORITHMS, JWT_VALIDITIES)
_MockHandlerCallDetails = namedtuple(
'_MockHandlerCallDetails', ('method', 'invocation_metadata',))
_MockMetadatum = namedtuple(
'_MockMetadatum', ('key', 'value',))
def _mock_call_details(token, method='TestMethod'):
invocation_metadata = [
_MockMetadatum(
key='user-agent',
value='grpc-c/6.0.0 (manylinux; chttp2; gao)')]
if token and token.count('.') == 2:
invocation_metadata.append(_MockMetadatum(
key='authorization', value='Bearer {}'.format(token)))
elif token:
invocation_metadata.append(_MockMetadatum(
key='authorization', value=token))
return _MockHandlerCallDetails(
method=method, invocation_metadata=invocation_metadata)
def _unary_unary_rpc_terminator(details):
def terminate(ignored_request, context):
context.set_code(grpc.StatusCode.OK)
return grpc.unary_unary_rpc_method_handler(terminate)
@pytest.mark.parametrize('token,secret,algorithm,validity', DATA)
def test_authorization(token, secret, algorithm, validity):
interceptor = AuthMetadataServerInterceptor(
method=AuthMetadataMethod.NONE, secret=secret, algorithm=algorithm)
call_details = _mock_call_details(token)
context = mock.create_autospec(_Context, spec_set=True)
try:
handler = interceptor.intercept_service(None, call_details)
except AssertionError:
context.set_code(grpc.StatusCode.OK)
else:
handler.unary_unary(None, context)
if validity:
context.set_code.assert_called_once_with(grpc.StatusCode.OK)
context.abort.assert_not_called()
else:
context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
context.set_code.assert_not_called()
@pytest.mark.skipif(not HAVE_JWT, reason="No pyjwt")
@pytest.mark.parametrize('token,secret,algorithm,validity', JWT_DATA)
def test_jwt_authorization(token, secret, algorithm, validity):
token = read_file(os.path.join(DATA_DIR, token), text_mode=True).strip()
secret = read_file(os.path.join(DATA_DIR, secret), text_mode=True).strip()
interceptor = AuthMetadataServerInterceptor(
method=AuthMetadataMethod.JWT, secret=secret, algorithm=algorithm)
continuator = _unary_unary_rpc_terminator
call_details = _mock_call_details(token)
context = mock.create_autospec(_Context, spec_set=True)
handler = interceptor.intercept_service(continuator, call_details)
handler.unary_unary(None, context)
if validity:
context.set_code.assert_called_once_with(grpc.StatusCode.OK)
context.abort.assert_not_called()
else:
context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
context.set_code.assert_not_called()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment