Commit 2869000d authored by Iván Sánchez Ortega's avatar Iván Sánchez Ortega

Implemented a priority queue to get all updates in order. This was easier than expected.

parent 074ae4b0
......@@ -16,6 +16,7 @@
"mastodon-api": "^1.1.1",
"oauth": "^0.9.15",
"striptags": "^3.0.1",
"tinyqueue": "^1.2.2",
"twitter": "^1.7.0",
"vorpal": "^1.11.4"
},
......
......@@ -63,8 +63,8 @@ export default class Account {
// filtering/transforming by plugins). Needs some normalized data about the
// update, but can include optional metadata as given from the platform.
// This is meant to be called only from subclasses.
emit(data, metadata) {
this._callback(this, data, metadata);
emit(timestamp, data) {
this._callback(timestamp, data);
return this;
}
......
......@@ -15,7 +15,7 @@ import he from 'he';
class MastodonAccount extends Account {
constructor(callback, vorpal, opts) {
constructor(callback, vorpal, accountNumber, opts) {
// this._timers = {};
this._opts = Object.assign({
......@@ -28,6 +28,7 @@ class MastodonAccount extends Account {
});
this._vorpal = vorpal;
this._accountNumber = accountNumber;
this._masto.get('accounts/verify_credentials')
/*.then(response=>response.json()).*/
......@@ -42,6 +43,7 @@ class MastodonAccount extends Account {
chalk.cyan(opts.badge)
);
this._id = json.data.username + '@' + opts.api_url.replace(/.*:\/\//,'').replace(/\/.*/, '');
// Hackishly listen to the public timeline
let userListener = this._masto.stream('streaming/user');
......@@ -67,8 +69,6 @@ class MastodonAccount extends Account {
}
});
// let publicListener = this._masto.stream('streaming/public');
// publicListener.on('message', this._getOnStreamMessage('public'));
// publicListener.on('error', err => {
......@@ -76,46 +76,6 @@ class MastodonAccount extends Account {
// });
/* A update msg looks like:
{ event: 'update',
data:
{ id: 2133504,
created_at: '2017-04-10T08:36:36.000Z',
in_reply_to_id: null,
in_reply_to_account_id: null,
sensitive: false,
spoiler_text: '',
visibility: 'public',
application: null,
account:
{ id: 52132,
username: 'Milady_Oscar',
acct: 'Milady_Oscar@oc.todon.fr',
display_name: 'Lady Oscar ',
locked: false,
created_at: '2017-04-05T15:18:04.424Z',
note: 'Un peu de vous et beaucoup de n\'importe moi, des trucs de filles, des blagues de mecs, et des fraises tagada.\nSur Twitter avec le même @ ',
url: 'https://oc.todon.fr/@Milady_Oscar',
avatar: 'https://files.mastodon.social/accounts/avatars/000/052/132/original/5d55de1a7e19b927.PNG?1491405483',
header: 'https://files.mastodon.social/accounts/headers/000/052/132/original/6e2afd6a5009629b.PNG?1491405483',
followers_count: 22,
following_count: 15,
statuses_count: 686
},
media_attachments: [],
mentions: [],
tags: [],
uri: 'tag:oc.todon.fr,2017-04-10:objectId=160722:objectType=Status',
content: '<p>Je sens bien le barbecue aujourd\'hui. Mais j\'ai pas acheté de charbon de bois. J\'ai envie de prendre Marine Le Pen à la place.</p>',
url: 'https://oc.todon.fr/users/Milady_Oscar/updates/11367',
reblogs_count: 0,
favourites_count: 0,
reblog: null
}
}
*/
});
......@@ -178,13 +138,16 @@ class MastodonAccount extends Account {
}
return({
accountNumber: this._accountNumber,
accountId: this._id,
sender: msg.account.acct,
str: msg.reblog ? '' : this._cleanString(msg.content),
channel: channel,
// realtime: true,
media: msg.media_attachments.map(i=>i.text_url || i.remote_url),
timestamp: Date.parse(msg.created_at),
echoed: msg.reblog && this._normalize(channel, msg.reblog)
echoed: msg.reblog && this._normalize(channel, msg.reblog),
raw: msg
});
// if (msg.mentions) {
......@@ -207,41 +170,50 @@ class MastodonAccount extends Account {
let normalized = this._normalize(channel, msg);
if (normalized) {
this.emit(normalized, msg);
this.emit(normalized.timestamp, Promise.resolve(normalized));
}
}
_normalizeAndEmitNotification(channel, msg) {
let timestamp = Date.parse(msg.created_at);
if (msg.type === 'follow') {
this.emit({
this.emit(timestamp, Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
str: chalk.bold(msg.account.acct) + ' started following you.',
channel: channel,
timestamp: Date.parse(msg.created_at)
});
timestamp: timestamp
}));
} else if (msg.type === 'reblog') {
this.emit({
this.emit(timestamp, Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
str: chalk.bold(msg.account.acct) + ' echoed your update: ' + this._cleanString(msg.status.content),
channel: channel,
timestamp: Date.parse(msg.created_at)
});
timestamp: timestamp
}));
} else if (msg.type === 'favourite') {
this.emit({
this.emit(timestamp, Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
str: chalk.bold(msg.account.acct) + ' favourited your update: ' + this._cleanString(msg.status.content),
channel: channel,
timestamp: Date.parse(msg.created_at)
});
timestamp: timestamp
}));
} else if (msg.type === 'mention') {
this.emit({
this.emit(timestamp, Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
str: chalk.bold(msg.account.acct) + ' mentioned you ' + this._cleanString(msg.status.content),
channel: channel,
timestamp: Date.parse(msg.created_at)
});
timestamp: timestamp
}));
} else {
this._vorpal.log('channel', msg);
}
}
}
......
......@@ -17,8 +17,10 @@ const soclialConsumerSecret = 'qsFwbc6gXtcHAplAXjfoKXu96hVXz82W7sEbzuLR2U';
class TwitterAccount extends Account {
constructor(callback, vorpal, opts) {
// this._timers = {};
constructor(callback, vorpal, accountNumber, opts) {
// /// While we are debugging, it's a good idea to not ask too much to the twitter servers.
// return; /// DEBUG!!!
this._opts = Object.assign({
// colour: 'grey'
......@@ -32,6 +34,7 @@ class TwitterAccount extends Account {
});
this._vorpal = vorpal;
this._accountNumber = accountNumber;
this._tw.get('account/verify_credentials', {}, (err, json)=>{
vorpal.log(
......@@ -43,6 +46,7 @@ class TwitterAccount extends Account {
chalk.cyan(opts.badge)
);
this._id = json.screen_name + '@twitter';
// Hackishly listen to the user's main stream
this._stream = this._tw.stream('user', {
......@@ -151,11 +155,14 @@ class TwitterAccount extends Account {
if (msg.extended_tweet) {
// return this._normalize(Object.assign({}, msg.extended_tweet, msg);
msg.text = msg.extended_tweet.full_text;
msg.entities = msg.extended_tweet.entities;
/// TODO: Maybe something has to be done about the entities?
}
if (msg.retweeted_status) {
return ({
accountNumber: this._accountNumber,
accountId: this._id,
sender: msg.user.screen_name,
str: '',
channel: 'home',
......@@ -187,6 +194,8 @@ class TwitterAccount extends Account {
let str = this._cleanString(msg.text, allEntities);
return({
accountNumber: this._accountNumber,
accountId: this._id,
sender: msg.user.screen_name,
str: str,
channel: 'home',
......@@ -209,7 +218,7 @@ class TwitterAccount extends Account {
let normalized = this._normalize(msg);
if (normalized) {
this.emit(normalized, msg);
this.emit(normalized.timestamp, Promise.resolve(normalized));
}
// /// FIXME!!!
......
......@@ -10,6 +10,7 @@ import inquirer from 'inquirer';
import Configstore from 'configstore';
import chalk from 'chalk256';
import Flatten from 'flatten-obj';
import TinyQueue from 'tinyqueue';
const flatten = Flatten();
const vorpal = Vorpal();
......@@ -37,46 +38,98 @@ vorpal.log(chalk.purple('>>> User configuration file read.'));
const startTime = new Date();
const timezoneOffset = startTime.getTimezoneOffset();
/// TODO: split handling (and storing), stringifying, and displaying.
function handle(account, data, metadata) {
// Define a tiniqueue for the items. This will allow fetching the Promise for
// the oldest update. Also the accounts can block the queue with synthetic
// promises that resolve to a falsy value.
// The queue will store entries in a { timestamp: 1234, itemPromise: Promise(...) } format.
function compareQueueItems(a, b) {
return a.timestamp - b.timestamp;
}
let itemQueue = new TinyQueue([], compareQueueItems);
// This handles new item Promises from the accounts. Put them in the queue,
// wait for the oldest to resolve.
function handle(timestamp, itemPromise) {
// let { timestamp, channel } = item;
// console.log('handling: ', timestamp, itemPromise);
// console.log('handling: ', timestamp);
data.metadata = metadata;
data.account = account;
itemQueue.push({ timestamp: timestamp, itemPromise: itemPromise });
let { timestamp, channel } = data;
// Is this now the only item in the queue? Let's trigger a queue fetch in a short
// while.
if (itemQueue.length === 1) { setTimeout(popOldest, 1000); }
if (timestamp === undefined) {
timestamp = Date.now();
}
// Pops the oldest in the queue, to stringify and display it.
// If there are no items left, do nothing. A new handle() is supposed to trigger
// this again.
function popOldest() {
if (itemQueue.length) {
/// TODO: Check if the oldest item is too close to the present, and
/// delay it by a couple a seconds if so. At least delay things by the
/// duration of an API roundtrip, to prevent out-of-order messages
/// when streaming stuff.
itemQueue.pop().itemPromise.then(onItemReady);
}
}
// stringifies and displays the item.
function onItemReady(item) {
timestamp -= timezoneOffset * 60000;
if (!item) {
// If there is no item, this means that a synthetic entry was added
// to the queue - most probably to block the queue temporarily.
popOldest(); // Go to the next one.
return;
}
if (item.timestamp === undefined) {
item.timestamp = Date.now();
}
item.timestamp -= timezoneOffset * 60000;
let id = circularBuffer.push(item);
item.soclialId = id;
// console.log('onItemReady, item accountNumber is ', item.accountNumber);
// console.log('onItemReady, all accounts config is ', conf.all.accounts);
// console.log('onItemReady, account config is ', conf.all.accounts[item.accountNumber]);
let badge = conf.all.accounts[Number(item.accountNumber)].badge; // What's the badge for this platform/account??
// https://stackoverflow.com/questions/10645994/node-js-how-to-format-a-date-string-in-utc
let timestampString =
new Date(timestamp)
.toISOString()
.replace(/T/, ' ') // replace T with a space
.replace(/\..+/, ''); // delete the dot and everything after
let id = circularBuffer.push(data);
let badge = account._opts.badge; // What's the badge for this platform/account??
new Date(item.timestamp)
.toISOString()
.replace(/T/, ' ') // replace T with a space
.replace(/\..+/, ''); // delete the dot and everything after
data.soclialId = id;
const timestampColour = conf.get('global.colours.timestamp');
const badgeColour = conf.get('global.colours.badge');
let header = '[' + chalk.foreground(timestampColour)(timestampString) + '] ' +
id + '> ' + chalk.foreground(badgeColour)(badge + '/' + channel);
id + '> ' + chalk.foreground(badgeColour)(badge + '/' + item.channel);
let formatted = stringify(conf, data);
let formatted = stringify(conf, item);
// Printing out should NOT be synchronous with the handling.
vorpal.log(header + formatted);
popOldest(); // Go to the next one.
}
// Add a mock 'clock' account to the config
// conf.set('accounts.clock', {
// test: { // alias for the account
......@@ -123,7 +176,7 @@ function activateAccount(accId) {
throw new Error('Unknown platform in a configured account. Please edit your configuration manually to remove the offending account.');
}
activeAccounts[accId] = new platforms[pl].Account(handle, vorpal, accConf);
activeAccounts[accId] = new platforms[pl].Account(handle, vorpal, accId, accConf);
for (let ch in accConf.channels) {
vorpal.log(
......
......@@ -1531,6 +1531,10 @@ time-stamp@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/time-stamp/-/time-stamp-1.0.1.tgz#9f4bd23559c9365966f3302dbba2b07c6b99b151"
tinyqueue@^1.2.2:
version "1.2.2"
resolved "https://registry.yarnpkg.com/tinyqueue/-/tinyqueue-1.2.2.tgz#947229e5e4197aba988acd27751dcc582e6728ff"
tmp@^0.0.31:
version "0.0.31"
resolved "https://registry.yarnpkg.com/tmp/-/tmp-0.0.31.tgz#8f38ab9438e17315e5dbd8b3657e8bfb277ae4a7"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment