使用 go 查询数据
连接 mongo 数据
mongo
db.grantRolesToUser("video",[{role:"read", db:"video"},{role:"readWrite",db:"video"},{role:"dbAdmin", db:"video"},{role:"userAdmin",db:"video"},{role:"userAdminAnyDatabase",db:"video"}]);
db.createUser({user:"video",pwd:"video12#$",roles:[{role:"readWrite",db:"video"},{role:"read",db:"video"}]});
go
go get github.com/qiniu/qmgo
go
import (
"context"
"errors"
"github.com/qiniu/qmgo"
"github.com/qiniu/qmgo/options"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
mgoptions "go.mongodb.org/mongo-driver/mongo/options"
"log"
"sync"
)
const (
Ip = "localhost"
Port = "27017"
UserName = ""
Password = ""
DBName = "video"
AuthSource = ""
)
var (
ConnectTimeoutMS = int64(1000)
MaxPoolSize = uint64(100)
MinPoolSize = uint64(0)
)
func GetMongo() *qmgo.Database {
ctx := context.Background()
// 拼接MongoDB Url
var mongoUrl string
if Password != "" {
mongoUrl = "mongodb://" + UserName + ":" + Password + "@" +
Ip + ":" + Port + "/" + DBName +
"?authSource=" + AuthSource
} else {
mongoUrl = "mongodb://" + Ip + ":" + Port
}
// 创建cmdMonitor,用于打印SQL
//startedCommands := make(map[int64]bson.Raw)
startedCommands := sync.Map{} // map[int64]bson.Raw
cmdMonitor := &event.CommandMonitor{
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
startedCommands.Store(evt.RequestID, evt.Command)
//startedCommands[evt.RequestID] = evt.Command
},
Succeeded: func(_ context.Context, evt *event.CommandSucceededEvent) {
//log.Printf("Command: %v Reply: %v\n",
// startedCommands[evt.RequestID],
// evt.Reply,
//)
var commands bson.Raw
v, ok := startedCommands.Load(evt.RequestID)
if ok {
commands = v.(bson.Raw)
}
log.Printf("\n【MongoDB】[%.3fms] [%v] %v \n", float64(evt.DurationNanos)/1e6, commands, evt.Reply)
},
Failed: func(_ context.Context, evt *event.CommandFailedEvent) {
//log.Printf("Command: %v Failure: %v\n",
// startedCommands[evt.RequestID],
// evt.Failure,
//)
var commands bson.Raw
v, ok := startedCommands.Load(evt.RequestID)
if ok {
commands = v.(bson.Raw)
}
log.Printf("\n【MongoDB】[%.3fms] [%v] \n %v \n", float64(evt.DurationNanos)/1e6, commands, evt.Failure)
},
}
// 创建options
ops := options.ClientOptions{ClientOptions: &mgoptions.ClientOptions{}}
ops.SetMonitor(cmdMonitor)
// 创建一个数据库链接
client, err := qmgo.NewClient(ctx, &qmgo.Config{
Uri: mongoUrl,
ConnectTimeoutMS: &ConnectTimeoutMS,
MaxPoolSize: &MaxPoolSize,
MinPoolSize: &MinPoolSize,
}, ops)
if err != nil {
err = errors.New("MongoDB连接异常:" + err.Error())
return nil
}
// 选择数据库
return client.Database(DBName)
// 在初始化成功后,请defer来关闭连接
defer func() {
if err = client.Close(ctx); err != nil {
panic(err)
}
}()
return nil
}
聚合查询
mongo
db := utils.GetMongo()
col := db.Collection("DeviceAlarm")
lookup := bson.D{{
"$lookup", bson.D{
{"from", "DeviceWorkShop"},
{"localField", "devicesn"},
{"foreignField", "deviceId"},
{"as", "info"},
},
}}
unwind := bson.D{{"$unwind", "$info"}}
group := bson.D{
{
"$group", bson.D{
{
"_id", "$info.workShop",
},
{
"value", bson.M{"$sum": 1},
},
},
},
}
//project := bson.D{
// {
// "$project", bson.D{{"gg", "$name"}, {"name", "$info.workShop"}},
// },
//}
project := bson.D{
{
"$project", bson.D{{"name", "$_id"}, {"_id", 0}, {"value", "1"}},
},
}
//limit := bson.D{
// {
// "$limit", 1,
// },
//}
//groupStage := bson.D{{"$project", bson.M{"sn": 1, "product_info": 1}}}
var results []map[string]interface{}
col.Aggregate(context.Background(), mongo.Pipeline{lookup, unwind, group, project}).All(&results)
[
{
$lookup:
/**
* from: The target collection.
* localField: The local join field.
* foreignField: The target join field.
* as: The name for the results.
* pipeline: Optional pipeline to run on the foreign collection.
* let: Optional variables to use in the pipeline field stages.
*/
{
from: "DeviceWorkShop",
localField: "sn",
foreignField: "deviceId",
as: "work"
}
},
{
$unwind:
/**
* Provide the field name to exclude.
* To exclude multiple fields, pass the field names in an array.
*/
"$work"
},
{
$project:
/**
* specifications: The fields to
* include or exclude.
*/
{
车间名称: "$work.workShop",
检查点: "$work.monitorPoint",
_id: 0,
正向电量: "$content.importactiveenergy",
反向电量: "$content.exportactiveenergy",
上报时间: {
$dateToString: {
format: "%Y-%m-%d %H:%M:%S",
date: {
$add: [
new Date(0),
"$timestamp",
28800000
]
}
}
},
冻结时间: {
$dateToString: {
format: "%Y-%m-%d %H:%M:%S",
date: {
$add: [
new Date(0),
"$content.frozenrecordtime",
28800000
]
}
}
}
}
},
{
$sort:
/**
* Provide any number of field/order pairs.
*/
{
车间名称: 1,
检查点: 1
}
},
{
$match:
/**
* query: The query in MQL.
*/
{
冻结时间: "2024-09-16 00:00:00"
}
}
]
mongo
[
{
$lookup:
/**
* from: The target collection.
* localField: The local join field.
* foreignField: The target join field.
* as: The name for the results.
* pipeline: Optional pipeline to run on the foreign collection.
* let: Optional variables to use in the pipeline field stages.
*/
{
from: "DeviceWorkShop",
localField: "sn",
foreignField: "deviceId",
as: "data"
}
},
{
$unwind:
/**
* path: Path to the array field.
* includeArrayIndex: Optional name for index.
* preserveNullAndEmptyArrays: Optional
* toggle to unwind null and empty values.
*/
"$data"
},
{
$group:
// $sn 分组
// 显示字段的获取
{
_id: "$sn",
deviceid: {
$first: "$deviceid"
},
workShop: {
$first: "$data.workShop"
},
monitorPoint: {
$first: "$data.monitorPoint"
}
}
},
{
$project:
/**
* specifications: The fields to
* include or exclude.
*/
{
deviceid: 1,
sn: "$_id",
workShop: 1,
monitorPoint: 1
}
},
{
$sort:
/**
* query: The query in MQL.
*/
{
workShop: 1,
monitorPoint: 1
}
}
]
go
mongo := utils.GetMongo()
col := mongo.Collection("EnergyFrozen")
lookup := bson.D{{
"$lookup", bson.D{
{"from", "DeviceWorkShop"},
{"localField", "sn"},
{"foreignField", "deviceId"},
{"as", "info"},
},
}}
unwind := bson.D{{"$unwind", "$info"}}
fmt.Println(lookup, unwind)
group := bson.D{
{"$group", bson.D{
{
"_id", "$content.frozenrecordtime",
},
{
"total", bson.M{"$sum": bson.M{"$add": bson.A{"$content.importactiveenergy", "$content.exportactiveenergy"}}},
}, {
"time", bson.M{"$first": "$content.frozenrecordtime"},
},
}},
}
project := bson.D{
{"$project", bson.D{
{"total", 1},
{"time", bson.M{
"$dateToString": bson.M{
"format": "%Y-%m-%d",
"date": bson.M{"$add": bson.A{time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), "$time", 28800000}},
},
}},
{"_id", 0},
}},
}
sort := bson.D{{"$sort", bson.D{{"time", -1}}}}
type TimeTotal struct {
Total float64 `json:"total"`
Time string `json:"time"`
}
var tt []TimeTotal
err := col.Aggregate(context.Background(), bson.A{group, project, sort}).All(&tt)
if err != nil {
fmt.Println(err.Error())
return
}
ttt := make([]TimeTotal, 0)
for i := 1; i < len(tt); i++ {
diff := tt[i-1].Total - tt[i].Total
num, _ := strconv.ParseFloat(fmt.Sprintf("%.2f", diff), 64)
ttt = append(ttt, TimeTotal{
Total: num,
Time: tt[i].Time,
})
}
b, _ := json.Marshal(ttt)
fmt.Println(string(b))
注意:
复杂的聚合分组查询,用 Aggregate 函数与 mongo.Pipeline 完成,顺序不能编号,
- lookup 关联表
- unwind 用于把信息展开,如果是信息是数组展开后多条数据,
- group 分组
- project 用于显示字段