加入收藏 | 设为首页 | 会员中心 | 我要投稿 辽源站长网 (https://www.0437zz.com/)- 云专线、云连接、智能数据、边缘计算、数据安全!
当前位置: 首页 > 站长学院 > MySql教程 > 正文

使用Kafka和MongoDB进行Go异步处理

发布时间:2018-08-28 06:45:43 所属栏目:MySql教程 来源:Melvin Vivas
导读:技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战 在我前面的博客文章我的第一个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数

kafka-to-mongo/kafka-mongo-sample.go

  1. func main() {
  2.  
  3. //Create MongoDB session
  4. session := initialiseMongo()
  5. mongoStore.session = session
  6.  
  7. receiveFromKafka()
  8.  
  9. }
  10.  
  11. func receiveFromKafka() {
  12.  
  13. fmt.Println("Start receiving from Kafka")
  14. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  15. "bootstrap.servers": "localhost:9092",
  16. "group.id": "group-id-1",
  17. "auto.offset.reset": "earliest",
  18. })
  19.  
  20. if err != nil {
  21. panic(err)
  22. }
  23.  
  24. c.SubscribeTopics([]string{"jobs-topic1"}, nil)
  25.  
  26. for {
  27. msg, err := c.ReadMessage(-1)
  28.  
  29. if err == nil {
  30. fmt.Printf("Received from Kafka %s: %sn", msg.TopicPartition, string(msg.Value))
  31. job := string(msg.Value)
  32. saveJobToMongo(job)
  33. } else {
  34. fmt.Printf("Consumer error: %v (%v)n", err, msg)
  35. break
  36. }
  37. }
  38.  
  39. c.Close()
  40.  
  41. }
  42.  
  43. func saveJobToMongo(jobString string) {
  44.  
  45. fmt.Println("Save to MongoDB")
  46. col := mongoStore.session.DB(database).C(collection)
  47.  
  48. //Save data into Job struct
  49. var _job Job
  50. b := []byte(jobString)
  51. err := json.Unmarshal(b, &_job)
  52. if err != nil {
  53. panic(err)
  54. }
  55.  
  56. //Insert job into MongoDB
  57. errMongo := col.Insert(_job)
  58. if errMongo != nil {
  59. panic(errMongo)
  60. }
  61.  
  62. fmt.Printf("Saved to MongoDB : %s", jobString)
  63.  
  64. }

(编辑:辽源站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读