coding tweaks in preparation for crafting "delete conference" code
This commit is contained in:
@@ -98,6 +98,17 @@ type AmConfig struct {
|
||||
NoCompressTypes []string `yaml:"noCompressTypes"`
|
||||
} `yaml:"uploads"`
|
||||
} `yaml:"posting"`
|
||||
Tuning struct {
|
||||
WorkerTasks int `yaml:"workerTasks"`
|
||||
Queues struct {
|
||||
AuditWrites int `yaml:"auditWrites"`
|
||||
ContextRecycle int `yaml:"contextRecycle"`
|
||||
EmailRecycle int `yaml:"emailRecycle"`
|
||||
EmailSend int `yaml:"emailSend"`
|
||||
IPBans int `yaml:"ipBans"`
|
||||
WorkerTasks int `yaml:"workerTasks"`
|
||||
} `yaml:"queues"`
|
||||
} `yaml:"tuning"`
|
||||
}
|
||||
|
||||
type AmConfigComputed struct {
|
||||
@@ -210,6 +221,13 @@ func overlayConfig(dest *AmConfig, loaded *AmConfig, defaults *AmConfig) {
|
||||
dest.Posting.ExternalDictionary = overlayString(loaded.Posting.ExternalDictionary, defaults.Posting.ExternalDictionary)
|
||||
dest.Posting.Uploads.MaxSize = overlayString(loaded.Posting.Uploads.MaxSize, defaults.Posting.Uploads.MaxSize)
|
||||
dest.Posting.Uploads.NoCompressTypes = overlayStringArray(loaded.Posting.Uploads.NoCompressTypes, defaults.Posting.Uploads.NoCompressTypes)
|
||||
dest.Tuning.WorkerTasks = overlayInt(loaded.Tuning.WorkerTasks, defaults.Tuning.WorkerTasks)
|
||||
dest.Tuning.Queues.AuditWrites = overlayInt(loaded.Tuning.Queues.AuditWrites, defaults.Tuning.Queues.AuditWrites)
|
||||
dest.Tuning.Queues.ContextRecycle = overlayInt(loaded.Tuning.Queues.ContextRecycle, defaults.Tuning.Queues.ContextRecycle)
|
||||
dest.Tuning.Queues.EmailRecycle = overlayInt(loaded.Tuning.Queues.EmailRecycle, defaults.Tuning.Queues.EmailRecycle)
|
||||
dest.Tuning.Queues.EmailSend = overlayInt(loaded.Tuning.Queues.EmailSend, defaults.Tuning.Queues.EmailSend)
|
||||
dest.Tuning.Queues.IPBans = overlayInt(loaded.Tuning.Queues.IPBans, defaults.Tuning.Queues.IPBans)
|
||||
dest.Tuning.Queues.WorkerTasks = overlayInt(loaded.Tuning.Queues.WorkerTasks, defaults.Tuning.Queues.WorkerTasks)
|
||||
}
|
||||
|
||||
// parseDataSize converts the data size in bytes, kilobytes, megabytes, or gigabytes to a number value.
|
||||
|
||||
@@ -52,3 +52,12 @@ posting:
|
||||
- "image/jpg"
|
||||
- "image/jpeg"
|
||||
- "image/png"
|
||||
tuning:
|
||||
workerTasks: 4
|
||||
queues:
|
||||
auditWrites: 16
|
||||
contextRecycle: 16
|
||||
emailRecycle: 16
|
||||
emailSend: 16
|
||||
ipBans: 32
|
||||
workerTasks: 128
|
||||
|
||||
+2
-1
@@ -14,6 +14,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.erbosoft.com/amy/amsterdam/config"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -150,7 +151,7 @@ func AmStoreAudit(rec *AuditRecord) {
|
||||
|
||||
// setupAuditWriter sets up the background audit writer.
|
||||
func setupAuditWriter() func() {
|
||||
auditWriteQueue = make(chan *AuditRecord, 16)
|
||||
auditWriteQueue = make(chan *AuditRecord, config.GlobalConfig.Tuning.Queues.AuditWrites)
|
||||
doneChan := make(chan bool)
|
||||
go auditWriter(auditWriteQueue, doneChan)
|
||||
return func() {
|
||||
|
||||
+2
-1
@@ -20,6 +20,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.erbosoft.com/amy/amsterdam/config"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -140,7 +141,7 @@ var banSweeperInput chan *sweepentry
|
||||
// setupIPBanSweep sets up the IP ban sweeper routine, and returns a function that tears it down.
|
||||
func setupIPBanSweep() func() {
|
||||
banSweeperReset = make(chan bool)
|
||||
banSweeperInput = make(chan *sweepentry, 32)
|
||||
banSweeperInput = make(chan *sweepentry, config.GlobalConfig.Tuning.Queues.IPBans)
|
||||
done := make(chan bool)
|
||||
ended := make(chan bool)
|
||||
go banSweeper(done, ended, banSweeperReset, banSweeperInput)
|
||||
|
||||
+13
-7
@@ -427,6 +427,18 @@ func backgroundPurgeTopic(ctx context.Context, topicid int32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// eraseTopicRecords erases the high-level records for a topic from the database.
|
||||
func eraseTopicRecords(ctx context.Context, tx *sqlx.Tx, topicid int32) error {
|
||||
_, err := tx.ExecContext(ctx, "DELETE FROM topics WHERE topicid = ?", topicid)
|
||||
if err == nil {
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM topicsettings WHERE topicid = ?", topicid)
|
||||
if err == nil {
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM topicbozo WHERE topicid = ?", topicid)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete deletes this topic.
|
||||
func (t *Topic) Delete(ctx context.Context, u *User, ipaddr string, background *util.WorkerPool) error {
|
||||
var ar *AuditRecord = nil
|
||||
@@ -446,13 +458,7 @@ func (t *Topic) Delete(ctx context.Context, u *User, ipaddr string, background *
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM topics WHERE topicid = ?", t.TopicId)
|
||||
if err == nil {
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM topicsettings WHERE topicid = ?", t.TopicId)
|
||||
if err == nil {
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM topicbozo WHERE topicid = ?", t.TopicId)
|
||||
}
|
||||
}
|
||||
err = eraseTopicRecords(ctx, tx, t.TopicId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+2
-2
@@ -220,12 +220,12 @@ func SetupMailSender() func() {
|
||||
emailRenderer.AddGlobal("GlobalConfig", config.GlobalConfig)
|
||||
|
||||
// Start the recycler.
|
||||
messageRecycleBin = make(chan *amMessage, 16)
|
||||
messageRecycleBin = make(chan *amMessage, config.GlobalConfig.Tuning.Queues.EmailRecycle)
|
||||
doneChan1 := make(chan bool)
|
||||
go recycleMessages(messageRecycleBin, doneChan1)
|
||||
|
||||
// Start the sender loop.
|
||||
sendChan = make(chan *amMessage, 16)
|
||||
sendChan = make(chan *amMessage, config.GlobalConfig.Tuning.Queues.EmailSend)
|
||||
doneChan2 := make(chan bool)
|
||||
go senderLoop(sendChan, doneChan2)
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ func main() {
|
||||
defer stop()
|
||||
|
||||
// Set up ampool.
|
||||
ampool = util.AmNewPool(ctx, 4, 128)
|
||||
ampool = util.AmNewPool(ctx, config.GlobalConfig.Tuning.WorkerTasks, config.GlobalConfig.Tuning.Queues.WorkerTasks)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
ampool.Shutdown()
|
||||
|
||||
+1
-1
@@ -621,7 +621,7 @@ func contextRecycler(incoming chan *amContext, done chan bool) {
|
||||
|
||||
// SetupAmContext starts the recycler for contexts.
|
||||
func SetupAmContext() func() {
|
||||
amContextRecycleBin = make(chan *amContext, 16)
|
||||
amContextRecycleBin = make(chan *amContext, config.GlobalConfig.Tuning.Queues.ContextRecycle)
|
||||
done := make(chan bool)
|
||||
go contextRecycler(amContextRecycleBin, done)
|
||||
return func() {
|
||||
|
||||
Reference in New Issue
Block a user