Skip to content

使用 go 查询数据

连接 mongo 数据

mongo
db.grantRolesToUser("video",[{role:"read", db:"video"},{role:"readWrite",db:"video"},{role:"dbAdmin", db:"video"},{role:"userAdmin",db:"video"},{role:"userAdminAnyDatabase",db:"video"}]);

db.createUser({user:"video",pwd:"video12#$",roles:[{role:"readWrite",db:"video"},{role:"read",db:"video"}]});
go
go get github.com/qiniu/qmgo
go
import (
	"context"
	"errors"
	"github.com/qiniu/qmgo"
	"github.com/qiniu/qmgo/options"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/event"
	mgoptions "go.mongodb.org/mongo-driver/mongo/options"
	"log"
	"sync"
)

const (
	Ip         = "localhost"
	Port       = "27017"
	UserName   = ""
	Password   = ""
	DBName     = "video"
	AuthSource = ""
)

var (
	ConnectTimeoutMS = int64(1000)
	MaxPoolSize      = uint64(100)
	MinPoolSize      = uint64(0)
)

func GetMongo() *qmgo.Database {
	ctx := context.Background()
	// 拼接MongoDB Url
	var mongoUrl string
	if Password != "" {
		mongoUrl = "mongodb://" + UserName + ":" + Password + "@" +
			Ip + ":" + Port + "/" + DBName +
			"?authSource=" + AuthSource
	} else {
		mongoUrl = "mongodb://" + Ip + ":" + Port
	}

	// 创建cmdMonitor,用于打印SQL
	//startedCommands := make(map[int64]bson.Raw)
	startedCommands := sync.Map{} // map[int64]bson.Raw
	cmdMonitor := &event.CommandMonitor{
		Started: func(_ context.Context, evt *event.CommandStartedEvent) {
			startedCommands.Store(evt.RequestID, evt.Command)
			//startedCommands[evt.RequestID] = evt.Command
		},
		Succeeded: func(_ context.Context, evt *event.CommandSucceededEvent) {
			//log.Printf("Command: %v Reply: %v\n",
			//	startedCommands[evt.RequestID],
			//	evt.Reply,
			//)
			var commands bson.Raw
			v, ok := startedCommands.Load(evt.RequestID)
			if ok {
				commands = v.(bson.Raw)
			}
			log.Printf("\n【MongoDB】[%.3fms] [%v] %v \n", float64(evt.DurationNanos)/1e6, commands, evt.Reply)
		},
		Failed: func(_ context.Context, evt *event.CommandFailedEvent) {
			//log.Printf("Command: %v Failure: %v\n",
			//	startedCommands[evt.RequestID],
			//	evt.Failure,
			//)
			var commands bson.Raw
			v, ok := startedCommands.Load(evt.RequestID)
			if ok {
				commands = v.(bson.Raw)
			}
			log.Printf("\n【MongoDB】[%.3fms] [%v] \n %v \n", float64(evt.DurationNanos)/1e6, commands, evt.Failure)
		},
	}
	// 创建options
	ops := options.ClientOptions{ClientOptions: &mgoptions.ClientOptions{}}
	ops.SetMonitor(cmdMonitor)

	// 创建一个数据库链接
	client, err := qmgo.NewClient(ctx, &qmgo.Config{
		Uri:              mongoUrl,
		ConnectTimeoutMS: &ConnectTimeoutMS,
		MaxPoolSize:      &MaxPoolSize,
		MinPoolSize:      &MinPoolSize,
	}, ops)

	if err != nil {
		err = errors.New("MongoDB连接异常:" + err.Error())
		return nil
	}
	// 选择数据库
	return client.Database(DBName)
	// 在初始化成功后,请defer来关闭连接
	defer func() {
		if err = client.Close(ctx); err != nil {
			panic(err)
		}
	}()
	return nil
}

聚合查询

mongo
db := utils.GetMongo()
	col := db.Collection("DeviceAlarm")

	lookup := bson.D{{
		"$lookup", bson.D{
			{"from", "DeviceWorkShop"},
			{"localField", "devicesn"},
			{"foreignField", "deviceId"},
			{"as", "info"},
		},
	}}
	unwind := bson.D{{"$unwind", "$info"}}
	group := bson.D{
		{
			"$group", bson.D{
				{
					"_id", "$info.workShop",
				},
				{
					"value", bson.M{"$sum": 1},
				},
			},
		},
	}
	//project := bson.D{
	//	{
	//		"$project", bson.D{{"gg", "$name"}, {"name", "$info.workShop"}},
	//	},
	//}
	project := bson.D{
		{
			"$project", bson.D{{"name", "$_id"}, {"_id", 0}, {"value", "1"}},
		},
	}
	//limit := bson.D{
	//	{
	//		"$limit", 1,
	//	},
	//}
	//groupStage := bson.D{{"$project", bson.M{"sn": 1, "product_info": 1}}}
	var results []map[string]interface{}
	col.Aggregate(context.Background(), mongo.Pipeline{lookup, unwind, group, project}).All(&results)
[
  {
    $lookup:
      /**
       * from: The target collection.
       * localField: The local join field.
       * foreignField: The target join field.
       * as: The name for the results.
       * pipeline: Optional pipeline to run on the foreign collection.
       * let: Optional variables to use in the pipeline field stages.
       */
      {
        from: "DeviceWorkShop",
        localField: "sn",
        foreignField: "deviceId",
        as: "work"
      }
  },
  {
    $unwind:
      /**
       * Provide the field name to exclude.
       * To exclude multiple fields, pass the field names in an array.
       */
      "$work"
  },
  {
    $project:
      /**
       * specifications: The fields to
       *   include or exclude.
       */
      {
        车间名称: "$work.workShop",
        检查点: "$work.monitorPoint",
        _id: 0,
        正向电量: "$content.importactiveenergy",
        反向电量: "$content.exportactiveenergy",
        上报时间: {
          $dateToString: {
            format: "%Y-%m-%d %H:%M:%S",
            date: {
              $add: [
                new Date(0),
                "$timestamp",
                28800000
              ]
            }
          }
        },
        冻结时间: {
          $dateToString: {
            format: "%Y-%m-%d %H:%M:%S",
            date: {
              $add: [
                new Date(0),
                "$content.frozenrecordtime",
                28800000
              ]
            }
          }
        }
      }
  },
  {
    $sort:
      /**
       * Provide any number of field/order pairs.
       */
      {
        车间名称: 1,
        检查点: 1
      }
  },
  {
    $match:
      /**
       * query: The query in MQL.
       */
      {
        冻结时间: "2024-09-16 00:00:00"
      }
  }
]
mongo
[
  {
    $lookup:
      /**
       * from: The target collection.
       * localField: The local join field.
       * foreignField: The target join field.
       * as: The name for the results.
       * pipeline: Optional pipeline to run on the foreign collection.
       * let: Optional variables to use in the pipeline field stages.
       */
      {
        from: "DeviceWorkShop",
        localField: "sn",
        foreignField: "deviceId",
        as: "data"
      }
  },
  {
    $unwind:
      /**
       * path: Path to the array field.
       * includeArrayIndex: Optional name for index.
       * preserveNullAndEmptyArrays: Optional
       *   toggle to unwind null and empty values.
       */
      "$data"
  },
  {
    $group:
     // $sn 分组
	 // 显示字段的获取
      {
        _id: "$sn",
        deviceid: {
          $first: "$deviceid"
        },
        workShop: {
          $first: "$data.workShop"
        },
        monitorPoint: {
          $first: "$data.monitorPoint"
        }
      }
  },
  {
    $project:
      /**
       * specifications: The fields to
       *   include or exclude.
       */
      {
        deviceid: 1,
        sn: "$_id",
        workShop: 1,
        monitorPoint: 1
      }
  },
  {
    $sort:
      /**
       * query: The query in MQL.
       */
      {
        workShop: 1,
        monitorPoint: 1
      }
  }
]
go
	mongo := utils.GetMongo()
	col := mongo.Collection("EnergyFrozen")

	lookup := bson.D{{
		"$lookup", bson.D{
			{"from", "DeviceWorkShop"},
			{"localField", "sn"},
			{"foreignField", "deviceId"},
			{"as", "info"},
		},
	}}
	unwind := bson.D{{"$unwind", "$info"}}

	fmt.Println(lookup, unwind)

	group := bson.D{
		{"$group", bson.D{
			{
				"_id", "$content.frozenrecordtime",
			},
			{
				"total", bson.M{"$sum": bson.M{"$add": bson.A{"$content.importactiveenergy", "$content.exportactiveenergy"}}},
			}, {
				"time", bson.M{"$first": "$content.frozenrecordtime"},
			},
		}},
	}

	project := bson.D{
		{"$project", bson.D{
			{"total", 1},
			{"time", bson.M{
				"$dateToString": bson.M{
					"format": "%Y-%m-%d",
					"date":   bson.M{"$add": bson.A{time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), "$time", 28800000}},
				},
			}},
			{"_id", 0},
		}},
	}
	sort := bson.D{{"$sort", bson.D{{"time", -1}}}}
	type TimeTotal struct {
		Total float64 `json:"total"`
		Time  string  `json:"time"`
	}
	var tt []TimeTotal

	err := col.Aggregate(context.Background(), bson.A{group, project, sort}).All(&tt)
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	ttt := make([]TimeTotal, 0)
	for i := 1; i < len(tt); i++ {
		diff := tt[i-1].Total - tt[i].Total
		num, _ := strconv.ParseFloat(fmt.Sprintf("%.2f", diff), 64)
		ttt = append(ttt, TimeTotal{
			Total: num,
			Time:  tt[i].Time,
		})
	}
	b, _ := json.Marshal(ttt)
	fmt.Println(string(b))

注意:

复杂的聚合分组查询,用 Aggregate 函数与 mongo.Pipeline 完成,顺序不能编号,

  • lookup 关联表
  • unwind 用于把信息展开,如果是信息是数组展开后多条数据,
  • group 分组
  • project 用于显示字段

https://www.cnblogs.com/sandea/p/10479192.html