cleanups to startup code and goroutine code

This commit is contained in:
2026-05-06 22:19:08 -06:00
parent 08a10a55dd
commit a2c2a1f750
9 changed files with 92 additions and 88 deletions
+10 -7
View File
@@ -29,9 +29,9 @@ import (
// Error classifications // Error classifications
const ( const (
classUnspecified = 0 classUnspecified = iota // unspecified, barf
classNeedInstall = 1 classNeedInstall // need to install the database
classNeedConvert = 2 classNeedConvert // need to convert a Venice database
) )
// MySQL Errors // MySQL Errors
@@ -210,11 +210,11 @@ func prepareDB() (string, error) {
} }
// SetupDb sets up the database and associated items. // SetupDb sets up the database and associated items.
func SetupDb() (func(), error) { func SetupDb() (string, func(), error) {
exitfns := make([]func(), 0, 2) exitfns := make([]func(), 0, 2)
version, err := prepareDB() version, err := prepareDB()
if err != nil { if err != nil {
return nil, err return "X", nil, err
} }
db, err := sqlx.Connect(config.GlobalComputedConfig.DatabaseDriver, buildMysqlDSN(false)) db, err := sqlx.Connect(config.GlobalComputedConfig.DatabaseDriver, buildMysqlDSN(false))
if err == nil { if err == nil {
@@ -223,6 +223,7 @@ func SetupDb() (func(), error) {
if err == nil { if err == nil {
if g.Version != version { if g.Version != version {
log.Warnf("!! database version %s does not match prepared version %s", g.Version, version) log.Warnf("!! database version %s does not match prepared version %s", g.Version, version)
version = g.Version
} }
setupAdCache() setupAdCache()
setupUserCache() setupUserCache()
@@ -232,11 +233,11 @@ func SetupDb() (func(), error) {
setupConferenceCache() setupConferenceCache()
exitfns = append(exitfns, setupAuditWriter()) exitfns = append(exitfns, setupAuditWriter())
exitfns = append(exitfns, setupIPBanSweep()) exitfns = append(exitfns, setupIPBanSweep())
log.Infof("SetupDb(): database version %s", g.Version) log.Infof("SetupDb(): database version %s", version)
} }
} }
slices.Reverse(exitfns) slices.Reverse(exitfns)
return func() { return version, func() {
for _, f := range exitfns { for _, f := range exitfns {
f() f()
} }
@@ -262,6 +263,8 @@ func transaction(ctx context.Context) (*sqlx.Tx, func() error, func()) {
err = tx.Commit() err = tx.Commit()
if err == nil { if err == nil {
live = false live = false
} else {
log.Errorf("***COMMIT ERROR*** %v", err)
} }
} }
return err return err
+3
View File
@@ -234,6 +234,9 @@ func SetupMailSender() func() {
emailRenderer.AddGlobal("AmsterdamVersion", config.AMSTERDAM_VERSION) emailRenderer.AddGlobal("AmsterdamVersion", config.AMSTERDAM_VERSION)
emailRenderer.AddGlobal("AmsterdamCopyright", config.AMSTERDAM_COPYRIGHT) emailRenderer.AddGlobal("AmsterdamCopyright", config.AMSTERDAM_COPYRIGHT)
emailRenderer.AddGlobal("GlobalConfig", config.GlobalConfig) emailRenderer.AddGlobal("GlobalConfig", config.GlobalConfig)
emailRenderer.AddGlobal("PLSCOPE_COMMUNITY", database.PLSCOPE_COMMUNITY)
emailRenderer.AddGlobal("PLSCOPE_CONFERENCE", database.PLSCOPE_CONFERENCE)
emailRenderer.AddGlobal("PLSCOPE_TOPIC", database.PLSCOPE_TOPIC)
// Start the recycler. // Start the recycler.
messageRecycleBin = make(chan *amMessage, config.GlobalConfig.Tuning.Queues.EmailRecycle) messageRecycleBin = make(chan *amMessage, config.GlobalConfig.Tuning.Queues.EmailRecycle)
+6 -10
View File
@@ -1,6 +1,6 @@
/* /*
* Amsterdam Web Communities System * Amsterdam Web Communities System
* Copyright (c) 2025 Erbosoft Metaverse Design Solutions, All Rights Reserved * Copyright (c) 2025-2026 Erbosoft Metaverse Design Solutions, All Rights Reserved
* *
* This Source Code Form is subject to the terms of the Mozilla Public * This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this * License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -44,17 +44,17 @@ func (d *TrieDictionary) Size() int {
// CheckWord returns true if a word is in the dictionary, false if not. // CheckWord returns true if a word is in the dictionary, false if not.
func (d *TrieDictionary) CheckWord(word string) bool { func (d *TrieDictionary) CheckWord(word string) bool {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock()
_, rc := d.trie.Find(strings.ToLower(word)) _, rc := d.trie.Find(strings.ToLower(word))
d.mutex.Unlock()
return rc return rc
} }
// AddWord adds a new word to the dictionary. // AddWord adds a new word to the dictionary.
func (d *TrieDictionary) AddWord(word string) { func (d *TrieDictionary) AddWord(word string) {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock()
d.trie.Add(strings.ToLower(word), true) d.trie.Add(strings.ToLower(word), true)
d.count++ d.count++
d.mutex.Unlock()
} }
// DelWord deletes a word from the dictionary. // DelWord deletes a word from the dictionary.
@@ -89,12 +89,8 @@ func loadDict(d *TrieDictionary, words []byte) {
// LoadTrieDict creates a TrieDictionary from a byte array that represents a word list (one word per line). // LoadTrieDict creates a TrieDictionary from a byte array that represents a word list (one word per line).
func LoadTrieDict(words []byte) *TrieDictionary { func LoadTrieDict(words []byte) *TrieDictionary {
rc := TrieDictionary{ rc := new(TrieDictionary{loaded: atomic.Bool{}, trie: trie.New(), count: 0})
loaded: atomic.Bool{},
trie: trie.New(),
count: 0,
}
rc.loaded.Store(false) rc.loaded.Store(false)
go loadDict(&rc, words) go loadDict(rc, words)
return &rc return rc
} }
+4 -6
View File
@@ -55,10 +55,8 @@ func SetupDicts() {
log.Errorf("failed to load external dictionary %s: %v", config.GlobalConfig.Posting.ExternalDictionary, err) log.Errorf("failed to load external dictionary %s: %v", config.GlobalConfig.Posting.ExternalDictionary, err)
} }
} }
rw := spellingRewriter{ rw := new(spellingRewriter{dict: NewCompositeDict(dicts)})
dict: NewCompositeDict(dicts), rewriterRegistry[rw.Name()] = rw
}
rewriterRegistry[rw.Name()] = &rw
} }
// spellingRewriter is a rewriter that flags spelling errors. // spellingRewriter is a rewriter that flags spelling errors.
@@ -89,10 +87,10 @@ func (rw *spellingRewriter) Rewrite(ctx context.Context, data string, svc rewrit
if rw.dict.CheckWord(data) { if rw.dict.CheckWord(data) {
return nil return nil
} }
return &markupData{ return new(markupData{
beginMarkup: defaultBeginError, beginMarkup: defaultBeginError,
text: data, text: data,
endMarkup: defaultEndError, endMarkup: defaultEndError,
rescan: false, rescan: false,
} })
} }
+21 -15
View File
@@ -31,6 +31,12 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// DEFAULT_MAXLOG is the default maximum log file size (16 megabytes).
const DEFAULT_MAXLOG = 16 * 1024 * 1024
// LOG_ROTATE_INTERVAL is the interval, in seconds, at which we try to rotate the logfile.
const LOG_ROTATE_INTERVAL = 10
/*---------------------------------------------------------------------------- /*----------------------------------------------------------------------------
* slog handler that outputs to Logrus * slog handler that outputs to Logrus
*---------------------------------------------------------------------------- *----------------------------------------------------------------------------
@@ -52,11 +58,7 @@ type SlogLogrusHandler struct {
// NewSlogLogrusHandler creates a SlogLogrusHandler with base information. // NewSlogLogrusHandler creates a SlogLogrusHandler with base information.
func NewSlogLogrusHandler() *SlogLogrusHandler { func NewSlogLogrusHandler() *SlogLogrusHandler {
rc := new(SlogLogrusHandler{ return new(SlogLogrusHandler{fields: make(log.Fields), groupPrefix: ""})
fields: make(log.Fields),
groupPrefix: "",
})
return rc
} }
// Enabled returns true if the specified log level is handled. // Enabled returns true if the specified log level is handled.
@@ -81,20 +83,18 @@ func (h *SlogLogrusHandler) Handle(ctx context.Context, r slog.Record) error {
// WithAttrs creates a new Handler from this one, with extra attributes. // WithAttrs creates a new Handler from this one, with extra attributes.
func (h *SlogLogrusHandler) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *SlogLogrusHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
newh := new(SlogLogrusHandler{fields: make(log.Fields)}) newh := new(SlogLogrusHandler{fields: make(log.Fields), groupPrefix: h.groupPrefix})
maps.Copy(newh.fields, h.fields) maps.Copy(newh.fields, h.fields)
for _, a := range attrs { for _, a := range attrs {
newh.fields[a.Key] = a.Value.Any() newh.fields[a.Key] = a.Value.Any()
} }
newh.groupPrefix = h.groupPrefix
return newh return newh
} }
// WithGroup creates a new Handler from this one, with an extra group prefix. // WithGroup creates a new Handler from this one, with an extra group prefix.
func (h *SlogLogrusHandler) WithGroup(name string) slog.Handler { func (h *SlogLogrusHandler) WithGroup(name string) slog.Handler {
newh := new(SlogLogrusHandler{fields: make(log.Fields)}) newh := new(SlogLogrusHandler{fields: make(log.Fields), groupPrefix: h.groupPrefix + name + "."})
maps.Copy(newh.fields, h.fields) maps.Copy(newh.fields, h.fields)
newh.groupPrefix = h.groupPrefix + name + "."
return newh return newh
} }
@@ -160,6 +160,7 @@ func (lf *amLogFile) Close() error {
} }
// rotate closes the log file and moves it to a new name, shuffling the previously stored log files by the same amount. // rotate closes the log file and moves it to a new name, shuffling the previously stored log files by the same amount.
// N.B.: We must be holding lf.mutex.
func (lf *amLogFile) rotate() error { func (lf *amLogFile) rotate() error {
if lf.keep == 0 && lf.keepCompressed == 0 { if lf.keep == 0 && lf.keepCompressed == 0 {
return nil // degenerate case, keep the log file the same return nil // degenerate case, keep the log file the same
@@ -262,7 +263,9 @@ func (lf *amLogFile) tryRotate() {
if lf.curSize >= lf.maxSize { if lf.curSize >= lf.maxSize {
err := lf.rotate() err := lf.rotate()
if err != nil { if err != nil {
//log.Error("log rotation failed") log.SetOutput(os.Stderr)
log.Errorf("log rotation failed: %v", err)
log.SetOutput(lf)
} }
} }
lf.mutex.Unlock() lf.mutex.Unlock()
@@ -302,8 +305,7 @@ func (lf *amLogFile) open(path string) error {
// logScanner is a goroutine that monitors the log file to see when it needs rotating. // logScanner is a goroutine that monitors the log file to see when it needs rotating.
func logScanner(ctx context.Context, lf *amLogFile, done chan bool) { func logScanner(ctx context.Context, lf *amLogFile, done chan bool) {
d, _ := time.ParseDuration("10s") t := time.NewTicker(LOG_ROTATE_INTERVAL * time.Second)
t := time.NewTicker(d)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -319,8 +321,10 @@ func logScanner(ctx context.Context, lf *amLogFile, done chan bool) {
// SetupLogging sets up the log file based on the configuration data. // SetupLogging sets up the log file based on the configuration data.
func SetupLogging() func() { func SetupLogging() func() {
loglevel, err := log.ParseLevel(config.GlobalComputedConfig.LogLevel) loglevel, err := log.ParseLevel(config.GlobalComputedConfig.LogLevel)
if err != nil { if err == nil {
loglevel = log.ErrorLevel loglevel = log.ErrorLevel
} else {
log.Errorf("default log level not valid: %s (%v)", config.GlobalComputedConfig.LogLevel, err)
} }
if config.GlobalComputedConfig.DebugMode && loglevel != log.TraceLevel { if config.GlobalComputedConfig.DebugMode && loglevel != log.TraceLevel {
loglevel = log.DebugLevel loglevel = log.DebugLevel
@@ -333,7 +337,8 @@ func SetupLogging() func() {
amlog := new(amLogFile) amlog := new(amLogFile)
maxlog, err := humanize.ParseBytes(config.GlobalConfig.Logging.MaxLogSize) maxlog, err := humanize.ParseBytes(config.GlobalConfig.Logging.MaxLogSize)
if err != nil { if err != nil {
maxlog = 16 * 1024 * 1024 // default to 16 megabytes log.Errorf("invalid value for max log size: %s (%v)", config.GlobalConfig.Logging.MaxLogSize, err)
maxlog = DEFAULT_MAXLOG
} }
amlog.maxSize = int64(maxlog) amlog.maxSize = int64(maxlog)
amlog.keep = config.GlobalConfig.Logging.KeepLogFiles amlog.keep = config.GlobalConfig.Logging.KeepLogFiles
@@ -344,13 +349,14 @@ func SetupLogging() func() {
ctx, cancelfunc = context.WithCancel(context.Background()) ctx, cancelfunc = context.WithCancel(context.Background())
done = make(chan bool) done = make(chan bool)
go logScanner(ctx, amlog, done) go logScanner(ctx, amlog, done)
} else {
log.Errorf("**** failed to open amlog: %v - logs will go to stdout", err)
} }
} }
if logfile == nil { if logfile == nil {
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
} else { } else {
log.SetOutput(logfile) log.SetOutput(logfile)
} }
log.SetLevel(loglevel) log.SetLevel(loglevel)
+22 -9
View File
@@ -20,6 +20,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"log/slog" "log/slog"
"net"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
@@ -40,9 +41,23 @@ import (
// READ_HEADER_TIMEOUT is the timeout value for reading headers in seconds. (Deliberately NOT configurable because this is a security issue) // READ_HEADER_TIMEOUT is the timeout value for reading headers in seconds. (Deliberately NOT configurable because this is a security issue)
const READ_HEADER_TIMEOUT = 2 const READ_HEADER_TIMEOUT = 2
// GRACEFUL_SHUTDOWN_TIMEOUT is the timeout value for a graceful shutdown.
const GRACEFUL_SHUTDOWN_TIMEOUT = 10 * time.Second
// GetAndPost is used to have functions that respond to both GET and POST on a URI. // GetAndPost is used to have functions that respond to both GET and POST on a URI.
var GetAndPost = []string{http.MethodGet, http.MethodPost} var GetAndPost = []string{http.MethodGet, http.MethodPost}
// myIPAddress returns the IP address of this computer.
func myIPAddress() net.IP {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
panic(err)
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP
}
// setupEcho creates, configures, and returns a new Echo instance. // setupEcho creates, configures, and returns a new Echo instance.
func setupEcho() *echo.Echo { func setupEcho() *echo.Echo {
e := echo.New() e := echo.New()
@@ -225,11 +240,15 @@ var SystemStartTime time.Time
// main is Ye Olde Main Function. // main is Ye Olde Main Function.
func main() { func main() {
SystemStartTime = time.Now() SystemStartTime = time.Now()
// Determine my IP address.
myIP := myIPAddress()
// Configure the system. // Configure the system.
config.SetupConfig() config.SetupConfig()
closer := SetupLogging() closer := SetupLogging()
defer closer() defer closer()
closer, err := database.SetupDb() dbVersion, closer, err := database.SetupDb()
if err != nil { if err != nil {
panic(fmt.Sprintf("Database open failure: %v", err)) panic(fmt.Sprintf("Database open failure: %v", err))
} }
@@ -240,12 +259,6 @@ func main() {
closer = ui.SetupUILayer() closer = ui.SetupUILayer()
defer closer() defer closer()
// Determine my IP address and the admin user.
myIP, err := util.MyIPAddress()
if err != nil {
panic(err)
}
// Set up to trap SIGINT/SIGTERM and shut down gracefully // Set up to trap SIGINT/SIGTERM and shut down gracefully
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop() defer stop()
@@ -262,7 +275,7 @@ func main() {
// Audit the startup // Audit the startup
database.AmStoreAudit(database.AmNewAudit(database.AuditStartup, 0, myIP.String(), database.AmStoreAudit(database.AmNewAudit(database.AuditStartup, 0, myIP.String(),
fmt.Sprintf("version=%s", config.AMSTERDAM_VERSION))) fmt.Sprintf("version=%s", config.AMSTERDAM_VERSION), fmt.Sprintf("database=%s", dbVersion)))
defer func() { defer func() {
// Audit the shutdown // Audit the shutdown
database.AmStoreAudit(database.AmNewAudit(database.AuditShutdown, 0, myIP.String())) database.AmStoreAudit(database.AmNewAudit(database.AuditShutdown, 0, myIP.String()))
@@ -273,7 +286,7 @@ func main() {
Address: config.GlobalComputedConfig.Listen, Address: config.GlobalComputedConfig.Listen,
HideBanner: true, HideBanner: true,
HidePort: true, HidePort: true,
GracefulTimeout: 10 * time.Second, GracefulTimeout: GRACEFUL_SHUTDOWN_TIMEOUT,
OnShutdownError: func(err error) { OnShutdownError: func(err error) {
log.Fatalf("error in shutting down the server: %v", err) log.Fatalf("error in shutting down the server: %v", err)
}, },
+20 -23
View File
@@ -18,7 +18,6 @@ import (
"net/http" "net/http"
"slices" "slices"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.erbosoft.com/amy/amsterdam/config" "git.erbosoft.com/amy/amsterdam/config"
@@ -33,6 +32,12 @@ import (
be timed out as well as used to show the logged-in users. This is similar to the session support provided in J2EE servlets. be timed out as well as used to show the logged-in users. This is similar to the session support provided in J2EE servlets.
*/ */
// DEFAULT_SESSION_EXPIRE is the default time in which sessions will expire.
const DEFAULT_SESSION_EXPIRE = 1 * time.Hour
// The interval at which all sessions will be swept.
const SESSION_STORE_SWEEP_INTERVAL = 2 * time.Minute
// AmSessionOptions gives the options for the session. // AmSessionOptions gives the options for the session.
type AmSessionOptions struct { type AmSessionOptions struct {
Path string Path string
@@ -248,7 +253,6 @@ type amSessionStore struct {
sessions map[string]*amSession sessions map[string]*amSession
maxEntries int maxEntries int
expiry time.Duration expiry time.Duration
sweepRunning atomic.Bool
} }
// createAmSessionStore creates the session store. // createAmSessionStore creates the session store.
@@ -258,7 +262,6 @@ func createAmSessionStore(exp time.Duration) *amSessionStore {
maxEntries: 0, maxEntries: 0,
expiry: exp, expiry: exp,
} }
rc.sweepRunning.Store(true)
return rc return rc
} }
@@ -339,9 +342,15 @@ func (st *amSessionStore) SessionInfo() (int, []string, int) {
* tick - Channel that "pulses" periodically to run the task. * tick - Channel that "pulses" periodically to run the task.
* done - Channel we write to when we're done. * done - Channel we write to when we're done.
*/ */
func (st *amSessionStore) sweep(tick <-chan time.Time, done chan bool) { func (st *amSessionStore) sweep(ctx context.Context, done chan bool) {
for range tick { tkr := time.NewTicker(SESSION_STORE_SWEEP_INTERVAL)
if st.sweepRunning.Load() { for {
select {
case <-ctx.Done():
tkr.Stop()
done <- true
return
case <-tkr.C:
// phase 1 - identify expired sessions // phase 1 - identify expired sessions
st.mutex.RLock() st.mutex.RLock()
zap := make([]string, 0, len(st.sessions)) zap := make([]string, 0, len(st.sessions))
@@ -366,11 +375,8 @@ func (st *amSessionStore) sweep(tick <-chan time.Time, done chan bool) {
} }
st.mutex.Unlock() st.mutex.Unlock()
} }
} else {
break
} }
} }
done <- true
} }
// sessionStore is the global session store. // sessionStore is the global session store.
@@ -381,30 +387,21 @@ func setupSessionManager() func() {
// get the time for the session to expire // get the time for the session to expire
d, err := time.ParseDuration(config.GlobalConfig.Site.SessionExpire) d, err := time.ParseDuration(config.GlobalConfig.Site.SessionExpire)
if err != nil { if err != nil {
d, err = time.ParseDuration("1h") log.Errorf("invalid session timeout value: %s", config.GlobalConfig.Site.SessionExpire)
if err != nil { d = DEFAULT_SESSION_EXPIRE
panic(err.Error())
}
} }
// create session store // create session store
sessionStore = createAmSessionStore(d) sessionStore = createAmSessionStore(d)
// get the clock value to run sweeps
d, err = time.ParseDuration("1s")
if err != nil {
panic(err.Error())
}
// set up the sweep runner // set up the sweep runner
tkr := time.NewTicker(d) ctx, cancel := context.WithCancel(context.Background())
done := make(chan bool) done := make(chan bool)
go sessionStore.sweep(tkr.C, done) go sessionStore.sweep(ctx, done)
return func() { return func() {
// stop the sweep runner // stop the sweep runner
sessionStore.sweepRunning.Store(false) cancel()
<-done <-done
tkr.Stop()
} }
} }
+2 -2
View File
@@ -14,6 +14,7 @@ package ui
import ( import (
"embed" "embed"
"errors"
"fmt" "fmt"
"io" "io"
"io/fs" "io/fs"
@@ -120,8 +121,7 @@ func AmLoadDialog(name string) (*Dialog, error) {
f, err = extDialogs.Open(fmt.Sprintf("%s.yaml", name)) f, err = extDialogs.Open(fmt.Sprintf("%s.yaml", name))
if err != nil { if err != nil {
f = nil f = nil
pe := err.(*fs.PathError) if errors.Is(err, os.ErrInvalid) || errors.Is(err, os.ErrNotExist) {
if pe.Err == os.ErrInvalid || pe.Err == os.ErrNotExist {
err = nil err = nil
} }
} }
-12
View File
@@ -13,7 +13,6 @@
package util package util
import ( import (
"net"
"regexp" "regexp"
"strings" "strings"
"time" "time"
@@ -172,17 +171,6 @@ func Map[A, B any](in []A, fn func(A) B) []B {
return rc return rc
} }
// MyIPAddress returns the local IP address of this machine.
func MyIPAddress() (net.IP, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return nil, err
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP, nil
}
// IIF is an "immediate-if" function returning its second argument if the first one is true, the third one if not. // IIF is an "immediate-if" function returning its second argument if the first one is true, the third one if not.
func IIF[A any](expr bool, v1, v2 A) A { func IIF[A any](expr bool, v1, v2 A) A {
if expr { if expr {