Page 1 of 1

Threaded MySQL

Posted: Wed Feb 08, 2017 12:13 am
by velocity
Threaded MySQL

Latest version: 2.0.4 (from 04-10-2018)

** UPDATE 04-10-2018 **
  • Fixed issue with prioritized queue not working
  • Fixed issue with commit freezing the server and added so commit is now happening on a different thread.

** UPDATE 15-02-2017 **
  • Now uses GameThread instead of Repeat and Delay
  • Delaying the queue is still possible tho, through

    Syntax: Select all

    TSQL.wait(5) # Seconds

If your server requires a remote connection to a database, stacked up queries can cause noticeable lag in your game server (freezing, players twitching) since MySQL doesn't 'really' queue up the queries.

I've made a library to fix this problem, it basically queues up the queries and executes them with a dispatched GameThread. It's inspired from the Sourcemod threaded mysql https://wiki.alliedmods.net/SQL_(SourceMod_Scripting)
Remember that all queries now requires a callback, since they are truely dispatched.


The library:
See bottom for download instructions..
threaded_mysql/__init__.py

Syntax: Select all

"""
Created By Velocity-plus
Github: https://github.com/Velocity-plus/
"""
from listeners.tick import GameThread
from core import echo_console
from traceback import format_exc as traceback_format_exc
from logging import error as logging_error
from sys import exc_info
from queue import Queue
from time import time as timestamp, sleep
import pymysql.cursors


class ThreadedMySQL:

def __init__(self):
# Is the thread running?
self.thread_status = False
# Regular Queue
self._r_queue = Queue()
# Prioitized Queue
self._p_queue = Queue()

self.connection_method = 0

# Issue with print for WINDOWS user set the variable to 'echo_console'
self.error_handling = 'print'
# Show print messages?
self._debug = True

self.wait = 0

def wait(self, delay):
"""
If you for some reason want to delay the queue
:param delay: The delay in seconds
:return:
"""
self.wait = delay

def execute(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function cannot pass fetched data to the callback!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
# We need this later
query_type = 0

# If callback = None assuming no data returned needed
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}

if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])

def fetchone(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function both execute and fetch data, no need to execute before using this!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
query_type = 1
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}

# If callback = None assuming no data returned needed
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])

def fetchall(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function both execute and fetch data, no need to execute before using this!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
query_type = 2

if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}

# If callback = None assuming no data returned needed
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])

def complete_task(self, worker, prio=None):
query = worker[0]
args = worker[1]
callback = worker[2]
data_pack = worker[3]
get_info = worker[4]
query_type = worker[5]
try:
if get_info:
get_info['time'] = timestamp() - get_info['time']

if args:
self.cursor.execute(query, args)
else:
self.cursor.execute(query)

if query_type == 0:
if get_info:
if callback:
if data_pack:
callback(data_pack, get_info)
else:
callback(get_info)
else:
if callback:
if data_pack:
callback(data_pack)
else:
callback()
if query_type == 1:
data = self.cursor.fetchone()
if get_info:
if callback:
if data_pack:
callback(data, data_pack, get_info)
else:
callback(data, get_info)
else:
if callback:
if data_pack:
callback(data, data_pack)
else:
callback(data)

if query_type == 2:
data = self.cursor.fetchall()
if get_info:
if callback:
if data_pack:
callback(data, data_pack, get_info)
else:
callback(data, get_info)
else:
if callback:
if data_pack:
callback(data, data_pack)
else:
callback(data)
if prio:
self._p_queue.task_done()
else:
self._r_queue.task_done()

except Exception as SQL_ERROR:

# Possible errors
class_error, actual_error, traceback = exc_info()

format_error = '-' * 64 + '\nExceptions probable cause (SQL Query: {0})\n{1}\nActual Error:\n{2}'.format(query,

class_error,

SQL_ERROR)

if self.error_handling == 'print':

print(format_error)

print('-' * 64)

else:

echo_console(format_error)

echo_console('-' * 64)

logging_error(traceback_format_exc())

def _threader(self):
while self.thread_status:
if self.wait:
sleep(self.wait)

if self._p_queue.empty():
worker = self._r_queue.get()
self.complete_task(worker, prio=False)

else:
worker = self._p_queue.get()
self.complete_task(worker, prio=True)

def _start_thread(self):
# Creates the thread
self.t = GameThread(target=self._threader)
self.t.daemon = True
self.t.start()

def handlequeue_start(self):
"""
This handles the queue, should be stopped on unload
:return:
"""
# Starts the queue
self.thread_status = True # This must be true before the thread can loop
self._start_thread()

def handlequeue_stop(self):
"""
This stops the queue for being processed, while a connection still might be open
no queries can be executed.
:return:
"""
self.thread_status = False

def queue_size(self):
"""
:return: Returns the size of the queue
"""
return self._r_queue.qsize() + self._p_queue.qsize()

def connect(self, host, user, password, db, charset, cursorclass=pymysql.cursors.DictCursor):
"""
Checkout PyMYSQL documentation for complete walkthrough
"""
try:
self.connection = pymysql.connect(host=host,
user=user,
password=password,
db=db,
charset=charset,
cursorclass=cursorclass)
self.cursor = self.connection.cursor()
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [SUCCES] connection was succesfully established.')
else:
echo_console('threaded_mysql: [SUCCES] connection was succesfully established.')


self.connection_method = 1
except:
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [ERROR] Not possible to make a connection.')
else:
echo_console('threaded_mysql: [ERROR] Not possible to make a connection.')

def connect_use(self, connection):
"""
If you created your connection elsewhere in your code, you can pass it to Threaded MySQL
:param connection: Your connection socket
:return:
"""
try:
self.connection = connection
self.cursor = self.connection.cursor()
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
else:
echo_console('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
self.connection_method = 2
except:
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [ERROR] Not possible to create cursor.')
else:
echo_console('threaded_mysql: [ERROR] Not possible to create cursor.')


def commit(self):
"""
Regular pymysql commit
:return:
"""
self.connection.commit()

def close(self, finish_queue_before_close=False):
"""
Closes the mysql connection
:param finish_queue_before_close: Finishes the queue before it terminates the connection
:return:
"""
if finish_queue_before_close:
while self.queue_size() > 0:
pass
else:
self.connection.close()
else: self.connection.close()


Documentation
The library works as an extension of PyMYSQL, so as any other MySQL script, a connection must be established to a database, but before we can do that, let us initialize the class for threaded MySQL.

Syntax: Select all

from threaded_mysql import ThreadedMySQL

TSQL = ThreadedMySQL()


After we have initialized our class, we can connect to our MySQL database, you can use Threaded MySQL to connect to your database.

Syntax: Select all

# Available paramenters (host, user, password, db ,charset, cursorclass)
TSQL.connect(host='localhost', user='root', password='123', db='utf8')


If you don't want to connect with Threaded MySQL you can make your connection elsewhere and pass it to Threaded MySQL as shown below with PyMYSQL:

Syntax: Select all

import pymysql.cursors

connection = pymysql.connect(host="localhost",
user="root",
password="123",
db="test",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)

TSQL.connect_use(connection)


Now that our connection has been made, we need to start the gamethread that handles the queue of queries, as seen below.

Syntax: Select all

TSQL.handlequeue_start()


Finally, now we can make use of it. The functions available are listed below

Syntax: Select all

# Different types of queries
TSQL.execute(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
TSQL.fetchone(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
TSQL.fetchall(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):

# Returns the size of the queue
TSQL.queue_size()

#If you want to delay the queue for a specific amount time, 1 being 1 seconed
TSQL.wait(delay)

# Refreshes the tables
TSQL.commit()

# Closes the mysql connection
TSQL.close()


It's important to note that when using the fetchone or fetchall it will execute the query BEFORE fetching it, so no need to use TSQL.execute when you want to fetch something.

If you want to grab the data from fetchone or fetchall a callback is necessary. To demonstrate this look at the code examples below:

Code examples (Updated)

Syntax: Select all

from messages import SayText2
from events import Event
from threaded_mysql import ThreadedMySQL

# Initializes the class
TSQL = ThreadedMySQL()

# Connects to a mysql database
TSQL.connect('localhost', 'root', '123', 'my_database', 'utf8')

# Starts the queuehandler (should only be called once)
TSQL.handlequeue_start()


# The callback from !fetchone
def sql_callback(data):
name = data['name']
SayText2(name).send()


# The callback from !fetchall
def sql_callback_2(data, data_pack):
text = data_pack['text']
SayText2("You wrote {}".format(text)).send()
for x in data:
name = x['name']
SayText2('Name: {}'.format(name)).send()

# The callback from !info
def sql_callback_3(get_info):
"""
get_info includes 'query', 'time', 'prioritized'
"""
query = get_info['query']
time = get_info['time']
prio = get_info['prioritized']
SayText2('Query: {0}\nTime: {1} seconds\nPrioritized: {2}'.format(query, time, prio)).send()



@Event('player_say')
def on_player_say(game_event):
# What did the player write
text = game_event['text']

if text == '!fetchone':
# Fetches all the names
TSQL.fetchone('SELECT name FROM my_database', callback=sql_callback)

if text == '!fetchall':
# Let's pass some extra things...
data_pack = {'text': text}
# Fetches one name
TSQL.fetchall('SELECT name FROM my_database', callback=sql_callback_2, data_pack=data_pack)

if text == '!info':
# Fetches one name
TSQL.execute("INSERT INTO my_database (name) VALUES('John')", callback=sql_callback_3, get_info=True)

Output !fetchall
=> You wrote: !fetchall
=> Name: <name >
=> Name: <name >
=> (...)

Output !fetchone
=> Name: John

Output !info
=> Query: INSERT INTO stats (name) VALUES('John')
=> Time: 0.014952421188354492 seconds
=> Prioritized: False


Stress test
Set the variable use_threaded_mysql = 1 or 0 to see difference.

Syntax: Select all

from messages import SayText2
from threaded_mysql import ThreadedMySQL
import pymysql.cursors

# ON = No lag | OFF = Server freeze
use_threaded_mysql = 1

connection = pymysql.connect(host="localhost",
user="root",
password="123",
db="my_database",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()


def load():
# Put use_threaded_mysql = 1 to test the difference
if not use_threaded_mysql:

# Executes the query 1000 times
for x in range(1000):

cursor.execute('SELECT name FROM my_database')
data = cursor.fetchone()
# Prints it out (not necessary tho)
SayText2('Name: {}'.format(data['name'])).send()
else:
# Class
TSQL = ThreadedMySQL()
# Use the connection already created
TSQL.connect_use(connection)
# Starts the queuehandler
TSQL.handlequeue_start()

for x in range(1000):
TSQL.fetchone('SELECT name FROM my_database', callback=test)


def test(data):
SayText2('Name: {}'.format(data['name'])).send()




Download
Latest release and notes are on my github
https://github.com/Velocity-plus/threaded_mysql

You can even create tick listener and spam queries without any lag at all

Enjoy :)

Re: Threaded MySQL

Posted: Wed Feb 08, 2017 12:27 am
by decompile
Thank you so much!!!!!

Re: Threaded MySQL

Posted: Wed Feb 08, 2017 1:01 pm
by Ayuto
Actually, this isn't really threaded as Delay and Repeat are used, which are executed in the same thread. So, this will probably just delay the lag. You need to utilize GameThread to create real threads.

Re: Threaded MySQL

Posted: Wed Feb 08, 2017 2:21 pm
by velocity
Ayuto wrote:Actually, this isn't really threaded as Delay and Repeat are used, which are executed in the same thread. So, this will probably just delay the lag. You need to utilize GameThread to create real threads.


First of all, it will prevent lag 100%. Delaying every single query is a good solution because every query for itself does not cause lag, the lag occurs when you have multiple queries right after each other. It's also the same way sourcemod is doing it.
I'm posting this because it helped me with my timer plugin since I have a lot of queries going on all the time and this definitely made everything run smoothly. Anyways, I will look into it.

Re: Threaded MySQL

Posted: Tue Feb 14, 2017 11:59 pm
by Doldol
velocity wrote:
Ayuto wrote:Actually, this isn't really threaded as Delay and Repeat are used, which are executed in the same thread. So, this will probably just delay the lag. You need to utilize GameThread to create real threads.


First of all, it will prevent lag 100%. Delaying every single query is a good solution because every query for itself does not cause lag, the lag occurs when you have multiple queries right after each other. It's also the same way sourcemod is doing it.
I'm posting this because it helped me with my timer plugin since I have a lot of queries going on all the time and this definitely made everything run smoothly. Anyways, I will look into it.


It's a horrible implementation, your server will lag for the x amount of time it takes for your query to be send and the results received, which can be seconds! Like Ayuto said the proper way is to use actual Python threads, which GameThread implements.

Re: Threaded MySQL

Posted: Wed Feb 15, 2017 11:20 am
by velocity
Alright!!!!!!!! I will be re-coding it now, but in my defense IT REALLY doesn't lag.

Re: Threaded MySQL

Posted: Wed Feb 15, 2017 1:52 pm
by satoon101
Since you posted this in the Module/Package Submissions forum, which is designed for posting modules/packages to include with SP, you should definitely take Ayuto's advice if it's going to be considered.

If you did not intend for this, and instead just wanted to post a custom package that people can download and use, that is what the Custom Packages forum is designed for. Do not start another topic there, though, just let us know if that is your intention and we can move this thread.

Re: Threaded MySQL

Posted: Wed Feb 15, 2017 1:54 pm
by velocity
Yes, sir, I am soon finished with the rework. I'm totally listening to all of you guys. I will post an update later here today, where I have implemented GameThread.

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Wed Feb 15, 2017 4:25 pm
by velocity
It should be better now.

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 12:20 am
by decompile
Hey, I decided to give youre package a try. Currently building it up right now, but Im wondering what does 'commit_before_save' on close() mean.
Will it execute all "queries" in the queue before closing the connection or something else?

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 12:48 am
by velocity
decompile wrote:Hey, I decided to give youre package a try. Currently building it up right now, but Im wondering what does 'commit_before_save' on close() mean.
Will it execute all "queries" in the queue before closing the connection or something else?


Oh yes.. You are exactly right. What you are seeing is actually a mistake on my end. That has been corrected now.

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 1:07 am
by decompile
Thanks for clarifying and keep up the good work :)

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 3:04 am
by decompile
But it looks like its currently not possible that it returns the dict back instead of calling a callback?

Thats what I realy need.

Greetings

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 9:56 am
by velocity
decompile wrote:But it looks like its currently not possible that it returns the dict back instead of calling a callback?

Thats what I realy need.

Greetings


I'm not sure what you mean, but you must have a callback, since it's threaded, it cannot return directly back from where you are calling the function in your code. Can you maybe show an example of what you are trying to do?

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 4:00 pm
by decompile
pretty much like:

playerName = TSQL.fetchone("SELECT name from users WHERE steamid=%s", (playerSteamid,))
playerTime = TSQL.fetchone("SELECT time from times WHERE steamid=%s", (playerSteamid,))

(just completly random ones)

So it either returns the dict back or None if it got no results.

Re: Threaded MySQL 2.0.0 (Updated)

Posted: Thu Feb 16, 2017 6:14 pm
by velocity

Re: Threaded MySQL

Posted: Thu Dec 28, 2017 8:55 pm
by Hymns For Disco
Your TSQL class is quite a bit overcomplicated if you simply want to execute queries on a different thread with callbacks. This short code snip will let you execute queries on a gamethread without lagging the server.

Syntax: Select all

import pymysql as MySQL
from listeners.tick import GameThread

connection = MySQL.connect(host='localhost',
port=3306,
user='root',
passwd='root',
db='timer')

cursor = connection.cursor()


def ThreadedDatabaseQuery(query, callback=None):
print('starting thread')
thread = GameThread(target=ExecQuery, args=(query, callback))
thread.start()


def ExecQuery(query, callback):
print('ExecQuery')
cursor.execute(query)
connection.commit()
callback(cursor.fetchall())


Called like this:

Syntax: Select all

def print_response(rows):
print(rows)

# 2 args for callback
ThreadedDatabaseQuery('SELECT * FROM test_table', print_response)

# 1 arg for no callback
ThreadedDatabaseQuery('INSERT INTO test_table (test_col) VALUES (123)')

Re: Threaded MySQL

Posted: Sun May 27, 2018 10:27 am
by Cheaterman
This is sort of off-topic, but I'm using SQLAlchemy on my server and it works well! :-)

Re: Threaded MySQL

Posted: Sun May 27, 2018 11:10 am
by Ayuto
SQLAlchemy doesn't prevent lags. It's just a wrapper/abstraction for multiple database systems. It might run well, because your queries are executed quite fast.