Skip to content
Snippets Groups Projects
Commit bdb30130 authored by Patrick Auernig's avatar Patrick Auernig :coffee:
Browse files

Make HLS optional with cargo feature flag

parent 0529ef8c
No related branches found
No related tags found
No related merge requests found
......@@ -17,9 +17,6 @@ parking_lot = "^0.7"
futures = "^0.1"
tokio = "^0.1"
rml_rtmp = "^0.2"
mpeg2ts = "0.1"
m3u8-rs = "1.0"
tempfile = "3.0"
clap = "~2.32"
[dependencies.log]
......@@ -34,6 +31,19 @@ version = "0.2"
optional = true
version = "0.2"
[dependencies.mpeg2ts]
optional = true
version = "0.1"
[dependencies.m3u8-rs]
optional = true
version = "1.0"
[dependencies.tempfile]
optional = true
version = "3.0"
[features]
default = ["tls"]
default = ["tls", "hls"]
tls = ["native-tls", "tokio-tls"]
hls = ["mpeg2ts", "m3u8-rs", "tempfile"]
......@@ -13,9 +13,13 @@ pub enum Error {
HandshakeFailed,
RequestError,
SessionError(String),
#[cfg(feature = "hls")]
NotEnoughData,
#[cfg(feature = "hls")]
DecoderConfigurationRecordMissing,
#[cfg(feature = "hls")]
AudioSpecificConfigurationMissing,
#[cfg(feature = "hls")]
UnsupportedConfigurationRecordVersion(u8),
}
......
......@@ -6,12 +6,16 @@ mod shared;
mod config;
mod media;
mod rtmp;
mod hls;
mod args;
#[cfg(feature = "hls")]
mod hls;
use futures::future::lazy;
use simplelog::{Config, SimpleLogger, TermLogger, LevelFilter};
#[allow(unused_imports)]
use self::{
shared::Shared,
error::Error,
......@@ -29,14 +33,21 @@ fn main() {
eprintln!("Failed to initialize logger: {}", err)));
tokio::run(lazy(|| {
let hls_server = hls::Server::new();
let hls_sender = hls_server.sender();
tokio::spawn(hls_server.coordinator());
let shared = Shared::new();
let shared = Shared::new(hls_sender);
#[cfg(feature = "hls")]
spawn_hls_server(shared.clone());
tokio::spawn(rtmp::Server::new(shared.clone()).start());
Ok(())
}));
}
#[cfg(feature = "hls")]
fn spawn_hls_server(mut shared: Shared) {
let hls_server = hls::Server::new();
let hls_sender = hls_server.sender();
shared.set_hls_sender(hls_sender);
tokio::spawn(hls_server.coordinator());
}
#[cfg(feature = "hls")]
pub mod codec;
use std::collections::HashSet;
use futures::sync::mpsc;
use bytes::Bytes;
use rml_rtmp::{
sessions::StreamMetadata,
time::RtmpTimestamp,
};
#[cfg(feature = "hls")]
use futures::sync::mpsc;
#[cfg(feature = "hls")]
pub use self::codec::{avc, aac};
#[cfg(feature = "hls")]
pub type Receiver = mpsc::UnboundedReceiver<Media>;
#[cfg(feature = "hls")]
pub type Sender = mpsc::UnboundedSender<Media>;
......
......@@ -3,6 +3,7 @@ use std::{
rc::Rc,
};
use::log::{debug, error, info};
#[cfg(feature = "hls")]
use futures::{sync::oneshot, Future};
use rml_rtmp::{
sessions::{
......@@ -16,8 +17,10 @@ use rml_rtmp::{
use crate::{
error::{Error, Result},
shared::Shared,
media::{self, Media, Channel},
media::{Media, Channel},
};
#[cfg(feature = "hls")]
use crate::media;
use super::Client;
......@@ -32,6 +35,7 @@ pub struct Handler {
peer_id: u64,
results: VecDeque<EventResult>,
shared: Shared,
#[cfg(feature = "hls")]
media_sender: Option<media::Sender>,
}
......@@ -49,6 +53,7 @@ impl Handler {
peer_id,
results: VecDeque::new(),
shared,
#[cfg(feature = "hls")]
media_sender: None,
};
......@@ -157,16 +162,8 @@ impl Handler {
}
}
let (request, response) = oneshot::channel();
self.shared.hls_sender.unbounded_send(request)
.map_err(|err| error!("{:?}", err))
.map(|_| {
response.map(|hls_writer_handle| {
self.media_sender = Some(hls_writer_handle);
})
.wait().unwrap()
})
.unwrap();
#[cfg(feature = "hls")]
self.register_on_hls_server();
let result = {
let mut clients = self.shared.clients.lock();
......@@ -284,9 +281,8 @@ impl Handler {
fn multimedia_data_received(&mut self, stream_key: &str, media: &Media) -> Result<()> {
// debug!("Received video data for stream with key {}", stream_key);
if let Some(media_sender) = &self.media_sender {
media_sender.unbounded_send(media.clone()).unwrap();
}
#[cfg(feature = "hls")]
self.send_to_hls_writer(media.clone());
let app_name = self.shared
.app_name_from_stream_key(&stream_key)
......@@ -337,6 +333,30 @@ impl Handler {
Ok(())
}
#[cfg(feature = "hls")]
fn register_on_hls_server(&mut self) {
if let Some(sender) = self.shared.hls_sender() {
let (request, response) = oneshot::channel();
sender.unbounded_send(request)
.map_err(|err| error!("{:?}", err))
.map(|_| {
response.map(|hls_writer_handle| {
self.media_sender = Some(hls_writer_handle);
})
.wait().unwrap()
})
.unwrap();
}
}
#[cfg(feature = "hls")]
fn send_to_hls_writer(&self, media: Media) {
if let Some(media_sender) = &self.media_sender {
media_sender.unbounded_send(media).unwrap();
}
}
}
impl Drop for Handler {
......@@ -345,3 +365,4 @@ impl Drop for Handler {
clients.remove(&self.peer_id);
}
}
......@@ -9,9 +9,10 @@ use crate::{
Client,
Sender,
},
hls,
config::Config,
};
#[cfg(feature = "hls")]
use crate::hls;
#[derive(Clone)]
......@@ -21,18 +22,34 @@ pub struct Shared {
pub clients: Arc<Mutex<HashMap<u64, Client>>>,
pub streams: Arc<RwLock<HashMap<String, Channel>>>,
pub app_names: Arc<RwLock<HashMap<String, String>>>,
pub hls_sender: hls::server::Sender,
#[cfg(feature = "hls")]
hls_sender: Arc<RwLock<Option<hls::server::Sender>>>,
}
impl Shared {
pub fn new(hls_sender: hls::server::Sender) -> Self {
pub fn new() -> Self {
Self {
config: Arc::new(RwLock::new(Config::new())),
peers: Arc::new(RwLock::new(HashMap::new())),
clients: Arc::new(Mutex::new(HashMap::new())),
streams: Arc::new(RwLock::new(HashMap::new())),
app_names: Arc::new(RwLock::new(HashMap::new())),
hls_sender,
#[cfg(feature = "hls")]
hls_sender: Arc::new(RwLock::new(None)),
}
}
#[cfg(feature = "hls")]
pub fn set_hls_sender(&mut self, sender: hls::server::Sender) {
let mut hls_sender = self.hls_sender.write();
*hls_sender = Some(sender);
}
#[cfg(feature = "hls")]
pub fn hls_sender(&self) -> Option<hls::server::Sender> {
match self.hls_sender.read().clone() {
Some(sender) => Some(sender),
None => None,
}
}
......
#![cfg(feature = "hls")]
use bytes::Buf;
use byteorder::{ReadBytesExt, BigEndian};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment