new interface/worker to accelerate csv parsing. define the struct, write a mapping function, and baby -- you got a stew goin'.

master
dtookey 3 years ago
parent b6d9f3e9fd
commit 11d3ca3e6d

@ -33,12 +33,12 @@ type ConnectorGeneric struct {
cachedConnection *sql.DB
}
type sqlScriptRunner struct {
type SqlScriptRunner struct {
ScriptName string
DatabaseName string
}
func (c *ConnectorGeneric) ExecuteSqlScript(runner *sqlScriptRunner) {
func (c *ConnectorGeneric) ExecuteSqlScript(runner *SqlScriptRunner) {
c.startConnection(runner.DatabaseName)
defer c.returnConnection()
queryWhole := *loadSqlFile(runner.ScriptName)
@ -102,7 +102,7 @@ func (c *ConnectorGeneric) QueryFromScript(scriptName string) *sql.Rows {
}
func (c *ConnectorGeneric) ProcessClarityScripts() { //@dream standardize these script names
tableCreationRunners := []*sqlScriptRunner{
tableCreationRunners := []*SqlScriptRunner{
NewRunner("0-run-first/1-sanitize_init.sql", ClarityDatabaseName),
NewRunner("0-run-first/all_projects.sql", ClarityDatabaseName),
NewRunner("0-run-first/billing.sql", ClarityDatabaseName),
@ -117,7 +117,7 @@ func (c *ConnectorGeneric) ProcessClarityScripts() { //@dream standardize these
}
func (c *ConnectorGeneric) CreateTables() {
tableCreationRunners := []*sqlScriptRunner{
tableCreationRunners := []*SqlScriptRunner{
NewRunner("create-any-database.sql", ""),
NewRunner("create-insight-user-table.sql", InsightDatabaseName),
NewRunner("create-insight-timeEntry-table.sql", InsightDatabaseName),
@ -144,7 +144,7 @@ func createDbConnection(database string) *sql.DB {
host := os.Getenv(dbCredsHostName)
dbString := dsnTemplate
connectString := fmt.Sprintf(dbString, cred, host, database)
fmt.Printf("Beginning connection to database: %s\n", connectString)
fmt.Printf("Beginning connection to database\n")
db, err := sql.Open("mysql", connectString)
if err != nil {
@ -208,8 +208,8 @@ func BlockUpdate[K Blocker](connector *ConnectorGeneric, dbName string, updateSc
connector.ExecuteString(dbName, query)
}
func NewRunner(scriptName string, databaseName string) *sqlScriptRunner {
return &sqlScriptRunner{scriptName, databaseName}
func NewRunner(scriptName string, databaseName string) *SqlScriptRunner {
return &SqlScriptRunner{scriptName, databaseName}
}
//</editor-fold>

@ -1,5 +0,0 @@
package drive
func Test() {
}

@ -289,9 +289,9 @@ func (t *TrialBalanceLine) toRow() []string {
return []string{t.AccountName, strconv.FormatFloat(t.Amount, 'f', 2, 64), t.Period, t.AccountType}
}
//<editor-fold name="util">
//<editor-fold name="mercuryUtil">
/*======================================================================================
util
mercuryUtil
======================================================================================*/
func convertDateToSqlDate(datelike string) string {

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"mercury/src/db"
"mercury/src/mercuryUtil"
"os"
"path"
"regexp"
@ -21,6 +22,13 @@ type (
ReportLines *[]*HourReportLineLegacy
}
EmployeeDirectory struct {
FilePath string
Records *[][]string
SkipFirstRow bool
DirectoryLines *[]*DirectoryReportLine
}
HourReportLoadTask struct {
Records *[][]string
Err error
@ -73,8 +81,32 @@ type (
var namePattern = regexp.MustCompile("^\\d{1,2}.\\d{1,2}.\\d{2}-(\\d{1,2}.\\d{1,2}.\\d{2}).*csv$")
func NewDirectoryReport(pathlike string, skipFirstRow bool) *EmployeeDirectory {
report := EmployeeDirectory{
FilePath: pathlike,
Records: nil,
SkipFirstRow: skipFirstRow,
}
asyncChan := make(chan *HourReportLoadTask)
go loadTimeSheet(report.FilePath, asyncChan)
recordStatus := <-asyncChan
if recordStatus.Err != nil {
fmt.Printf("Error in the following file: %s\n", report.FilePath)
panic(recordStatus.Err)
return nil
}
report.Records = recordStatus.Records
report.DirectoryLines = processDirectoryToLines(report)
return &report
}
func loadReports(pathlikeBase string) *[]HourReport {
files, err := getAllFilesInDir(pathlikeBase)
files, err := mercuryUtil.GetAllFilesInDir(pathlikeBase)
reports := make([]HourReport, 0, 300)
if err != nil {
@ -105,6 +137,18 @@ func UpdateTimesheetReport(pathlike string) {
connector.ExecuteSqlScript(tablePrune)
}
func UpdateEmployeeDirectory(pathlike string) {
directory := NewDirectoryReport(pathlike, false)
connector := &db.ConnectorGeneric{}
tableWipe := db.NewRunner("create-mercury-hrDirectory-table.sql", db.MercuryDatabaseName)
connector.ExecuteSqlScript(tableWipe)
log.Printf("Updating database\n")
db.BlockUpdate[DirectoryReportLine](connector, db.MercuryDatabaseName, "update-mercury-hrDirectory.sql", directory.DirectoryLines)
log.Printf("Updates finished.\n")
}
func NewHourReport(pathlike string, skipFirstRow bool) *HourReport {
report := HourReport{
FilePath: pathlike,
@ -149,6 +193,25 @@ func processReportToLines(report HourReport) *[]*HourReportLineLegacy {
return &lines
}
func processDirectoryToLines(report EmployeeDirectory) *[]*DirectoryReportLine {
lines := make([]*DirectoryReportLine, 0, 250)
localTable := *report.Records
headersRaw := (localTable)[0]
headers := make([]string, len(headersRaw), len(headersRaw))
for i, v := range headersRaw {
key := strings.Trim(v, " \t\uFEFF")
headers[i] = key
}
for i := 1; i < len(localTable); i++ {
row := localTable[i]
line := newDirectoryReportLine(headers, row)
lines = append(lines, &line)
}
return &lines
}
func fileNameToSQLDate(fileName string) string {
name := path.Base(fileName)
parts := strings.Split(name, "_")
@ -176,14 +239,17 @@ func newDirectoryReportLine(headers []string, row []string) DirectoryReportLine
case "EEId":
v, err := strconv.Atoi(strVal)
if err != nil {
v = 0
v = -1
}
line.EEId = v
case "Department Name":
line.DepartmentName = strVal
case "Manager":
line.Manager = strVal
default:
panic("could not find matching struct analogue for " + header)
}
}
return line
@ -361,8 +427,21 @@ func (line HourReportLineLegacy) ToQueryBlock() string {
)
}
func (line DirectoryReportLine) ToQueryBlock() string {
return fmt.Sprintf(
"('%s','%s','%s','%d','%s','%s')",
line.Paygroup,
line.LName,
line.FName,
line.EEId,
line.DepartmentName,
line.Manager,
)
}
//deprecated
func rename(report HourReport) {
outPathBase := "/home/dtookey/work/clarity-reporting/pcorrect"
fileName := path.Base(report.FilePath)
if namePattern.MatchString(fileName) {
idx := namePattern.FindAllStringSubmatch(fileName, -1)
@ -383,9 +462,10 @@ func rename(report HourReport) {
full := fmt.Sprintf("20%02d%02d%02d", year, month, date)
tStamp := time.Date(year, getMonth(month), date, 10, 0, 0, 0, time.UTC)
_, week := tStamp.ISOWeek()
fileName := fmt.Sprintf("%s_Paycor_W%d.csv", full, week)
err = copyFile(report.FilePath, fileName)
fileName := fmt.Sprintf("%s_Paycor_W%d.csv", full, week)
outPath := path.Join(outPathBase, fileName)
err = mercuryUtil.CopyFile(report.FilePath, outPath)
if err != nil {
panic(err)
}
@ -424,34 +504,3 @@ func getMonth(month int) time.Month {
return time.January
}
}
func getAllFilesInDir(pathlikeBase string) (*[]string, error) {
listing, err := os.ReadDir(pathlikeBase)
res := make([]string, 0, 300)
if err != nil {
return nil, err
}
for _, list := range listing {
if list.IsDir() || path.Ext(list.Name()) != ".csv" {
fmt.Printf("Skipping: %s\n", list.Name())
continue
} else {
res = append(res, path.Join(pathlikeBase, list.Name()))
}
}
return &res, nil
}
func copyFile(inPath string, outpath string) error {
outPathBase := "/home/dtookey/work/clarity-reporting/pcorrect"
outFinal := path.Join(outPathBase, outpath)
b, err := os.ReadFile(inPath)
if err != nil {
return err
}
err = os.WriteFile(outFinal, b, 0755)
if err != nil {
return err
}
return nil
}

@ -7,6 +7,7 @@ import (
"mercury/src/hr"
"mercury/src/mercury"
"os"
"path"
"time"
)
@ -19,7 +20,8 @@ func main() {
// regular run
//updateInsightData()
updateHR()
updateTimesheets()
f := time.Now()
log.Println(f.Sub(s).Milliseconds())
@ -33,7 +35,8 @@ func updateTelecom() {
icx.UpdateVerizonReports()
}
func updateHR() {
func updateTimesheets() {
hr.UpdateEmployeeDirectory(path.Join("/home/dtookey/work/clarity-reporting/paycor_dir", "EmployeeRoster.csv"))
hr.UpdateTimesheetReport("/home/dtookey/work/clarity-reporting/paycor")
}

@ -0,0 +1,62 @@
package mercuryUtil
import "strconv"
type (
CsvConvertable interface {
Set(header string, content string) error
}
CsvWorker[K CsvConvertable] struct {
Filepath string
Artifacts *[]K
headerInFirstRow bool
MakeNew func() K
}
)
func NewCsvWorker[K CsvConvertable](pathlike string, container []K, factory func() K, headerInFirstRow bool) (*CsvWorker[K], error) {
worker := CsvWorker[K]{Filepath: pathlike, headerInFirstRow: headerInFirstRow}
worker.MakeNew = factory
worker.Artifacts = &container
err := worker.process()
if err != nil {
return nil, err
}
return &worker, nil
}
func (w *CsvWorker[K]) process() error {
data := LoadCsv(w.Filepath)
var headers []string
var startingIdx int
//intialize headers. if we don't get strings, we'll generate a set of strings numbered 0..len(data[0])
if w.headerInFirstRow {
headers = (*data)[0]
startingIdx = 1
} else {
l := len((*data)[0])
headers = make([]string, l, l)
for i := 0; i < l; i++ {
headers[i] = strconv.Itoa(i)
}
startingIdx = 0
}
for rowIdx, row := range *data {
if rowIdx < startingIdx {
continue
}
obj := w.MakeNew()
for i, cell := range row {
header := headers[i]
err := obj.Set(header, cell)
if err != nil {
return err
}
}
*w.Artifacts = append(*w.Artifacts, obj)
}
return nil
}

@ -0,0 +1,117 @@
package mercuryUtil
import (
"bytes"
"fmt"
)
type (
MigrationRule struct {
ColumnName string
MappingFunction func(string) string
}
MigrationWorker struct {
FirstRowIsHeader bool
ubiRules []MigrationRule //ubiquitous rules
headerRules []MigrationRule
rules []MigrationRule
}
)
func NewMigrationWorker() *MigrationWorker {
rules := make([]MigrationRule, 0, 100)
headerRules := make([]MigrationRule, 0, 100)
worker := MigrationWorker{FirstRowIsHeader: false, headerRules: headerRules, rules: rules}
//this default rule will remove the web-based zero-width nonblocking space character from everything
ufeffRule := MigrationRule{ColumnName: "*", MappingFunction: func(s string) string {
var blank []byte
bbuff := []byte(s)
bbuff = bytes.ReplaceAll(bbuff, []byte("\ufeff"), blank)
//I have no idea what 0xef, 0xbb, 0xbf is, but it's fucking things up
bbuff = bytes.ReplaceAll(bbuff, []byte{0xef, 0xbb, 0xbf}, blank)
return string(bbuff)
}}
worker.ubiRules = append(worker.ubiRules, ufeffRule)
worker.AddHeaderRule(&ufeffRule)
return &worker
}
func MigrateCSVs(sourceDir string, targetDir string) error {
worker := NewMigrationWorker()
worker.AddHeaderRule(
&MigrationRule{
"Badge #",
func(s string) string { return "EEID" },
},
)
files, err := GetAllFilesInDir(sourceDir)
if err != nil {
panic(err)
}
for _, file := range *files {
table := LoadCsv(file)
headerRow := (*table)[0]
headers := make([]string, len(headerRow), len(headerRow))
for i, header := range headerRow {
prepassRules := worker.ubiRules
v := header
for _, rule := range prepassRules {
v = rule.MappingFunction(v)
}
formattingRules := worker.getHeaderRulesByHeader(v)
for _, rule := range *formattingRules {
v = rule.MappingFunction(v)
}
headers[i] = v
}
fmt.Printf("%s\t%#v\n", file, headers)
}
return nil
}
func (w *MigrationWorker) AddRule(m *MigrationRule) {
w.rules = append(w.rules, *m)
}
func (w *MigrationWorker) AddHeaderRule(m *MigrationRule) {
w.headerRules = append(w.headerRules, *m)
}
func (w *MigrationWorker) getRulesByHeader(header string) *[]*MigrationRule {
ret := make([]*MigrationRule, 0, 10)
for _, rule := range w.rules {
if rule.ColumnName == "*" || rule.ColumnName == header {
ret = append(ret, &rule)
}
}
return &ret
}
func (w *MigrationWorker) getHeaderRulesByHeader(header string) *[]*MigrationRule {
ret := make([]*MigrationRule, 0, 10)
for _, rule := range w.headerRules {
if rule.ColumnName == "*" || rule.ColumnName == header {
lRule := rule
ret = append(ret, &lRule)
}
}
return &ret
}
func (w *MigrationWorker) ProcessRecords(records *[][]string) {
}

@ -0,0 +1,54 @@
package mercuryUtil
import (
"encoding/csv"
"fmt"
"os"
"path"
)
func GetAllFilesInDir(pathlikeBase string) (*[]string, error) {
listing, err := os.ReadDir(pathlikeBase)
res := make([]string, 0, 300)
if err != nil {
return nil, err
}
for _, list := range listing {
if list.IsDir() || path.Ext(list.Name()) != ".csv" {
fmt.Printf("Skipping: %s\n", list.Name())
continue
} else {
res = append(res, path.Join(pathlikeBase, list.Name()))
}
}
return &res, nil
}
func CopyFile(inPath string, outpath string) error {
b, err := os.ReadFile(inPath)
if err != nil {
return err
}
err = os.WriteFile(outpath, b, 0755)
if err != nil {
return err
}
return nil
}
func LoadCsv(pathlike string) *[][]string {
f, err := os.OpenFile(pathlike, os.O_RDONLY, 0755)
if err != nil {
panic(err)
}
defer f.Close()
reader := csv.NewReader(f)
records, err := reader.ReadAll()
if err != nil {
panic(err)
}
return &records
}

@ -0,0 +1,13 @@
DROP TABLE IF EXISTS mercury.hr_timesheet_directory;
CREATE TABLE mercury.hr_timesheet_directory
(
PayGroup VARCHAR(150),
LName VARCHAR(150),
FName VARCHAR(150),
EEId INT,
HomeDept VARCHAR(150),
ManagerName VARCHAR(150)
);

@ -0,0 +1,2 @@
INSERT INTO mercury.hr_timesheet_directory (PayGroup, LName, FName, EEId, HomeDept, ManagerName)
VALUES %s;

@ -1,9 +1,76 @@
package main
import "mercury/src/hr"
import (
"errors"
"fmt"
"mercury/src/mercuryUtil"
"strconv"
)
func main() {
reportBase := "/home/dtookey/work/clarity-reporting/paycor/"
hr.LoadReports(reportBase)
test()
}
type CCStatement struct {
TxnDate string
PostedDate string
CardNo int
Description string
Category string
Debit float64
Account string
Department string
Notes string
}
func (c *CCStatement) Set(header string, content string) error {
switch header {
case "Transaction Date":
c.TxnDate = content
case "Posted Date":
c.PostedDate = content
case "Card No.":
i, err := strconv.Atoi(content)
if err != nil {
c.CardNo = -1
return err
}
c.CardNo = i
case "Description":
c.Description = content
case "Category":
c.Category = content
case "Debit":
f, err := strconv.ParseFloat(content, 64)
if err != nil {
return err
}
c.Debit = f
case "Account":
c.Account = content
case "Department":
c.Department = content
case "Notes":
c.Notes = content
default:
return errors.New("could not find header: '" + header + "'")
}
return nil
}
func test() {
path := "/home/dtookey/work/dde-expense/Credit Card coding 0122 Tookey.csv"
artifacts := make([]*CCStatement, 0, 5000)
worker, err := mercuryUtil.NewCsvWorker[*CCStatement](
path,
artifacts,
func() *CCStatement { return &CCStatement{} },
true,
)
if err != nil {
panic(err)
}
for _, val := range *worker.Artifacts {
fmt.Printf("%#v\n", val)
}
}

Loading…
Cancel
Save