Commit 58c2f521 authored by Jose V. Trigueros's avatar Jose V. Trigueros

Async Transaction

parent 428c9162
Pipeline #99880706 passed with stages
in 14 minutes and 9 seconds
......@@ -10,12 +10,12 @@
</scm>
<properties>
<jda.version>4.0.0_53</jda.version>
<jda.version>4.0.0_68</jda.version>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.compiler.jvmTarget>11</kotlin.compiler.jvmTarget>
<kotlin.version>1.3.50</kotlin.version>
<kotlin.version>1.3.61</kotlin.version>
<main.class>tech.gdragon.App</main.class>
<metrics.version>4.1.0</metrics.version>
<metrics.version>4.1.1</metrics.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>2.0.0-alpha1</slf4j.version>
</properties>
......@@ -49,7 +49,7 @@
<dependency>
<groupId>io.github.microutils</groupId>
<artifactId>kotlin-logging</artifactId>
<version>1.7.6</version>
<version>1.7.8</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
......@@ -64,7 +64,7 @@
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.13</version>
<version>2.2.15</version>
</dependency>
<dependency>
<groupId>net.dv8tion</groupId>
......@@ -84,7 +84,7 @@
<dependency>
<groupId>org.jetbrains.exposed</groupId>
<artifactId>exposed</artifactId>
<version>0.17.6</version>
<version>0.17.7</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
......
......@@ -66,13 +66,13 @@ public class DiscordWebhookAppender extends AbstractAppender {
Response response = null;
try {
response = client.newCall(request).execute();
if(!response.isSuccessful()) {
if (!response.isSuccessful()) {
LOGGER.error("Could not send log message to Discord, error code: " + response.code());
}
} catch (IOException e) {
LOGGER.error("Could not send log message to Discord", e);
} finally {
if(response != null && response.body() != null) {
if (response != null && response.body() != null) {
response.body().close();
}
}
......
......@@ -9,10 +9,11 @@ import org.jetbrains.exposed.dao.EntityID
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.SqlExpressionBuilder
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import org.joda.time.DateTime
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.table.Tables.Guilds
import tech.gdragon.db.transaction
import tech.gdragon.listener.CombinedAudioRecorderHandler
import tech.gdragon.listener.SilenceAudioSendHandler
import java.io.File
......@@ -170,7 +171,9 @@ object BotUtils {
audioManager.sendingHandler = audioSendHandler
recordingStatus(channel.guild.selfMember, true)
sendMessage(defaultChannel, ":red_circle: **Audio is being recorded on <#${channel.id}>**")
sendMessage(defaultChannel, """:red_circle: **Recording audio on <#${channel.id}>**
|_Session ID: `${recorder.session}`_
""".trimMargin())
}
}
......@@ -288,7 +291,7 @@ object BotUtils {
}
guilds
?.forEach {
.forEach {
val guild = jda.shardManager?.getGuildById(it.id)
guild
?.leave()
......@@ -298,7 +301,7 @@ object BotUtils {
logger.error(e) { "Could not leave server '$guild'!" }
})
?: logger.warn {
transaction {
asyncTransaction {
Guild[it.id].active = false
}
"No longer in this guild ${it.name}, but marking as inactive"
......
......@@ -5,10 +5,11 @@ import com.codahale.metrics.MetricRegistry
import com.github.rollingmetrics.counter.ResetPeriodicallyCounter
import mu.KotlinLogging
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import org.koin.core.KoinComponent
import org.koin.core.inject
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
import tech.gdragon.discord.Command
import tech.gdragon.metrics.Metrics
import java.time.Duration
......
......@@ -2,11 +2,11 @@ package tech.gdragon.commands.misc
import net.dv8tion.jda.api.EmbedBuilder
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
import tech.gdragon.discord.Command
import java.awt.Color
......
package tech.gdragon.commands.settings
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Alias
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
import tech.gdragon.discord.Command
class Alias : CommandHandler() {
......@@ -34,17 +35,18 @@ class Alias : CommandHandler() {
aliases?.any { it.alias == alias } == true -> ":no_entry_sign: _Alias **`$alias`** already exists._"
// Checks that alias isn't a command
command == alias.toUpperCase() -> ":no_entry_sign: _Alias cannot be a command: **`$alias`**_"
else -> transaction {
Guild.findById(event.guild.idLong)?.settings?.let {
Alias.new {
name = command
this.alias = alias
settings = it
else -> {
asyncTransaction {
Guild.findById(event.guild.idLong)?.settings?.let {
Alias.new {
name = command
this.alias = alias
settings = it
}
}
}
":dancers: _New alias: **`$alias -> ${command.toLowerCase()}`**_"
} ?: ":no_entry_sign: _Couldn't create alias, try again._"
":dancers: _New alias: **`$alias -> ${command.toLowerCase()}`**_"
}
}
BotUtils.sendMessage(defaultChannel, message)
......
......@@ -3,10 +3,11 @@ package tech.gdragon.commands.settings
import mu.withLoggingContext
import net.dv8tion.jda.api.entities.GuildChannel
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import tech.gdragon.db.transaction
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Channel
import tech.gdragon.db.dao.Guild
......@@ -17,13 +18,13 @@ class AutoRecord : CommandHandler() {
transaction {
Guild.findOrCreate(idLong, name, region.name)
}
}?.let { guild ->
transaction {
}.let { guild ->
asyncTransaction {
Channel
.findOrCreate(channel.idLong, channel.name, guild)
.also { it.autoRecord = autoRecord }
}
} ?: logger.warn("Couldn't set autorecord.")
}
}
}
......
package tech.gdragon.commands.settings
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import tech.gdragon.db.transaction
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
......
......@@ -3,12 +3,13 @@ package tech.gdragon.commands.settings
import mu.withLoggingContext
import net.dv8tion.jda.api.entities.GuildChannel
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Channel
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
class AutoStop : CommandHandler() {
......@@ -18,14 +19,14 @@ class AutoStop : CommandHandler() {
transaction {
Guild.findOrCreate(idLong, name, region.name)
}
}?.let { guild ->
transaction {
}.let { guild ->
asyncTransaction {
Channel
.findOrCreate(channel.idLong, channel.name, guild)
.also { it.autoStop = autoStop }
}
}
} ?: logger.warn("Couldn't set autostop.")
}
}
override fun action(args: Array<String>, event: GuildMessageReceivedEvent) {
......
package tech.gdragon.commands.settings
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
class Prefix : CommandHandler() {
override fun action(args: Array<String>, event: GuildMessageReceivedEvent) {
......
package tech.gdragon.commands.settings
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
class RemoveAlias : CommandHandler() {
override fun action(args: Array<String>, event: GuildMessageReceivedEvent) {
......
......@@ -2,22 +2,23 @@ package tech.gdragon.commands.settings
import net.dv8tion.jda.api.entities.TextChannel
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import org.jetbrains.exposed.sql.transactions.transaction
import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.dao.Settings
import tech.gdragon.db.transaction
class SaveLocation : CommandHandler() {
private fun setSaveLocation(settings: Settings, channel: TextChannel?): String {
return when {
channel == null -> {
transaction { settings.defaultTextChannel = null }
asyncTransaction { settings.defaultTextChannel = null }
":file_folder: _All messages will default to current channel._"
}
channel.canTalk() -> {
transaction { settings.defaultTextChannel = channel.idLong }
asyncTransaction { settings.defaultTextChannel = channel.idLong }
":file_folder: _All messages will default to channel **${channel.asMention}**._"
}
else -> ":no_entry_sign: _Cannot send messages in **${channel.asMention}**, please configure permissions and try again._"
......
......@@ -5,7 +5,7 @@ import tech.gdragon.BotUtils
import tech.gdragon.commands.CommandHandler
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.transaction
import org.jetbrains.exposed.sql.transactions.transaction
import java.math.BigDecimal
class Volume : CommandHandler() {
......
......@@ -7,7 +7,6 @@ import org.apache.commons.io.FileUtils
import org.joda.time.DateTime
import org.koin.core.KoinComponent
import java.io.File
import java.io.InputStream
class DataStore : KoinComponent {
val logger = KotlinLogging.logger { }
......@@ -31,7 +30,7 @@ class DataStore : KoinComponent {
"Ready to upload recording to - $baseUrl/$key"
}
client.putObject(bucketName, key, file.path)
client.putObject(bucketName, key, file.path, null, null, null, null)
val stat = UploadResult.from(baseUrl, client.statObject(bucketName, key))
logger.info {
......@@ -40,10 +39,6 @@ class DataStore : KoinComponent {
return stat
}
fun upload(key: String, stream: InputStream, contentType: String = "audio/mpeg") {
client.putObject(bucketName, key, stream, contentType)
}
}
data class UploadResult(val key: String, val timestamp: DateTime, val size: Long, val url: String) {
......
@file:JvmName("ThreadLocalTransactionManager")
package tech.gdragon.db
import mu.KotlinLogging
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.statements.expandArgs
import org.jetbrains.exposed.sql.transactions.transactionManager
import java.sql.SQLException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
import org.jetbrains.exposed.sql.transactions.transaction as txn
val logger = KotlinLogging.logger { }
fun <T> transaction(db: Database? = null, statement: Transaction.() -> T): T? {
return try {
txn(db.transactionManager.defaultIsolationLevel, db.transactionManager.defaultRepetitionAttempts, db, statement)
} catch (e: SQLException) {
val exposedSQLException = e as? ExposedSQLException
val transaction = db.transactionManager.newTransaction(db.transactionManager.defaultIsolationLevel, null)
val lock = Object()
val queriesToLog = exposedSQLException?.contexts?.joinToString("\n") {
it.expandArgs(transaction)
fun <T> asyncTransaction(db: Database? = null, statement: Transaction.() -> T): Future<T> {
return CompletableFuture.supplyAsync {
synchronized(lock) {
txn(db.transactionManager.defaultIsolationLevel, db.transactionManager.defaultRepetitionAttempts, db, statement)
}
logger.error(e) {
"Failed to run transaction: $queriesToLog"
}.exceptionally { t ->
logger.error(t) {
"Failed to run asyncTransaction"
}
null
......
......@@ -2,8 +2,8 @@ package tech.gdragon.discord.logging
fun parseGuildInfo(body: String): Map<String, Any> {
val (guild, channel) = body.substringBefore(':', "#").split('#').let {
if(it.count() != 2)
listOf("","")
if (it.count() != 2)
listOf("", "")
else
it
}
......@@ -11,7 +11,7 @@ fun parseGuildInfo(body: String): Map<String, Any> {
}
fun parseAudioFile(body: String): Map<String, Any> {
val (f,s) = body.split(" - ")
val (f, s) = body.split(" - ")
val filename = f.split(' ').last()
val size = s.slice(0 until 10).toDouble()
......
......@@ -19,10 +19,10 @@ import org.koin.core.KoinComponent
import org.koin.core.inject
import tech.gdragon.BotUtils
import tech.gdragon.data.DataStore
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Channel
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.dao.Recording
import tech.gdragon.db.transaction
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.FileOutputStream
......@@ -31,6 +31,8 @@ import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.StandardCopyOption
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceChannel, val defaultChannel: TextChannel) : AudioReceiveHandler, KoinComponent {
......@@ -56,7 +58,7 @@ class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceCh
private var subscription: Disposable? = null
private var uuid: UUID? = null
private var queueFile: QueueFile? = null
private var recordingRecord: Recording? = null
private var recordingRecord: Future<Recording?> = CompletableFuture.completedFuture(null)
private var canReceive = true
private var afkCounter = 0
......@@ -65,6 +67,9 @@ class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceCh
private var queueFilename: String? = null
private var recordingSize: Int = 0
val session: String
get() = uuid.toString()
init {
subscription = createRecording()
}
......@@ -89,7 +94,7 @@ class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceCh
}
private fun createRecording(): Disposable? {
recordingRecord = transaction {
recordingRecord = asyncTransaction {
Guild.findById(voiceChannel.guild.idLong)?.let {
Recording.new {
channel = Channel.findOrCreate(voiceChannel.idLong, voiceChannel.name, it)
......@@ -232,8 +237,8 @@ class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceCh
val message = ":no_entry_sign: _Recording is empty, not uploading._"
BotUtils.sendMessage(channel, message)
transaction {
recordingRecord?.apply {
asyncTransaction {
recordingRecord.get()?.apply {
size = 0
modifiedOn = DateTime.now()
url = "N/A"
......@@ -248,22 +253,34 @@ class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceCh
// Upload to Minio
if (recording.length() < MAX_RECORDING_SIZE) {
val recordingKey = "${channel.guild.id}/${recording.name}"
val result = datastore.upload(recordingKey, recording)
try {
val result = datastore.upload(recordingKey, recording)
val message = """|:microphone2: **Recording for <#${voiceChannel?.id}> has been uploaded!**
|${result.url}
|
|_Recording will only be available for 24hrs_
|""".trimMargin()
val message = """|:microphone2: **Recording for <#${voiceChannel?.id}> has been uploaded!**
|${result.url}
|
|_Recording will only be available for 24hrs_
|""".trimMargin()
BotUtils.sendMessage(channel, message)
BotUtils.sendMessage(channel, message)
transaction {
recordingRecord?.apply {
size = result.size
modifiedOn = result.timestamp
url = result.url
asyncTransaction {
recordingRecord.get()?.apply {
size = result.size
modifiedOn = result.timestamp
url = result.url
}
}
} catch (e: Exception) {
logger.error(e) {
"Error uploading recording."
}
val errorMessage = """|:no_entry_sign: _Error uploading recording, please visit support server and provide Session ID._
|_Session ID: `$session`_
|""".trimMargin()
BotUtils.sendMessage(channel, errorMessage)
}
} else {
val recordingSize = FileUtils.byteCountToDisplaySize(recording.length())
......@@ -299,8 +316,8 @@ class CombinedAudioRecorderHandler(var volume: Double, val voiceChannel: VoiceCh
}
}
transaction {
recordingRecord?.apply {
asyncTransaction {
recordingRecord.get()?.apply {
if (url.isNullOrEmpty())
delete()
}
......
......@@ -15,13 +15,14 @@ import net.dv8tion.jda.api.events.guild.voice.GuildVoiceMoveEvent
import net.dv8tion.jda.api.events.message.guild.GuildMessageReceivedEvent
import net.dv8tion.jda.api.events.message.priv.PrivateMessageReceivedEvent
import net.dv8tion.jda.api.hooks.ListenerAdapter
import org.jetbrains.exposed.sql.transactions.transaction
import org.koin.core.KoinComponent
import tech.gdragon.BotUtils
import tech.gdragon.commands.InvalidCommand
import tech.gdragon.commands.handleCommand
import tech.gdragon.db.asyncTransaction
import tech.gdragon.db.dao.Guild
import tech.gdragon.db.nowUTC
import tech.gdragon.db.transaction
class EventListener : ListenerAdapter(), KoinComponent {
......@@ -31,7 +32,7 @@ class EventListener : ListenerAdapter(), KoinComponent {
override fun onGuildJoin(event: GuildJoinEvent) {
withLoggingContext("guild" to event.guild.name) {
val guild = event.guild
transaction {
asyncTransaction {
Guild
.findOrCreate(guild.idLong, guild.name, guild.region.name)
.also {
......@@ -46,7 +47,7 @@ class EventListener : ListenerAdapter(), KoinComponent {
override fun onGuildLeave(event: GuildLeaveEvent) {
withLoggingContext("guild" to event.guild.name) {
transaction {
asyncTransaction {
Guild
.findById(event.guild.idLong)
?.let {
......@@ -61,7 +62,7 @@ class EventListener : ListenerAdapter(), KoinComponent {
override fun onGuildUpdateName(event: GuildUpdateNameEvent) {
withLoggingContext("guild" to event.guild.name) {
event.run {
transaction {
asyncTransaction {
Guild.findById(guild.idLong)
.also {
it?.name = newName
......@@ -77,7 +78,7 @@ class EventListener : ListenerAdapter(), KoinComponent {
override fun onGuildUpdateRegion(event: GuildUpdateRegionEvent) {
withLoggingContext("guild" to event.guild.name) {
event.run {
transaction {
asyncTransaction {
Guild.findOrCreate(guild.idLong, guild.name, event.oldRegion.name)
.also {
it.region = newRegion.name
......@@ -163,32 +164,27 @@ class EventListener : ListenerAdapter(), KoinComponent {
}
val prefix = transaction {
guild?.settings?.prefix
guild.settings.prefix
}
withLoggingContext("guild" to event.guild.name, "text-channel" to event.channel.name) {
if (prefix != null) {
val rawContent = event.message.contentDisplay.toLowerCase()
val inMaintenance = getKoin().getProperty("MAINTENANCE", "false").toBoolean()
val defaultChannel = BotUtils.defaultTextChannel(event.guild) ?: event.channel
val hasPrefix = rawContent.startsWith(prefix)
when {
hasPrefix && inMaintenance -> {
BotUtils.sendMessage(defaultChannel, ":poop: _Currently running an update...\n\nIf you have any questions please visit the support server: ${website}_")
logger.warn("Trying to use while running an update")
}
hasPrefix ->
try {
handleCommand(event, prefix, rawContent)
// Update activity
transaction { guild?.lastActiveOn = nowUTC() }
} catch (e: InvalidCommand) {
BotUtils.sendMessage(defaultChannel, ":no_entry_sign: _Usage: `${e.usage(prefix)}`_")
logger.warn { "[$rawContent] ${e.reason}" }
}
val rawContent = event.message.contentDisplay.toLowerCase()
val inMaintenance = getKoin().getProperty("MAINTENANCE", "false").toBoolean()
val defaultChannel = BotUtils.defaultTextChannel(event.guild) ?: event.channel
val hasPrefix = rawContent.startsWith(prefix)
if (hasPrefix && inMaintenance) {
BotUtils.sendMessage(defaultChannel, ":poop: _Currently running an update...\n\nIf you have any questions please visit the support server: ${website}_")
logger.warn("Trying to use while running an update")
} else if (hasPrefix)
try {
handleCommand(event, prefix, rawContent)
// Update activity
asyncTransaction { guild.lastActiveOn = nowUTC() }
} catch (e: InvalidCommand) {
BotUtils.sendMessage(defaultChannel, ":no_entry_sign: _Usage: `${e.usage(prefix)}`_")
logger.warn { "[$rawContent] ${e.reason}" }
}
}
}
}
......
......@@ -4,20 +4,17 @@ DISCORD_WEBHOOK=
PORT=8080
VERSION=dev
WEBSITE=https://www.pawa.im
# Bot
BOT_NAME=throw_voice
BOT_INVITE_URL=
BOT_TOKEN=
BOT_VERSION=latest
# DataStore
DS_ACCESS_KEY=
DS_BASEURL=
DS_BUCKET=
DS_HOST=http://localhost:9000
DS_SECRET_KEY=
# Rollbar
ROLLBAR_ENV=test
ROLLBAR_TOKEN=
......@@ -17,11 +17,27 @@
<Discord name="Discord">
<webhookUrl>${env:DISCORD_WEBHOOK}</webhookUrl>
</Discord>
<RollingRandomAccessFile
name="RollingRandomAccessFile"
fileName="logs/database.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="${globalPattern}" charset="UTF-8"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
</Policies>
<DefaultRolloverStrategy max="5"/>
</RollingRandomAccessFile>
</Appenders>