消息队列设计一个幂等的消费逻辑golang版

如何实现消息幂等

设计幂等的消费逻辑的关键是确保每条消息只被处理一次,即使在网络故障或消费者重启的情况下。通常使用唯一的消息ID和持久化存储来记录处理过的消息ID。

实现步骤

  1. 连接kafka和redis
  2. 检查消息ID
  3. 处理消息
  4. 标记消息已处理
package main

import (
	"context"
	"crypto/md5"
	"encoding/hex"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/go-redis/redis/v8"
	"log"
	"time"
)

// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
	Addr:     "localhost:6379",
	Password: "", // no password set
	DB:       0,  // use default DB
})

// 计算消息的唯一ID(可以使用消息的内容或其他标识)
func calculateMessageID(message []byte) string {
	hash := md5.Sum(message)
	return hex.EncodeToString(hash[:])
}

// 检查消息ID是否已处理
func isMessageProcessed(messageID string) bool {
	result, err := rdb.Get(ctx, messageID).Result()
	if err == redis.Nil {
		return false
	} else if err != nil {
		log.Fatalf("Failed to get message ID from Redis: %v", err)
	}
	return result == "processed"
}

// 标记消息ID为已处理
func markMessageAsProcessed(messageID string) {
	err := rdb.Set(ctx, messageID, "processed", 0).Err()
	if err != nil {
		log.Fatalf("Failed to set message ID in Redis: %v", err)
	}
}

// 处理消息的逻辑
func processMessage(message []byte) {
	// 在这里添加具体的消息处理逻辑
	fmt.Printf("Processing message: %s\n", string(message))
}
// 初始化Kafka消费者,读取消息,检查消息ID,处理未处理的消息,并将消息ID标记为已处理。
func main() {
	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	consumer.Subscribe("myTopic", nil)

	for {
		msg, err := consumer.ReadMessage(-1)
		if err == nil {
			messageID := calculateMessageID(msg.Value)
			if !isMessageProcessed(messageID) {
				processMessage(msg.Value)
				markMessageAsProcessed(messageID)
			} else {
				fmt.Printf("Message %s already processed\n", messageID)
			}
		} else {
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
		time.Sleep(1 * time.Second) // 可选:添加延迟以防止消息消费过快
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/758896.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

grpc教程——proto文件转go

【1】编写一个proto文件 syntax "proto3"; package myproto;service NC{rpc SayStatus (NCRequest) returns (NCResponse){} }message NCRequest{ string name 1; } message NCResponse{string status 1; } 【2】转换:protoc --go_out. myservice.pro…

重生奇迹MU 正确获取金币的方式

在游戏中,需要消耗大量的金币来购买红药等物品。因此,如何快速赚取金币也成为玩家关注的问题。您知道有哪些方法可以快速地获得金币吗? 一、哪个地图上是最适合打金币的很关键 在选择打钱的地方时,不能盲目行动,需要…

安装maven与nexus

安装maven与nexus Maven官网下载地址:http://maven.apache.org cd /data/software/wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.8-bin.tar.gz# 解压 tar xf apache-maven-3.8.1-bin.tar.gz -C /opt/[rooth…

木各力“GERRI”被“GREE”格力无效宣告成功

近日“GERRI”被“GREE”格力无效宣告成功,“GERRI”和“GREE”近似不,如果很近似当初就不会通过初审和下商标注册证,但是如果涉及知名商标和驰名商标,人家就可以异议和无效。 “GERRI”在被无效宣告时,引用了6个相关的…

【启明智显分享】乐鑫ESP32-S3R8方案2.8寸串口屏:高性能低功耗,WIFI/蓝牙无线通信

近年来HMI已经成为大量应用聚焦的主题,在消费类产品通过创新的HMI设计带来增强的连接性和更加身临其境的用户体验之际,工业产品却仍旧在采用物理接口。这些物理接口通常依赖小型显示器或是简单的LED,通过简单的机电开关或按钮来实现HMI交互。…

竞赛 深度学习 大数据 股票预测系统 - python lstm

文章目录 0 前言1 课题意义1.1 股票预测主流方法 2 什么是LSTM2.1 循环神经网络2.1 LSTM诞生 2 如何用LSTM做股票预测2.1 算法构建流程2.2 部分代码 3 实现效果3.1 数据3.2 预测结果项目运行展示开发环境数据获取 最后 0 前言 🔥 优质竞赛项目系列,今天…

2.2 Python数据类型详解

第二节:Python数据类型详解 Python作为一种动态类型语言,支持多种数据类型,每种数据类型都有其特定的特点和用途。本章将详细介绍Python中常见的数据类型及其特性,以及如何使用这些数据类型进行编程。 2.2.1 整数 (int) 整数是…

黑马点评-Redis的缓存击穿,缓存雪崩,缓存穿透,互斥锁

文章目录 1.缓存穿透2.缓存雪崩3.缓存击穿3.1 互斥锁 1.缓存穿透 解决办法 写入NULL值到Redis缓存,以后就会命中Redis的控制缓存而不会出现请求直接打到数据库的问题! 代码 2.缓存雪崩 这个概念很好理解,雪崩就是无数的小雪花结构突然因…

pandas数据分析(1)

pandas,即Python数据分析库(Python data analysis library) DataFrame和Series DataFrame(数据帧)和Series(序列)是pandas的核心数据结构。DataFrame的主要组件包含索引、列、数据。DataFrame和…

扫描全能王的AI驱动创新与智能高清滤镜技术解析

目录 引言1、扫描全能王2、智能高清滤镜黑科技2.1、图像视觉矫正2.2、去干扰技术 3、实际应用案例3.1、打印文稿褶皱检测3.2、试卷擦除手写3.3、老旧文件处理3.4、收银小票3.5、从不同角度扫描文档 4、用户体验结论与未来展望 引言 在数字化时代背景下,文档扫描功能…

云计算【第一阶段(21)】Linux引导过程与服务控制

目录 一、linux操作系统引导过程 1.1、开机自检 1.2、MBR引导 1.3、GRUB菜单 1.4、加载 Linux 内核 1.5、init进程初始化 1.6、简述总结 1.7、初始化进程centos 6和7的区别 二、排除启动类故障 2.1、修复MBR扇区故障 2.1.1、 实验 2.2、修复grub引导故障 2.2.1、实…

从AICore到TensorCore:华为910B与NVIDIA A100全面分析

华为NPU 910B与NVIDIA GPU A100性能对比,从AICore到TensorCore,展现各自计算核心优势。 AI 2.0浪潮汹涌而来,若仍将其与区块链等量齐观,视作炒作泡沫,则将错失新时代的巨大机遇。现在,就是把握AI时代的关键…

深入解析高斯过程:数学理论、重要概念和直观可视化全解

与其他算法相比,高斯过程不那么流行,但是如果你只有少量的数据,那么可以首先高斯过程。在这篇文章中,我将详细介绍高斯过程。并可视化和Python实现来解释高斯过程的数学理论。 多元高斯分布 多元高斯分布是理解高斯过程所必须的概…

图书管理系统(附源码)

前言:前面一起和小伙伴们学习了较为完整的Java语法体系,那么本篇将运用这些知识连串在一起实现图书管理系统。 目录 一、总体设计 二、书籍与书架 书籍(Book) 书架(Booklist) 三、对图书的相关操作 I…

java将html转成图片

java 将html转成图片 1.导入jar2.代码3.展示结果4.注意事项 最近有一个需求需要根据指定的样式生成图片&#xff0c;使用java原生技术有些麻烦&#xff0c;所以上网搜了下案例&#xff0c;最后发现最好用的还是html2image&#xff0c;这里进行简单总结下。 1.导入jar <!-- 用…

metasfresh开源ERP系统Windows开发环境配置参考

目录 概述 开发环境 配置过程 后端启动 前端启动 登陆系统 其他 概述 Compiere闭源之后衍生出了Admpiere等若干开源的产品&#xff0c;metasfresh就是其中之一&#xff0c;metasfresh截至发稿时在GitHub上已有64000多次的修改提交&#xff0c;而且仍在维护中&#xff0…

Python应用开发——30天学习Streamlit Python包进行APP的构建(12)

st.checkbox 显示复选框部件。 Function signature[source] st.checkbox(label, valueFalse, keyNone, helpNone, on_changeNone, argsNone, kwargsNone, *, disabledFalse, label_visibility"visible") Returns (bool) Whether or not the checkbox is checked. …

Sentinel解决雪崩问题

我们或多或少都对雪崩问题有点了解&#xff0c;在微服务系统中&#xff0c;各个微服务互相调用&#xff0c;关系错综复杂&#xff0c;如果其中一个微服务挂了或者处理消息的速度大幅下降&#xff0c;需要被处理的消息越积越多&#xff0c;那么影响的不仅仅是本微服务的功能&…

算法入门(上)

什么是算法&#xff1f; 算法&#xff08;Algorithm&#xff09;是解决特定问题求解步骤的描述&#xff0c;在计算机中表现为指令的有限序列&#xff0c;并且每条指令表示一个或多个操作。 给定一个问题&#xff0c;能够解决这个问题的算法是有很多种的。算式中的问题是千奇百怪…

C语言单链表的算法之插入节点

一&#xff1a;访问各个节点中的数据 &#xff08;1&#xff09;访问链表中的各个节点的有效数据&#xff0c;这个访问必须注意不能使用p、p1、p2&#xff0c;而只能使用phead &#xff08;2&#xff09;只能用头指针不能用各个节点自己的指针。因为在实际当中我们保存链表的时…