improved some of the tasks to use fewer goroutines
This commit is contained in:
@@ -12,6 +12,8 @@ package database
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/gommon/log"
|
||||
)
|
||||
|
||||
// AuditRecord holds an audit record instance.
|
||||
@@ -46,6 +48,9 @@ const (
|
||||
AuditAdminLockUnlockAccount = 113
|
||||
)
|
||||
|
||||
// auditWriteQueue is a channel to store audit records in the background.
|
||||
var auditWriteQueue chan *AuditRecord
|
||||
|
||||
/* AmNewAudit creates a new audit record.
|
||||
* Parameters:
|
||||
* rectype - Audit record type.
|
||||
@@ -94,3 +99,32 @@ func (ar *AuditRecord) Store() error {
|
||||
ar.OnDate = moment
|
||||
return nil
|
||||
}
|
||||
|
||||
// auditWriter is the routine that stores audit records in trhe background.
|
||||
func auditWriter(workChan chan *AuditRecord, doneChan chan bool) {
|
||||
for ar := range workChan {
|
||||
err := ar.Store()
|
||||
if err != nil {
|
||||
log.Errorf("dropped audit record on the floor: %v", err)
|
||||
}
|
||||
}
|
||||
doneChan <- true
|
||||
}
|
||||
|
||||
// AmStoreAudit stores the audit record in the background.
|
||||
func AmStoreAudit(rec *AuditRecord) {
|
||||
if rec != nil {
|
||||
auditWriteQueue <- rec
|
||||
}
|
||||
}
|
||||
|
||||
// setupAuditWriter sets up the background audit writer.
|
||||
func setupAuditWriter() func() {
|
||||
auditWriteQueue = make(chan *AuditRecord, 16)
|
||||
doneChan := make(chan bool)
|
||||
go auditWriter(auditWriteQueue, doneChan)
|
||||
return func() {
|
||||
close(auditWriteQueue)
|
||||
<-doneChan
|
||||
}
|
||||
}
|
||||
|
||||
+5
-1
@@ -20,12 +20,16 @@ var amdb *sqlx.DB
|
||||
|
||||
// SetupDb sets up the database and associated items.
|
||||
func SetupDb() (func(), error) {
|
||||
var fn1 func() = nil
|
||||
db, err := sqlx.Open(config.GlobalConfig.Database.Driver, config.GlobalConfig.Database.Dsn)
|
||||
if err == nil {
|
||||
amdb = db
|
||||
// TODO: additional initialization
|
||||
fn1 = setupAuditWriter()
|
||||
}
|
||||
return func() {
|
||||
if fn1 != nil {
|
||||
fn1()
|
||||
}
|
||||
amdb.Close()
|
||||
}, err
|
||||
}
|
||||
|
||||
+2
-6
@@ -226,9 +226,7 @@ func AmAuthenticateUser(name string, password string, remoteIP string) (*User, e
|
||||
log.Debugf("AmAuthenicate() authenticating user %s...", name)
|
||||
var ar *AuditRecord = nil
|
||||
defer func() {
|
||||
if ar != nil {
|
||||
go ar.Store()
|
||||
}
|
||||
AmStoreAudit(ar)
|
||||
}()
|
||||
|
||||
user, err := AmGetUserByName(name)
|
||||
@@ -294,9 +292,7 @@ func crackAuthString(authString string) (int32, string, error) {
|
||||
func AmAuthenticateUserByToken(authString string, remoteIP string) (*User, error) {
|
||||
var ar *AuditRecord = nil
|
||||
defer func() {
|
||||
if ar != nil {
|
||||
go ar.Store()
|
||||
}
|
||||
AmStoreAudit(ar)
|
||||
}()
|
||||
|
||||
uid, token, err := crackAuthString(authString)
|
||||
|
||||
+23
-17
@@ -131,22 +131,28 @@ func AmNewEmailMessage(sender int32, ip string) Message {
|
||||
return rc
|
||||
}
|
||||
|
||||
// recycleMessage cleans out a message and puts it back on the free list.
|
||||
func recycleMessage(m *amMessage) {
|
||||
m.from = ""
|
||||
m.fromAddr = ""
|
||||
m.to = make([]string, 0)
|
||||
m.toAddrs = make([]string, 0)
|
||||
m.cc = make([]string, 0)
|
||||
m.bcc = make([]string, 0)
|
||||
m.subject = ""
|
||||
m.text = ""
|
||||
for k := range m.headers {
|
||||
delete(m.headers, k)
|
||||
// The "recycle bin" for messages.
|
||||
var messageRecycleBin chan *amMessage
|
||||
|
||||
// recycleMessages is a goroutine that recycles the messages on its queue.
|
||||
func recycleMessages(messages chan *amMessage, done chan bool) {
|
||||
for m := range messages {
|
||||
m.from = ""
|
||||
m.fromAddr = ""
|
||||
m.to = make([]string, 0)
|
||||
m.toAddrs = make([]string, 0)
|
||||
m.cc = make([]string, 0)
|
||||
m.bcc = make([]string, 0)
|
||||
m.subject = ""
|
||||
m.text = ""
|
||||
for k := range m.headers {
|
||||
delete(m.headers, k)
|
||||
}
|
||||
m.template = ""
|
||||
for k := range m.vars {
|
||||
delete(m.vars, k)
|
||||
}
|
||||
freeMessages.Put(m)
|
||||
}
|
||||
m.template = ""
|
||||
for k := range m.vars {
|
||||
delete(m.vars, k)
|
||||
}
|
||||
freeMessages.Put(m)
|
||||
done <- true
|
||||
}
|
||||
|
||||
+13
-8
@@ -174,7 +174,7 @@ func senderLoop(sent chan *amMessage, done chan bool) {
|
||||
} else {
|
||||
log.Errorf("unable to format message: %v", err)
|
||||
}
|
||||
go recycleMessage(m)
|
||||
messageRecycleBin <- m
|
||||
}
|
||||
done <- true // signal done for synchronization
|
||||
}
|
||||
@@ -182,9 +182,6 @@ func senderLoop(sent chan *amMessage, done chan bool) {
|
||||
// sendChan is the channel we put E-mail messages on to be sent.
|
||||
var sendChan chan *amMessage
|
||||
|
||||
// doneChan is the channel that gets signaled when the senderLoop breaks.
|
||||
var doneChan chan bool
|
||||
|
||||
// SetupMailSender starts the mail-sending goroutine.
|
||||
func SetupMailSender() func() {
|
||||
// Initialize mail host and authentication.
|
||||
@@ -207,12 +204,20 @@ func SetupMailSender() func() {
|
||||
jet.DevelopmentMode(true),
|
||||
)
|
||||
|
||||
// Start the recycler.
|
||||
messageRecycleBin = make(chan *amMessage, 16)
|
||||
doneChan1 := make(chan bool)
|
||||
go recycleMessages(messageRecycleBin, doneChan1)
|
||||
|
||||
// Start the sender loop.
|
||||
sendChan = make(chan *amMessage, 16)
|
||||
doneChan = make(chan bool)
|
||||
go senderLoop(sendChan, doneChan)
|
||||
doneChan2 := make(chan bool)
|
||||
go senderLoop(sendChan, doneChan2)
|
||||
|
||||
return func() {
|
||||
close(sendChan) // will break the loop in senderLoop
|
||||
<-doneChan // wait for routine to complete
|
||||
close(sendChan)
|
||||
<-doneChan2
|
||||
close(messageRecycleBin)
|
||||
<-doneChan1
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user