...
 
Commits (6)
......@@ -19,7 +19,7 @@ test:
script:
- docker-compose -f docker-compose.test.yml up -d
- docker exec collector pip install pytest-cov
- docker exec collector pytest --cov-report term --cov=./ ../test/unit_tests.py
- docker exec collector pytest -v --cov-report term --cov=./ ../test/unit_tests.py
- docker exec collector rm -r __pycache__
- docker exec collector rm .coverage
- docker exec collector rm -r ../test/__pycache__
......
......@@ -9,7 +9,10 @@
Current API connections:
* Google analytics V4 (you'll need to generate an **json api key file** [documentation](https://developers.google.com/analytics/devguides/reporting/core/v4/authorization))
* Google analytics V4
* Google Search Console V3
For Google analytics V4 and Google Search V3, you'll need to generate **service account key file** [documentation](https://developers.google.com/analytics/devguides/reporting/core/v4/authorization) and grant it rights to those applications
## Usage
......@@ -19,12 +22,15 @@ The docker image available at [registry.gitlab.com/data-major/online_visibility_
|--------------------------:|:-------------------------------------------------------------------------|:----------------:|
| `DB_HOST` | Hostname of the Influx database | somewhere.com |
| `DB_PORT ` | Port of the Influx database | 8086 |
| `GA_VIEW_ID` | The ID of the Google Analytics view you want to ingest the data from | 608124608 |
| `INFLUXDB_DB` | Name of the database | some_name |
| `INFLUXDB_USER` | Username on InfluxDB | some_user |
| `INFLUXDB_USER_PASSWORD` | Password from the User | some_password |
| `GA_VIEW_ID` % | The ID of the **Google Analytics** view you want to ingest the data from | 608124608 |
| `GS_URL` % | The url of a domain you own on **Google Search Console** | https://some.url |
`GA_VIEW_ID` and `GS_URL` are optional, depending on whether you want to ingest data from these services.
Furthermore, you need to mount google **json api key file** on `/ga_key_file.json`
However, for both services, you'll need to mount your google **json api key file** on `/g_service_account.json`
### Deployment with docker-compose
......@@ -54,7 +60,7 @@ services:
container_name: collector
image: registry.gitlab.com/data-major/online_visibility_collector
volumes:
- ./path/to/the/google_analytics_key.json:/ga_key_file.json
- ./path/to/the/google_analytics_key.json:/g_service_account.json
networks:
- monitoring
ports:
......@@ -64,10 +70,11 @@ services:
environment:
- DB_HOST=db
- DB_PORT=8086
- GA_VIEW_ID=608124608
- INFLUXDB_DB=some_name
- INFLUXDB_USER=some_user
- INFLUXDB_USER_PASSWORD=db_user_password
- GA_VIEW_ID=608124608
- GS_URL=https://some.verified.domain
networks:
monitoring:
......
......@@ -24,7 +24,7 @@ services:
volumes:
- ./src/:/usr/src/collector/
- ./test/:/usr/src/test
- ./test/ga_fake_key.json:/ga_key_file.json
- ./test/ga_fake_key.json:/g_service_account.json
networks:
- monitoring
ports:
......@@ -35,6 +35,7 @@ services:
- DB_HOST=db
- DB_PORT=8086
- GA_VIEW_ID=983581608
- GS_URL=https://some.verified.domain
- INFLUXDB_DB=some_name
- INFLUXDB_USER=some_user
- INFLUXDB_USER_PASSWORD=db_user_password
......
......@@ -13,12 +13,13 @@ services:
collector:
container_name: collector
restart: always
build:
context: ./src
dockerfile: Dockerfile
volumes:
- ./src/:/usr/src/collector/
- .secrets/ga_key_file.json:/ga_key_file.json
- .secrets/g_service_account.json:/g_service_account.json
networks:
- monitoring
ports:
......
import requests, ast, time, argparse
import schedule
from collections import defaultdict
import requests, ast, time, argparse, httplib2, inspect, schedule, logging, os
from apiclient.discovery import build
from datetime import datetime, timedelta
from oauth2client.service_account import ServiceAccountCredentials
from abc import ABC, abstractmethod
from abc import abstractmethod
from influxdb import InfluxDBClient
from utils import ConfigFactory
from utils import ConfigFactory, log
from datetime import date, timedelta, datetime
import oauth2client
class GA_Batch(object):
class Batch(object):
'''structure data obtained through API. Make sure of its integrity with config file'''
def __init__(self, **kwargs):
self.date = kwargs.get('date')
self.config = kwargs.get('config')
self.raw_data = kwargs.get('raw_data',{})
self.origin = kwargs.get('origin', "unknown")
try:
self.dimensions_names = self._get_dimensions_names()
self.metrics_names = self._get_metrics_names()
self.rows = self._get_rows()
except:
logging.warning("Wrong data format from {}".format(self.origin))
def _get_metrics_names(self):
if self.origin == "google_analytics":
return [x["name"] for x in self.raw_data["columnHeader"]["metricHeader"]["metricHeaderEntries"]]
elif self.origin == "google_search":
return sorted(self.config.gsc_metrics)
def _get_rows(self):
if self.origin == "google_analytics":
return ({"dimensions": row["dimensions"],
"values":[ast.literal_eval(x) for x in row["metrics"][0]["values"]]} for row in self.raw_data["data"]["rows"])
elif self.origin == "google_search":
return ({"dimensions": row["keys"] , "values": [x[1] for x in sorted(row.items()) if x[0] != "keys"]} for row in self.raw_data["rows"])
def _get_dimensions_names(self):
if self.origin == "google_analytics":
return [x for x in self.raw_data["columnHeader"]["dimensions"]]
elif self.origin == "google_search":
return self.config.gsc_dimensions
class GoogleExtractor(object):
"""Google analytics data batch"""
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/webmasters.readonly']
def __init__(self, config):
super(GA_Batch, self).__init__()
super(GoogleExtractor, self).__init__()
self.credentials = ServiceAccountCredentials.from_json_keyfile_dict(config.ga_key_conf, type(self).SCOPES)
self.ga_view_id = config.ga_view_id
self._extract(config.ga_metrics, config.ga_dimensions)
self.config = config
self.end_date = (date.today()-timedelta(days=1))
self.start_date = (date.today()-timedelta(days=2))
def _get_daily_report(self, metrics_list, dimensions_list):
"""fetch data from google analytics API v4 and EXTRACT relevant data
ATTENTION: only returns one resport """
@log
def _get_google_search_daily(self):
analytics = build('webmasters', 'v3', credentials=self.credentials)
r = {
'startDate' : (self.start_date-timedelta(days=1)).strftime("%Y-%m-%d"),
'endDate' : (self.end_date-timedelta(days=1)).strftime("%Y-%m-%d") ,
'dimensions': self.config.gsc_dimensions ,
'searchType': 'web',
'rowLimit' : 10
}
return analytics.searchanalytics().query(siteUrl=self.config.gs_url, body=r).execute()
@log
def _get_google_analytics_daily(self):
"""fetch data from google analytics API v4 and EXTRACT relevant data ATTENTION: only returns one report """
analytics = build('analyticsreporting', 'v4', credentials=self.credentials)
return analytics.reports().batchGet(
body={
'reportRequests': [
{
'viewId': self.ga_view_id,
'dateRanges': [{'startDate': 'yesterday', 'endDate': 'today'}],
'metrics': [{'expression': x} for x in metrics_list],
'dimensions': [{'name': x} for x in dimensions_list]
'viewId': self.config.ga_view_id,
'dateRanges': [{'startDate': self.start_date.strftime("%Y-%m-%d"), 'endDate': self.end_date.strftime("%Y-%m-%d")}],
'metrics': [{'expression': x} for x in self.config.ga_metrics ],
'dimensions': [{'name': x} for x in self.config.ga_dimensions]
}]
}
).execute()["reports"][0]
def _extract(self, metrics_list, dimensions_list):
response = self._get_daily_report(metrics_list, dimensions_list)
self.metrics = [x["name"] for x in response["columnHeader"]["metricHeader"]["metricHeaderEntries"]]
self.dimensions = [x for x in response["columnHeader"]["dimensions"]]
self.rows = response["data"]["rows"]
@abstractmethod
def extract(self, origin):
return Batch(raw_data = getattr(self, '_get_{}_daily'.format(origin))(), origin=origin, config=self.config, date=self.end_date)
class DataFormater(ABC):
@classmethod
def ga2influx(self, batch:GA_Batch):
"""Generate mesurement points from a google analytics batch object """
for row in batch.rows:
yield {"measurement": "google_analytics",
"time" : datetime.utcnow() - timedelta(days=1),
"tags": dict(zip(batch.dimensions, row["dimensions"])),
"fields": dict(zip(batch.metrics, [ast.literal_eval(x) for x in row["metrics"][0]["values"]]))}
class DbClient(InfluxDBClient):
def __init__(self,config):
super(DbClient, self).__init__(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
self._check_connection()
def _check_connection(self):
try:
self.query("SHOW RETENTION POLICIES")
except Exception as e:
print("failed to init connection {} with user: {}".format(self._host, self._username))
raise e
class DataFormater(object):
def __init__(self, batch: Batch):
self.batch = batch
@abstractmethod
def __rshift__(self, db_method):
if type(db_method.__self__) == InfluxDBClient:
point_time = datetime.combine(self.batch.date, datetime.min.time())
db_method( ({"measurement": self.batch.origin,
"time" : point_time,
"tags": dict(zip(self.batch.dimensions_names, row["dimensions"])),
"fields": dict(zip(self.batch.metrics_names, row["values"]))} for row in self.batch.rows ))
def main():
config = ConfigFactory("config.yml")
db_client = DbClient(config)
logging.basicConfig(level=logging.INFO, format="%(asctime)s;%(levelname)s;%(message)s")
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
batch = GA_Batch(config)
db_client.write_points(DataFormater.ga2influx(batch))
config = ConfigFactory("config.yml")
db_client = InfluxDBClient(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
if 'GA_VIEW_ID' in os.environ:
logging.info("Starting Google Analytics ingestion")
batch = GoogleExtractor(config).extract("google_analytics")
DataFormater(batch)>>db_client.write_points
if 'GS_URL' in os.environ:
logging.info("Starting Google Search ingestion")
batch = GoogleExtractor(config).extract("google_search")
DataFormater(batch)>>db_client.write_points
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("mode")
args = parser.parse_args()
if args.mode == "deamon":
schedule.every().day.at("08:30").do(main)
schedule.every().day.at("06:30").do(main)
while 1:
schedule.run_pending()
time.sleep(1)
schedule.run_pending()
time.sleep(1)
elif args.mode == "now":
main()
......@@ -17,4 +17,13 @@ google_analytics_dimensions:
- 'ga:pagePathLevel1'
- 'ga:landingPagePath'
- 'ga:pageDepth'
\ No newline at end of file
google_search_dimensions:
- 'query'
- 'page'
google_search_metrics:
- 'clicks'
- 'impressions'
- 'ctr'
- 'position'
\ No newline at end of file
import yaml, json, os
import yaml, json, os, logging
def log(func):
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
logging.info('Running {} - {}'.format(func.__name__, args, kwargs))
return res
return wrapper
class ConfigFactory(object):
def __init__(self, file):
with open(file, 'r') as stream:
try:
d = yaml.safe_load(stream)
self.ga_metrics = d["google_analytics_metrics"]
self.ga_dimensions = d["google_analytics_dimensions"]
self.ga_metrics = d["google_analytics_metrics"]
self.ga_dimensions = d["google_analytics_dimensions"]
self.gsc_metrics = d["google_search_metrics"]
self.gsc_dimensions = d["google_search_dimensions"]
except yaml.YAMLError as exc:
raise exc
# TODO: find a way to do this without volumes (env variables would be great)
KEY_FILE_LOCATION = '/ga_key_file.json'
KEY_FILE_LOCATION = '/g_service_account.json'
self.ga_key_conf = json.loads(open(KEY_FILE_LOCATION).read())
self.ga_view_id = os.environ.get('GA_VIEW_ID', None)
self.gs_url = os.environ.get('GS_URL', None)
try:
self.ga_view_id = os.environ['GA_VIEW_ID']
self.db_host = os.environ['DB_HOST']
self.db_port = os.environ['DB_PORT']
self.db_user = os.environ['INFLUXDB_USER']
......
{
"rows": [
{
"keys": [
"data-major",
"https://www.data-major.com/"
],
"ctr": 1.0,
"clicks": 2.0,
"position": 6.0,
"impressions": 2.0
},
{
"keys": [
"data major",
"https://www.data-major.com/"
],
"clicks": 1.0,
"impressions": 1.0,
"ctr": 1.0,
"position": 5.0
},
{
"keys": [
"data major lyon",
"https://www.data-major.com/"
],
"clicks": 0.0,
"impressions": 1.0,
"ctr": 0.0,
"position": 6.0
}
],
"responseAggregationType": "byPage"
}
\ No newline at end of file
{
"rows": [
{
"keys": [
"data-major",
"https://www.data-major.com/"
],
"clicks": 2.0,
"impressions": 2.0,
"ctr": 1.0,
"position": 6.0
},
{
"keys": [
"data major",
"https://www.data-major.com/"
],
"clicks": 1.0,
"impressions": 1.0,
"ctr": 1.0,
"position": 5.0
},
{
"keys": [
"data major lyon",
"https://www.data-major.com/"
],
"clicks": 0.0,
"impressions": 1.0,
"ctr": 0.0,
"position": 6.0
}
],
"responseAggregationType": "byPage"
}
......@@ -5,4 +5,13 @@ google_analytics_metrics:
google_analytics_dimensions:
- 'ga:deviceCategory'
- 'ga:country'
\ No newline at end of file
google_search_dimensions:
- 'query'
- 'page'
google_search_metrics:
- 'clicks'
- 'impressions'
- 'ctr'
- 'position'
\ No newline at end of file
import sys
sys.path.insert(1, '/usr/src/collector')
import pytest, unittest, os
import pytest, unittest, os, logging
from apiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
from unittest.mock import patch, Mock, MagicMock
from collector import *
from utils import *
ga_api_mock = MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read()))
gsc_api_mock = MagicMock(return_value=json.loads(open('../test/api_responses/gsc_normal_answer.json').read()))
shitty_api_mock = MagicMock(return_value=json.dumps({"some":{"random":"bullshit","time":2}}))
@pytest.fixture(scope="module")
def config():
return ConfigFactory("../test/config/config.yml")
'''UNIT TESTING'''
class TestConfigFactory(unittest.TestCase):
def test_instantiation(self):
......@@ -21,6 +29,8 @@ class TestConfigFactory(unittest.TestCase):
config = ConfigFactory("config.yml")
assert type(config.ga_dimensions) == list
assert type(config.ga_metrics) == list
assert type(config.gsc_dimensions) == list
assert type(config.gsc_metrics) == list
assert type(config.ga_view_id) == str
assert type(config.db_host) == str
assert type(config.db_port) == str
......@@ -31,51 +41,127 @@ class TestConfigFactory(unittest.TestCase):
with self.assertRaises(Exception) as context:
config = ConfigFactory("config.yml")
@patch.dict(os.environ, {'DB_HOST':'bla','DB_PORT':'bla','INFLUXDB_USER':'bla','INFLUXDB_USER_PASSWORD':'bla','INFLUXDB_DB':'bla'}, clear=True)
def test_missing_google_analytics_id_does_not_raise_excep(self):
config = ConfigFactory("config.yml")
class Test_GA_Batch:
class Test_GoogleExtractor:
"""test EXTRACT"""
@patch('collector.GA_Batch._get_daily_report', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
def test_batch_extract_headers(self):
config = ConfigFactory("../test/config/config.yml")
batch = GA_Batch(config)
assert hasattr(batch, "metrics")
assert hasattr(batch, "dimensions")
assert batch.metrics == ["ga:sessions", "ga:pageLoadTime"]
assert batch.dimensions == ["ga:deviceCategory", "ga:country"]
@patch('collector.GA_Batch._get_daily_report', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
def test_batch_get_rows_properly(self):
config = ConfigFactory("../test/config/config.yml")
batch = GA_Batch(config)
assert len(batch.rows) == 5
def test_data_interval_interval_is_one_day(self, config):
extractor = GoogleExtractor(config)
assert hasattr(extractor, "end_date")
assert hasattr(extractor, "start_date")
assert (extractor.end_date - extractor.start_date) == timedelta(days=1)
def test_extractor_has_config_and_credentials(self, config):
extractor = GoogleExtractor(config)
assert type(extractor.config) == ConfigFactory
assert type(extractor.credentials) == ServiceAccountCredentials
class Test_Batch_Google_Analytics:
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_batch_has_proper_date(self, config):
batch = GoogleExtractor(config).extract("google_analytics")
assert batch.date == date.today()-timedelta(days=1)
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_batch_extract_headers(self, config):
batch = GoogleExtractor(config).extract("google_analytics")
assert hasattr(batch, "metrics_names")
assert hasattr(batch, "dimensions_names")
assert batch.metrics_names == ["ga:sessions", "ga:pageLoadTime"]
assert batch.dimensions_names == ["ga:deviceCategory", "ga:country"]
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_batch_get_rows_properly(self, config):
batch = GoogleExtractor(config).extract("google_analytics")
assert hasattr(batch, "rows")
assert next(batch.rows) == {'dimensions': ['desktop', 'France'], 'values': [8, 0]}
@patch('collector.GoogleExtractor._get_google_analytics_daily', shitty_api_mock)
def test_wrong_data_format_is_logged(self, config, caplog):
batch = GoogleExtractor(config).extract("google_analytics")
assert "Wrong data format from google_analytics" in caplog.text
class Test_Batch_Google_Search:
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_batch_right_source(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert batch.origin == "google_search"
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_batch_get_metrics_names_and_dimensions(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert batch.metrics_names == sorted(['clicks', 'impressions', 'ctr', 'position'])
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_batch_get_rows(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert hasattr(batch, "rows")
assert next(batch.rows) == {'dimensions': ['data-major','https://www.data-major.com/'], 'values': [2.0,1.0,2.0,6.0]}
@patch('collector.GoogleExtractor._get_google_search_daily', shitty_api_mock)
def test_wrong_data_format_is_logged(self, config, caplog):
batch = GoogleExtractor(config).extract("google_search")
assert "Wrong data format from google_search" in caplog.text
@patch('collector.GoogleExtractor._get_google_search_daily', MagicMock(return_value=json.loads(open('../test/api_responses/gsc_different_metric_order.json').read())))
def test_different_metric_order_should_not_alter_rows(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert hasattr(batch, "rows")
assert next(batch.rows) == {'dimensions': ['data-major','https://www.data-major.com/'], 'values': [2.0,1.0,2.0,6.0]}
class TestDataFormater:
"""test TRANSFORM"""
@patch('collector.GA_Batch._get_daily_report', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
def test_ga2influx_create_a_generator(self):
config = ConfigFactory("../test/config/config.yml")
batch = GA_Batch(config)
points = DataFormater.ga2influx(batch)
case = next(points)
assert case["measurement"] == "google_analytics"
assert case["tags"] == {"ga:deviceCategory": "desktop", "ga:country": "France"}
assert case["fields"] == {"ga:sessions": 8, "ga:pageLoadTime": 0}
@patch('collector.GA_Batch._get_daily_report', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
def test_ga2influx_dont_miss(self):
config = ConfigFactory("../test/config/config.yml")
batch = GA_Batch(config)
total_session = GA_Batch._get_daily_report()["data"]["totals"][0]["values"][0]
points = DataFormater.ga2influx(batch)
assert sum([view["fields"]["ga:sessions"] for view in points]) == int(total_session)
class TestDbConnector:
def test_data_base_connectivity(self):
config = ConfigFactory("../test/config/config.yml")
db_client = DbClient(config)
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_formater_for_google_analytics_and_influx(self, config):
batch = GoogleExtractor(config).extract("google_analytics")
db_client = InfluxDBClient(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
DataFormater(batch)>>db_client.write_points
data = db_client.query('SELECT * FROM google_analytics')
assert len(list(data.get_points(measurement='google_analytics'))) == 5
assert list(data.get_points(measurement='google_analytics'))[0]["time"] == "{}T00:00:00Z".format(date.today()-timedelta(days=1))
assert sum([x["ga:sessions"] for x in list(data.get_points(measurement='google_analytics', tags={'ga:deviceCategory': 'desktop'}))]) == 9
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_formater_for_google_search_and_influx(self, config):
batch = GoogleExtractor(config).extract("google_search")
db_client = InfluxDBClient(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
DataFormater(batch)>>db_client.write_points
data = db_client.query('SELECT * FROM google_search')
assert len(list(data.get_points(measurement='google_search'))) == 3
assert sum([x["position"] for x in list(data.get_points(measurement='google_search', tags={'query': 'data-major'}))]) == 6.0
'''INTEGRATION TESTS'''
class TestMain:
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_ingestion_of_both(self, config, caplog):
caplog.set_level(logging.INFO)
main()
assert "Starting Google Analytics ingestion" in caplog.text
assert "Starting Google Search ingestion" in caplog.text
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_ingestion_that_missing_env_variable_prevent_ingestion(self, config, caplog):
caplog.set_level(logging.INFO)
del os.environ['GS_URL']
main()
assert "Starting Google Analytics ingestion" in caplog.text
assert "Starting Google Search ingestion" not in caplog.text
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_ingestion_that_missing_env_variable_prevent_ingestion(self, config, caplog):
caplog.set_level(logging.INFO)
del os.environ['GA_VIEW_ID']
main()
assert "Starting Google Analytics ingestion" not in caplog.text
assert "Starting Google Search ingestion" in caplog.text
\ No newline at end of file