悠家娱乐 | Linux 中国,构建一个即时消息应用(四):消息( 二 )


varrxSpaces=regexp.MustCompile(''s+'')funcremoveSpaces(sstring)string{ifs==''''{returns}lines:=make([]string,0)for_,line:=rangestrings.Split(s,''n''){line=rxSpaces.ReplaceAllLiteralString(line,'''')line=strings.TrimSpace(line)ifline!=''''{lines=append(lines,line)}}returnstrings.Join(lines,''n'')}
这是删除空格的函数 。 它遍历每一行 , 删除两个以上的连续空格 , 然后回非空行 。
验证之后 , 它将启动一个SQL事务 。 首先 , 它查询对话中的参与者是否存在 。
funcqueryParticipantExistance(ctxcontext.Context,tx*sql.Tx,userID,conversationIDstring)(bool,error){ifctx==nil{ctx=context.Background()}varexistsbooliferr:=tx.QueryRowContext(ctx,`SELECTEXISTS(SELECT1FROMparticipantsWHEREuser_id=$1ANDconversation_id=$2)`,userID,conversationID).Scan(&exists);err!=nil{returnfalse,err}returnexists,nil}
我将其提取到一个函数中 , 因为稍后可以重用 。
如果用户不是对话参与者 , 我们将返回一个404NOTFound错误 。
然后 , 它插入消息并更新对话last_message_id 。 从这时起 , 由于我们不允许删除消息 , 因此last_message_id不能为NULL 。
接下来提交事务 , 并在goroutine中更新参与者messages_read_at 。
funcupdateMessagesReadAt(ctxcontext.Context,userID,conversationIDstring)error{ifctx==nil{ctx=context.Background()}if_,err:=db.ExecContext(ctx,`UPDATEparticipantsSETmessages_read_at=now()WHEREuser_id=$1ANDconversation_id=$2`,userID,conversationID);err!=nil{returnerr}returnnil}
在回复这条新消息之前 , 我们必须通知一下 。 这是我们将要在下一篇文章中编写的实时部分 , 因此我在那里留一了个注释 。
获取消息
这个端点处理对/api/conversations/{conversationID}/messages的GET请求 。 它用一个包含会话中所有消息的JSON数组进行响应 。 它还具有更新参与者messages_read_at的副作用 。
funcgetMessages(whttp.ResponseWriter,r*http.Request){ctx:=r.Context()authUserID:=ctx.Value(keyAuthUserID).(string)conversationID:=way.Param(ctx,''conversationID'')tx,err:=db.BeginTx(ctx,&sql.TxOptions{ReadOnly:true})iferr!=nil{respondError(w,fmt.Errorf(''couldnotbegintx:%v'',err))return}defertx.Rollback()isParticipant,err:=queryParticipantExistance(ctx,tx,authUserID,conversationID)iferr!=nil{respondError(w,fmt.Errorf(''couldnotqueryparticipantexistance:%v'',err))return}if!isParticipant{http.Error(w,''Conversationnotfound'',http.StatusNotFound)return}rows,err:=tx.QueryContext(ctx,`SELECTid,content,created_at,user_id=$1ASmineFROMmessagesWHEREmessages.conversation_id=$2ORDERBYmessages.created_atDESC`,authUserID,conversationID)iferr!=nil{respondError(w,fmt.Errorf(''couldnotquerymessages:%v'',err))return}deferrows.Close()messages:=make([]Message,0)forrows.Next(){varmessageMessageiferr=rows.Scan(&message.ID,&message.Content,&message.CreatedAt,&message.Mine,);err!=nil{respondError(w,fmt.Errorf(''couldnotscanmessage:%v'',err))return}messages=append(messages,message)}iferr=rows.Err();err!=nil{respondError(w,fmt.Errorf(''couldnotiterateovermessages:%v'',err))return}iferr=tx.Commit();err!=nil{respondError(w,fmt.Errorf(''couldnotcommittxtogetmessages:%v'',err))return}gofunc(){iferr=updateMessagesReadAt(nil,authUserID,conversationID);err!=nil{log.Printf(''couldnotupdatemessagesreadat:%vn'',err)}}()respond(w,messages,http.StatusOK)}