Commit 1e10d543 authored by Iván Sánchez Ortega's avatar Iván Sánchez Ortega

Resolve twitter video URLs, lock main stream until tw endpoints are connected

parent 9f5d3cf2
import Account from '../abstracts/account';
import TimeLock from '../timelock';
import inquirer from 'inquirer';
import { OAuth } from 'oauth';
......@@ -19,6 +20,8 @@ class TwitterAccount extends Account {
constructor(callback, vorpal, accountNumber, opts) {
super.constructor(callback, opts);
// /// While we are debugging, it's a good idea to not ask too much to the twitter servers.
// return; /// DEBUG!!!
......@@ -36,6 +39,12 @@ class TwitterAccount extends Account {
this._vorpal = vorpal;
this._accountNumber = accountNumber;
let initialTimeLock = new TimeLock();
// Lock the stream until we have some data from all the channels
this.emit(-Infinity, initialTimeLock.promise);
// console.log('TW platform locked the main stream on user init');
this._tw.get('account/verify_credentials', {}, (err, json)=>{
vorpal.log(
chalk.purple('You are logged in the Twitter platform as '),
......@@ -62,6 +71,7 @@ class TwitterAccount extends Account {
stream.on('error', function(error) {
vorpal.log(chalk.red(error));
});
});
// // Hackishly listen to the "samples" public timeline
......@@ -81,6 +91,10 @@ class TwitterAccount extends Account {
// });
let homeTimelineLock = new TimeLock();
this.emit(-Infinity, homeTimelineLock.promise);
// console.log('TW locked the main stream on the last home timeline');
// Get a few of the last statuses in the main timeline
this._tw.get('statuses/home_timeline', {
count: 100
......@@ -95,11 +109,13 @@ class TwitterAccount extends Account {
this._normalizeAndEmitUpdate(msgs[i], 'home');
// vorpal.log('timelines/home', msgs);
}
// console.log('TW removed home timeline lock');
homeTimelineLock.unlock();
});
initialTimeLock.unlock();
// console.log('TW platform unlocked user init');
});
return super.constructor(callback, opts);
}
listen(channel) {
......@@ -147,6 +163,28 @@ class TwitterAccount extends Account {
replace(/&lt;/g, "<");
}
// Expand the media URLs. This is not trivial because of video URLs.
_cleanMedia(msg) {
let rawEntities = {};
if (!msg.entities) { return }
if (msg.entities && msg.entities.media) {
msg.entities.media.forEach(el=>rawEntities[el.id_str]=el);
}
if (msg.extended_entities && msg.extended_entities.media) {
msg.extended_entities.media.forEach(el=>rawEntities[el.id_str]=el);
}
return Object.keys(rawEntities).map(k=>{
let ent = rawEntities[k];
if ('video_info' in ent) {
return ent.video_info.variants[0].url;
}
return ent.media_url.https || ent.media_url;
});
}
// // Given a stream descriptor, return a function that handles messages from the stream
// _getOnStreamMessage(stream) {
// return (msg)=>{
......@@ -178,6 +216,36 @@ class TwitterAccount extends Account {
//
// }
if (msg.quoted_status &&
msg.retweeted_status &&
msg.retweeted_status.quoted_status &&
msg.retweeted_status.quoted_status.id_str === msg.quoted_status.id_str
) {
// Delete the quoted one, as the quoted from the retweet is the same
// and has the extended attributes
delete msg.quoted_status;
}
if (msg.extended_tweet && msg.extended_tweet.full_text) {
msg.text = msg.extended_tweet.full_text;
msg.truncated = false;
}
if (msg.truncated) {
// console.log('Truncated ', msg.text, ' - fetching full from id ', msg.id_str);
return {
timestamp: Date.parse(msg.created_at),
itemPromise: this._tw.get('statuses/show/', { id: msg.id_str }).then((full)=>{
if (full.truncated) {
// After requesting individual tweet, got it truncated also
// console.log('Got full but still truncated. From: «', msg.text , '» into: ', full);
console.log('Got full but still truncated. ', msg.text);
full.truncated = false;
}
return this._normalize(full, channel).itemPromise;
})
}
}
if (msg.extended_tweet) {
// return this._normalize(Object.assign({}, msg.extended_tweet, msg);
......@@ -187,29 +255,33 @@ class TwitterAccount extends Account {
}
if (msg.retweeted_status) {
return ({
accountNumber: this._accountNumber,
accountId: this._id,
raw: channel && msg,
sender: msg.user.screen_name,
str: '',
channel: channel,
media: [],
// Wait for normalization of retweeted, return a promise
return {
timestamp: Date.parse(msg.created_at),
echoed: this._normalize(msg.retweeted_status, null)
});
itemPromise: this._normalize(msg.retweeted_status, null).itemPromise.then(echoed=>
Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
raw: channel && msg,
sender: msg.user.screen_name,
str: '',
channel: channel,
media: [],
timestamp: Date.parse(msg.created_at),
echoed: echoed
})
)
};
// msg.text = msg.retweeted_status.text;
}
let allEntities = Object.assign(
{},
msg.entities || {},
msg.extended_entities || {},
(msg.retweeted_status && msg.retweeted_status.entities) || {},
(msg.retweeted_status && msg.retweeted_status.extended_entities) || {}
msg.extended_entities || {}
);
let allMedia = (allEntities.media && allEntities.media.map(i=>i.media_url_https || i.media_url )) || [];
// let allMedia = (allEntities.media && allEntities.media.map(i=>i.media_url_https || i.media_url )) || [];
// let allMedia = (allEntities.media && allEntities.media.map((i)=>{
// return
......@@ -221,21 +293,51 @@ class TwitterAccount extends Account {
let str = this._cleanString(msg.text, allEntities);
return({
accountNumber: this._accountNumber,
accountId: this._id,
sender: msg.user.screen_name,
str: str,
channel: channel,
raw: channel && msg,
// Wait for normalization of quoted, return a promise
if (msg.quoted_status) {
return {
timestamp: Date.parse(msg.created_at),
itemPromise: this._normalize(msg.quoted_status, null).itemPromise.then(quoted=>
Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
sender: msg.user.screen_name,
str: str,
channel: channel,
raw: channel && msg,
// realtime: true,
// media: msg.media_attachments.map(i=>i.text_url || i.remote_url),
media: this._cleanMedia(msg),
timestamp: Date.parse(msg.created_at),
quoted: quoted
})
)
};
}
// Neither quoted nor retweeted here
return {
timestamp: Date.parse(msg.created_at),
itemPromise: Promise.resolve({
accountNumber: this._accountNumber,
accountId: this._id,
sender: msg.user.screen_name,
str: str,
channel: channel,
raw: channel && msg,
// realtime: true,
// media: msg.media_attachments.map(i=>i.text_url || i.remote_url),
media: allMedia,
timestamp: Date.parse(msg.created_at),
quoted: msg.quoted_status && this._normalize(msg.quoted_status, null)
});
media: this._cleanMedia(msg),
timestamp: Date.parse(msg.created_at),
})
};
} else {
this._vorpal.log(chalk.red(''));
this._vorpal.log(chalk.red('Received something which is not a tweet'));
return {
timestamp: Date.parse(msg.created_at),
itemPromise: Promise.resolve(msg)
}
}
}
......@@ -247,7 +349,7 @@ class TwitterAccount extends Account {
let normalized = this._normalize(msg, channel);
if (normalized) {
this.emit(normalized.timestamp, Promise.resolve(normalized));
this.emit(normalized.timestamp, normalized.itemPromise);
}
// /// FIXME!!!
......
......@@ -60,13 +60,17 @@ function handle(timestamp, itemPromise) {
// console.log('handling: ', timestamp);
// Pass the item through all known filters
itemPromise = itemPromise.then(deshortifyFilter);
itemPromise = itemPromise.then((item)=>item && deshortifyFilter(item));
itemQueue.push({ timestamp: timestamp, itemPromise: itemPromise });
// Is this now the only item in the queue? Let's trigger a queue fetch in a short
// while.
if (itemQueue.length === 1) { setTimeout(popOldest, 1000); }
if (itemQueue.length === 1) {
// // vorpal.log(chalk.purple('>>> Nothing in the queue, let\'s wait a bit to pop the oldest item.'));
setTimeout(popOldest, 1000);
}
}
......@@ -79,6 +83,8 @@ function popOldest() {
/// delay it by a couple a seconds if so. At least delay things by the
/// duration of an API roundtrip, to prevent out-of-order messages
/// when streaming stuff.
// // vorpal.log(chalk.purple('>>> Popping item at ', itemQueue.peek().timestamp));
itemQueue.pop().itemPromise.then(onItemReady);
}
}
......
// A simple wrapper over a Promise with a timestamp, to save a bit of code in the
// platforms code.
// The idea is to provide a way to resolve a promise asynchronously, without
// adding one more level of indentation and preventing a bit of callback hell.
export default class TimeLock {
constructor() {
this._promise = new Promise((resolve)=>{
this._onUnlock = resolve;
});
}
get promise() {
return this._promise;
}
unlock() {
this._onUnlock(false);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment