Commit ad0fdd32 authored by zar3bski's avatar zar3bski 💬

WIP

parent 8d3ea838
Pipeline #91330604 passed with stage
in 4 minutes and 10 seconds
......@@ -19,7 +19,7 @@ test:
script:
- docker-compose -f docker-compose.test.yml up -d
- docker exec collector pip install pytest-cov
- docker exec collector pytest --cov-report term --cov=./ ../test/unit_tests.py
- docker exec collector pytest -v --cov-report term --cov=./ ../test/unit_tests.py
- docker exec collector rm -r __pycache__
- docker exec collector rm .coverage
- docker exec collector rm -r ../test/__pycache__
......
......@@ -20,10 +20,11 @@ The docker image available at [registry.gitlab.com/data-major/online_visibility_
|--------------------------:|:-------------------------------------------------------------------------|:----------------:|
| `DB_HOST` | Hostname of the Influx database | somewhere.com |
| `DB_PORT ` | Port of the Influx database | 8086 |
| `GA_VIEW_ID` | The ID of the Google Analytics view you want to ingest the data from | 608124608 |
| `INFLUXDB_DB` | Name of the database | some_name |
| `INFLUXDB_USER` | Username on InfluxDB | some_user |
| `INFLUXDB_USER_PASSWORD` | Password from the User | some_password |
| `GA_VIEW_ID` % | The ID of the **Google Analytics** view you want to ingest the data from | 608124608 |
| `GS_URL` % | The url of a domain you own on **Google Search Console** | https://some.url |
Furthermore, you need to mount google **json api key file** on `/ga_key_file.json`
......
......@@ -35,6 +35,7 @@ services:
- DB_HOST=db
- DB_PORT=8086
- GA_VIEW_ID=983581608
- GS_URL=https://some.verified.domain
- INFLUXDB_DB=some_name
- INFLUXDB_USER=some_user
- INFLUXDB_USER_PASSWORD=db_user_password
......
......@@ -13,6 +13,7 @@ services:
collector:
container_name: collector
restart: always
build:
context: ./src
dockerfile: Dockerfile
......
import requests, ast, time, argparse, httplib2
import schedule
from collections import defaultdict
import requests, ast, time, argparse, httplib2, inspect, schedule, logging, os
from apiclient.discovery import build
from datetime import datetime, timedelta
from oauth2client.service_account import ServiceAccountCredentials
from oauth2client.client import OAuth2WebServerFlow
from abc import ABC, abstractmethod
from abc import abstractmethod
from influxdb import InfluxDBClient
from utils import ConfigFactory
from datetime import date, timedelta
import pprint
from datetime import date, timedelta, datetime
import oauth2client
class Batch(object):
'''structure data obtained through API. Make sure of its integrity with config file'''
def __init__(self, **kwargs):
self.date = (date.today()-timedelta(days=1))
self.config = kwargs.get('config')
self.raw_data = kwargs.get('raw_data',{})
self.origin = kwargs.get('origin', "unknown")
self.dimensions_names = self._get_dimensions_names()
self.metrics_names = self._get_metrics_names()
self.rows = self._get_rows()
class GoogleBatch(object):
def _get_metrics_names(self):
if self.origin == "google_analytics":
return [x["name"] for x in self.raw_data["columnHeader"]["metricHeader"]["metricHeaderEntries"]]
elif self.origin == "google_search":
return sorted(self.config.gsc_metrics)
def _get_rows(self):
if self.origin == "google_analytics":
return ({"dimensions": row["dimensions"],
"values":[ast.literal_eval(x) for x in row["metrics"][0]["values"]]} for row in self.raw_data["data"]["rows"])
elif self.origin == "google_search":
return ({"dimensions": row["keys"] , "values": [x[1] for x in sorted(row.items()) if x[0] != "keys"]} for row in self.raw_data["rows"])
def _get_dimensions_names(self):
if self.origin == "google_analytics":
return [x for x in self.raw_data["columnHeader"]["dimensions"]]
elif self.origin == "google_search":
return self.config.gsc_dimensions
class GoogleExtractor(object):
"""Google analytics data batch"""
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/webmasters.readonly']
def __init__(self, config, origin):
super(GoogleBatch, self).__init__()
def __init__(self, config):
super(GoogleExtractor, self).__init__()
self.credentials = ServiceAccountCredentials.from_json_keyfile_dict(config.ga_key_conf, type(self).SCOPES)
self.config = config
self.end_date = (date.today()-timedelta(days=1)).strftime("%Y-%m-%d")
self.start_date = (date.today()-timedelta(days=2)).strftime("%Y-%m-%d")
self.origin = origin
self.end_date = (date.today()-timedelta(days=1))
self.start_date = (date.today()-timedelta(days=2))
def _get_gsc_daily(self):
analytics = build('webmasters', 'v3', credentials=self.credentials)
def _get_google_search_daily(self):
analytics = build('webmasters', 'v3', credentials=self.credentials)
r = {
'startDate' : self.start_date,
'endDate' : self.end_date ,
'startDate' : self.start_date.strftime("%Y-%m-%d"),
'endDate' : self.end_date.strftime("%Y-%m-%d") ,
'dimensions': self.config.gsc_dimensions ,
'searchType': 'web',
'rowLimit' : 10
}
return analytics.searchanalytics().query(siteUrl="https://www.data-major.com/", body=r).execute()
return analytics.searchanalytics().query(siteUrl=self.config.gs_url, body=r).execute()
def _get_ga_daily(self):
def _get_google_analytics_daily(self):
"""fetch data from google analytics API v4 and EXTRACT relevant data ATTENTION: only returns one report """
analytics = build('analyticsreporting', 'v4', credentials=self.credentials)
return analytics.reports().batchGet(
......@@ -44,70 +70,64 @@ class GoogleBatch(object):
'reportRequests': [
{
'viewId': self.config.ga_view_id,
'dateRanges': [{'startDate': 'yesterday', 'endDate': 'today'}],
'dateRanges': [{'startDate': self.start_date.strftime("%Y-%m-%d"), 'endDate': self.end_date.strftime("%Y-%m-%d")}],
'metrics': [{'expression': x} for x in self.config.ga_metrics ],
'dimensions': [{'name': x} for x in self.config.ga_dimensions]
}]
}
).execute()["reports"][0]
def extract(self):
if self.origin == "ga":
response = self._get_ga_daily()
self.metrics = [x["name"] for x in response["columnHeader"]["metricHeader"]["metricHeaderEntries"]]
self.dimensions = [x for x in response["columnHeader"]["dimensions"]]
self.rows = response["data"]["rows"]
elif self.origin == "gsc":
response = self._get_gsc_daily()
try:
self.rows = response["rows"]
except:
self.rows = []
class DataFormater(ABC):
@classmethod
def ga2influx(self, batch:GoogleBatch):
"""Generate mesurement points from a google analytics batch object """
for row in batch.rows:
yield {"measurement": "google_analytics",
"time" : datetime.utcnow() - timedelta(days=1),
"tags": dict(zip(batch.dimensions, row["dimensions"])),
"fields": dict(zip(batch.metrics, [ast.literal_eval(x) for x in row["metrics"][0]["values"]]))}
class DbClient(InfluxDBClient):
def __init__(self,config):
super(DbClient, self).__init__(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
self._check_connection()
def _check_connection(self):
try:
self.query("SHOW RETENTION POLICIES")
except Exception as e:
print("failed to init connection {} with user: {}".format(self._host, self._username))
raise e
@abstractmethod
def extract(self, origin):
return Batch(raw_data = getattr(self, '_get_{}_daily'.format(origin))(), origin=origin, config=self.config)
class DataFormater(object):
def __init__(self, batch: Batch):
self.batch = batch
@abstractmethod
def __rshift__(self, db_method):
if type(db_method.__self__) == InfluxDBClient:
point_time = datetime.combine(self.batch.date, datetime.min.time())
db_method( ({"measurement": self.batch.origin,
"time" : point_time,
"tags": dict(zip(self.batch.dimensions_names, row["dimensions"])),
"fields": dict(zip(self.batch.metrics_names, row["values"]))} for row in self.batch.rows ))
def main():
config = ConfigFactory("config.yml")
db_client = DbClient(config)
db_client = InfluxDBClient(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
# WIP
batch = GoogleBatch(config, "gsc")
batch._get_gsc_daily()
try:
os.environ['GA_VIEW_ID']
batch = GoogleExtractor(config).extract("google_analytics")
DataFormater(batch)>>db_client.write_points
except KeyError:
logging.info("GA_VIEW_ID not found in ENV var => google_analytics data won't be fetched")
#batch = GoogleBatch(config)
#db_client.write_points(DataFormater.ga2influx(batch.extract_ga()))
try:
os.environ['GS_URL']
batch = GoogleExtractor(config).extract("google_search")
DataFormater(batch)>>db_client.write_points
except KeyError:
logging.info("GS_URL not found in ENV var => google_search data won't be fetched")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format="%(asctime)s;%(levelname)s;%(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("mode")
args = parser.parse_args()
if args.mode == "deamon":
schedule.every().day.at("08:30").do(main)
logging.info('Starting online_visibility_collector in deamon mode')
schedule.every().day.at("06:30").do(main)
while 1:
schedule.run_pending()
time.sleep(1)
schedule.run_pending()
time.sleep(1)
elif args.mode == "now":
main()
......@@ -20,4 +20,10 @@ google_analytics_dimensions:
google_search_dimensions:
- 'query'
- 'page'
\ No newline at end of file
- 'page'
google_search_metrics:
- 'clicks'
- 'impressions'
- 'ctr'
- 'position'
\ No newline at end of file
......@@ -7,15 +7,18 @@ class ConfigFactory(object):
d = yaml.safe_load(stream)
self.ga_metrics = d["google_analytics_metrics"]
self.ga_dimensions = d["google_analytics_dimensions"]
self.gsc_dimensions = d["google_analytics_dimensions"]
self.gsc_metrics = d["google_search_metrics"]
self.gsc_dimensions = d["google_search_dimensions"]
except yaml.YAMLError as exc:
raise exc
# TODO: find a way to do this without volumes (env variables would be great)
KEY_FILE_LOCATION = '/g_service_account.json'
self.ga_key_conf = json.loads(open(KEY_FILE_LOCATION).read())
self.ga_view_id = os.environ.get('GA_VIEW_ID', None)
self.gs_url = os.environ.get('GS_URL', None)
try:
self.ga_view_id = os.environ['GA_VIEW_ID']
self.db_host = os.environ['DB_HOST']
self.db_port = os.environ['DB_PORT']
self.db_user = os.environ['INFLUXDB_USER']
......
......@@ -5,10 +5,10 @@
"data-major",
"https://www.data-major.com/"
],
"clicks": 2.0,
"impressions": 2.0,
"ctr": 1.0,
"position": 6.0
"clicks": 2.0,
"position": 6.0,
"impressions": 2.0
},
{
"keys": [
......@@ -31,5 +31,5 @@
"position": 6.0
}
],
"responseAggregationType": "bySomeWrongDimension"
"responseAggregationType": "byPage"
}
\ No newline at end of file
......@@ -7,4 +7,11 @@ google_analytics_dimensions:
- 'ga:country'
google_search_dimensions:
- 'query'
\ No newline at end of file
- 'query'
- 'page'
google_search_metrics:
- 'clicks'
- 'impressions'
- 'ctr'
- 'position'
\ No newline at end of file
......@@ -7,6 +7,8 @@ from unittest.mock import patch, Mock, MagicMock
from collector import *
from utils import *
ga_api_mock = MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read()))
gsc_api_mock = MagicMock(return_value=json.loads(open('../test/api_responses/gsc_normal_answer.json').read()))
@pytest.fixture(scope="module")
def config():
......@@ -26,6 +28,8 @@ class TestConfigFactory(unittest.TestCase):
config = ConfigFactory("config.yml")
assert type(config.ga_dimensions) == list
assert type(config.ga_metrics) == list
assert type(config.gsc_dimensions) == list
assert type(config.gsc_metrics) == list
assert type(config.ga_view_id) == str
assert type(config.db_host) == str
assert type(config.db_port) == str
......@@ -36,67 +40,94 @@ class TestConfigFactory(unittest.TestCase):
with self.assertRaises(Exception) as context:
config = ConfigFactory("config.yml")
@patch.dict(os.environ, {'DB_HOST':'bla','DB_PORT':'bla','INFLUXDB_USER':'bla','INFLUXDB_USER_PASSWORD':'bla','INFLUXDB_DB':'bla'}, clear=True)
def test_missing_google_analytics_id_does_not_raise_excep(self):
config = ConfigFactory("config.yml")
class Test_GoogleBatch_Google_Analytics:
class Test_GoogleExtractor:
"""test EXTRACT"""
def test_data_interval_interval_is_one_day(self, config):
extractor = GoogleExtractor(config)
assert hasattr(extractor, "end_date")
assert hasattr(extractor, "start_date")
assert (extractor.end_date - extractor.start_date) == timedelta(days=1)
def test_extractor_has_config(self, config):
extractor = GoogleExtractor(config)
assert type(extractor.config) == ConfigFactory
class Test_Batch_Google_Analytics:
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_batch_has_proper_date(self, config):
batch = GoogleExtractor(config).extract("google_analytics")
assert batch.date == date.today()-timedelta(days=1)
@patch('collector.GoogleBatch._get_ga_daily', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_batch_extract_headers(self, config):
batch = GoogleBatch(config, "ga")
batch.extract()
assert hasattr(batch, "metrics")
assert hasattr(batch, "dimensions")
assert batch.metrics == ["ga:sessions", "ga:pageLoadTime"]
assert batch.dimensions == ["ga:deviceCategory", "ga:country"]
@patch('collector.GoogleBatch._get_ga_daily', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
batch = GoogleExtractor(config).extract("google_analytics")
assert hasattr(batch, "metrics_names")
assert hasattr(batch, "dimensions_names")
assert batch.metrics_names == ["ga:sessions", "ga:pageLoadTime"]
assert batch.dimensions_names == ["ga:deviceCategory", "ga:country"]
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_batch_get_rows_properly(self, config):
batch = GoogleBatch(config, "ga")
batch.extract()
assert len(batch.rows) == 5
batch = GoogleExtractor(config).extract("google_analytics")
assert hasattr(batch, "rows")
assert next(batch.rows) == {'dimensions': ['desktop', 'France'], 'values': [8, 0]}
class Test_Batch_Google_Search:
class Test_GoogleBatch_Google_Search:
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_batch_right_source(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert batch.origin == "google_search"
@patch('collector.GoogleBatch._get_gsc_daily', MagicMock(return_value=json.loads(open('../test/api_responses/gsc_normal_answer.json').read())))
def test_batch_get_row(self, config):
batch = GoogleBatch(config, "gsc")
batch.extract()
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_batch_get_metrics_names_and_dimensions(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert batch.metrics_names == sorted(['clicks', 'impressions', 'ctr', 'position'])
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_batch_get_rows(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert hasattr(batch, "rows")
assert len(batch.rows) == 3
assert next(batch.rows) == {'dimensions': ['data-major','https://www.data-major.com/'], 'values': [2.0,1.0,2.0,6.0]}
@patch('collector.GoogleBatch._get_gsc_daily', MagicMock(return_value=json.loads(open('../test/api_responses/gsc_abnormal_wrong_dimension_aggregation.json').read())))
def test_ill_formed_answer_sould_not_produce_rows(self, config):
batch = GoogleBatch(config, "gsc")
batch.extract()
#assert hasattr(batch, "rows")
#assert len(batch.rows) == 0
@patch('collector.GoogleExtractor._get_google_search_daily', MagicMock(return_value=json.loads(open('../test/api_responses/gsc_different_metric_order.json').read())))
def test_different_metric_order_should_not_alter_rows(self, config):
batch = GoogleExtractor(config).extract("google_search")
assert hasattr(batch, "rows")
assert next(batch.rows) == {'dimensions': ['data-major','https://www.data-major.com/'], 'values': [2.0,1.0,2.0,6.0]}
class TestDataFormater:
"""test TRANSFORM"""
@patch('collector.GoogleBatch._get_ga_daily', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
def test_ga2influx_create_a_generator(self, config):
batch = GoogleBatch(config, "ga")
batch.extract()
points = DataFormater.ga2influx(batch)
case = next(points)
assert case["measurement"] == "google_analytics"
assert case["tags"] == {"ga:deviceCategory": "desktop", "ga:country": "France"}
assert case["fields"] == {"ga:sessions": 8, "ga:pageLoadTime": 0}
@patch('collector.GoogleBatch._get_ga_daily', MagicMock(return_value=json.loads(open('../test/api_responses/ga_normal_answer.json').read())))
def test_ga2influx_dont_miss(self, config):
batch = GoogleBatch(config, "ga")
batch.extract()
total_session = GoogleBatch._get_ga_daily()["data"]["totals"][0]["values"][0]
points = DataFormater.ga2influx(batch)
assert sum([view["fields"]["ga:sessions"] for view in points]) == int(total_session)
class TestDbConnector:
def test_data_base_connectivity(self):
config = ConfigFactory("../test/config/config.yml")
db_client = DbClient(config)
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_formater_for_google_analytics_and_influx(self, config):
batch = GoogleExtractor(config).extract("google_analytics")
db_client = InfluxDBClient(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
DataFormater(batch)>>db_client.write_points
data = db_client.query('SELECT * FROM google_analytics')
assert len(list(data.get_points(measurement='google_analytics'))) == 5
assert list(data.get_points(measurement='google_analytics'))[0]["time"] == "{}T00:00:00Z".format(date.today()-timedelta(days=1))
assert sum([x["ga:sessions"] for x in list(data.get_points(measurement='google_analytics', tags={'ga:deviceCategory': 'desktop'}))]) == 9
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
def test_formater_for_google_search_and_influx(self, config):
batch = GoogleExtractor(config).extract("google_search")
db_client = InfluxDBClient(config.db_host, config.db_port, config.db_password, config.db_user, config.db_name)
DataFormater(batch)>>db_client.write_points
data = db_client.query('SELECT * FROM google_search')
assert len(list(data.get_points(measurement='google_search'))) == 3
assert sum([x["position"] for x in list(data.get_points(measurement='google_search', tags={'query': 'data-major'}))]) == 6.0
class TestMain:
@patch('collector.GoogleExtractor._get_google_search_daily', gsc_api_mock)
@patch('collector.GoogleExtractor._get_google_analytics_daily', ga_api_mock)
def test_ingestion_of_google_analytics(self):
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment