Commit 856bf4ca authored by valtron's avatar valtron

stat collection

parent 653088a8
__pycache__
.cache
msn.sqlite
*.sqlite
settings_local.py
etc/Caddyfile
dev/cert
......
from db import Base, engine
Base.metadata.create_all(engine)
import db
from core import stats
db.Base.metadata.create_all(db.engine)
stats.Base.metadata.create_all(stats.engine)
......@@ -6,6 +6,7 @@ from util.misc import gen_uuid, EMPTY_SET, run_loop
from .user import UserService
from .auth import AuthService
from .stats import Stats
from .models import User, Group, Lst, Contact, UserStatus
from . import error, event
......@@ -20,6 +21,7 @@ class Backend:
self._loop = loop
self._user_service = user_service or UserService()
self._auth_service = auth_service or AuthService()
self._stats = Stats()
self._sc = _SessionCollection()
# Dict[User.uuid, User]
......@@ -34,6 +36,7 @@ class Backend:
loop.create_task(self._sync_db())
loop.create_task(self._clean_sessions())
loop.create_task(self._sync_stats())
def add_runner(self, runner):
self._runners.append(runner)
......@@ -43,6 +46,7 @@ class Backend:
def on_leave(self, sess):
user = sess.user
self._stats.on_logout()
if user is None: return
self._sc.remove_session(sess)
if self._sc.get_sessions_by_user(user):
......@@ -75,6 +79,8 @@ class Backend:
self._user_service.update_date_login(uuid)
user = self._load_user_record(uuid)
sess.user = user
self._stats.on_login()
self._stats.on_user_active(user, sess.client)
self._sc.add_session(sess)
user.detail = self._load_detail(user)
return user
......@@ -120,6 +126,9 @@ class Backend:
self._unsynced_db[user] = ud
def sb_token_create(self, sess, *, extra_data = None):
if extra_data is None:
extra_data = {}
extra_data['client'] = sess.client
return self._auth_service.create_token('sb/xfr', { 'uuid': sess.user.uuid, 'extra_data': extra_data })
def me_update(self, sess, fields):
......@@ -280,19 +289,18 @@ class Backend:
if user is None: return None
if user.email != email: return None
sess.user = user
chat = Chat()
sess.client = extra_data['client']
chat = Chat(self._stats)
self._chats[chat.id] = chat
chat.add_session(sess)
return chat, extra_data
def auth_cal(self, uuid):
return self._auth_service.create_token('sb/cal', uuid)
def login_cal(self, sess, email, token, chatid):
(user, extra_data) = self._load_user('sb/cal', token)
if user is None: return None
if user.email != email: return None
sess.user = user
sess.client = extra_data['client']
chat = self._chats.get(chatid)
if chat is None: return None
for sc, _ in chat.get_roster(self):
......@@ -330,7 +338,9 @@ class Backend:
if not ctc_sessions: raise error.ContactNotOnline()
for ctc_sess in ctc_sessions:
token = self._auth_service.create_token('sb/cal', { 'uuid': ctc.head.uuid, 'extra_data': ctc_sess.state.get_sb_extra_data() })
extra_data = ctc_sess.state.get_sb_extra_data() or {}
extra_data['client'] = ctc_sess.client
token = self._auth_service.create_token('sb/cal', { 'uuid': ctc.head.uuid, 'extra_data': extra_data })
ctc_sess.send_event(event.InvitedToChatEvent(chatid, token, caller))
async def _sync_db(self):
......@@ -374,6 +384,15 @@ class Backend:
for sess in closed:
self._sc.remove_session(sess)
async def _sync_stats(self):
while True:
await asyncio.sleep(60)
try:
self._stats.flush()
except Exception:
import traceback
traceback.print_exc()
class _SessionCollection:
def __init__(self):
......@@ -417,19 +436,22 @@ class _SessionCollection:
self._sessions_by_user[sess.user].discard(sess)
class Chat:
def __init__(self):
def __init__(self, stats):
self.id = gen_uuid()
# Dict[Session, User]
self._users_by_sess = {}
self._stats = stats
def add_session(self, sess):
self._users_by_sess[sess] = sess.user
def send_message_to_everyone(self, sess_sender, data):
self._stats.on_message_sent(sess_sender.user, sess_sender.client)
su_sender = self._users_by_sess[sess_sender]
for sess in self._users_by_sess.keys():
if sess == sess_sender: continue
sess.send_event(event.ChatMessage(su_sender, data))
self._stats.on_message_received(sess.user, sess.client)
def get_roster(self, sess):
roster = []
......
class Client:
__slots__ = ('program', 'version', 'via', '_tuple', '_hash')
@classmethod
def FromJSON(cls, json):
return cls(json['program'], json['version'], json.get('via'))
@classmethod
def ToJSON(cls, client):
return {
'program': client.program,
'version': client.version,
'via': client.via,
}
def __init__(self, program, version, via = None):
self.program = program
self.version = version
self.via = via
self._tuple = (program, version, via)
self._hash = hash(self._tuple)
def __setattr__(self, attr, value):
if getattr(self, '_hash', None) is None:
super().__setattr__(attr, value)
return
raise AttributeError("Immutable")
def __eq__(self, other):
if not isinstance(other, Client):
return False
return self._tuple == other._tuple
def __hash__(self):
return self._hash
......@@ -5,6 +5,7 @@ class Session:
def __init__(self, state):
self.closed = False
self.user = None
self.client = None
self.state = state
def data_received(self, data: bytes) -> None:
......
from datetime import datetime
from contextlib import contextmanager
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from HLL import HyperLogLog
from core.client import Client
from util.json_type import JSONType
import settings
class Stats:
def __init__(self):
self.logged_in = 0
# Dict[DBClient.id, Dict[stat, stat value]]?
self.by_client = {}
# Dict[Client, DBClient.id]?
self._client_id_cache = None
hour = _current_hour()
with Session() as sess:
current = sess.query(CurrentStats).filter(CurrentStats.key == 'current_hour').one_or_none()
if not current:
return
if current.value['hour'] != hour:
return
self.by_client = {
int(client_id): _stats_from_json(stats)
for client_id, stats in current.value['by_client'].items()
}
def on_login(self):
self.logged_in += 1
def on_logout(self):
self.logged_in -= 1
def on_user_active(self, user, client):
self._collect('users_active', user, client)
def on_message_sent(self, user, client):
self._collect('messages_sent', user, client)
def on_message_received(self, user, client):
self._collect('messages_received', user, client)
def _collect(self, stat, user, client):
if self.by_client is None:
self.by_client = {}
bc = self.by_client
client_id = self._get_client_id(client)
if client_id not in bc:
bc[client_id] = {}
bhc = bc[client_id]
if stat == 'users_active':
if stat not in bhc:
bhc[stat] = HyperLogLog(12)
bhc[stat].add(user.email)
else:
if stat not in bhc:
bhc[stat] = 0
bhc[stat] += 1
def flush(self):
hour = _current_hour()
with Session() as sess:
current = sess.query(CurrentStats).filter(CurrentStats.key == 'logged_in').one_or_none()
if not current:
current = CurrentStats(key = 'logged_in')
current.value = self.logged_in
sess.add(current)
sess.flush()
current = sess.query(CurrentStats).filter(CurrentStats.key == 'current_hour').one_or_none()
if not current:
current = CurrentStats(key = 'current_hour', value = { 'hour': hour })
cs_hour = current.value['hour']
current.value = self._flush_to_hourly(sess, hour)
sess.add(current)
if cs_hour != hour:
self.by_client = {}
def _flush_to_hourly(self, sess, hour):
for client_id, stats in self.by_client.items():
hcs = sess.query(HourlyClientStats).filter(HourlyClientStats.hour == hour, HourlyClientStats.client_id == client_id).one_or_none()
if hcs is None:
hcs = HourlyClientStats(hour = hour, client_id = client_id)
hcs.messages_sent = stats.get('messages_sent') or 0
hcs.messages_received = stats.get('messages_received') or 0
if 'users_active' in stats:
hcs.users_active = stats['users_active'].cardinality()
else:
hcs.users_active = 0
sess.add(hcs)
return {
'hour': hour,
'by_client': {
client_id: _stats_to_json(stats)
for client_id, stats in self.by_client.items()
}
}
def _get_client_id(self, client):
if self._client_id_cache is None:
with Session() as sess:
self._client_id_cache = {
Client.FromJSON(row.data): row.id
for row in sess.query(DBClient).all()
}
if client not in self._client_id_cache:
with Session() as sess:
dbobj = DBClient(data = Client.ToJSON(client))
sess.add(dbobj)
sess.flush()
self._client_id_cache[client] = dbobj.id
return self._client_id_cache[client]
def _stats_to_json(stats):
json = {}
if 'messages_sent' in stats:
json['messages_sent'] = stats['messages_sent']
if 'messages_received' in stats:
json['messages_received'] = stats['messages_received']
if 'users_active' in stats:
json['users_active'] = list(stats['users_active'].registers())
return json
def _stats_from_json(json):
stats = {}
if 'messages_sent' in json:
stats['messages_sent'] = json['messages_sent']
if 'messages_received' in json:
stats['messages_received'] = json['messages_received']
if 'users_active' in json:
hll = HyperLogLog(12)
hll.set_registers(bytearray(json['users_active']))
stats['users_active'] = hll
return stats
def _current_hour():
now = datetime.utcnow()
ts = now.timestamp()
return ts // 3600
class Base(declarative_base()):
__abstract__ = True
class DBClient(Base):
__tablename__ = 't_client'
id = sa.Column(sa.Integer, nullable = False, primary_key = True)
data = sa.Column(JSONType, nullable = False)
class HourlyClientStats(Base):
__tablename__ = 't_stats_hour_client'
hour = sa.Column(sa.Integer, nullable = False, primary_key = True)
client_id = sa.Column(sa.Integer, nullable = False, primary_key = True)
users_active = sa.Column(sa.Integer, nullable = False)
messages_sent = sa.Column(sa.Integer, nullable = False)
messages_received = sa.Column(sa.Integer, nullable = False)
class CurrentStats(Base):
__tablename__ = 't_stats_current'
key = sa.Column(sa.String, nullable = False, primary_key = True)
value = sa.Column(JSONType, nullable = False)
engine = sa.create_engine(settings.STATS_DB)
session_factory = sessionmaker(bind = engine)
@contextmanager
def Session():
if Session._depth > 0:
yield Session._global
return
session = session_factory()
Session._global = session
Session._depth += 1
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
Session._global = None
Session._depth -= 1
Session._global = None
Session._depth = 0
from datetime import datetime
from lxml.objectify import fromstring as parse_xml
from core import session
from core.models import Substatus, Lst
from core.client import Client
from .misc import build_msnp_presence_notif, MSNPHandlers, encode_msnobj, Err
......@@ -33,6 +35,7 @@ def _m_ver(sess, trid, *args):
@_handlers
def _m_cvr(sess, trid, *args):
v = args[5]
sess.client = Client('msn', v, 'gw' if isinstance(sess, session.PollingSession) else 'direct')
sess.send_reply('CVR', trid, v, v, v, 'https://escargot.log1p.xyz', 'https://escargot.log1p.xyz')
@_handlers
......
DB = 'sqlite:///msn.sqlite'
STATS_DB = 'sqlite:///stats.sqlite'
LOGIN_HOST = 'm1.escargot.log1p.xyz'
STORAGE_HOST = LOGIN_HOST
DEBUG = False
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment